目录

client-go架构

以下是client-go的官方架构图

https://cdn.jsdelivr.net/gh/betterfor/cloudImage/images/2022/01/10/client-go-controller-interaction.jpeg

一、Indexer

Indexer保存来自api-server的资源,使用list-watch方式来维护资源的增量变化。通过这种方式可以减少对api-server的访问,减少api-server端的压力。

Indexer继承Store接口,使用多个索引扩展Store并限制每个累加器仅保存当前对象(在Delete后为空)。

Store接口定义了对象的增删改查等方法。

cache实现了Indexer接口,它定义了线程安全和key的序列化方法

1
2
3
4
type cache struct {
	cacheStorage ThreadSafeStore
	keyFunc KeyFunc
}

ThreadSafeStore是一个允许对后台存储进行并发访问的接口,类似于Indexer,但是不需要知道如何从对象obj中取出key。

通过NewStoreNewIndexer初始化cache返回StoreIndexer指针。

  • indexer实际的对象存储在threadSafeMap
  • indexers划分了不同类型的索引值(indexName,如namespace),并按照索引类型进行索引(indexFunc,如MetaNamespaceIndexFunc),得到符合该对象的索引键,一个对象在一个索引类型中可以有多个索引键
  • items的键值对保存了实际对象

namespace为索引类型,

1、从indexers获取计算namespaceIndexFunc

2、用IndexFunc计算入参obj相关的所有namespaces,indeices中保存了所有namespaces下面的对象键,可以获取特定namespace下的对象键

3、在items中可以通过对象键得到对象

二、DeltaFIFO

DeltaFIFO是一个生产者-消费者队列,生产者为Reflector,消费者为Pop()方法。

DeltaFIFO实现了QueueStore接口,使用Deltas保存对象状态的变更(Add/Delete/Update),Deltas缓存了针对相同对象的多个状态变更信息,如Delta[0]存储了pod的更新,最老的状态变更为Oldest(),最新的状态变更为Newest

其中knownObjects类型为KeyListerGetter,接口方法ListKeysGetByKey也是Store接口中的方法,而实际上,knownObjects也是使用Store/Indexer来初始化的,也就是说可以通过ListKeysGetByKey方法获取Store/Indexer中的对象键。

initialPopulationCount表示是否完成全量同步,在Replace增加,在Pop减少,当initialPopulationCount=0populated=true表示Pop了所有的Replace添加到DeltaFIFO对象,populated用于判断是DeltaFIFO中是否为初始化状态(即没有处理过任何对象).

NewSharedIndexInformer中初始化了sharedIndexInformer,用NewIndexer初始化Indexer,在Run方法中用s.indexer作为DeltaFIFOknowObjects

DeltaFIFO实现了Queue接口。

list-watchlist步骤中,会调用Replace方法来对DeltaFIFO进行全量更新:

1、Sync所有DeltaFIFO中的对象,全部加入到DeltaFIFO

2、如果knownObjects为空,则删除DeltaFIFO中不存在的对象,

3、如果knownObjects非空,获取knownObjects中不存在的对象,在DeltaFIFO删除

三、List-Watch

Lister用于获取某个资源的全量,Watcher用于获取某个资源的增量变化。

实际使用中Lister和Watcher都从apiServer获取资源信息,Lister一般用于首次获取某资源(如Pod)的全量信息,而Watcher用于持续获取该资源的增量变化信息。

在例子的workqueue可以看到调用NewListWatchFromClient的地方

1
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

而这个资源名称在k8s.io/api/core/v1/types.go中定义。

其中clientset中定义API group,这些都定义在client-go/kubernetes/typed中。

返回的RESTClient()返回值为Interface接口类型,封装了对资源的操作方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

而在List-WatchList时就调用了Get方法。

四、Reflector

Reflector使用ListerWatcher获取资源,将其保存在store中,这里的store就是DeltaFIFO

ListAndWatchReflector.Run启动,并以resyncPeriod周期性调度。

在List时获取资源的首个resourceVersion值,在Watch时获取资源的增量变化,然后将获取到的resourceVersion保存起来

五、WorkQueue

用于保存infromer中handler处理后的数据。

六、Controller

1
2
3
4
5
6
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

config中的Queue就是DeltaFIFO。

Controller定义了如何调度Reflector。

Controller的运行就是执行了Reflector.Run,相当于启动了一个DeltaFIFO的生产者,使用wait.Until启动一个消费者。

1
2
3
wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)

processLoop运行了DeltaFIFO.Pop方法,消费DeltaFIFO对象,如果运行失败会重新添加AddIfNotPresent

七、ShareInformer

一个是启动了Controller,一个是注册了process的handler方法,run方法接受nextCh的值,将其作为参数传给handler处理,而popaddCh中的元素传送给pendingNotifications,并读取元素,传给nextCh

也就是说pop负责管理数据,run负责处理数据。

这个handler就是例子中写的AddFuncUpdateFuncDeleteFunc

七、总结