网站开发中都引用什么文献,网站建设备案不通过,天津手机版建站系统价格,两学一做 官方网站1 何为virtual-kubelet#xff1f;
kubelet是k8s的agent#xff0c;负责监听Pod的调度情况#xff0c;并运行Pod。而virtual-kubelet不是真实跑在宿主机上的#xff0c;而是一个可以跑在任何地方的进程#xff0c;该进程向k8s伪装成一个真实的Node#xff0c;但是实际的…1 何为virtual-kubelet
kubelet是k8s的agent负责监听Pod的调度情况并运行Pod。而virtual-kubelet不是真实跑在宿主机上的而是一个可以跑在任何地方的进程该进程向k8s伪装成一个真实的Node但是实际的操作可以自定义。
也就是说virtual-kubelet向k8s提供与kubelet兼容的接口而可以自定义底层的具体实现通常可以用于不同架构之间的配合使用例如virtual-kubelet的底层的具体实现是采用kvm实现或者用另一种方式实现Pod。
virtual-kubelet的使用场景
对接原有的平台Kubernetes Virtual Kubelet with ACI资源的自动扩容UCloud UK8S虚拟节点 让用户不再担心集群没有资源
2 virtual-kubelet的整体架构
virtual-kubelet
整个virtual-kubelet仓库重要的目录是
cmd/virtual-kubelet/ main.go 主函数负责自动virtual-kubelet依旧使用了Cobra实现register.go 注册provider这里实现了个Mock的providerinternal/ commands/ 程序命令的定义provider/ provider.go provider的接口定义mock/ mock的provider的实现 internal/ 额外实现的一些包供内部调用node/ kubelet中的一些逻辑
因此整个仓库的整体调用路径是
main.go使用Cobra构建命令行操作然后调用register.go注册provider进入事件循环node/目录下有PodController的实现这里面就会调用注册的provider
3 基于virtual-kubelet库扩展k8s
使用virtual-kubelet扩展k8s有2种方式
1 直接克隆virtual-kubelet仓库然后在cmd/virtual-kubelet/internal/provider下面新建一个自己的provider然后实现provider接口的各种方法然后在cmd/virtual-kubelet/register.go中注册自己实现的provider即可2 新开一个仓库在里面使用virtual-kubelet包同样的只需要实现provider在主函数中注册然后将provider的接口对接到自己的实现就行
如果是自己实现virtual-kubelet通常会使用方式2。
当前部分云厂商已经开发了自己的virtual-kubelet用于对接自己的容器平台例如微软开发了对接ACI的azure-aci下面重点来分析azure-aci的实现自己实现的方式也类似。
4 Microsoft的azure-aci
virtual-kubelet/azure-aci的目录结构如下
charts/helm打包生成的压缩包client/封装对接ACI的接口 aci/对ACI容器组的接口封装api/封装ACI接口时使用的一些功能函数network/封装ACI的subnet的接口resourcegroups/封装ACI的资源组的接口 cmd/virtual-kubelet/main.go主函数日志和配置处理注册ACIProvider并启动helm/helm的配置provider/ACIProvider的实现调用client中的函数实现provider
main.go中就是一些环境配置和启动函数
// 获取环境变量中的配置
// 例如节点名、kubeconfig文件、污点
o, err : opts.FromEnv()
if err ! nil {log.G(ctx).Fatal(err)
}
o.Provider azure
o.Version strings.Join([]string{k8sVersion, vk-azure-aci, buildVers, -)
o.PodSyncWorkers numberOfWork// 初始化node节点
node, err : cli.New(ctx,cli.WithBaseOpts(o),cli.WithCLIVersion(buildVersion, buildTime),cli.WithProvider(azure, func(cfg provider.InitConfig) (proviProvider, error) {return azprovider.NewACIProvider(cfg.ConfigPath, ResourceManager, cfg.NodeName, cfg.OperatingSystem, InternalIP, cfg.DaemonPort, cfg.KubeClusterDomain)}),cli.WithPersistentFlags(logConfig.FlagSet()),cli.WithPersistentPreRunCallback(func() error {return logruscli.Configure(logConfig, logger)}),cli.WithPersistentFlags(traceConfig.FlagSet()),cli.WithPersistentPreRunCallback(func() error {return opencensuscli.Configure(ctx, traceConfig, o)}),if err ! nil {log.G(ctx).Fatal(err)if err : node.Run(ctx); err ! nil {log.G(ctx).Fatal(err)
}上面的核心代码就是WithProvider()该函数有两个参数一个是provider的名称另一个是provider的初始化函数这里传的初始化函数就是创建ACIProviderazprovider.NewACIProvider()。
func WithProvider(name string, f provider.InitFunc) Option {return func(c *Command) {if c.s nil {c.s provider.NewStore()}c.s.Register(name, f)}
}这个地方需要注意的是初始化函数的参数cfg provider.InitConfig
type InitConfig struct {ConfigPath stringNodeName string // 注册到k8s到节点名称OperatingSystem string // 节点的操作系统InternalIP string // 节点的IPDaemonPort int32 // 节点的端口KubeClusterDomain string // k8s集群域名ResourceManager *manager.ResourceManager
}provider.InitConfig里面大部分都是VK节点向k8s集群声明自己的一些信息。这些信息是通过初始化函数直接给到provider到初始化函数的那么这些参数从哪里获得呢
第一种方式前面调用了opts.FromEnv()该函数会从环境变量中获取一些信息但是这个只有很少量的信息节点名、端口、kubconfig、污点。
第二种方式在执行azure-aci时传一些命令行参数通过查看cli.New()函数的实现发现该函数返回的实际上是个Command该类型的cmd是个cobra.Commandcmd在创建命令时调用了installFlags(cmd.Flags(), o)该函数会添加很多命令行选项其中包含常见的的cluster-domain、nodename、provider等。其实直接编译执行也能发现该程序等命令行参数。
这些命令行参数里面比较重要的是
disable-taint关闭污点如果VK节点需要调度Pod就需要启用该选项nodename节点名称cluster-domain集群域名连接k8s集群时使用no-verify-clients当请求访问VK节点时不验证客户端证书
接下来的重点就是azprovider.NewACIProvider()的实现从上面的目录结构也可以看出provider是对ACIProvider对provider接口的实现client是对ACI接口的封装在实现ACIProvider过程中调用client进行对接
provider - client - ACINewACIProvider()函数在provider/aci.go中实现其中的核心逻辑是
// 创建ACI客户端
p.aciClient, err aci.NewClient(azAuth, p.extraUserAgent, p.retryConfig)
if err ! nil {return nil, err
}// 设置节点容量
if err : p.setupCapacity(context.TODO()); err ! nil {return nil, err
}接下来就是接口实现kubelet最本质的工作就是监听Pod的状态变更然后执行相应的动作因此当然是需要实现Pod的相关操作。
下面是VK所有的接口
// pod控制器调用的接口用于管理pod的生命周期
type PodLifecycleHandler interface {// 创建PodCreatePod(ctx context.Context, pod *corev1.Pod) error// 更新PodUpdatePod(ctx context.Context, pod *corev1.Pod) error// 删除PodDeletePod(ctx context.Context, pod *corev1.Pod) error// 查询单个Pod返回的Pod有可能被多个goroutine并发访问// 因此最好使用DeepCopy深拷贝GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error)// 查询单个Pod对应的状态同样的需要使用DeepCopyGetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error)// 查询provider上运行的所有Pod同样的需要使用DeepCopyGetPods(context.Context) ([]*corev1.Pod, error)
}// 下面是必须要实现的函数除了Pod还包含其他的相关函数
type Provider interface {node.PodLifecycleHandler// 返回某个容器的日志(kubectl logs)GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error)// 在容器中执行命令(kubectl exec)RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error// 设置节点的参数包含容量、condition等ConfigureNode(context.Context, *v1.Node)
}// 返回Pod的统计
type PodMetricsProvider interface {GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error)
}// 用于支持Pod状态的异步更新
type PodNotifier interface {// 异步通知Pod的状态注册回调函数当Pod状态发生变化时就会调用回调函数NotifyPods(context.Context, func(*corev1.Pod))
}type NodeProvider interface {// 用于探测节点是否存活k8s周期调用该函数确定节点是否存活Ping(context.Context) error// 异步通知节点的状态注册回调函数当节点状态发生变化时就会调用回调函数NotifyNodeStatus(ctx context.Context, cb func(*corev1.Node))
}以上接口中除了Provider接口必须实现其他接口都是可选的。
下面以CreatePod()为例看下azure的具体实现
func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {...return p.createContainerGroup(ctx, pod.Namespace, pod.Name, containerGroup)
}func (p *ACIProvider) createContainerGroup(ctx context.Context, podNS, podName string, cg *aci.ContainerGroup) error {ctx addAzureAttributes(ctx, span, p)cgName : containerGroupName(podNS, podName)_, err : p.aciClient.CreateContainerGroup(ctx,p.resourceGroup,cgName,*cg,)if err ! nil {log.G(ctx).WithError(err).Errorf(failed to create container group %v, cgName)}return err
}CreatePod()首先准备创建ACI容器组的资源然后调用createContainerGroup()该函数对接口调用再次封装然后调用了aciClient的CreateContainerGroup()创建ACI容器组。而CreateContainerGroup()就是调用ACI的API接口创建容器组。PodLifecycleHandler中的函数实现方式都类似只需要对接后端的接口即可。
实现了Provider接口剩下的只需实现PodNotifier中的NotifyPods()。
NotifyPods()用于异步通知Pod的状态变化。设想下k8s展示Pod状态的实现k8s如何知道Pod的状态呢一种方式是k8s定时调用GetPods()接口就得到当前节点的所有Pod当Node和Pod较多时资源消耗还是有些多的。另一种方式就是节点通过比较k8s认为节点有的Pod和ACI上实际有的容器组就得到应该更新哪些Pod的状态。
Pod的异步更新实现在provider/podsTracker.go
type PodsTrackerHandler interface {// 查询存活的PodListActivePods(ctx context.Context) ([]PodIdentifier, error)// 查询Pod的状态FetchPodStatus(ctx context.Context, ns, name string) (*v1.PodStatus, error)// 清理PodCleanupPod(ctx context.Context, ns, name string) error
}type PodsTracker struct {rm *manager.ResourceManagerupdateCb func(*v1.Pod)handler PodsTrackerHandler
}// NotifyPods函数的实现该函数只在VK节点的PodController启动时调用一次
func (p *ACIProvider) NotifyPods(ctx context.Context, notifierCb func(*v1.Pod)) {// Capture the notifier to be used for communicating updates to VKp.tracker PodsTracker{rm: p.resourceManager,updateCb: notifierCb,handler: p,}go p.tracker.StartTracking(ctx)
}而在StartTracking()函数中会定时执行Pod的更新(updatePodsLoop)和删除(cleanupDanglingPods)
func (pt *PodsTracker) updatePodsLoop(ctx context.Context) {// 从资源管理器获取当前节点的Podk8sPods : pt.rm.GetPods()for _, pod : range k8sPods {updatedPod : pod.DeepCopy()ok : pt.processPodUpdates(ctx, updatedPod)if ok {pt.updateCb(updatedPod)}}
}// 处理Pod的更新返回值表明Pod的状态是否更新
func (pt *PodsTracker) processPodUpdates(ctx context.Context, pod *v1.Pod) bool {// 调用ACI的接口获取Pod的状态podStatusFromProvider, err : pt.handler.FetchPodStatus(ctx, pod.Namespace, pod.Name)if err nil podStatusFromProvider ! nil {// 如果获取状态没有出错并且返回了状态则将状态信息更新到PodpodStatusFromProvider.DeepCopyInto(pod.Status)return true}if errdef.IsNotFound(err) || (err nil podStatusFromProvider nil) {if pod.Status.Phase v1.PodRunning {// 如果k8s中的状态是Running但是ACI容器组不存在则将k8s中容器的状态设置为Failed此时会重建Podpod.Status.Phase v1.PodFailedpod.Status.Reason statusReasonNotFoundpod.Status.Message statusMessageNotFoundnow : metav1.NewTime(time.Now())for i : range pod.Status.ContainerStatuses {if pod.Status.ContainerStatuses[i].State.Running nil {continue}// 更新Pod的状态pod.Status.ContainerStatuses[i].State.Terminated v1.ContainerStateTerminated{ExitCode: containerExitCodeNotFound,Reason: statusReasonNotFound,Message: statusMessageNotFound,FinishedAt: now,StartedAt: pod.Status.ContainerStatuses[i].State.Running.StartedAt,ContainerID: pod.Status.ContainerStatuses[i].ContainerID,}pod.Status.ContainerStatuses[i].State.Running nil}return true}return false}if err ! nil {log.G(ctx).WithError(err).Errorf(failed to retrieve pod %v status from provider, pod.Name)}return false
}// 删除Pod
func (pt *PodsTracker) cleanupDanglingPods(ctx context.Context) {// 获取k8s中的Pod和ACI容器组k8sPods : pt.rm.GetPods()activePods, err : pt.handler.ListActivePods(ctx)if err ! nil {log.G(ctx).WithError(err).Errorf(failed to retrive active container groups list)return}if len(activePods) 0 {for i : range activePods {// 遍历ACI容器组存活的Pod如果k8s中没有对应的Pod则删除ACI容器组pod : getPodFromList(k8sPods, activePods[i].namespace, activePods[i].name)if pod ! nil {continue}err : pt.handler.CleanupPod(ctx, activePods[i].namespace, activePods[i].name)if err ! nil !errdef.IsNotFound(err) {log.G(ctx).WithError(err).Errorf(failed to cleanup pod %v, activePods[i].name)}}}
}updatePodsLoop处理k8s中有ACI容器组不存在的情况cleanupDanglingPods处理ACI中有k8s中不存在的情况
那么virtual kubelet的整体结构如下 总结下上面的三条路径
PodController通过informer机制监听Pod的变化然后执行Pod的增删改查操作virtual kubelet提供http接口当用户执行kubectl logs/exec时就调用对应的函数然后会调用Provider接口中对应的函数这里主要难点在于需要实时将数据回传展示给用户PodNotifier提供了异步更新Pod的接口apiserver为了让etcd中Pod的数据与节点上Pod的数据保持一致会定时调用节点的接口查询Pod的状态当节点和Pod比较多时比较消耗apiserver的资源。为了节省资源节点会比较k8s中的Pod的数据和后端实际Pod的数据如果发现有不一致(k8s中有该Pod后端没有k8s中没有该Pod后端有)则执行状态的更新或者后端容器组的操作
5 关于logs和exec
Provider中剩下logs和exec则比较麻烦
GetContainerLogs 读取日志如果需要支持-f选项就比较麻烦RunInContainer 执行命令需要实时将命令的结果传送回来
azure-aci中的logs不支持-f选项因此只需要调用ACI容器组的接口获取日志就行。而exec则需要使用websocket进行实时的命令传送和结果回传。
具体的数据流向如下 当用户在终端输入命令时首先kubectl会先拿到命令然后再交给kube-apiserverkube-apiserver再将命令发送给kubelet在这里就是virtual kubelet。那么VK要做的就是读取命令然后将命令发送给后端的容器组去执行容器组执行完成后再将结果推送给VKVK则将结果推送给kube-apiserverkube-apiserver将结果推送给kubectlkubectl打印出来。这时候用户看到的就类似于输入命令然后输出结果。
而这些组件之间的数据流转需要实时推送比较适合的方式就是使用websocket这些组件之间通过websocket连接连接之后通过readmessage()和writemessage()进行数据传输。
从VK的角度看需要做的就是
使用websocket连接后端的容器组从kube-apiserver读取数据然后发送给后端的容器组从后端的容器组接收数据并输出给kube-apiserver
// api.AttachIO是个接口Stdin()和Stdout()分别返回标准输入和标准输出
// VK需要从Stdin()中读取数据然后写给后端的容器组
// 同时需要接收容器组返回的数据然后发送给Stdout()
func (p *ACIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {out : attach.Stdout()if out ! nil {defer out.Close()}// 根据namespace和name获取容器组cg, err : p.getContainerGroup(ctx, namespace, name)if err ! nil {return err}// 设置终端默认大小size : api.TermSize{Height: 60,Width: 120,}resize : attach.Resize()if resize ! nil {select {case size -resize:case -ctx.Done():return ctx.Err()}}// 获取ACI容器组的websocket的URI和密码ts : aci.TerminalSizeRequest{Height: int(size.Height), Width: int(size.Width)}xcrsp, err : p.aciClient.LaunchExec(p.resourceGroup, cg.Name, container, strings.Join(cmd, ), ts)if err ! nil {return err}wsURI : xcrsp.WebSocketURIpassword : xcrsp.Password// 连接ACI的websocket并输入密码c, _, _ : websocket.DefaultDialer.Dial(wsURI, nil)if err : c.WriteMessage(websocket.TextMessage, []byte(password)); err ! nil {panic(err)}defer c.Close()in : attach.Stdin()if in ! nil {// 将读取命令并写入后端的容器组的逻辑放在后台的goroutinego func() {for {// 如果父协程结束直接退出select {case -ctx.Done():returndefault:}// 读取kube-apiserver发送的命令然后发送给后端的容器组var msg make([]byte, 512)n, err : in.Read(msg)if err ! nil {// Handle errorsreturn}if n 0 {if err : c.WriteMessage(websocket.BinaryMessage, msg[:n]); err ! nil {panic(err)}}}}()}if out ! nil {// 将接收容器组数据并写到kube-apiserver的逻辑放在前台任务for {// 如果父携程结束则推出循环select {case -ctx.Done():breakdefault:}// 从容器组读取数据然后发送给kube-apiserver_, cr, err : c.NextReader()if err ! nil {break}if _, err : io.Copy(out, cr); err ! nil {panic(err)}}}return ctx.Err()
}从上面的实现上看VK的角色是kubelet用于连接kube-apiserver和Pod因此如果需要实时通信就需要用websocket分别连接两端作为桥梁对两边的数据进行中转。