网站发展的方向,广州市城市建设开发总公司网站,“哈尔滨企业服务平台”公众号,免登录直接玩的游戏client-go 是一个库#xff0c;提供了与 Kubernetes API 服务器交互的基础设施。它提供了诸如 Informer、Lister、ClientSet 等工具#xff0c;用于监听、缓存和操作 Kubernetes 资源。而自定义控制器则利用这些工具来实现特定的业务逻辑和自动化任务。业务逻辑实现#xff…client-go 是一个库提供了与 Kubernetes API 服务器交互的基础设施。它提供了诸如 Informer、Lister、ClientSet 等工具用于监听、缓存和操作 Kubernetes 资源。而自定义控制器则利用这些工具来实现特定的业务逻辑和自动化任务。业务逻辑实现client-go 不包含特定的业务逻辑。自定义控制器允许实现特定于您的应用程序或需求的逻辑。扩展 Kubernetes通过自定义控制器可以扩展 Kubernetes 的功能处理自定义资源或实现特定的自动化任务。响应资源变化自定义控制器可以监听特定资源的变化并据此执行相应的操作。
而这里的workqueue是costromer Controller的一部分 逻辑
当我们创建一个workqueue的时候到底发生了什么
queue : workqueue.New()
该方法调用的new方法又调用了NewWithConfig()以及newQueueWithConfig().可以看到逐级返回以后返回的是一个type类型的数据。
func New() *Type {return NewWithConfig(QueueConfig{Name: ,})
}
func NewTyped[T comparable]() *Typed[T] {return NewTypedWithConfig(TypedQueueConfig[T]{Name: ,})
}func NewWithConfig(config QueueConfig) *Type {return NewTypedWithConfig(config)
}func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {var metricsFactory *queueMetricsFactoryif config.MetricsProvider ! nil {metricsFactory queueMetricsFactory{metricsProvider: config.MetricsProvider,}} else {metricsFactory globalMetricsFactory}if config.Clock nil {config.Clock clock.RealClock{}}if config.Queue nil {config.Queue DefaultQueue[T]()}return newQueue(config.Clock,config.Queue,metricsFactory.newQueueMetrics(config.Name, config.Clock),updatePeriod,)
}TypedInterface
Interface 被标记为废弃Deprecated并建议使用 TypedInterface 代替。这种变化主要是因为 Go 语言引入了泛型特性。TypedInterface[T comparable] 使用了泛型T 是一个类型参数它必须是可比较的comparable。泛型允许在编译时进行类型检查提供了更好的类型安全性。使用 TypedInterface[T] 可以在编译时捕获类型错误而不是在运行时。
这里最后返回了一个newQueue而它的定义如下
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] {t : Typed[T]{clock: c,queue: queue,dirty: set[T]{},processing: set[T]{},cond: sync.NewCond(sync.Mutex{}),metrics: metrics,unfinishedWorkUpdatePeriod: updatePeriod,}// Dont start the goroutine for a type of noMetrics so we dont consume// resources unnecessarilyif _, ok : metrics.(noMetrics); !ok {go t.updateUnfinishedWorkLoop()}return t
}
那么Type类型到底是什么Type 是 Typed[any] 的一个别名。这意味着 Type 可以在任何使用 Typed[any] 的地方使用它们是完全等价的。
type Type Typed[any]
type Typed[t comparable] struct {queue Queue[t]// dirty defines all of the items that need to be processed.dirty set[t]// Things that are currently being processed are in the processing set.// These things may be simultaneously in the dirty set. When we finish// processing something and remove it from this set, well check if// its in the dirty set, and if so, add it to the queue.processing set[t]cond *sync.CondshuttingDown booldrain boolmetrics queueMetricsunfinishedWorkUpdatePeriod time.Durationclock clock.WithTicker
}type empty struct{}
type t interface{}
type set[t comparable] map[t]empty 这里有两个set一个是process一个是dirty一个项目可能同时存在于这两个集合中。这是因为一个正在处理的项目在 processing 中可能在处理过程中被标记为需要重新处理因此也在 dirty 中。如果它在 dirty 集合中说明在处理过程中它被标记为需要重新处理。这时系统会将它重新加入到处理队列中。
这里的t是一个空接口允许存储任何形式的kubernetes资源。
这里还定义了接口而Type实现了这个接口。
type Interface interface {Add(item interface{})Len() intGet() (item interface{}, shutdown bool)Done(item interface{})ShutDown()ShutDownWithDrain()ShuttingDown() bool
}
dirty队列
添加任务当有新任务时首先检查它是否已经在 dirty 中。如果不在就添加进去。开始处理当开始处理一个任务时将它从 dirty 中移除。重新添加如果一个正在处理的任务需要重新处理就把它再次加入 dirty。dirty 帮助工作队列系统更高效地管理需要处理的任务避免重复工作并能快速决定是否需要添加新任务到处理队列中。
各种类型的queue
在k8s.io/client-go/util/workqueue中查看。
从上面的例子可以看到一个queue是有很多参数的如果只是简单的通过new来创建很多参数都是默认的参数。
限速队列
k8s.io/client-go/util/workqueue/default-rate-limiters.go
限速队列应用得非常广泛比如在我们做一些操作失败后希望重试几次但是立刻重试很有可能还是会失败这个时候我们可以延迟一段时间再重试而且失败次数越多延迟时间越长这个其实就是限速。首先我们需要来了解下限速器。
type RateLimiter TypedRateLimiter[any]type TypedRateLimiter[T comparable] interface {// When gets an item and gets to decide how long that item should waitWhen(item T) time.Duration// Forget indicates that an item is finished being retried. Doesnt matter whether its for failing// or for success, well stop tracking itForget(item T)// NumRequeues returns back how many failures the item has hadNumRequeues(item T) int
}
TypedBucketRateLimiter (令牌桶限速器) 这个限速器基于令牌桶算法。想象一个固定容量的桶桶里装着令牌。令牌以固定的速率被加入到桶中。当一个请求或任务到来时它需要从桶中获取一个令牌。如果桶中有令牌请求可以立即处理。如果桶是空的请求必须等待直到新的令牌被加入。这种方法可以很好地控制平均处理速率同时允许短时间的突发流量。 TypedItemExponentialFailureRateLimiter (指数退避限速器) 这个限速器根据失败次数增加等待时间每次失败等待时间会指数增加基础延迟 * 2^失败次数。有一个最大延迟时间防止等待时间无限增长。 TypedItemFastSlowRateLimiter (快慢双速限速器) 这个限速器有两种速率快速和慢速在最初的几次尝试中使用快速延迟。超过设定的尝试次数后切换到慢速延迟。适用于需要快速重试几次然后如果仍然失败就减慢重试频率的场景。 TypedMaxOfRateLimiter (最大值限速器) 这个限速器组合了多个其他限速器 它包含一个限速器的列表。当需要决定延迟时间时它会询问所有的限速器。然后返回所有限速器中最长的延迟时间。这允许你组合多种限速策略总是使用最保守最慢的那个。 TypedWithMaxWaitRateLimiter (最大等待时间限速器)
从代码中可以看到有一个基础的RateLimiter的接口interface然后其余的结构体都是这个端口的实现
type TypedBucketRateLimiter[T comparable] struct {*rate.Limiter
}
ype TypedItemExponentialFailureRateLimiter[T comparable] struct {failuresLock sync.Mutexfailures map[T]intbaseDelay time.DurationmaxDelay time.Duration
}
type TypedItemFastSlowRateLimiter[T comparable] struct {failuresLock sync.Mutexfailures map[T]intmaxFastAttempts intfastDelay time.DurationslowDelay time.Duration
}
type TypedMaxOfRateLimiter[T comparable] struct {limiters []TypedRateLimiter[T]
}
type TypedWithMaxWaitRateLimiter[T comparable] struct {limiter TypedRateLimiter[T]maxDelay time.Duration
}
他们的new函数部分
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
}func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {return TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
}
接口实现
func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {delay : w.limiter.When(item)if delay w.maxDelay {return w.maxDelay}return delay
}func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {w.limiter.Forget(item)
}func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {return w.limiter.NumRequeues(item)
} 我们可以看到有的限速器需要一个基础限速器NewTypedWithMaxWaitRateLimiter是从多个限速器中取得最大的限速时间。这里函数名称不同源代码里是NewTypedWithMaxWaitRateLimiter而实际演示代码是NewWithMaxWaitRateLimiter这是因为源码读的是最新版而实际安装的go是1.22所以不一样但是只有增加和缺少Type的区别
baseRateLimiter : workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second)
ratelimiter :workqueue.NewWithMaxWaitRateLimiter(baseRateLimiter,10*time.Second)
ratelimitedQueue : workqueue.NewRateLimitingQueue(ratelimiter)
延迟队列
type DelayingInterface interface {Interface// AddAfter adds an item to the workqueue after the indicated duration has passedAddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {Interface// clock tracks time for delayed firingclock clock.Clock// stopCh lets us signal a shutdown to the waiting loopstopCh chan struct{}// stopOnce guarantees we only signal shutdown a single timestopOnce sync.Once// heartbeat ensures we wait no more than maxWait before firingheartbeat clock.Ticker// waitingForAddCh is a buffered channel that feeds waitingForAddwaitingForAddCh chan *waitFor// metrics counts the number of retriesmetrics retryMetrics
}func NewDelayingQueue() DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}
具体实现可以看到NewDelayingQueue()-NewDelayingQueueWithConfig{return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)},然后有一个newDelayingQueue但是带有参数的方法这里的new的n是小写的代表这是一个私有的方法可以看到最后返回的是一个delayingType。而NewDelayingQueue()返回的是一个interface。
func NewDelayingQueue() DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {if config.Clock nil {config.Clock clock.RealClock{}}if config.Queue nil {config.Queue NewWithConfig(QueueConfig{Name: config.Name,MetricsProvider: config.MetricsProvider,Clock: config.Clock,})}return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
}func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {ret : delayingType{Interface: q,clock: clock,heartbeat: clock.NewTicker(maxWait),stopCh: make(chan struct{}),waitingForAddCh: make(chan *waitFor, 1000),metrics: newRetryMetrics(name, provider),}go ret.waitingLoop()return ret
}
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {// dont add if were already shutting downif q.ShuttingDown() {return}q.metrics.retry()// immediately add things with no delayif duration 0 {q.Add(item)return}select {case -q.stopCh:// unblock if ShutDown() is calledcase q.waitingForAddCh - waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:}
}
可以看到还有很多变种但是最后都会调用 NewDelayingQueue但是带有参数的方法。 // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
// inject custom queue Interface instead of the default one
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name,Queue: q,})
}// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue(name string) DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
}// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name,Clock: clock,})
}
为什么newDelayingQueue返回的是type类型而他的上级返回的是interface类型呢以下面的代码为例advancedAnimal实现的结构体包含一个interfaceanimal和一个trick。这个trick字段是为了实现PerformTrick方法。 func (a *advancedAnimalType) PerformTrick() string { return a.trick } 这个接收advancedAnimal的函数实现了PerformTrick(),所以可以看作是advancedAnimal实现了AdvancedAnimal的interface。 所以在下面的New函数中虽然返回的是advancedAnimalType但是最后NewAdvancedAnimal返回的是interface类型。 func NewAdvancedAnimal(config AdvancedAnimalConfig) AdvancedAnimal { if config.Animal nil { config.Animal NewAnimal(config.Species, config.Sound, config.Movement) } return advancedAnimalType{ Animal: config.Animal, trick: config.Trick, } } type Animal interface {Speak() stringMove() string
}
// 扩展的 AdvancedAnimal 接口
type AdvancedAnimal interface {AnimalPerformTrick() string
}
// 基本的动物实现
type basicAnimal struct {species stringsound stringmovement string
}
func (a *basicAnimal) Speak() string {return a.sound
}
func (a *basicAnimal) Move() string {return a.movement
}
// 高级动物实现
type advancedAnimalType struct {Animaltrick string
}func (a *advancedAnimalType) PerformTrick() string {return a.trick
}
// 创建基本动物的函数
func NewAnimal(species, sound, movement string) Animal {return basicAnimal{species: species,sound: sound,movement: movement,}
}
// 创建高级动物的函数
func NewAdvancedAnimal(config AdvancedAnimalConfig) AdvancedAnimal {if config.Animal nil {config.Animal NewAnimal(config.Species, config.Sound, config.Movement)}return advancedAnimalType{Animal: config.Animal,trick: config.Trick,}
}
// 配置结构体
type AdvancedAnimalConfig struct {Animal AnimalSpecies stringSound stringMovement stringTrick string
}