Z.S.K.'s Records

Kube-batch学习(nodeorder插件使用)

AI场景跟大多数的业务不太一样的是: 网络端需要尽可能地靠近,对于大多数业务来说,为了保证其可用性,一般副本都会分散地部署在不同node,而AI业务通常伴随着海量的数据交换,一个job中的多个pod需要协同处理,如果分散在多个node上,task间的任务交换的快慢就得依赖于网络的传输的快慢,而如果是在一台node上的话,那就没有这部分的消耗,一个job中的pods如何做到尽可能地调度在同一台机器上呢, kube-batch除了能够支持poggroup外,也是能够支持的podaffinit的.

一个job中的pods如何做到尽可能地调度在同一台机器上呢, 最容易让人想到的是podAffinit,podAffinit是针对于在集群中已经存在的pod,其它的pod可以通过podAffinit来让他们部署在一起,这里有个很大的问题在于, 如果同时使用了podgroup,也就是说在podgroup中的pod在绑定节点之前在集群中是不存在的,也就是办法通过labelsector找到这些pod,那要怎么办呢?

为解决这个问题,kube-batch进行了详细的issue讨论,为此,kube-batch引入了一个全新的plugins, 最开始叫Prioritize,后改名为nodeorder.

假使kube-batch使用以下的配置:

1
2
3
4
5
6
7
actions: "allocate, backfill"
tiers:
- plugins:
- name: gang
- plugins:
- name: nodeorder
- name: predicates

之前提到过,actions指定了kube-batch在调度时需要执行的操作,同时,这些操作会关联一些plugins(简单来说就是一些算法)来实现相关功能,比如说,actions是allocate,allocate意为分配资源,但是在分配资源时有时也会有一些要求,比如优先级高的先分配,或者把某个任务当成一个整体进行分配(gang)等等,同时,不同的actions可能关联同一个plugins,比如对于资源回收时,也可能存在先回收优先级低的pod的资源,这就是actions及plugins之间的关系

这里将nodeorder放在靠前的位置,同时启用gang插件.

nodeorder的流程如下:

从图中可看出Prioritize只发生在Allocate及Preempt actions中, 上面的配置中只启用了Allocate,当kube-batch从所有机器中筛选出所有符合条件的node列表后,在Allocate中开始执行Prioritize的function,详细流程:

  • 并行地在筛选出来的node列表中执行所有的priority functions
  • 根据优先级规则是否满足工作负载调度标准对节点进行评分
  • 一旦从所有优先级返回分数,则聚合分数并确定得分最高的节点
  • 将最后一步中选定的节点委托给AllocateFn,以将工作负载绑定到该节点

从上图中可以看出priority functions包含interpodAffinityFn,从这个命名来看是跟podAffinity有关的,假如我的job定义了这样的podAffinity

1
2
3
4
5
6
7
8
9
10
11
12
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: argo-workflow/mpi-task
operator: In
values:
- '{{workflow.parameters.CURRENT_OP_RUN_ID}}'
topologyKey: "kubernetes.io/hostname"

使用了preferredDuringSchedulingIgnoredDuringExecution,尽可能地调度到一台节点上,确实是interpodAffinityFn起了作用

Talk is cheap, show me the code

主要的逻辑代码位于kube-batch/vendor/k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/interpod_affinity.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
processNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]]
if nodeInfo.Node() != nil {
if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the nodes.
for _, existingPod := range nodeInfo.Pods() {
if err := processPod(existingPod); err != nil {
pm.setError(err)
}
}
} else {
// The pod doesn't have any constraints - we need to check only existing
// ones that have some.
for _, existingPod := range nodeInfo.PodsWithAffinity() {
if err := processPod(existingPod); err != nil {
pm.setError(err)
}
}
}
}
}

其中

pod 一个需被调度的Pod
hasAffinityConstraints “被调度的pod”是否有定义亲和配置
hasAntiAffinityConstraints “被调度的pod”是否有定义亲和配置
existingPod 一个待处理的亲和目标pod
existingPodNode 运行此“亲和目标pod”的节点–“目标Node
existingHasAffinityConstraints “亲和目标pod”是否存在亲和约束
existingHasAntiAffinityConstraints “亲和目标pod”是否存在反亲和约束

上面调用的processPod,传入的是一个existingPod,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
processPod := func(existingPod *v1.Pod) error {
existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
return nil
}
return err
}
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil

if hasAffinityConstraints {
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, 1)
}

第一次循环的时候通过亲和性规则显然是找不到pod的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight float64) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
p.setError(err)
return
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector)
if match { // 第一次无法匹配
func() {
p.Lock()
defer p.Unlock()
for _, node := range p.nodes {
if priorityutil.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) {
p.counts[node.Name] += weight
}
}
}()
}
}

func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
for i := range terms {
term := &terms[i]
p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, float64(term.Weight*int32(multiplier)))
}
}

但是第二次循环的时候就能发现第1个pod了,在整个循环期间需要计算weight值,最后得分最高的node为最终选中的node

更加详细的代码详解可参考: https://www.jianshu.com/p/a931ad4f0242

参考文章:

转载请注明原作者: 周淑科(https://izsk.me)

 wechat
Scan Me To Read on Phone
I know you won't do this,but what if you did?