Custom Kubernetes Schedulers for GPU Topology-Aware Workloads
The Inadequacy of Default Scheduling for High-Performance GPU Clusters
The default Kubernetes scheduler, kube-scheduler, is a marvel of general-purpose orchestration. It effectively handles stateless applications, databases, and a wide variety of workloads by evaluating a series of predicates (Filter plugins) and priorities (Score plugins). However, when confronted with high-performance computing (HPC) or distributed machine learning training workloads, its abstractions begin to leak, particularly around specialized hardware like GPUs.
The core of the problem lies in its resource model. To the default scheduler, a GPU is just an opaque, integer-based resource: nvidia.com/gpu: 8. It sees eight identical units on a node and assumes they are fungible. This assumption is fundamentally flawed for modern multi-GPU servers.
Consider a DGX A100 node with 8 GPUs. These GPUs are not an undifferentiated pool. They are interconnected in a specific topology via NVLink bridges, and their proximity to certain CPU sockets (NUMA locality) dramatically impacts performance. For a distributed training job using a collective communication library like NCCL, placing two communicating pods on GPUs connected by a high-speed NVLink bridge is orders of magnitude faster than placing them on GPUs that must communicate over the much slower PCIe bus.
The default scheduler has no awareness of this. It might place a 4-GPU training job across two different NUMA nodes or on GPUs with poor interconnects, crippling performance without any indication of misconfiguration in the Kubernetes API. This is not a bug; it's a limitation of its general-purpose design. To achieve maximum performance and efficiency, we must inject our domain-specific knowledge—GPU topology—directly into the scheduling process. This is precisely what the Kubernetes Scheduler Framework enables.
This article will guide you through building, deploying, and utilizing a custom scheduler plugin that makes topology-aware decisions for GPU workloads. We will not be replacing the default scheduler but rather running a second, specialized scheduler alongside it, dedicated to these high-value workloads.
Architecture: The Scheduler Framework and GPU Feature Discovery
Our solution consists of two key components:
    *   nvidia.com/gpu.count=8
    *   nvidia.com/gpu.product=NVIDIA-A100-SXM4-40GB
    *   nvidia.com/gpu.memory=40960
    *   nvidia.com/nvlink.p2p.level=NV8 (Indicates a specific NVLink connection pattern)
k8s.io/scheduler/framework/v1 interfaces. We'll focus on two extension points:    *   Filter: This plugin acts as a hard requirement. It will inspect a Pod's resource request and annotations, then check if a given node has the hardware to satisfy it. For example: "Does this node have 4 available A100 GPUs with at least an NV4 NVLink connection between them?" If not, the node is rejected for this Pod.
    *   Score: This plugin ranks the nodes that passed the Filter stage. It provides a nuanced preference. For example: "Among all nodes that have the required GPUs, prefer the one that offers the tightest NUMA locality and the highest NVLink level (NV8 is better than NV4). Also, prefer nodes with fewer existing GPU workloads to bin-pack our jobs."
Here is a visual representation of the workflow:
graph TD
    A[Pod with `schedulerName: gpu-topology-scheduler`] --> B{Scheduler Queue}
    B --> C{Custom Scheduler}
    C -- For each Node --> D{Filter Plugin}
    D -- Node has required GPU topology? --> E{Yes}
    D -- No --> F[Node Rejected]
    E --> G{Score Plugin}
    G -- Calculate Score based on best fit --> H[Node Score: 85]
    C -- After all nodes scored --> I{Select Node with Highest Score}
    I --> J[Bind Pod to Node]Section 1: Implementing the Topology-Aware Scheduler Plugin
