Custom Kubernetes Schedulers for GPU Topology-Aware ML Workloads

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Performance Cliff: Why the Default Scheduler Fails Distributed ML

The kube-scheduler is a marvel of general-purpose orchestration, but its abstractions break down under the demanding, hardware-specific requirements of large-scale distributed machine learning. For a senior engineer tasked with running multi-GPU training jobs for frameworks like PyTorch DistributedDataParallel (DDP) or Horovod, the default scheduler's core limitation becomes a critical performance bottleneck.

The scheduler sees a GPU node's resources like this:

yaml
allocatable:
  cpu: "96"
  memory: "512Gi"
  nvidia.com/gpu: "8"

To the scheduler, nvidia.com/gpu: "8" is just a number. It understands that it can't place a pod requesting 4 GPUs on a node with only 3 available. However, it has zero awareness that not all GPUs are created equal in their connectivity. On a modern DGX A100 server, those 8 GPUs are interconnected by high-speed NVLink and NVSwitch fabric, offering bandwidth in the range of 600 GB/s. Communication over the node's PCIe bus is an order of magnitude slower, and communication over the data center's Ethernet network is slower still.

Consider a pod requesting 4 GPUs for a DDP training job. The default scheduler might place it on a node with 8 GPUs, but it could unknowingly assign GPUs 0, 2, 5, 7, which might not share a direct NVLink path. Worse, in a multi-job scenario, two pods from the same 8-GPU training job could be scheduled on two different 4-GPU nodes. The inter-pod communication for gradient synchronization would then be forced over the 100GbE network fabric, completely negating the benefits of the on-node NVLink interconnect.

This topology-agnostic scheduling leads to a performance cliff. Jobs that should leverage 600 GB/s interconnects are throttled by 12.5 GB/s network speeds, extending training times, increasing costs, and creating unpredictable performance profiles. The solution is not to abandon Kubernetes, but to teach it about our hardware topology by extending its brain: the scheduler itself.

The Kubernetes Scheduling Framework: Our Toolkit for Extension

Instead of forking and maintaining the entire kube-scheduler, we can leverage the Scheduling Framework. It exposes a set of extension points (Go plugin interfaces) that allow us to inject custom logic into the scheduling cycle. For our topology-aware scheduler, we'll focus on a few key extension points:

* Filter: This is a predicate function. It answers the question: "Can this pod run on this node?" We will implement a Filter plugin that rejects nodes which cannot satisfy the pod's GPU request within a single high-bandwidth NVLink clique.

Score: After filtering, one or more nodes may be deemed viable. The Score extension ranks these nodes, answering: "Which of these valid nodes is the best* fit?" We can implement scoring logic to prefer nodes that, for example, minimize GPU fragmentation.

* Reserve: This is a critical extension for managing state and avoiding race conditions. Once a node is selected for a pod, Reserve is called. This is our chance to mark the specific GPUs we've chosen as "allocated" in our plugin's state, so that another pod being scheduled concurrently doesn't try to claim the same specific GPUs.

By implementing these interfaces, we can build a lightweight plugin that runs as part of a scheduler process, precisely controlling GPU placement without reinventing the wheel.

Designing Our GPU Topology-Aware Scheduler

