在 Kubernetes 中,Pod 是最小的可部署单元,它包含一个或多个容器,这些容器共享同一个网络空间和存储空间。Kubernetes 可以根据不同的调度策略将 Pod 调度到不同的节点上运行。在一些需要特定硬件特性的场景中,例如需要使用 GPU 进行计算的任务,我们需要将 Pod 调度到有 GPU 的节点上运行。本文将介绍 Kubernetes 中的硬件特性调度,包括节点标签、调度器扩展器和 CRD,以及如何使用它们来实现硬件特性调度。
1. 节点标签
Kubernetes 中的节点标签可以用来标识节点的属性,例如 CPU、内存、GPU 等硬件特性。我们可以通过给节点打标签的方式来实现硬件特性调度。下面是一个示例,我们给一个节点打上了标签 gpu=true
,表示这个节点有 GPU:
apiVersion: v1 kind: Node metadata: name: gpu-node labels: gpu: "true"
接下来,我们可以在 Pod 的调度模板中加入硬件特性的标签要求,例如:
// javascriptcn.com 代码示例 apiVersion: v1 kind: Pod metadata: name: gpu-pod spec: nodeSelector: gpu: "true" containers: - name: gpu-container image: gpu-image
这样,Kubernetes 调度器就会将这个 Pod 调度到有 GPU 的节点上运行。
2. 调度器扩展器
除了节点标签,Kubernetes 还提供了调度器扩展器的机制,可以通过编写调度器扩展器的方式来实现硬件特性调度。调度器扩展器是一个独立的程序,它可以订阅 Kubernetes 调度器的事件,拦截和修改调度器的决策,从而实现自定义的调度逻辑。下面是一个调度器扩展器的示例,它会将需要 GPU 的 Pod 调度到有 GPU 的节点上运行:
// javascriptcn.com 代码示例 package main import ( "context" "fmt" "strings" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/scheduler/framework" ) const ( gpuLabel = "gpu" ) type gpuScheduler struct { clientset kubernetes.Interface podQueue workqueue.RateLimitingInterface } func NewGPUScheduler() framework.Plugin { config, err := rest.InClusterConfig() if err != nil { panic(err.Error()) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } s := &gpuScheduler{ clientset: clientset, podQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "gpu-scheduler"), } go s.run() return s } func (s *gpuScheduler) Name() string { return "gpu-scheduler" } func (s *gpuScheduler) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if !needsGPU(pod) { return framework.NewStatus(framework.Success, "") } if hasGPU(nodeInfo) { return framework.NewStatus(framework.Success, "") } return framework.NewStatus(framework.Unschedulable, "Node does not have GPU") } func (s *gpuScheduler) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { return 0, framework.NewStatus(framework.Success, "") } func (s *gpuScheduler) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, &v1.Binding{ ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, }) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } return framework.NewStatus(framework.Success, "") } func (s *gpuScheduler) run() { handler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { if node, ok := obj.(*v1.Node); ok { s.podQueue.Add(node.Name) } }, DeleteFunc: func(obj interface{}) { if node, ok := obj.(*v1.Node); ok { s.podQueue.Add(node.Name) } }, UpdateFunc: func(oldObj, newObj interface{}) { if node, ok := newObj.(*v1.Node); ok { s.podQueue.Add(node.Name) } }, } nodeListWatcher := cache.NewListWatchFromClient(s.clientset.CoreV1().RESTClient(), "nodes", "", labels.Everything()) _, controller := cache.NewInformer(nodeListWatcher, &v1.Node{}, 0, handler) controller.Run(workqueue.NeverStop) for { key, quit := s.podQueue.Get() if quit { return } nodeName := key.(string) s.podQueue.Forget(key) pods, err := s.clientset.CoreV1().Pods("").List(context.Background(), v1.ListOptions{}) if err != nil { s.podQueue.AddRateLimited(nodeName) continue } for _, pod := range pods.Items { if pod.Spec.NodeName == "" && needsGPU(&pod) { nodeInfo, found := controller.GetNodeInfo(nodeName) if !found { s.podQueue.AddRateLimited(nodeName) continue } if hasGPU(nodeInfo) { err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(context.Background(), &v1.Binding{ ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, }) if err != nil { s.podQueue.AddRateLimited(nodeName) continue } } } } } } func needsGPU(pod *v1.Pod) bool { for _, container := range pod.Spec.Containers { if strings.Contains(container.Image, "gpu") { return true } } return false } func hasGPU(nodeInfo *framework.NodeInfo) bool { labels := sets.NewString() for _, nodeLabel := range nodeInfo.Node().Labels { labels.Insert(fmt.Sprintf("%s=%s", nodeLabel.Key, nodeLabel.Value)) } return labels.Has(fmt.Sprintf("%s=true", gpuLabel)) }
上述代码中,我们编写了一个名为 gpuScheduler
的调度器扩展器,它会订阅 Kubernetes 节点的事件,如果有节点的标签中包含 gpu=true
,则将需要 GPU 的 Pod 调度到该节点上运行。我们可以使用下面的命令来启动这个调度器扩展器:
$ kubectl apply -f gpu-scheduler.yaml
其中,gpu-scheduler.yaml
是上述代码的 YAML 配置文件。
3. CRD
除了节点标签和调度器扩展器,Kubernetes 还支持自定义资源定义(CRD)的方式来实现硬件特性调度。CRD 是 Kubernetes 中的一种扩展机制,它可以让用户定义自己的 API 资源类型,从而扩展 Kubernetes 的功能。我们可以通过定义一个名为 GPU
的 CRD,来标识节点是否有 GPU,并将需要 GPU 的 Pod 调度到有 GPU 的节点上运行。下面是一个 GPU
CRD 的示例:
// javascriptcn.com 代码示例 apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: name: gpus.devices.example.com spec: group: devices.example.com versions: - name: v1alpha1 served: true storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: gpu: type: string pattern: "^(true|false)$" scope: Namespaced names: plural: gpus singular: gpu kind: GPU shortNames: - gp
上述代码中,我们定义了一个名为 gpus.devices.example.com
的 CRD,它有一个名为 GPU
的资源类型,可以用来标识节点是否有 GPU。我们可以通过下面的命令来创建这个 CRD:
$ kubectl apply -f gpu-crd.yaml
接下来,我们可以在节点上创建一个名为 gpu-node
的 GPU
资源,表示这个节点有 GPU:
apiVersion: devices.example.com/v1alpha1 kind: GPU metadata: name: gpu-node spec: gpu: "true"
最后,我们可以在 Pod 的调度模板中加入 GPU
资源的要求,例如:
// javascriptcn.com 代码示例 apiVersion: v1 kind: Pod metadata: name: gpu-pod spec: containers: - name: gpu-container image: gpu-image nodeSelector: devices.example.com/gpu: "true"
这样,Kubernetes 调度器就会将这个 Pod 调度到有 GPU 的节点上运行。
总结
本文介绍了 Kubernetes 中的硬件特性调度,包括节点标签、调度器扩展器和 CRD,以及如何使用它们来实现硬件特性调度。通过使用这些机制,我们可以将需要特定硬件特性的 Pod 调度到有相应硬件特性的节点上运行,从而提高计算效率和资源利用率。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/6553d90ad2f5e1655dd8be75