WorkQueue原理
上一节我们学习了informer,我们可以通过informer添加事件处理方法,那为什么我们不直接在事件方法(EventHander)中处理我们的业务逻辑呢?主要是evnt产生的速度要比event创立的数速要快,所以为了解决这种速度不一致的问题,所以引入了workQueue
队列类型
-
通用队列
-
延迟队列
-
限速队列
通常我们会直接使用限速队列,因为它是借助通用队列和延迟队列来实现的
通用队列
type Interface interface {
Add(item interface{}) //添加一个元素
Len() int //队列元素个数
Get() (item interface{}, shutdown bool) //获取一个队列元素
Done(item interface{}) //标记一个元素已经处理完
ShutDown() //关闭队列
ShuttingDown() bool //是否正在关闭
}
type Type struct {
queue []t //定义队列,具有顺序性,待处理元素列表
dirty set //标记所有需要被处理的元素
processing set //当前正在被处理的元素
//为什么需要3个数据结构?
//为了保障队列在同一时间只有一个相同的元素正在被处理
cond *sync.Cond
shuttingDown bool //是否正在关闭
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
延迟队列
type DelayingInterface interface {
Interface
//延迟添加
//除了有通用队列的功能外,还添加了一个AddAfter方法,表示的是在某个时间以后,把item加到通用队列里面
AddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {
Interface //用来嵌套通用Queue
clock clock.Clock //计时器
stopCh chan struct{}
stopOnce sync.Once
heartbeat clock.Ticker
waitingForAddCh chan *waitFor //传递waitFor的channel,默认大小1000
metrics retryMetrics
}
重要方法:waitingLoop/AddAfter
限速队列
type RateLimitingInterface interface {
DelayingInterface //延时队列里包含了普通队列,限速队列里面包含了延时队列,所以通常我们会直接使用这个限速队列,它既有延迟队列的功能,又有限速队列和普通队列的功能
AddRateLimited(item interface{}) //往队列里加入一个元素
Forget(item interface{}) //停止元素重试
NumRequeues(item interface{}) int //记录这个元素被处理多少次了
}
限速器的目的:根据相应的算法获取元素的延迟时间,然后利用延迟队列来控制队列的速度。
type RateLimiter interface {
When(item interface{}) time.Duration //返回一个item需要等待的时长
Forget(item interface{}) //标识一个元素结束重试
NumRequeues(item interface{}) int //标识这个元素被处理多少次了
}
上节课中写了一段former的使用,现在我们在里面添加队列,看如何使用的
package main
import (
"fmt"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
您暂时无权查看此隐藏内容!