We'll write our plugin in Go. Ensure you have a Go environment (1.18+) set up.
Project Setup
Initialize a new Go module:
mkdir gpu-topology-scheduler
cd gpu-topology-scheduler
go mod init github.com/your-org/gpu-topology-schedulerWe need the Kubernetes API and scheduler framework dependencies:
go get k8s.io/api/core/v1
go get k8s.io/apimachinery/pkg/runtime
go get k8s.io/kubernetes/pkg/scheduler/framework/v1
go get k8s.io/component-base/logsThe Plugin's Core Structure
Let's define the basic structure of our plugin. We'll create a single struct that implements both Filter and Score interfaces.
main.go
package main
import (
	"context"
	"fmt"
	"os"
	"strconv"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/component-base/logs"
	"k8s.io/kubernetes/cmd/kube-scheduler/app"
	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1"
)
const (
	// Name is the name of the plugin used in the scheduler configuration.
	Name = "GpuTopologyAware"
	// Annotation keys we'll use on Pods to request specific topologies.
	MinGpusAnnotation      = "gpu.scheduler.example.com/min-gpus"
	MinNvlinkLvlAnnotation = "gpu.scheduler.example.com/min-nvlink-level"
	// Node labels populated by GFD.
	GpuCountLabel   = "nvidia.com/gpu.count"
	NvlinkLvlLabel  = "nvidia.com/nvlink.p2p.level"
)
type GpuTopologyAware struct{}
var _ framework.FilterPlugin = &GpuTopologyAware{}
var _ framework.ScorePlugin = &GpuTopologyAware{}
func (g *GpuTopologyAware) Name() string {
	return Name
}
// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
	return &GpuTopologyAware{}, nil
}
// Main entry point to run the scheduler.
func main() {
	command := app.NewSchedulerCommand(
		app.WithPlugin(Name, New),
	)
	logs.InitLogs()
	defer logs.FlushLogs()
	if err := command.Execute(); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}This boilerplate sets up the plugin name, defines the annotations we'll look for on Pods, and provides the main function to compile this plugin into a standalone scheduler binary.
