Kubernetes之ListOption参数源码分析

书接上一篇Kubernetes之List参数使用不当引发的ETCD网络风暴说最近排查了一个因业务层使用List接口时因参数使用不当引起的etcd压力极速增长的问题, 该篇将按图索骥来看看ListOption在源码是如何处理的

处理逻辑

kube-apiserver LIST 请求处理逻辑可以看到下图原图地址

以上可以看到,系统路径中存在两级 List/ListWatch但数据是同一份):

  1. apiserver List/ListWatch etcd
  2. 其它对象如controller/operator List/ListWatch apiserver

因此,从最简形式上来说,apiserver 就是挡在 etcd 前面的一个代理(proxy),

1
2
3
4
5
  +--------+              +---------------+                 +------------+
| Client | -----------> | Proxy (cache) | --------------> | Data store |
+--------+ +---------------+ +------------+

infra services apiserver etcd

对于List请求可归类为两种:

  1. apiserver直接从自己的缓存中读数据
  2. apiserver跳过缓存,直接从etcd读数据

还是以使用client-go中listJob为例, 常见写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ListJobs lists all jobs details.
func ListJobs() error {
config, err := util.BuildConfig(listJobFlags.Master, listJobFlags.Kubeconfig)
if err != nil {
return err
}
if listJobFlags.allNamespace {
listJobFlags.Namespace = ""
}
jobClient := versioned.NewForConfigOrDie(config)
jobs, err := jobClient.BatchV1alpha1().Jobs(listJobFlags.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

if len(jobs.Items) == 0 {
fmt.Printf("No resources found\n")
return nil
}
PrintJobs(jobs, os.Stdout)

return nil
}

来看看ListOptions的struct, 这里因篇幅有限,所以将注释去除了

kubernetes based on v1.22

ListOptions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pkg/apis/meta/v1/types.go
type ListOptions struct {
TypeMeta `json:",inline"`

LabelSelector string `json:"labelSelector,omitempty" protobuf:"bytes,1,opt,name=labelSelector"`

FieldSelector string `json:"fieldSelector,omitempty" protobuf:"bytes,2,opt,name=fieldSelector"`

Watch bool `json:"watch,omitempty" protobuf:"varint,3,opt,name=watch"`

AllowWatchBookmarks bool `json:"allowWatchBookmarks,omitempty" protobuf:"varint,9,opt,name=allowWatchBookmarks"`


ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,4,opt,name=resourceVersion"`

ResourceVersionMatch ResourceVersionMatch `json:"resourceVersionMatch,omitempty" protobuf:"bytes,10,opt,name=resourceVersionMatch,casttype=ResourceVersionMatch"`

TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty" protobuf:"varint,5,opt,name=timeoutSeconds"`

Limit int64 `json:"limit,omitempty" protobuf:"varint,7,opt,name=limit"`

Continue string `json:"continue,omitempty" protobuf:"bytes,8,opt,name=continue"`
}
  1. LabelSelector/FieldSelector: 标签选择器与字段选择器, 上文提到, etcd只是KV存储,并不理解label/field这些信息,因此在etcd层面无法处理这些过滤条件。 所以如果是没走apiserver的缓存直接到达etcd的实际的过程是:apiserver 从 etcd 拉全量数据,然后在内存做过滤,再返回给客户端
  2. Watch及AllowWatchBookmarks: AllowWatchBookmarks也是个很有用的功能,但并不是本文的重点,并不展开说明
  3. ResourceVersion/ResourceVersionMatch: 资源版本
  4. Limit: 分页功能, 最常见的例子是kubectl命令行工具,会自动将请求加上limit=500这个查询参数, 可以使用kubectl –v=8看到
  5. Continue: 是个token,是否期望从server端返回最多的结果

这里主要会聚焦ResourceVersion

从上面可以看出ResousrceVersion的默认类型为string,所以默认值为空,来看看apiserver List() 操作源码分析

List()调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
store.List
|-store.ListPredicate
|-if opt == nil
| opt = ListOptions{ResourceVersion: ""}
|-Init SelectionPredicate.Limit/Continue fileld
|-list := e.NewListFunc() // objects will be stored in this list
|-storageOpts := storage.ListOptions{opt.ResourceVersion, opt.ResourceVersionMatch, Predicate: p}
|
|-if MatchesSingle ok // 1. when "metadata.name" is specified, get single obj
| // Get single obj from cache or etcd
|
|-return e.Storage.List(KeyRootFunc(ctx), storageOpts) // 2. get all objs and perform filtering
|-cacher.List()
| // case 1: list all from etcd and filter in apiserver
|-if shouldDelegateList(opts) // true if resourceVersion == ""
| return c.storage.List // list from etcd
| |- fromRV *int64 = nil
| |- if len(storageOpts.ResourceVersion) > 0
| | rv = ParseResourceVersion
| | fromRV = &rv
| |
| |- for hasMore {
| | objs := etcdclient.KV.Get()
| | filter(objs) // filter by labels or filelds
| | }
|
| // case 2: list & filter from apiserver local cache (memory)
|-if cache.notready()
| return c.storage.List // get from etcd
|
| // case 3: list & filter from apiserver local cache (memory)
|-obj := watchCache.WaitUntilFreshAndGet
|-for elem in obj.(*storeElement)
| listVal.Set() // append results to listOjb
|-return // results stored in listObj

store

List()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

// List returns a list of items matching labels and field according to the
// store's PredicateFunc.
func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
if err != nil {
return nil, err
}
if e.Decorator != nil {
e.Decorator(out)
}
return out, nil
}

