
看这个client-go的架构图,DeltaFIFO是一个生产者-消费者队列,生产者是Reflector,消费者是Pop()方法。
一、DeltaFIFO
Delta是存储的元数据,能够了解到数据的变化。
1
2
3
4
5
6
  | 
type Delta struct {
	Type   DeltaType
	Object interface{}
}
type Deltas []Delta
  | 
 
DeltaType记录了数据变化的类型
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
  | 
type DeltaType string
// Change type definition
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced"
	Sync DeltaType = "Sync"
)
  | 
 
DeltaFIFO的定义 :
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  | 
type DeltaFIFO struct {
	lock sync.RWMutex
	cond sync.Cond
	items map[string]Deltas
	queue []string
	populated bool
	initialPopulationCount int
	keyFunc KeyFunc
	knownObjects KeyListerGetter
	closed bool
	emitDeltaTypeReplaced bool
}
  | 
 
item:按照key-value存储对象,value是Delta数组 
queue:先入先出的队列,存储的是对象的key 
populated:标识第一次将对象放入队列,或第一次调用增删改接口时为true 
initialPopulationCount:第一次对象放入队列的数量 
keyFunc:对象键计算函数 
knownObjects:指向Indexer 
1.1、KeyListerGetter
KeyListerGetter是一个通用的接口,定义了返回所有的key和通过key获取对象两个方法。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  | 
type KeyListerGetter interface {
	KeyLister
	KeyGetter
}
type KeyLister interface {
	ListKeys() []string
}
type KeyGetter interface {
	GetByKey(key string) (value interface{}, exists bool, err error)
}
  | 
 
1.2、Queue
DeltaFIFO是实现了Queue接口,Queue继承了Store接口,可以让对象有序弹出。
1
2
3
4
5
6
7
  | 
type Queue interface {
	Store
	Pop(PopProcessFunc) (interface{}, error)
	AddIfNotPresent(interface{}) error
	HasSynced() bool
	Close()
}
  | 
 
1.3、数据状态
Deltas是数组,存储数据变更状态,例如0号位存储最老的状态,末位存储最新的状态。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  | 
func (d Deltas) Oldest() *Delta {
	if len(d) > 0 {
		return &d[0]
	}
	return nil
}
func (d Deltas) Newest() *Delta {
	if n := len(d); n > 0 {
		return &d[n-1]
	}
	return nil
}
  | 
 
1.4、初始化
有两种方法初始化,一种是通过Indexer的keyFunc和它的存储方法初始化
1
2
3
4
5
6
  | 
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KeyFunction:  keyFunc,
		KnownObjects: knownObjects,
	})
}
  | 
 
另一种方法是通过选项初始化
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  | 
type DeltaFIFOOptions struct {
	KeyFunction KeyFunc
	KnownObjects KeyListerGetter
	EmitDeltaTypeReplaced bool
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,
		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	f.cond.L = &f.lock
	return f
}
  | 
 
二、Queue
2.1、KeyOf
计算Deltas对象的key,最终使用的是Indexer的keyFunc方法。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  | 
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
	if d, ok := obj.(Deltas); ok {
		if len(d) == 0 {
			return "", KeyError{obj, ErrZeroLengthDeltasObject}
		}
		obj = d.Newest().Object
	}
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return f.keyFunc(obj)
}
  | 
 
2.2、List
List返回对象的最新版本
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  | 
func (f *DeltaFIFO) List() []interface{} {
	f.lock.RLock()
	defer f.lock.RUnlock()
	return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
	list := make([]interface{}, 0, len(f.items))
	for _, item := range f.items {
		list = append(list, item.Newest().Object)
	}
	return list
}
  | 
 
ListKeys返回对象的键
1
2
3
4
5
6
7
8
9
  | 
func (f *DeltaFIFO) ListKeys() []string {
	f.lock.RLock()
	defer f.lock.RUnlock()
	list := make([]string, 0, len(f.queue))
	for _, key := range f.queue {
		list = append(list, key)
	}
	return list
}
  | 
 
2.3、GET
Get获取对象接口,用对象获取对象
1
2
3
4
5
6
7
  | 
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
	key, err := f.KeyOf(obj)
	if err != nil {
		return nil, false, KeyError{obj, err}
	}
	return f.GetByKey(key)
}
  | 
 
GetByKey通过对象键获取对象
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  | 
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
	f.lock.RLock()
	defer f.lock.RUnlock()
	d, exists := f.items[key]
	if exists {
		// Copy item's slice so operations on this slice
		// won't interfere with the object we return.
		d = copyDeltas(d)
	}
	return d, exists, nil
}
  | 
 
2.4、Add/Update/Delete
增删改操作都是调用的queueActionLocked方法,删除方法多了一步就是在item里删除对象
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  | 
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	if f.knownObjects == nil {
		if _, exists := f.items[id]; !exists {
			return nil
		}
	} else {
		_, exists, err := f.knownObjects.GetByKey(id)
		_, itemsExist := f.items[id]
		if err == nil && !exists && !itemsExist {
			return nil
		}
	}
	return f.queueActionLocked(Deleted, obj)
}
  | 
 
在queueActionLocked方法中,Add/Update/Delete操作其实都需要保存下来,记录在Deltas
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  | 
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)
	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		if oldDeltas == nil {
			return nil
		}
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}
  | 
 
- 首先计算出资源对象的key
 
- 将
actionType和资源对象构造成Delta,通过dedupDeltas去重 
- 将新的
Deltas添加进items中,通过cond.Broadcast通知所有的消费者接触阻塞 
另一个方法AddIfNotPresent,看名字就能知道,如果不存在就新增
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  | 
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
	deltas, ok := obj.(Deltas)
	if !ok {
		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
	}
	id, err := f.KeyOf(deltas)
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.addIfNotPresent(id, deltas)
	return nil
}
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
	f.populated = true
	if _, exists := f.items[id]; exists {
		return
	}
	f.queue = append(f.queue, id)
	f.items[id] = deltas
	f.cond.Broadcast()
}
  | 
 
2.5、Replace
原子性地做两件事:保存数据;删除不存在的数据
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  | 
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	keys := make(sets.String, len(list))
	action := Sync
	if f.emitDeltaTypeReplaced {
		action = Replaced
	}
	for _, item := range list {
		key, err := f.KeyOf(item)
		if err != nil {
			return KeyError{item, err}
		}
		keys.Insert(key)
		if err := f.queueActionLocked(action, item); err != nil {
			return fmt.Errorf("couldn't enqueue object: %v", err)
		}
	}
	if f.knownObjects == nil {
		queuedDeletions := 0
		for k, oldItem := range f.items {
			if keys.Has(k) {
				continue
			}
			var deletedObj interface{}
			if n := oldItem.Newest(); n != nil {
				deletedObj = n.Object
			}
			queuedDeletions++
			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
				return err
			}
		}
		// 首次添加
		if !f.populated {
			f.populated = true
			f.initialPopulationCount = keys.Len() + queuedDeletions
		}
		return nil
	}
	knownKeys := f.knownObjects.ListKeys()
	queuedDeletions := 0
	for _, k := range knownKeys {
		if keys.Has(k) {
			continue
		}
		deletedObj, exists, err := f.knownObjects.GetByKey(k)
		if err != nil {
			deletedObj = nil
		} else if !exists {
			deletedObj = nil
		}
		queuedDeletions++
		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}
	if !f.populated {
		f.populated = true
		f.initialPopulationCount = keys.Len() + queuedDeletions
	}
	return nil
}
  | 
 
这里对knownObjects也就是是否存在Indexer,如果没有就在items里查找不存在的数据,如果有就去Indexer中查找不存在的数据。
2.6、Resync