store.List |-store.ListPredicate |-if opt == nil | opt = ListOptions{ResourceVersion: ""} |-Init SelectionPredicate.Limit/Continue fileld |-list := e.NewListFunc() // objects will be stored in this list |-storageOpts := storage.ListOptions{opt.ResourceVersion, opt.ResourceVersionMatch, Predicate: p} | |-if MatchesSingle ok // 1. when "metadata.name" is specified, get single obj | // Get single obj from cache or etcd | |-return e.Storage.List(KeyRootFunc(ctx), storageOpts) // 2. get all objs and perform filtering |-cacher.List() | // case 1: list all from etcd and filter in apiserver |-if shouldDelegateList(opts) // trueif resourceVersion == "" | return c.storage.List // list from etcd | |- fromRV *int64 = nil | |- if len(storageOpts.ResourceVersion) > 0 | | rv = ParseResourceVersion | | fromRV = &rv | | | |- for hasMore { | | objs := etcdclient.KV.Get() | | filter(objs) // filter by labels or filelds | | } | | // case 2: list & filter from apiserver local cache (memory) |-if cache.notready() | return c.storage.List // get from etcd | | // case 3: list & filter from apiserver local cache (memory) |-obj := watchCache.WaitUntilFreshAndGet |-for elem in obj.(*storeElement) | listVal.Set() // append results to listOjb |-return // results stored in listObj
// ListPredicate returns a list of all the items matching the given // SelectionPredicate. func(e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) { if options == nil { // By default we should serve the request from etcd. options = &metainternalversion.ListOptions{ResourceVersion: ""} } p.Limit = options.Limit p.Continue = options.Continue list := e.NewListFunc() qualifiedResource := e.qualifiedResourceFromContext(ctx) storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, ResourceVersionMatch: options.ResourceVersionMatch, Predicate: p} if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { err := e.Storage.GetToList(ctx, key, storageOpts, list) return list, storeerr.InterpretListError(err, qualifiedResource) } // if we cannot extract a key based on the current context, the optimization is skipped }
// If resourceVersion is specified, serve it from cache. // It's guaranteed that the returned value is at least that // fresh as the given resourceVersion. listRV, err := c.versioner.ParseResourceVersion(resourceVersion) if err != nil { return err } // 如果缓存还没有建立好,只能从 etcd 读 if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. return c.storage.List(ctx, key, opts, listObj) }
// If resourceVersion is not specified, serve it from underlying // storage (for backward compatibility). If a continuation is // requested, serve it from the underlying storage as well. // Limits are only sent to storage when resourceVersion is non-zero // since the watch cache isn't able to perform continuations, and // limits are ignored when resource version is zero return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact }
// List implements storage.Interface.List. func(s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { resourceVersion := opts.ResourceVersion match := opts.ResourceVersionMatch pred := opts.Predicate
// ...
// set the appropriate clientv3 options to filter the returned data set var paging bool options := make([]clientv3.OpOption, 0, 4) if s.pagingEnabled && pred.Limit > 0 { paging = true options = append(options, clientv3.WithLimit(pred.Limit)) }
// loop until we have filled the requested limit from etcd or there are no more results var lastKey []byte var hasMore bool var getResp *clientv3.GetResponse for { getResp = s.client.KV.Get(ctx, key, options...) // 从 etcd 拉数据 numFetched += len(getResp.Kvs) hasMore = getResp.More // ...
// take items from the response until the bucket is full, filtering as we go for _, kv := range getResp.Kvs { if paging && int64(v.Len()) >= pred.Limit { hasMore = true break } lastKey = kv.Key
// instruct the client to begin querying from immediately after the last key we returned // we never return a key that the client wouldn't be allowed to see if hasMore { // we want to start immediately after the last key next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV) if err != nil { return err } var remainingItemCount *int64 // getResp.Count counts in objects that do not match the pred. // Instead of returning inaccurate count for non-empty selectors, we return nil. // Only set remainingItemCount if the predicate is empty. if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) { if pred.Empty() { c := int64(getResp.Count - pred.Limit) remainingItemCount = &c } } return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount) }
// no continuation return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil) }