ListPredicate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

// ListPredicate returns a list of all the items matching the given
// SelectionPredicate.
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
if options == nil {
// By default we should serve the request from etcd.
options = &metainternalversion.ListOptions{ResourceVersion: ""}
}
p.Limit = options.Limit
p.Continue = options.Continue
list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, ResourceVersionMatch: options.ResourceVersionMatch, Predicate: p}
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}

err := e.Storage.List(ctx, e.KeyRootFunc(ctx), storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}

ListPredicate()中:

  1. 如果客户端没传 **ListOption**,则初始化一个默认值,其中的 ResourceVersion 设置为空字符串;

  2. 用 listoptions 中的字段分别初始化过滤器(SelectionPredicate)的 limit/continue 字段;

  3. 初始化返回结果,list := e.NewListFunc()

  4. 将 API 侧的 ListOption 转成底层存储的 ListOption;

  5. p.MatchesSingle()判断请求中是否指定了metadata.name,如果指定了,说明是查询单个对象,因为 Name 是唯一的,接下来转入查询单个 object 的逻辑;

  6. 如果p.MatchesSingle()不成立,则需要获取全量数据,然后在 apiserver 内存中根据 SelectionPredicate 中的过滤条件进行过滤,将最终结果返回给客户端;

最终e.Storage.GetToList()/e.Storage.List()会执行到cacher。这两个funciont很相似

不管是获取单个 object,还是获取全量数据,都经历类似的过程:

  1. 优先从 apiserver 本地缓存获取(决定因素包括 ResourceVersion 等),
  2. 不得已才到 etcd 去获取;

apiserver cache

List()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
// 是否必须从 etcd 读
if shouldDelegateList(opts) {
return c.storage.List(ctx, key, opts, listObj)
}

// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
// 如果缓存还没有建立好,只能从 etcd 读
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, opts, listObj)
}

trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
trace.Step("Ready")

// 情况三:apiserver 缓存正常,从缓存读:保证返回的 objects 版本不低于 `listRV`
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)

objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
if err != nil {
return err
}
trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
trace.Step("Resized result")
}
for _, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
// 真正的过滤
if filter(elem.Key, elem.Labels, elem.Fields) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})

// 更新最后一次读到的 ResourceVersion
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
return nil
}

shouldDelegateList()

判断是否必须从 etcd 读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"

// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
}

这里非常重要:

  1. 客户端未设置 ListOption{} 中的 ResourceVersion 字段,对应到这里的 resourceVersion == ""

  2. 客户端设置了 limit=500&resourceVersion=0 不会导致下次 hasContinuation==true,因为resourceVersion=0 将导致 limit 被忽略hasLimit 那一行代码),也就是说, 虽然指定了 limit=500,但这个请求会返回全量数据

  3. ResourceVersionMatch的作用是用来告诉 apiserver,该如何解读 ResourceVersion。官方的详细说明表格 ,有兴趣可以看看。

接下来再返回到 cacher 的 GetList() 逻辑,来看下具体有哪几种处理情况。

ListOption要求从 etcd 读数据

有两种情况:

  1. 当客户端要求必须从etcd读取数据时,适用于数据一致性要求极其高的场景
  2. 当apiserver缓存还没有创建好时,比如apiserver重启到ready这阶段