Implementing the `Filter` Plugin
The Filter plugin is our non-negotiable gatekeeper. Its logic will be:
- Check if the Pod requests GPUs and has our custom annotations.
- If not, the pod is not our concern; let it pass.
- If it does, parse the requested minimum GPU count and NVLink level from annotations.
- Check the node's labels (from GFD) to see if it meets these requirements.
Success if it meets the criteria, Unschedulable otherwise.Let's add the Filter method to our GpuTopologyAware struct.
filter.go (add this method to the struct in main.go or a new file)
// Filter checks if a node has enough GPUs with the required topology.
func (g *GpuTopologyAware) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
	// Get the requested GPU count from the pod spec. We only care about pods that request GPUs.
	podGpuRequest := getGpuRequestCount(pod)
	if podGpuRequest == 0 {
		return framework.NewStatus(framework.Success)
	}
	// Get custom requirements from annotations.
	minGpus, _ := getIntFromAnnotation(pod, MinGpusAnnotation)
	minNvlinkLvl, _ := getIntFromAnnotation(pod, MinNvlinkLvlAnnotation)
	// If no custom annotations, this plugin doesn't apply.
	if minGpus == 0 && minNvlinkLvl == 0 {
		return framework.NewStatus(framework.Success)
	}
	// Now, check the node's capabilities from its labels.
	node := nodeInfo.Node()
	if node == nil {
		return framework.NewStatus(framework.Error, "node not found")
	}
	// 1. Check total GPU count on the node.
	nodeGpuCount, err := getIntFromLabel(node, GpuCountLabel)
	if err != nil {
		return framework.NewStatus(framework.Unschedulable, "node does not have GPU count label")
	}
	if int64(nodeGpuCount) < podGpuRequest {
		return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node has insufficient total GPUs: requested %d, available %d", podGpuRequest, nodeGpuCount))
	}
	// 2. Check NVLink level.
	if minNvlinkLvl > 0 {
		nodeNvlinkLvl, err := getNvlinkLevelFromLabel(node.Labels)
		if err != nil {
			return framework.NewStatus(framework.Unschedulable, "node does not have NVLink level label")
		}
		if nodeNvlinkLvl < minNvlinkLvl {
			return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node NVLink level %d is less than required %d", nodeNvlinkLvl, minNvlinkLvl))
		}
	}
	// A more advanced filter would check *available* GPUs, not just total count.
	// This requires tracking allocated resources, which is more complex.
	// For this example, we assume node-level properties are sufficient for filtering.
	return framework.NewStatus(framework.Success)
}
// Helper functions to parse pod and node data.
func getGpuRequestCount(pod *v1.Pod) int64 {
	var count int64
	for _, container := range pod.Spec.Containers {
		if val, ok := container.Resources.Limits["nvidia.com/gpu"]; ok {
			count += val.Value()
		}
	}
	return count
}
func getIntFromAnnotation(pod *v1.Pod, key string) (int, error) {
	if val, ok := pod.Annotations[key]; ok {
		i, err := strconv.Atoi(val)
		if err == nil {
			return i, nil
		}
	}
	return 0, fmt.Errorf("annotation not found or invalid")
}
func getIntFromLabel(node *v1.Node, key string) (int, error) {
	if val, ok := node.Labels[key]; ok {
		i, err := strconv.Atoi(val)
		if err == nil {
			return i, nil
		}
	}
	return 0, fmt.Errorf("label not found or invalid")
}
func getNvlinkLevelFromLabel(labels map[string]string) (int, error) {
	if val, ok := labels[NvlinkLvlLabel]; ok {
		// GFD label is like "NV8". We need the integer part.
		if len(val) > 2 && val[:2] == "NV" {
			return strconv.Atoi(val[2:])
		}
	}
	return 0, fmt.Errorf("nvlink label not found or in wrong format")
}Implementing the `Score` Plugin
After filtering, we have a list of valid nodes. The Score plugin ranks them. A higher score means a better fit. Our scoring logic will be a weighted sum of factors:
NV8 is superior to one with NV4.We need to add the Score and PreScore methods. PreScore is for pre-computation, though we won't need it here.
score.go (add these methods to the struct)
// PreScore is a no-op for this plugin.
func (g *GpuTopologyAware) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
	return framework.NewStatus(framework.Success)
}
// Score ranks nodes that passed the filter stage.
func (g *GpuTopologyAware) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	// This requires the framework handle, which we get from the New function.
	// For simplicity in this example, let's assume we can get nodeInfo. In a real implementation,
	// you'd pass the framework.Handle from New() to your struct.
	// Let's re-fetch node info here (not ideal for performance, but demonstrates the logic).
	// A proper implementation would get nodeInfo from a snapshot passed by the framework handle.
	// Let's simulate the logic without the full handle plumbing.
	// For this example, we will focus on the scoring logic itself.
	// The actual nodeInfo would be retrieved via the framework handle.
	// The score logic is what's important.
	// Let's define a dummy function to get node labels for demonstration.
	// In production, this data comes from the framework's internal state.
	nodeLabels := getNodeLabelsFromName(nodeName) // DUMMY FUNCTION
	// 1. Score based on NVLink level.
	nvlinkScore := 0
	nodeNvlinkLvl, err := getNvlinkLevelFromLabel(nodeLabels)
	if err == nil {
		// Let's say each level is worth 10 points. NV8 = 80 points.
		nvlinkScore = nodeNvlinkLvl * 10
	}
	// 2. Score based on available GPUs (bin-packing).
	// We prefer nodes with more available GPUs.
	// This is a complex calculation in a real scheduler. It needs to know about all pods on the node.
	// Let's create a simplified score.
	// Higher available GPU count = higher score.
	allocatedGpus := getAllocatedGpuCount(nodeName) // DUMMY FUNCTION
	totalGpus, _ := getIntFromLabelFromMap(nodeLabels, GpuCountLabel)
	availableGpus := totalGpus - allocatedGpus
	// Normalize the score. Let's say max 8 GPUs. Score from 0 to 20.
	// (availableGpus / totalGpus) * 20
	fragmentationScore := 0
	if totalGpus > 0 {
		fragmentationScore = (availableGpus * 20) / totalGpus
	}
	finalScore := int64(nvlinkScore + fragmentationScore)
	// Final score must be between framework.MinNodeScore and framework.MaxNodeScore (0-100)
	if finalScore > framework.MaxNodeScore {
		finalScore = framework.MaxNodeScore
	}
	return finalScore, framework.NewStatus(framework.Success)
}
// ScoreExtensions returns a ScoreExtensions interface if the plugin implements one.
func (g *GpuTopologyAware) ScoreExtensions() framework.ScoreExtensions {
	return nil // We don't need normalization in this simple case.
}
// Dummy functions for demonstration. In a real plugin, this data comes from the framework.Handle and NodeInfo snapshot.
func getNodeLabelsFromName(nodeName string) map[string]string {
	// In a real scenario, you'd query the cluster state.
	if nodeName == "gpu-node-a100-nv8" {
		return map[string]string{
			"nvidia.com/gpu.count": "8",
			"nvidia.com/nvlink.p2p.level": "NV8",
		}
	}
	if nodeName == "gpu-node-v100-nv4" {
		return map[string]string{
			"nvidia.com/gpu.count": "4",
			"nvidia.com/nvlink.p2p.level": "NV4",
		}
	}
	return nil
}
func getAllocatedGpuCount(nodeName string) int {
	// In a real scheduler, this would inspect the pods assigned to the node.
	if nodeName == "gpu-node-a100-nv8" {
		return 2 // Simulate 2 GPUs already in use
	}
	return 1
}
func getIntFromLabelFromMap(labels map[string]string, key string) (int, error) {
	if val, ok := labels[key]; ok {
		return strconv.Atoi(val)
	}
	return 0, fmt.Errorf("label not found")
}Note on Production Implementation: The dummy functions above are crucial to understand. A production-grade plugin would receive a framework.Handle in its New function. This handle provides access to a cached snapshot of the cluster state (handle.SnapshotSharedLister()), which is far more efficient than querying the API server for every scheduling decision.
Section 2: Building and Deploying the Scheduler
Now that we have the Go code, we need to package it as a container and deploy it to Kubernetes.
Dockerfile
We'll use a multi-stage Docker build for an optimized, small final image.
# Build stage
FROM golang:1.19-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
# Build the scheduler binary
RUN CGO_ENABLED=0 GOOS=linux go build -o /gpu-topology-scheduler
# Final stage
FROM alpine:latest
WORKDIR /root/
COPY --from=builder /gpu-topology-scheduler .
ENTRYPOINT ["/root/gpu-topology-scheduler"]Build and push the image to your container registry:
docker build -t your-registry/gpu-topology-scheduler:v1.0.0 .
docker push your-registry/gpu-topology-scheduler:v1.0.0Kubernetes Deployment Manifests
Deploying a custom scheduler involves three key Kubernetes objects:
ConfigMap containing the scheduler's configuration.ClusterRole and ClusterRoleBinding to give it the necessary permissions.Deployment to run the scheduler pod itself.scheduler-config.yaml
This file tells the scheduler binary to enable our custom plugin.
apiVersion: v1
kind: ConfigMap
metadata:
  name: gpu-topology-scheduler-config
  namespace: kube-system
data:
  scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    leaderElection:
      leaderElect: true
      resourceName: gpu-topology-scheduler
      resourceNamespace: kube-system
    profiles:
      - schedulerName: gpu-topology-scheduler
        plugins:
          filter:
            enabled:
              - name: GpuTopologyAware
          score:
            enabled:
              - name: GpuTopologyAware
        pluginConfig:
          - name: GpuTopologyAware
            args: {}rbac.yaml
Our scheduler needs permissions to read Pods and Nodes, and to update Pod status (to bind them to a node).
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: gpu-topology-scheduler-role
rules:
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["get", "list", "watch"]
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list", "watch", "update"]
- apiGroups: ["coordination.k8s.io"]
  resources: ["leases"]
  verbs: ["create", "get", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: gpu-topology-scheduler-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: gpu-topology-scheduler-role
subjects:
- kind: ServiceAccount
  name: gpu-topology-scheduler-sa
  namespace: kube-systemdeployment.yaml
Finally, the Deployment to run our scheduler.
apiVersion: v1
kind: ServiceAccount
metadata:
  name: gpu-topology-scheduler-sa
  namespace: kube-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpu-topology-scheduler
  namespace: kube-system
  labels:
    app: gpu-topology-scheduler
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gpu-topology-scheduler
  template:
    metadata:
      labels:
        app: gpu-topology-scheduler
    spec:
      serviceAccountName: gpu-topology-scheduler-sa
      containers:
      - name: scheduler
        image: your-registry/gpu-topology-scheduler:v1.0.0
        args:
        - --config=/etc/kubernetes/scheduler-config.yaml
        - -v=4 # Verbose logging
        resources:
          requests:
            cpu: "100m"
            memory: "100Mi"
        volumeMounts:
        - name: config-volume
          mountPath: /etc/kubernetes
      volumes:
      - name: config-volume
        configMap:
          name: gpu-topology-scheduler-configApply these manifests to your cluster:
kubectl apply -f scheduler-config.yaml
kubectl apply -f rbac.yaml
kubectl apply -f deployment.yamlYou now have a second scheduler running in your cluster, waiting for Pods that specify schedulerName: gpu-topology-scheduler.
Section 3: Using the Custom Scheduler and Advanced Considerations
To use our new scheduler, a developer simply needs to add two things to their Pod spec: the schedulerName and our custom annotations.
Sample Pod Spec
Here is a Pod requesting 4 GPUs on a node that has at least an NV8 level of NVLink interconnectivity.
apiVersion: v1
kind: Pod
metadata:
  name: distributed-training-job-pod-1
  annotations:
    gpu.scheduler.example.com/min-nvlink-level: "8"
spec:
  schedulerName: gpu-topology-scheduler # CRITICAL: This directs the pod to our scheduler
  containers:
  - name: training-container
    image: nvidia/cuda:11.8.0-base-ubuntu22.04
    command: ["/bin/sh", "-c", "sleep 3600"]
    resources:
      limits:
        nvidia.com/gpu: 4When this Pod is created, the Kubernetes API server will not hand it to the default scheduler. Instead, it will be picked up by our gpu-topology-scheduler, which will then execute our custom Filter and Score logic to find the optimal node.
Edge Case: Gang Scheduling and Atomicity
Our current implementation has a significant limitation for distributed workloads: it schedules one pod at a time. A real distributed training job might consist of 4 pods that must be scheduled simultaneously (all-or-nothing). If only 3 can be scheduled, the entire job is useless and wastes resources. This is known as gang scheduling.
Solving this requires more advanced plugins:
*   Permit Plugin: This plugin can delay a Pod's binding. The scheduler would put the first pod of a job into a "waiting" state at the Permit phase. It continues scheduling other pods from the same job. Only when all pods in the job have found a valid node does the Permit plugin allow them all to be bound. If one fails, the Permit plugin can reject all of them, forcing a reschedule.
*   Custom QueueSort Plugin: You can implement a QueueSort plugin to ensure that pods from the same job are evaluated together, preventing other pods from taking the resources they need.
Projects like Kube-batch/Volcano or the Coscheduling plugin in the official scheduler-plugins repository provide robust, production-ready implementations of gang scheduling.
Performance Considerations and Observability
Adding custom logic to the critical scheduling path introduces latency. Every millisecond spent in your Filter or Score function adds to the Pod's scheduling time.
*   Benchmarking: The scheduler framework includes metrics that can be scraped by Prometheus. Monitor the scheduler_plugin_execution_duration_seconds metric to understand the performance impact of your plugin.
*   Efficient Code: Avoid making external API calls within your plugins. Rely on the cached data provided by the framework.Handle. Your plugin logic should be computationally inexpensive.
* Logging: Use structured logging within your plugin to debug scheduling decisions. Log why a node was filtered or what score it received. This is invaluable for troubleshooting.
For example, in our Filter plugin:
// Inside Filter function
// ...
if nodeNvlinkLvl < minNvlinkLvl {
    klog.V(4).Infof("Node %s rejected for Pod %s: required NVLink %d, got %d", node.Name, pod.Name, minNvlinkLvl, nodeNvlinkLvl)
    return framework.NewStatus(framework.Unschedulable, ...)
}Conclusion
The default Kubernetes scheduler provides a solid foundation, but for specialized, performance-sensitive hardware like GPUs, it is insufficient. By leveraging the Scheduler Framework, we can encode complex, domain-specific requirements directly into the cluster's decision-making process. We've demonstrated how to build a topology-aware plugin that understands GPU interconnects, moving beyond simple resource counting to intelligent resource placement.
This approach transforms Kubernetes from a general-purpose orchestrator into a high-performance computing platform, capable of maximizing the efficiency and performance of your most critical AI/ML workloads. While we've covered the core implementation, the path to a production system involves integrating more advanced concepts like gang scheduling, robust error handling, and comprehensive observability—all of which are enabled by the powerful extension points of the scheduler framework.