Our solution consists of two main components:

  • Topology Discovery DaemonSet: A background process that runs on every GPU-enabled node. Its job is to inspect the local hardware, determine the GPU connectivity matrix, and publish this information where the scheduler can see it: as a Node annotation.
  • Custom Scheduler Plugin: The Go code that implements the Filter and Score logic, reading the topology from the Node annotations to make intelligent placement decisions.
  • Step 1: GPU Topology Discovery and Annotation

    The scheduler itself doesn't run on the worker nodes, so it cannot directly inspect hardware. We need an agent to do this for us.

    A DaemonSet is the perfect tool. We'll create a simple container that includes the NVIDIA Management Library (NVML) or can shell out to nvidia-smi. On startup, and perhaps periodically, this agent will execute a command like nvidia-smi topo -m to get the GPU connectivity matrix.

    The output looks something like this:

    text
            GPU0    GPU1    GPU2    GPU3    GPU4    GPU5    GPU6    GPU7    mlx5_0  mlx5_1
    GPU0     X      NV12    NV12    NV12    NV12    NV12    NV12    NV12    PHB     PHB
    GPU1    NV12     X      NV12    NV12    NV12    NV12    NV12    NV12    PHB     PHB
    ...

    Our agent will parse this matrix. We can represent NV12 (a version of NVLink) as a 1 and other connections (like PHB - PCIe Host Bridge) as a 0. The agent will then serialize this information and patch its own Node object with an annotation.

    Example Node Annotation:

    yaml
    apiVersion: v1
    kind: Node
    metadata:
      name: dgx-node-1
      annotations:
        gpu-topology.acme.com/nvlink-matrix: "[[0,1,1,1,1,1,1,1],[1,0,1,1,1,1,1,1],...]]"
        gpu-topology.acme.com/gpu-states: "[\"free\",\"free\",\"free\",\"free\",\"free\",\"free\",\"free\",\"free\"]"

    We add two annotations: one for the static NVLink connectivity matrix and another to track the allocation state of individual GPUs, which our scheduler will manage. Using annotations is simpler than a CRD for this use case as the data is tightly coupled to a specific Node.

    Step 2: The Scheduler Plugin Architecture

    We will deploy our plugin as part of a secondary scheduler. This is a crucial production pattern. The default kube-scheduler continues to handle all standard workloads, while our specialized scheduler, let's call it topology-aware-scheduler, handles only the pods that explicitly request it via the schedulerName field in their spec. This minimizes risk and isolates our custom logic.

    Implementing the Scheduler Plugin in Go

    Now, let's dive into the core implementation. We'll be using the k8s.io/kubernetes/pkg/scheduler/framework package.

    Project Structure:

    text
    /cmd
      /scheduler
        main.go
    /pkg
      /scheduler
        scheduler.go
        state.go
        filter.go
        score.go

    `main.go`: Registering the Plugin

    This is the entrypoint that builds the scheduler command and registers our custom plugin.

    go
    package main
    
    import (
    	"os"
    
    	"k8s.io/component-base/cli"
    	"k8s.io/kubernetes/cmd/kube-scheduler/app"
    
    	"github.com/your-org/topology-scheduler/pkg/scheduler"
    )
    
    func main() {
    	// Register our custom plugin with the scheduler framework's registry.
    	command := app.NewSchedulerCommand(
    		app.WithPlugin(scheduler.Name, scheduler.New),
    	)
    
    	if err := cli.RunNoErrOutput(command); err != nil {
    		os.Exit(1)
    	}
    }

    `scheduler.go`: Plugin Boilerplate

    This file defines our plugin's name and the constructor that satisfies the framework's PluginFactory interface.

    go
    package scheduler
    
    import (
    	"context"
    	"fmt"
    
    	"k8s.io/apimachinery/pkg/runtime"
    	corev1 "k8s.io/api/core/v1"
    	"k8s.io/kubernetes/pkg/scheduler/framework"
    )
    
    const (
    	Name = "GPUTopologyAwareScheduler"
    	AnnotationGPUStates = "gpu-topology.acme.com/gpu-states"
    	AnnotationNVLinkMatrix = "gpu-topology.acme.com/nvlink-matrix"
    )
    
    // GPUTopologyPlugin is our custom plugin.
    type GPUTopologyPlugin struct {
    	handle framework.Handle
    	// We need a shared state to track GPU reservations across extension points.
    	state  *SharedState
    }
    
    // Ensure our plugin implements all the necessary interfaces.
    var _ framework.FilterPlugin = &GPUTopologyPlugin{}
    var _ framework.ScorePlugin = &GPUTopologyPlugin{}
    var _ framework.ReservePlugin = &GPUTopologyPlugin{}
    
    // New creates a new instance of our plugin.
    func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
    	return &GPUTopologyPlugin{
    		handle: h,
    		state:  NewSharedState(),
    	}, nil
    }
    
    func (p *GPUTopologyPlugin) Name() string {
    	return Name
    }
    
    // ... Filter, Score, Reserve methods will go here ...

    `filter.go`: The Core Placement Logic

    This is where the magic happens. The Filter method will reject any node that cannot provide the requested number of GPUs within a fully-connected NVLink clique.

    go
    package scheduler
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    
    	corev1 "k8s.io/api/core/v1"
    	"k8s.io/klog/v2"
    	"k8s.io/kubernetes/pkg/scheduler/framework"
    )
    
    // Filter is the core logic of our scheduler.
    func (p *GPUTopologyPlugin) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    	requestedGPUs, ok := getRequestedGPUs(pod)
    	if !ok || requestedGPUs == 0 {
    		// This pod doesn't request GPUs, so we don't interfere.
    		return framework.NewStatus(framework.Success)
    	}
    
    	node := nodeInfo.Node()
    	if node == nil {
    		return framework.NewStatus(framework.Error, "node not found")
    	}
    
    	// 1. Parse topology and state from annotations.
    	matrix, states, err := parseGPUAnnotations(node.Annotations)
    	if err != nil {
    		klog.Errorf("Error parsing annotations for node %s: %v", node.Name, err)
    		return framework.NewStatus(framework.Error, err.Error())
    	}
    
    	// 2. Find a valid placement.
    	// This is a non-trivial algorithm. We need to find a subgraph (clique)
    	// of `requestedGPUs` size where all nodes (GPUs) are 'free' and
    	// interconnected.
    	placement, err := findNVLinkClique(requestedGPUs, matrix, states)
    	if err != nil {
    		// No suitable clique found on this node.
    		return framework.NewStatus(framework.Unschedulable, err.Error())
    	}
    
    	// 3. Store the potential placement in the cycle state.
    	// This allows other extension points (like Reserve) to access it without recalculating.
    	writeCycleState(cycleState, node.Name, placement)
    	klog.Infof("Found valid GPU placement for pod %s on node %s: GPUs %v", pod.Name, node.Name, placement)
    	
    	return framework.NewStatus(framework.Success)
    }
    
    // findNVLinkClique implements the combinatorial logic to find a set of interconnected, free GPUs.
    // This is a simplified example; a production implementation might use more advanced graph algorithms.
    func findNVLinkClique(required int, matrix [][]int, states []string) ([]int, error) {
    	numGPUs := len(states)
    	if numGPUs == 0 {
    		return nil, fmt.Errorf("no GPUs reported on node")
    	}
    
    	freeGPUs := []int{}
    	for i, state := range states {
    		if state == "free" {
    			freeGPUs = append(freeGPUs, i)
    		}
    	}
    
    	if len(freeGPUs) < required {
    		return nil, fmt.Errorf("not enough free GPUs available")
    	}
    
    	// Iterate through all combinations of `required` size from the `freeGPUs` list.
    	combinations := generateCombinations(freeGPUs, required)
    
    	for _, combo := range combinations {
    		if isClique(combo, matrix) {
    			return combo, nil // Found a valid, fully NVLink-connected clique
    		}
    	}
    
    	return nil, fmt.Errorf("no NVLink-connected clique of size %d found", required)
    }
    
    // isClique checks if all GPUs in a given combination are interconnected.
    func isClique(combo []int, matrix [][]int) bool {
    	if len(combo) < 2 {
    		return true
    	}
    	for i := 0; i < len(combo); i++ {
    		for j := i + 1; j < len(combo); j++ {
    			gpu1 := combo[i]
    			gpu2 := combo[j]
    			if matrix[gpu1][gpu2] == 0 { // Assuming 1 means NVLink connected
    				return false
    			}
    		}
    	}
    	return true
    }
    
    // (Utility functions like getRequestedGPUs, parseGPUAnnotations, generateCombinations, writeCycleState not shown for brevity)

    Edge Case Handling in Filter: The combinatorial complexity of findNVLinkClique can be significant. For a node with 8 GPUs and a request for 4, we check C(8,4) = 70 combinations. For larger nodes (e.g., 16 GPUs), this can become a performance concern. A production implementation should include optimizations, caching of clique results, or more efficient graph search algorithms.

    `score.go`: Ranking Viable Nodes

    If multiple nodes pass the Filter stage, we need to decide which is best. A good strategy is to minimize fragmentation. We prefer the node that, after placing the pod, will be left with the largest possible remaining NVLink clique.

    go
    package scheduler
    
    // Score extension point.
    func (p *GPUTopologyPlugin) Score(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (int64, *framework.Status) {
    	// We will implement a "Least Fragmented" scoring strategy.
    	nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    	if err != nil {
    		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from snapshot: %s", nodeName, err))
    	}
    
    	matrix, states, err := parseGPUAnnotations(nodeInfo.Node().Annotations)
    	if err != nil {
    		return 0, framework.NewStatus(framework.Error, err.Error())
    	}
    
    	placement, err := readCycleState(state, nodeName)
    	if err != nil {
    		// This shouldn't happen if Filter ran correctly.
    		return 0, framework.NewStatus(framework.Error, err.Error())
    	}
    
    	// Create a hypothetical future state of the node after this pod is placed.
    	futureStates := make([]string, len(states))
    	copy(futureStates, states)
    	for _, gpuIndex := range placement {
    		futureStates[gpuIndex] = "allocated"
    	}
    
    	// Find the largest remaining NVLink clique in this future state.
    	largestRemainingClique := 0
    	for i := len(futureStates); i > 0; i-- {
    		clique, _ := findNVLinkClique(i, matrix, futureStates)
    		if clique != nil {
    			largestRemainingClique = i
    			break
    		}
    	}
    
    	// The score is the size of the largest remaining clique.
    	// The framework will prefer nodes with higher scores.
    	return int64(largestRemainingClique), framework.NewStatus(framework.Success)
    }
    
    // We need to implement ScoreExtensions to normalize scores.
    func (p *GPUTopologyPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status {
    	// Find max score and scale all scores to the range [0, 100].
    	// ... implementation for score normalization ...
    	return framework.NewStatus(framework.Success)
    }

    `state.go` and `Reserve`: Handling Concurrency

    This is the most critical part for production stability. The Filter and Score methods are read-only. But what happens when the scheduler decides to place Pod A on Node X using GPUs [0,1,2,3]? Before Pod A is bound, the scheduler might start scheduling Pod B, which also wants 4 GPUs. It will run our Filter logic again on Node X and see that GPUs [0,1,2,3] are still available according to the Node annotation, because Pod A hasn't been bound yet and the annotation hasn't been updated.

    The Reserve extension point solves this. It's called after a node is chosen but before the pod is bound.

    go
    package scheduler
    
    import (
    	"context"
    	"sync"
    	corev1 "k8s.io/api/core/v1"
    	"k8s.io/kubernetes/pkg/scheduler/framework"
    )
    
    // SharedState holds in-memory reservations.
    type SharedState struct {
    	mu sync.Mutex
    	// map[nodeName]map[podUID]reservedGPUs
    	reservations map[string]map[string][]int
    }
    
    func NewSharedState() *SharedState {
    	return &SharedState{
    		reservations: make(map[string]map[string][]int),
    	}
    }
    
    // Reserve is called after Score and before Bind.
    func (p *GPUTopologyPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
    	placement, err := readCycleState(state, nodeName)
    	if err != nil {
    		return framework.NewStatus(framework.Error, err.Error())
    	}
    
    	p.state.mu.Lock()
    	defer p.state.mu.Unlock()
    
    	if _, ok := p.state.reservations[nodeName]; !ok {
    		p.state.reservations[nodeName] = make(map[string][]int)
    	}
    	p.state.reservations[nodeName][string(pod.UID)] = placement
    
    	// Here, we would also trigger an asynchronous update to the Node annotation.
    	// This ensures that if our scheduler restarts, it can recover the state.
    	// The update logic is complex: it must use optimistic locking (ResourceVersion)
    	// and handle conflicts gracefully.
    	go p.updateNodeAnnotation(nodeName, pod.UID, placement)
    
    	return framework.NewStatus(framework.Success)
    }
    
    // Unreserve is called if a later plugin fails or binding fails.
    func (p *GPUTopologyPlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) {
    	p.state.mu.Lock()
    	defer p.state.mu.Unlock()
    
    	if nodeReservations, ok := p.state.reservations[nodeName]; ok {
    		delete(nodeReservations, string(pod.UID))
    	}
    
    	// Also trigger an annotation update to release the GPUs.
    	go p.releaseNodeAnnotation(nodeName, pod.UID)
    }

    The actual implementation of updateNodeAnnotation is non-trivial. It needs a client-go clientset to patch the node, and it must handle retries and conflicts since other controllers might be updating the node's annotations simultaneously.

    Production Deployment and Configuration

  • Containerize: Build the Go binary and package it into a minimal container image.
  • Configuration (KubeSchedulerConfiguration): Define a config file that enables our plugin and disables the default resource fitting plugins for GPUs to avoid conflicts.
  • yaml
        apiVersion: kubescheduler.config.k8s.io/v1
        kind: KubeSchedulerConfiguration
        leaderElection:
          leaderElect: true
        profiles:
          - schedulerName: topology-aware-scheduler
            plugins:
              filter:
                enabled:
                  - name: GPUTopologyAwareScheduler
              score:
                enabled:
                  - name: GPUTopologyAwareScheduler
              reserve:
                enabled:
                  - name: GPUTopologyAwareScheduler
            pluginConfig:
              - name: GPUTopologyAwareScheduler
                args: {}
  • Deployment: Deploy the scheduler as a Deployment in the kube-system namespace, mounting the configuration file from a ConfigMap. It will need a ClusterRole with permissions to get/list/watch Nodes and Pods, and to update/patch Pod status and Node annotations.
  • Usage: ML engineers can now opt-in to this scheduler.
  • yaml
        apiVersion: v1
        kind: Pod
        metadata:
          name: distributed-training-job-1
        spec:
          schedulerName: topology-aware-scheduler # Opt-in!
          containers:
          - name: trainer
            image: my-ml-image:latest
            resources:
              limits:
                nvidia.com/gpu: 4 # Request 4 GPUs

    Advanced Consideration: Gang Scheduling for Multi-Node Training

    Our current scheduler brilliantly handles single-node, multi-GPU jobs. But what about a massive training job that needs, for example, 4 pods of 8 GPUs each?

    This requires gang scheduling: all 32 GPUs across 4 nodes must be secured before any of the pods start running. If we only secure 3 nodes and the 4th isn't available, the first 3 sit idle, wasting expensive resources.

    The Scheduling Framework provides the Permit extension point for this. The workflow is:

  • A pod arrives at the Permit phase.
  • Our plugin checks if this pod belongs to a group (e.g., via a pod-group: my-job-id label).
  • It checks an internal or CRD-based state to see how many pods from this group have arrived at the Permit phase.
  • If the group is not yet complete (e.g., only 3 of 4 pods are ready), the plugin returns a Wait status. The pod is held in an internal "unschedulable" queue.
  • When the final pod of the group arrives, the plugin's logic sees the group is complete. It then iterates through all waiting pods for that group and tells the framework to Allow them, releasing them all to the Bind phase at once.
  • This requires a more complex state management system than our simple in-memory map and is often implemented using a custom controller and a PodGroup CRD.

    Conclusion

    By moving beyond the default Kubernetes scheduler, we can transform Kubernetes from a generic container orchestrator into a high-performance computing platform truly optimized for specialized hardware. Building a custom scheduler plugin for GPU topology awareness is a prime example of extending the Kubernetes control plane to solve a specific, high-value problem. While the implementation involves delving into complex areas like combinatorial algorithms, concurrent state management, and the scheduler's internal lifecycle, the payoff is immense: predictable high performance, efficient resource utilization, and faster training times for critical ML workloads. This pattern of targeted extension is a hallmark of advanced Kubernetes engineering, demonstrating how its flexible architecture can be adapted to nearly any technical domain.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles