火炬开发区网站建设,爱写作网站,网站建设项目计划,哈尔滨站建好了吗简介#xff1a;pv controller是 kcm 的组件之一#xff0c;它负责处理集群中的pvc/pv对象#xff0c;对pvc/pv 对象进行状态转换。本文将基于 kubernetes 1.23进行解析。 作者 | 牧琦 来源 | 阿里技术公众号
基于 kubernetes 1.23
一 简介
pv controller是 kcm 的组件之…简介pv controller是 kcm 的组件之一它负责处理集群中的pvc/pv对象对pvc/pv 对象进行状态转换。本文将基于 kubernetes 1.23进行解析。 作者 | 牧琦 来源 | 阿里技术公众号
基于 kubernetes 1.23
一 简介
pv controller是 kcm 的组件之一它负责处理集群中的pvc/pv对象对pvc/pv 对象进行状态转换。
二 pvController 初始化
初始化代码在 pkg/controller/volume/persistentvolume/pv_controller_base.go 文件中NewController 主要做了如下几件事情
初始化 eventRecorder初始化 PersistentVolumeController 对象调用 VolumePluginMgr.InitPlugins() 方法 初始化存储插件代码存在于 pkg/volume/plugins.go 文件中 开始创建 informer 监听集群内的资源初始化了如下 informer PersistentVolumeInformerPersistentVolumeClaimInformerStorageClassInformerPodInformerNodeInformer将 PV PVC 的 event 分别放入 volumeQueue claimQueue为了不每次都迭代 pods 自定义一个通过 pvc 键索引 pod 的索引器初始化 intree 存储 - csi 迁移相关功能的 manager
NewController代码在cmd/kube-controller-manager代码里面被调用初始化成功之后紧接着调用go Run()方法运行 pvController
三 开始运行
// 开始运行 pvController
func (ctrl *PersistentVolumeController) Run(stopCh -chan struct{}) {defer utilruntime.HandleCrash()defer ctrl.claimQueue.ShutDown()defer ctrl.volumeQueue.ShutDown()klog.Infof(Starting persistent volume controller)defer klog.Infof(Shutting down persistent volume controller)if !cache.WaitForNamedCacheSync(persistent volume, stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {return}ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)go wait.Until(ctrl.volumeWorker, time.Second, stopCh)go wait.Until(ctrl.claimWorker, time.Second, stopCh)metrics.Register(ctrl.volumes.store, ctrl.claims, ctrl.volumePluginMgr)-stopCh
}
同步缓存之后开始周期性执行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 我们看下 initalizeCaches 方法
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {// 这里不访问 apiserver是从本地缓存拿出的对象这些对象不可以被外部函数修改volumeList, err : volumeLister.List(labels.Everything())if err ! nil {klog.Errorf(PersistentVolumeController cant initialize caches: %v, err)return}for _, volume : range volumeList {// 我们不能改变 volume 对象所以这里我们copy一份新对象对新对象进行操作volumeClone : volume.DeepCopy()if _, err ctrl.storeVolumeUpdate(volumeClone); err ! nil {klog.Errorf(error updating volume cache: %v, err)}}claimList, err : claimLister.List(labels.Everything())if err ! nil {klog.Errorf(PersistentVolumeController cant initialize caches: %v, err)return}for _, claim : range claimList {if _, err ctrl.storeClaimUpdate(claim.DeepCopy()); err ! nil {klog.Errorf(error updating claim cache: %v, err)}}klog.V(4).Infof(controller initialized)
}type persistentVolumeOrderedIndex struct {store cache.Indexer
}
该方法将 cache.listener 里面的缓存转存在 persistentVolumeOrderedIndex 中它是按 AccessModes 索引并按存储容量排序的 persistentVolume 的缓存。
1 resync
func (ctrl *PersistentVolumeController) resync() {klog.V(4).Infof(resyncing PV controller)pvcs, err : ctrl.claimLister.List(labels.NewSelector())if err ! nil {klog.Warningf(cannot list claims: %s, err)return}for _, pvc : range pvcs {ctrl.enqueueWork(ctrl.claimQueue, pvc)}pvs, err : ctrl.volumeLister.List(labels.NewSelector())if err ! nil {klog.Warningf(cannot list persistent volumes: %s, err)return}for _, pv : range pvs {ctrl.enqueueWork(ctrl.volumeQueue, pv)}
}
这里将集群内所有的 pvc/pv 统一都放到对应的 claimQueue volumeQueue 里面重新处理。 这个resyncPeriod 等于一个random time.Duration * config.time(在 kcm 启动时设置)。
2 volumeWorker
一个无限循环 不断的处理从 volumeQueue 里面获取到的 PersistentVolume
workFunc : func() bool {keyObj, quit : ctrl.volumeQueue.Get()if quit {return true}defer ctrl.volumeQueue.Done(keyObj)key : keyObj.(string)klog.V(5).Infof(volumeWorker[%s], key)_, name, err : cache.SplitMetaNamespaceKey(key)if err ! nil {klog.V(4).Infof(error getting name of volume %q to get volume from informer: %v, key, err)return false}volume, err : ctrl.volumeLister.Get(name)if err nil {// The volume still exists in informer cache, the event must have// been add/update/syncctrl.updateVolume(volume)return false}if !errors.IsNotFound(err) {klog.V(2).Infof(error getting volume %q from informer: %v, key, err)return false}// The volume is not in informer cache, the event must have been// deletevolumeObj, found, err : ctrl.volumes.store.GetByKey(key)if err ! nil {klog.V(2).Infof(error getting volume %q from cache: %v, key, err)return false}if !found {// The controller has already processed the delete event and// deleted the volume from its cacheklog.V(2).Infof(deletion of volume %q was already processed, key)return false}volume, ok : volumeObj.(*v1.PersistentVolume)if !ok {klog.Errorf(expected volume, got %v, volumeObj)return false}ctrl.deleteVolume(volume)return false}
我们主要关注 ctrl.updateVolume(volume) 方法
updateVolume
updateVolume 方法是对于集群内的 events 实际 handler 方法它里面主要调用了 ctrl.syncVolume 方法来处理
func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {klog.V(4).Infof(synchronizing PersistentVolume[%s]: %s, volume.Name, getVolumeStatusForLogging(volume))...// [Unit test set 4]if volume.Spec.ClaimRef nil {// Volume is unusedklog.V(4).Infof(synchronizing PersistentVolume[%s]: volume is unused, volume.Name)if _, err : ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ); err ! nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else /* pv.Spec.ClaimRef ! nil */ {// Volume is bound to a claim.if volume.Spec.ClaimRef.UID {// The PV is reserved for a PVC; that PVC has not yet been// bound to this PV; the PVC sync will handle it.klog.V(4).Infof(synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))if _, err : ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ); err ! nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil}klog.V(4).Infof(synchronizing PersistentVolume[%s]: volume is bound to claim %s, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Get the PVC by _name_var claim *v1.PersistentVolumeClaimclaimName : claimrefToClaimKey(volume.Spec.ClaimRef)obj, found, err : ctrl.claims.GetByKey(claimName)if err ! nil {return err}if !found {if volume.Status.Phase ! v1.VolumeReleased volume.Status.Phase ! v1.VolumeFailed {obj, err ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)if err ! nil !apierrors.IsNotFound(err) {return err}found !apierrors.IsNotFound(err)if !found {obj, err ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err ! nil !apierrors.IsNotFound(err) {return err}found !apierrors.IsNotFound(err)}}}if !found {klog.V(4).Infof(synchronizing PersistentVolume[%s]: claim %s not found, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Fall through with claim nil} else {var ok boolclaim, ok obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf(cannot convert object from volume cache to volume %q!?: %#v, claim.Spec.VolumeName, obj)}klog.V(4).Infof(synchronizing PersistentVolume[%s]: claim %s found: %s, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))}if claim ! nil claim.UID ! volume.Spec.ClaimRef.UID {klog.V(4).Infof(Maybe cached claim: %s is not the newest one, we should fetch it from apiserver, claimrefToClaimKey(volume.Spec.ClaimRef))claim, err ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err ! nil !apierrors.IsNotFound(err) {return err} else if claim ! nil {// Treat the volume as bound to a missing claim.if claim.UID ! volume.Spec.ClaimRef.UID {klog.V(4).Infof(synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))claim nil} else {klog.V(4).Infof(synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))}}}if claim nil {if volume.Status.Phase ! v1.VolumeReleased volume.Status.Phase ! v1.VolumeFailed {// Also, log this only once:klog.V(2).Infof(volume %q is released and reclaim policy %q will be executed, volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)if volume, err ctrl.updateVolumePhase(volume, v1.VolumeReleased, ); err ! nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err ctrl.reclaimVolume(volume); err ! nil {// Release failed, we will fall back into the same condition// in the next call to this methodreturn err}if volume.Spec.PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimRetain {// volume is being retained, it references a claim that does not exist now.klog.V(4).Infof(PersistentVolume[%s] references a claim %q (%s) that is not found, volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)}return nil} else if claim.Spec.VolumeName {if pvutil.CheckVolumeModeMismatches(claim.Spec, volume.Spec) {volumeMsg : fmt.Sprintf(Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode., claim.Name)ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)claimMsg : fmt.Sprintf(Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode., volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)// Skipping syncClaimreturn nil}if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {// The binding is not completed; let PVC sync handle itklog.V(4).Infof(synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it, volume.Name)} else {// Dangling PV; try to re-establish the link in the PVC syncklog.V(4).Infof(synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it, volume.Name)}ctrl.claimQueue.Add(claimToClaimKey(claim))return nil} else if claim.Spec.VolumeName volume.Name {// Volume is bound to a claim properly, update status if necessaryklog.V(4).Infof(synchronizing PersistentVolume[%s]: all is bound, volume.Name)if _, err ctrl.updateVolumePhase(volume, v1.VolumeBound, ); err ! nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else {// Volume is bound to a claim, but the claim is bound elsewhereif metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) volume.Spec.PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimDelete {if volume.Status.Phase ! v1.VolumeReleased volume.Status.Phase ! v1.VolumeFailed {// Also, log this only once:klog.V(2).Infof(dynamically volume %q is released and it will be deleted, volume.Name)if volume, err ctrl.updateVolumePhase(volume, v1.VolumeReleased, ); err ! nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err ctrl.reclaimVolume(volume); err ! nil {return err}return nil} else {if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {klog.V(4).Infof(synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding, volume.Name)if err ctrl.unbindVolume(volume); err ! nil {return err}return nil} else {// The PV must have been created with this ptr; leave it alone.klog.V(4).Infof(synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound, volume.Name)if err ctrl.unbindVolume(volume); err ! nil {return err}return nil}}}}
}
1、当 pv 的 Spec.ClaimRef 的值为空的时候说明当前 pv 未被使用调用 ctrl.updateVolumePhase 使得 pv 进入 Available 状态
2、当 pv 的 Spec.ClaimRef 的值不为空的时候 说明当前 pv 已绑定一个pvc
当Spec.ClaimRef.UID 为空的时候说明 pvc 还未绑定 pv 调用ctrl.updateVolumePhase 使得 pv 进入 Available 状态 方法返回等待 pvc syncClaim 方法处理使用 Spec.ClaimRef 相关的 pvc 信息获取 pv_controller缓存的pvc 如果 pvc 没有找到 有可能是集群压力过大缓存没有更新则进一步从 informercache 中找如果 informercache里面还是没有的话则进一步从apiserver中去找这里如果发现 非 Released 非 Failed 的pv 经过上述步骤仍然找不到 pvc 的话说明 pvc 被删除。在最新的kubernetes 版本中会检查reclaimPoilcy对 pv的状态进行处理找到 pvc 之后
1如果 pvc 的 uid 和 Spec.ClaimRef.UID 不一致这样一般是 pv 指向的 pvc 被删了然后立即创建了一个同名的pvc 而缓存还没有更新这时我们需要doublecheck一下若 double check 之后依旧不存在则判断是pv绑定了一个不存在的pvc 将pvc置为空执行上述pvc 没有找到的逻辑
2如果pvc 的 volumeName 为空
检查 pvc的 volumeMode 和 pv 的 volumeMode是否一致不一致报 event 出来
如果发现有这个 pv 有 AnnBoundByController pv.kubernetes.io/bound-by-controller 这个annotation 说明 pvc/pv 流程正在绑定中
将 pvc 放到 claimQueue 里面 让 claimWorker 进行处理
3如果 pvc.Spec.volumeName pv.volumeName 的时候直接将 pv 设置为 bound 状态
4如果 pvc.Spec.volumeName ! pv.volumeName 的时候
如果是 pv 是动态创建的情况下并且 pv 的 ReclaimPolicy 是 delete 的情况下 说明 pvc 已经绑定了其他pv 将 pv 置为 released 的状态 等待deleters 删除
如果 pv 不是动态创建的情况下将 pv 的 ClaimRef 字段置为空将其 unbound 掉
3 claimWorker
一个无限循环不断的处理从 claimQueue 里面获取到的 PersistentVolumeClaim workFunc : func() bool {keyObj, quit : ctrl.claimQueue.Get()if quit {return true}defer ctrl.claimQueue.Done(keyObj)key : keyObj.(string)klog.V(5).Infof(claimWorker[%s], key)namespace, name, err : cache.SplitMetaNamespaceKey(key)if err ! nil {klog.V(4).Infof(error getting namespace name of claim %q to get claim from informer: %v, key, err)return false}claim, err : ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)if err nil {// The claim still exists in informer cache, the event must have// been add/update/syncctrl.updateClaim(claim)return false}if !errors.IsNotFound(err) {klog.V(2).Infof(error getting claim %q from informer: %v, key, err)return false}// The claim is not in informer cache, the event must have been deleteclaimObj, found, err : ctrl.claims.GetByKey(key)if err ! nil {klog.V(2).Infof(error getting claim %q from cache: %v, key, err)return false}if !found {// The controller has already processed the delete event and// deleted the claim from its cacheklog.V(2).Infof(deletion of claim %q was already processed, key)return false}claim, ok : claimObj.(*v1.PersistentVolumeClaim)if !ok {klog.Errorf(expected claim, got %v, claimObj)return false}ctrl.deleteClaim(claim)return false}
我们主要关注 ctrl.updateClaim(claim) 方法, 与上面同样它里面主要调用了 ctrl.syncClaim 方法来处理 在 syncClaim 里面根据 pvc 的状态分别调用了 ctrl.syncUnboundClaim ctrl.syncBoundClaim 方法来处理
syncUnboundClaim
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {if claim.Spec.VolumeName {// User did not care which PV they get.delayBinding, err : pvutil.IsDelayBindingMode(claim, ctrl.classLister)if err ! nil {return err}// [Unit test set 1]volume, err : ctrl.volumes.findBestMatchForClaim(claim, delayBinding)if err ! nil {klog.V(2).Infof(synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v, claimToClaimKey(claim), err)return fmt.Errorf(error finding PV for claim %q: %w, claimToClaimKey(claim), err)}if volume nil {klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: no volume found, claimToClaimKey(claim))switch {case delayBinding !pvutil.IsDelayBindingProvisioning(claim):if err ctrl.emitEventForUnboundDelayBindingClaim(claim); err ! nil {return err}case storagehelpers.GetPersistentVolumeClaimClass(claim) ! :if err ctrl.provisionClaim(ctx, claim); err ! nil {return err}return nildefault:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, no persistent volumes available for this claim and no storage class is set)}// Mark the claim as Pending and try to find a match in the next// periodic syncClaimif _, err ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err ! nil {return err}return nil} else /* pv ! nil */ {// Found a PV for this claim// OBSERVATION: pvc is Pending, pv is AvailableclaimKey : claimToClaimKey(claim)klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s, claimKey, volume.Name, getVolumeStatusForLogging(volume))if err ctrl.bind(volume, claim); err ! nil {metrics.RecordMetric(claimKey, ctrl.operationTimestamps, err)return err}metrics.RecordMetric(claimKey, ctrl.operationTimestamps, nil)return nil}} else /* pvc.Spec.VolumeName ! nil */ {klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested, claimToClaimKey(claim), claim.Spec.VolumeName)obj, found, err : ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err ! nil {return err}if !found {klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time, claimToClaimKey(claim), claim.Spec.VolumeName)if _, err ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err ! nil {return err}return nil} else {volume, ok : obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf(cannot convert object from volume cache to volume %q!?: %v, claim.Spec.VolumeName, obj)}klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s, claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))if volume.Spec.ClaimRef nil {klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding, claimToClaimKey(claim))if err checkVolumeSatisfyClaim(volume, claim); err ! nil {klog.V(4).Infof(Cant bind the claim to volume %q: %v, volume.Name, err)// send an eventmsg : fmt.Sprintf(Cannot bind to requested volume %q: %s, volume.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)// volume does not satisfy the requirements of the claimif _, err ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err ! nil {return err}} else if err ctrl.bind(volume, claim); err ! nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: pvc is Bound, pv is Boundreturn nil} else if pvutil.IsVolumeBoundToClaim(volume, claim) {// User asked for a PV that is claimed by this PVC// OBSERVATION: pvc is Pending, pv is Boundklog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding, claimToClaimKey(claim))// Finish the volume binding by adding claim UID.if err ctrl.bind(volume, claim); err ! nil {return err}// OBSERVATION: pvc is Bound, pv is Boundreturn nil} else {// User asked for a PV that is claimed by someone else// OBSERVATION: pvc is Pending, pv is Boundif !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later, claimToClaimKey(claim))claimMsg : fmt.Sprintf(volume %q already bound to a different claim., volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)// User asked for a specific PV, retry laterif _, err ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err ! nil {return err}return nil} else {klog.V(4).Infof(synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN, claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))claimMsg : fmt.Sprintf(volume %q already bound to a different claim., volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)return fmt.Errorf(invalid binding of claim %q to volume %q: volume already claimed by %q, claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))}}}}
}
梳理下整体流程 如果当前 pvc 的 volumeName 为空 判断当前pvc 是否是延迟绑定的调用 volume, err : ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出对应的 pv 如果找到 volume 的话 调用 ctrl.bind(volume, claim) 方法进行绑定 如果没有找到 volume 的话 如果是延迟绑定 并且还未触发pod 未引用则 emit event 到 pvc 上如果 pvc 绑定了 sc, 调用 ctrl.provisionClaim(ctx, claim) 方法
分析 pvc yaml 找到 provisioner driver启动一个 goroutine调用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行创建工作
provisionClaimOperation
func (ctrl *PersistentVolumeController) provisionClaimOperation(ctx context.Context,claim *v1.PersistentVolumeClaim,plugin vol.ProvisionableVolumePlugin,storageClass *storage.StorageClass) (string, error) {claimClass : storagehelpers.GetPersistentVolumeClaimClass(claim)klog.V(4).Infof(provisionClaimOperation [%s] started, class: %q, claimToClaimKey(claim), claimClass)pluginName : plugin.GetPluginName()if pluginName ! kubernetes.io/csi claim.Spec.DataSource ! nil {strerr : fmt.Sprintf(plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource, pluginName)return pluginName, fmt.Errorf(strerr)}provisionerName : storageClass.Provisioner// Add provisioner annotation to be consistent with external provisioner workflownewClaim, err : ctrl.setClaimProvisioner(ctx, claim, provisionerName)if err ! nil {// Save failed, the controller will retry in the next syncklog.V(2).Infof(error saving claim %s: %v, claimToClaimKey(claim), err)return pluginName, err}claim newClaimpvName : ctrl.getProvisionedVolumeNameForClaim(claim)volume, err : ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})if err ! nil !apierrors.IsNotFound(err) {klog.V(3).Infof(error reading persistent volume %q: %v, pvName, err)return pluginName, err}if err nil volume ! nil {// Volume has been already provisioned, nothing to do.klog.V(4).Infof(provisionClaimOperation [%s]: volume already exists, skipping, claimToClaimKey(claim))return pluginName, err}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err : ref.GetReference(scheme.Scheme, claim)if err ! nil {klog.V(3).Infof(unexpected error getting claim reference: %v, err)return pluginName, err}// Gather provisioning optionstags : make(map[string]string)tags[CloudVolumeCreatedForClaimNamespaceTag] claim.Namespacetags[CloudVolumeCreatedForClaimNameTag] claim.Nametags[CloudVolumeCreatedForVolumeNameTag] pvNameoptions : vol.VolumeOptions{PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,MountOptions: storageClass.MountOptions,CloudTags: tags,ClusterName: ctrl.clusterName,PVName: pvName,PVC: claim,Parameters: storageClass.Parameters,}// Refuse to provision if the plugin doesnt support mount options, creation// of PV would be rejected by validation anywayif !plugin.SupportsMountOption() len(options.MountOptions) 0 {strerr : fmt.Sprintf(Mount options are not supported by the provisioner but StorageClass %q has mount options %v, storageClass.Name, options.MountOptions)klog.V(2).Infof(Mount options are not supported by the provisioner but claim %qs StorageClass %q has mount options %v, claimToClaimKey(claim), storageClass.Name, options.MountOptions)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, fmt.Errorf(provisioner %q doesnt support mount options, plugin.GetPluginName())}// Provision the volumeprovisioner, err : plugin.NewProvisioner(options)if err ! nil {strerr : fmt.Sprintf(Failed to create provisioner: %v, err)klog.V(2).Infof(failed to create provisioner for claim %q with StorageClass %q: %v, claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}var selectedNode *v1.Node nilif nodeName, ok : claim.Annotations[pvutil.AnnSelectedNode]; ok {selectedNode, err ctrl.NodeLister.Get(nodeName)if err ! nil {strerr : fmt.Sprintf(Failed to get target node: %v, err)klog.V(3).Infof(unexpected error getting target node %q for claim %q: %v, nodeName, claimToClaimKey(claim), err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}}allowedTopologies : storageClass.AllowedTopologiesopComplete : util.OperationCompleteHook(plugin.GetPluginName(), volume_provision)volume, err provisioner.Provision(selectedNode, allowedTopologies)opComplete(volumetypes.CompleteFuncParam{Err: err})if err ! nil {ctrl.rescheduleProvisioning(claim)strerr : fmt.Sprintf(Failed to provision volume with StorageClass %q: %v, storageClass.Name, err)klog.V(2).Infof(failed to provision volume for claim %q with StorageClass %q: %v, claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}klog.V(3).Infof(volume %q for claim %q created, volume.Name, claimToClaimKey(claim))// Create Kubernetes PV object for the volume.if volume.Name {volume.Name pvName}// Bind it to the claimvolume.Spec.ClaimRef claimRefvolume.Status.Phase v1.VolumeBoundvolume.Spec.StorageClassName claimClass// Add AnnBoundByController (used in deleting the volume)metav1.SetMetaDataAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController, yes)metav1.SetMetaDataAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())// Try to create the PV object several timesfor i : 0; i ctrl.createProvisionedPVRetryCount; i {klog.V(4).Infof(provisionClaimOperation [%s]: trying to save volume %s, claimToClaimKey(claim), volume.Name)var newVol *v1.PersistentVolumeif newVol, err ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err nil || apierrors.IsAlreadyExists(err) {// Save succeeded.if err ! nil {klog.V(3).Infof(volume %q for claim %q already exists, reusing, volume.Name, claimToClaimKey(claim))err nil} else {klog.V(3).Infof(volume %q for claim %q saved, volume.Name, claimToClaimKey(claim))_, updateErr : ctrl.storeVolumeUpdate(newVol)if updateErr ! nil {// We will get an volume added event soon, this is not a big errorklog.V(4).Infof(provisionClaimOperation [%s]: cannot update internal cache: %v, volume.Name, updateErr)}}break}// Save failed, try again after a while.klog.V(3).Infof(failed to save volume %q for claim %q: %v, volume.Name, claimToClaimKey(claim), err)time.Sleep(ctrl.createProvisionedPVInterval)}if err ! nil {strerr : fmt.Sprintf(Error creating provisioned PV object for claim %s: %v. Deleting the volume., claimToClaimKey(claim), err)klog.V(3).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)var deleteErr errorvar deleted boolfor i : 0; i ctrl.createProvisionedPVRetryCount; i {_, deleted, deleteErr ctrl.doDeleteVolume(volume)if deleteErr nil deleted {// Delete succeededklog.V(4).Infof(provisionClaimOperation [%s]: cleaning volume %s succeeded, claimToClaimKey(claim), volume.Name)break}if !deleted {klog.Errorf(Error finding internal deleter for volume plugin %q, plugin.GetPluginName())break}// Delete failed, try again after a while.klog.V(3).Infof(failed to delete volume %q: %v, volume.Name, deleteErr)time.Sleep(ctrl.createProvisionedPVInterval)}if deleteErr ! nil {strerr : fmt.Sprintf(Error cleaning provisioned volume for claim %s: %v. Please delete manually., claimToClaimKey(claim), deleteErr)klog.V(2).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)}} else {klog.V(2).Infof(volume %q provisioned for claim %q, volume.Name, claimToClaimKey(claim))msg : fmt.Sprintf(Successfully provisioned volume %s using %s, volume.Name, plugin.GetPluginName())ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)}return pluginName, nil
}
provisionClaimOperation 的基本逻辑如下
检查driver只有 csi 类型的 driver 才允许使用 dataSource 字段为 pvc 加 claim.Annotations[volume.kubernetes.io/storage-provisioner] class.Provisioner annotation根据规则拼出 pv Name pvc- pvc.UID如果找到了 pv 则说明 pv已经存在跳过 provision收集pvc/pv 基本信息封装到 options 中对 plugin 进行校验 如果plugin不支持mount操作则直接拒绝provision 请求调用plugin.NewProvisioner(options) 创建 provisioner 接口实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法注意该方法为同步方法Provision 方法返回了 PersistentVolume实例为创建出来的 pv 关联 pvc 对象ClaimRef尝试创建 pv 对象 (重复多次)如果创建 pv 失败则尝试调用 Delete 方法删除创建的volume资源
syncBoundClaim
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {if claim.Spec.VolumeName {// Claim was bound before but not any more.if _, err : ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, ClaimLost, Bound claim has lost reference to PersistentVolume. Data on the volume is lost!); err ! nil {return err}return nil}obj, found, err : ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err ! nil {return err}if !found {// Claim is bound to a non-existing volume.if _, err ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, ClaimLost, Bound claim has lost its PersistentVolume. Data on the volume is lost!); err ! nil {return err}return nil} else {volume, ok : obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf(cannot convert object from volume cache to volume %q!?: %#v, claim.Spec.VolumeName, obj)}klog.V(4).Infof(synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s, claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))if volume.Spec.ClaimRef nil {// Claim is bound but volume has come unbound.// Or, a claim was bound and the controller has not received updated// volume yet. We cant distinguish these cases.// Bind the volume again and set all states to Bound.klog.V(4).Infof(synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing, claimToClaimKey(claim))if err ctrl.bind(volume, claim); err ! nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil} else if volume.Spec.ClaimRef.UID claim.UID {// All is well// NOTE: syncPV can handle this so it can be left out.// NOTE: bind() call here will do nothing in most cases as// everything should be already set.klog.V(4).Infof(synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound, claimToClaimKey(claim))if err ctrl.bind(volume, claim); err ! nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil} else {// Claim is bound but volume has a different claimant.// Set the claim phase to Lost, which is a terminal// phase.if _, err ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, ClaimMisbound, Two claims are bound to the same volume, this one is bound incorrectly); err ! nil {return err}return nil}}
}
1如果 pvc.Spec.VolumeName 为空, 说明这个 pvc 之前被 bound 过但是已经不存在指向的pv 报出event并返回
2从 cache 里面找 pvc 绑定的 pv
如果没找到, 说明 pvc 绑定了一个不存在的pv报 event 并返回 如果找到了pv 检查 pv.Spec.ClaimRef 字段 如果 为空说明 pv 还没有绑定 pvc 调用 ctrl.bind(volume, claim); 方法进行绑定pv.ClaimRef.UID pvc.UID, 调用 bind 方法但是大多数情况会直接返回因为所有的操作都已经做完了其他情况说明 volume 绑定了其他的 pvc 更新pvc 的状态 为 lost 并报出 event
四 总结
最后用一张 pvc/pv 的状态流转图来总结一下 原文链接
本文为阿里云原创内容未经允许不得转载。