apiserver 会直接从 etcd 读取所有 objects 并过滤,然后返回给客户端, 适用于数据一致性要求极其高的场景。 当然,也容易误入这种场景造成 etcd 压力过大

这里将会从cache的GetList()转到etcd的List()

etcd List()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate

// ...

// set the appropriate clientv3 options to filter the returned data set
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(pred.Limit))
}

newItemFunc := getNewItemFunc(listObj, v)

var fromRV *uint64
// 如果 RV 非空
if len(resourceVersion) > 0 {
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
fromRV = &parsedRV
}

var returnedRV, continueRV, withRev int64
var continueKey string

// ResourceVersion, ResourceVersionMatch 等处理逻辑
switch {
// ...
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}

// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
var hasMore bool
var getResp *clientv3.GetResponse
for {
getResp = s.client.KV.Get(ctx, key, options...) // 从 etcd 拉数据
numFetched += len(getResp.Kvs)
hasMore = getResp.More
// ...

// take items from the response until the bucket is full, filtering as we go
for _, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
hasMore = true
break
}
lastKey = kv.Key

data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
// ...
}

// ...
}

// instruct the client to begin querying from immediately after the last key we returned
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}
var remainingItemCount *int64
// getResp.Count counts in objects that do not match the pred.
// Instead of returning inaccurate count for non-empty selectors, we return nil.
// Only set remainingItemCount if the predicate is empty.
if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) {
if pred.Empty() {
c := int64(getResp.Count - pred.Limit)
remainingItemCount = &c
}
}
return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
}

// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
}
  • client.KV.Get() 就进入 etcd client 库了
  • appendListItem()对拿到的数据进行过滤,这就是我们第一节提到的 apiserver 内存过滤操作。

apiserver使用本地缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

// GetList implements storage.Interface
func (c *Cacher) List(ctx , key string, opts storage.ListOptions, listObj runtime.Object) error
{
// 情况一:ListOption 要求必须从 etcd 读
// ...
// 情况二:apiserver 缓存未建好,只能从 etcd 读,跟情况一一
// ...
// 情况三:apiserver 缓存正常,从缓存读:保证返回的 objects 版本不低于 `listRV`
listPtr := meta.GetItemsPtr(listObj) // List elements with at least 'listRV' from cache.
listVal := conversion.EnforcePtr(listPtr)
filter := filterWithAttrsFunction(key, pred) // 最终的过滤器

objs, readResourceVersion, indexUsed := c.listItems(listRV, key, pred, ...) // 根据 index 预筛,性能优化
for _, obj := range objs {
elem := obj.(*storeElement)
if filter(elem.Key, elem.Labels, elem.Fields) // 真正的过滤
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem))
}

if c.versioner != nil
c.versioner.UpdateList(listObj, readResourceVersion, "", nil)
return nil
}

总结建议

  1. List请求默认设置 ResourceVersion=0以防etcd拉全量数据再过滤,导致,很慢或者扛不住
  2. 不要滥用List接口,如果能使用watch就使用watch
  3. 优先通过 label/field selector 在服务端做过滤
    如果需要缓存某些资源并监听变动,那需要使用 ListWatch 机制,将数据拉到本地,业务逻辑根据需要自己从 local cache 过滤。 这是 client-go 的 ListWatch/informer 机制。
  4. 优先使用 namespaced API,
    etcd 中 namespace 是前缀的一部分,因此能指定 namespace 过滤资源,速度比不是前缀的 selector 快很多,如果要 LIST 的资源在单个或少数几个 namespace,考虑使用 namespaced API:
  • Namespaced API: /api/v1/namespaces/<ns>/pods?query=xxx
  • Un-namespaced API: /api/v1/pods?query=xxx
  1. 细化etcd/apiserver等核心监控,比如作者之前不怎么关注的网络方面的metrics
  2. 经过上述的方法还是扛不住的话那可以把event等不是很重要的但很具有冲击对象的数据进行etcd分离
  3. 遇到性能问题是最容易出现多方的扯皮, 作者觉得最好的说服方式就是: 直接用数据说话,数据不会骗人

其它

本篇主要追了一下ResourceVersion是怎么处理的,但还有很多细节是没有展开的,比如

  1. ListOption中的其它参数都起到什么作用
  2. Reflector中ListandWatch关于ResourceVersion又是怎样的
  3. relist如何使用

但实在是不想篇幅太长,所以给自己留个作业,下回再探讨

参考文章: