网站流量图怎么做的,wordpress单号管理系统,黄冈做网站价格,php双语网站源码欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)#xff1a;https://github.com/zq2599/blog_demos 《controller-manager学习三部曲》完整链接
通过脚本文件寻找程序入口源码学习deployment的controller启动分析
本篇概览 本文是《controller-manager学习三…欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)https://github.com/zq2599/blog_demos 《controller-manager学习三部曲》完整链接
通过脚本文件寻找程序入口源码学习deployment的controller启动分析
本篇概览 本文是《controller-manager学习三部曲》的终篇前面咱们从启动到运行已经分析了controller-manager的详细工作对controller-manager有了详细了解也知道controller-manager最重要的任务是调用各controller的初始化方法使它们进入正常的工作状态也就是下面代码的黄色箭头位置 这些初始化方法又是哪来的呢不得不再次提到前文多次遇到的NewControllerInitializers方法这里面有所有controller的初始化方法 现在问题来了controller有这么多它们的初始化到底做了些什么 篇幅所限自然不可能把每个controller的初始化方法都看一遍所以咱们还是挑一个典型的来看看吧就选deployment的controller也就是上图黄色箭头指向的那行
register(names.DeploymentController, startDeploymentController)不过在正式阅读deployment的controller启动代码之前先巩固一下基础弄清楚register方法是什么
register方法
所有controller都要通过register方法注册所以这个register有必要了解一下
// controllers是个mapkey就是controller的名称了value是初始化方法
controllers : map[string]InitFunc{}
// register在此定义
register : func(name string, fn InitFunc) {// 同一个key不能注册多次 if _, found : controllers[name]; found {panic(fmt.Sprintf(controller name %q was registered twice, name))}controllers[name] fn
}作为value的InitFunc也非常重要它对初始化方法的入参和返回值做了定义保证了一致性
type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)InitFunc会在StartControllers方法中被执行每个InitFunc执行结束意味着对应controller初始化完成来看看它的三个返回值
返回值说明controller创建的controller对象这是个接口定义只要求实现Name方法enabled用于描述创建的controller对象是否可用如果可用就会做健康检查相关的判断和注册工作err如果创建过程有错误发生就在此返回此err一旦不为空就会导致整个controller-manager进程退出
现在准备工作算是完成了该研究deployment的启动代码了也就是startDeploymentController方法
且看deployment的controller是如何创建的
如果您看过欣宸之前的《client-go实战》系列应该对自定义controller的套路非常熟悉主要是下面这几件事情
创建队列并指定处理队列数据的方法监听指定类型的资源待其发生变化的时候将其放入队列由前面指定的方法来做具体的处理
正因为熟悉了这个套路才可以提前猜测deployment的controller做的也是这些事情然后再来验证猜测startDeploymentController方法的源码很简单先创建对象再调用Run方法启动业务处理逻辑要注意的是NewDeploymentController的入参有deployment、replicaset、pod等三种Informer所以controller会监听这三种资源的变更然后还有个client在请求api-server时会用到
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {// 实例化对象dc, err : deployment.NewDeploymentController(ctx,controllerContext.InformerFactory.Apps().V1().Deployments(),controllerContext.InformerFactory.Apps().V1().ReplicaSets(),controllerContext.InformerFactory.Core().V1().Pods(),controllerContext.ClientBuilder.ClientOrDie(deployment-controller),)if err ! nil {return nil, true, fmt.Errorf(error creating Deployment controller: %v, err)}go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))return nil, true, nil
}打开方法看看deployment的controller具体是如何创建的
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {eventBroadcaster : record.NewBroadcaster()logger : klog.FromContext(ctx)// 创建deployment的controller对象注意queue被用来存入要监听的业务变更然后有对应的processor来处理这是套路client也传进去了里面请求api-server会用到(主要是资源的写操作)dc : DeploymentController{client: client,eventBroadcaster: eventBroadcaster,eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: deployment-controller}),queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), deployment),}// rsControl提供PatchReplicaSet方法可用于ReplicaSet资源的patch操作dc.rsControl controller.RealRSControl{KubeClient: client,Recorder: dc.eventRecorder,}// 监听Deployment资源的变化增删改都绑定了对应的处理方法dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {dc.addDeployment(logger, obj)},UpdateFunc: func(oldObj, newObj interface{}) {dc.updateDeployment(logger, oldObj, newObj)},// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc: func(obj interface{}) {dc.deleteDeployment(logger, obj)},})// 监听ReplicaSet资源的变化增删改都绑定了对应的处理方法rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {dc.addReplicaSet(logger, obj)},UpdateFunc: func(oldObj, newObj interface{}) {dc.updateReplicaSet(logger, oldObj, newObj)},DeleteFunc: func(obj interface{}) {dc.deleteReplicaSet(logger, obj)},})// 监听Pod资源的变化增删改都绑定了对应的处理方法podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: func(obj interface{}) {dc.deletePod(logger, obj)},})// 这是整个controller的核心业务代码就是收到deployment资源的变化后所做的各种操作dc.syncHandler dc.syncDeployment// 入参是对象功能是将对象的key放入队列dc.enqueueDeployment dc.enqueue// 这一堆lister都会用在各种查询的场景dc.dLister dInformer.Lister()dc.rsLister rsInformer.Lister()dc.podLister podInformer.Lister()// 是否已经同步的标志dc.dListerSynced dInformer.Informer().HasSynceddc.rsListerSynced rsInformer.Informer().HasSynceddc.podListerSynced podInformer.Informer().HasSyncedreturn dc, nil
}在上面的代码中果然看到了队列queue这就是连接生产和消费的关键对象至于dInformer.Informer().AddEventHandler、rsInformer.Informer().AddEventHandler等方面里面的AddFunc、UpdateFunc等肯定是监听了deployment、pod的变化然后将key放入deployment中为了验证这个猜测咱们挑一个看看就看ReplicaSet的AddFunc中的(logger, obj)dc.addReplicaSet果然这些资源的变化都会导致相关的资源被放入队列queue 至此咱们对deployment的controller创建算是了解了接下来要了解controller如何运行也就是下图黄色箭头所指的方法做了些什么按照套路这里面要做的就是让queue的生产和消费正常运转起来 方法的代码如下
func (dc *DeploymentController) Run(ctx context.Context, workers int) {defer utilruntime.HandleCrash()// Start events processing pipeline.dc.eventBroadcaster.StartStructuredLogging(0)dc.eventBroadcaster.StartRecordingToSink(v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events()})defer dc.eventBroadcaster.Shutdown()defer dc.queue.ShutDown()logger : klog.FromContext(ctx)logger.Info(Starting controller, controller, deployment)defer logger.Info(Shutting down controller, controller, deployment)// 确保本地与api-server的同步已经完成if !cache.WaitForNamedCacheSync(deployment, ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {return}// 多个协程并行执行每个一秒钟执行一次dc.worker方法其实就是消费queue的业务逻辑for i : 0; i workers; i {go wait.UntilWithContext(ctx, dc.worker, time.Second)}-ctx.Done()
}可见是定时执行dc.worker方法那就看看这个worker是啥吧如下所示其实就是processNextWorkItem这个咱们在《client-go实战》系列中已经写了太多次了就是消费queue的业务代码也是整个controller的核心业务代码
func (dc *DeploymentController) worker(ctx context.Context) {for dc.processNextWorkItem(ctx) {}
}// 这是整个deployment的controller的核心业务逻辑对deployment资源的变化进行响应
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {// 从queue中取出对象的keykey, quit : dc.queue.Get()if quit {return false}defer dc.queue.Done(key)// 对指定key对应的资源进行处理也就是此deployment的主要工作err : dc.syncHandler(ctx, key.(string))dc.handleErr(ctx, err, key)return true
}读到这里不知您是否有一种豁然开朗的感觉kubernetes规范了queue、生产、消费的模式并且在自身controller中践行此模式这就使得开发controller和阅读controller代码都变得更加容易了甚至在本文章我也数次尝试在阅读代码前先猜测再验证结果都和猜测的一致或许您可能会有疑问代码都分析到这里了咋不继续读dc.syncHandler的源码把这个controller搞明白?呃…这里必须要打住了本文的重点的controller-manager的学习也就是controller是如何创建和启动的而并非研究controller的具体业务所以dc.syncHandler就不展开看了毕竟每个controller都有自己独特的业务处理逻辑但我相信现在的您已经可以轻松读懂dc.syncHandler从而彻底掌握deployment的controller了毕竟咱们一起经历了太多的套路对这套逻辑已经熟悉至此《controller-manager学习三部曲》就已经完成了希望这个系列能够帮您梳理和熟悉kubernetes的controller管理和启动逻辑让您能开发出更加契合原生kubernetes系统的controller
你不孤单欣宸原创一路相伴
Java系列Spring系列Docker系列kubernetes系列数据库中间件系列DevOps系列