Custom Kubernetes Schedulers for Stateful, Data-Intensive Workloads

17 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Limits of `kube-scheduler` for Specialized Workloads

The default Kubernetes scheduler, kube-scheduler, is remarkably effective for stateless applications. Its two-phase process—Filtering (Predicates) and Scoring (Priorities)—efficiently places pods across a cluster based on resource requests, affinity/anti-affinity rules, and topology spread constraints. However, for senior engineers managing complex stateful systems like distributed databases (Cassandra, Vitess), stream processors (Kafka), or large-scale ML training jobs, the limitations of this general-purpose approach become a critical production bottleneck.

These advanced workloads introduce scheduling requirements that transcend standard primitives:

  • Strict Data Locality: When using local storage provisioners (e.g., OpenEBS, TopoLVM, or even raw hostPath volumes in specific scenarios), a pod is inextricably tied to the node holding its data. If that pod is rescheduled, it must land on the exact same node to remount its PersistentVolume. The default scheduler has no inherent, foolproof mechanism to guarantee this, especially during complex recovery scenarios.
  • Fine-Grained Resource Topology: A pod might require not just a GPU, but a specific GPU that is on the same NUMA node as its CPU cores and connected to a specific high-speed network interface. While Kubernetes' TopologyManager helps, a custom scheduler can enforce even more granular policies, ensuring that a set of pods for a high-performance computing (HPC) task are all placed on nodes with an identical hardware topology to prevent performance degradation from cross-NUMA memory access.
  • Complex Co-location & Gang Scheduling: Standard pod affinity is often insufficient. Consider a scenario where you need to co-locate a group of pods on the same physical rack (for low-latency communication) but ensure they are on different physical hosts within that rack (for high availability). This requires awareness of custom node labels representing physical infrastructure (topology.corp.com/rack-id). Furthermore, gang scheduling—where a group of pods must be scheduled simultaneously or not at all—is a concept entirely foreign to the default scheduler's one-pod-at-a-time design.
  • Dynamic Scheduling Based on Custom Metrics: The default scheduler primarily considers resource requests and limits. A more intelligent scheduler might need to make decisions based on real-time metrics from a monitoring system like Prometheus. For example, placing a new video transcoding pod on the node with the lowest current GPU utilization and network I/O, not just the one with available GPU slots.
  • When these requirements become non-negotiable for performance and reliability, it's time to architect a custom scheduling solution. We will explore the two primary paths: the minimally invasive Scheduler Extender and the all-powerful Full Custom Scheduler.


    Architectural Decision: Extender vs. Full Custom Scheduler

    Choosing the right approach is a critical architectural decision with significant implications for development complexity, operational overhead, and flexibility.

    The Scheduler Extender: A Webhook-Based Approach

    A Scheduler Extender allows you to augment the default scheduler's logic via webhooks without replacing it. The kube-scheduler remains in control but calls out to your external HTTP/S endpoint at specific points in its decision-making process.

    How it Works:

  • You configure kube-scheduler with a policy file pointing to your extender's endpoints.
  • Filter: For each pod, the scheduler first runs its own internal predicates. If a node passes, it then calls your extender's /filter endpoint, sending the pod and the candidate node. Your extender replies with a simple pass/fail.
  • Prioritize: After filtering, the scheduler calls your extender's /prioritize endpoint with the pod and the list of all filtered nodes. Your extender returns a list of scores (0-10) for each node, which are added to the scores from the scheduler's internal priority functions.
  • (Optional) Preempt: The extender can influence preemption decisions.
  • (Optional) Bind: Your extender can take over the final binding process, though this is less common.
  • Pros:

    * Reduced Complexity: You don't need to reinvent the wheel. You leverage the default scheduler's robust, battle-tested core: its caching mechanisms (informers), handling of pod queues, and standard predicate checks.

    * Lower Maintenance: You are only responsible for the specialized logic in your webhook, not the entire scheduling pipeline.

    Cons:

    Limited Control: You can only influence* the decision; you cannot fundamentally change the scheduling algorithm or implement concepts like gang scheduling.

    * Performance Overhead: Every scheduling decision for a relevant pod incurs at least one, and often two, round-trip HTTP calls. High latency in your extender can slow down scheduling for the entire cluster.

    * Statelessness: Extenders are best suited for stateless logic. While you can maintain state in your extender service, it's not as tightly integrated as a full scheduler's internal cache.

    The Full Custom Scheduler: Ultimate Control

    A full custom scheduler is a completely separate control plane component that runs alongside (or in place of) kube-scheduler. It watches the API server for pods that have designated it as their scheduler and takes full responsibility for placing them.

    How it Works:

  • A pod is created with spec.schedulerName: my-custom-scheduler.
  • The default kube-scheduler ignores this pod.
    • Your custom scheduler, running as a Deployment in the cluster, is watching for unscheduled pods assigned to it.
    • When it sees such a pod, it executes its own internal filtering and scoring logic, which can be arbitrarily complex.
  • Once it chooses a node, it creates a Binding object and POSTs it to the Kubernetes API, which tells the kubelet on the target node to start the pod.
  • Pros:

    * Total Flexibility: You control every aspect of the scheduling logic. You can implement stateful algorithms, gang scheduling, custom resource bin-packing, and integrate with any external system.

    * High Performance: With an efficient implementation using client-go informers, a custom scheduler can make decisions with very low latency as all cluster state is held in its local memory cache.

    Cons:

    * Maximum Complexity: You are now responsible for everything: maintaining a consistent cache of the cluster state (pods, nodes, PVs, etc.), handling race conditions, ensuring high availability via leader election, and implementing preemption if needed.

    * Higher Development & Maintenance Cost: This is a significant piece of infrastructure that requires deep Kubernetes internals knowledge to build and operate reliably.

    Decision Heuristic: Start with a Scheduler Extender if your logic can be expressed as a simple filter/score function (e.g., checking for data locality). Graduate to a Full Custom Scheduler only when you need to fundamentally alter the scheduling workflow (e.g., gang scheduling) or require a highly stateful, performance-critical scheduling algorithm.


    Production Pattern: Building an Extender for Data Locality

    Let's tackle a common and critical problem: ensuring a pod using a LocalPersistentVolume is always scheduled back to the node where its data resides.

    Scenario: We have a StatefulSet using a StorageClass that provisions LocalPersistentVolumes. When a node reboots, the pod is evicted. The Kubernetes controller manager will recreate the pod, but the default scheduler has no strict guarantee it will place the new pod on the original node. Our extender will enforce this.

    When a LocalPersistentVolume is created and bound to a PVC, Kubernetes adds a node affinity annotation to the PersistentVolume object: volume.kubernetes.io/selected-node. This is our key. Our extender's filter logic will be: "If a pod uses a PVC, find its PV. If that PV has the selected-node annotation, then only allow the pod to be scheduled on that specific node."

    1. The Extender Go Implementation

    We'll build a simple Go HTTP server. For production, you would use a robust web framework, but net/http is sufficient for this example.

    go
    // main.go
    package main
    
    import (
    	"bytes"
    	"encoding/json"
    	"fmt"
    	"io"
    	"log"
    	"net/http"
    
    	kubeinformers "k8s.io/client-go/informers"
    	kubeclientset "k8s.io/client-go/kubernetes"
    	kubelisters "k8s.io/client-go/listers/core/v1"
    	"k8s.io/client-go/tools/cache"
    	"k8s.io/client-go/tools/clientcmd"
    	"k8s.io/kube-scheduler/extender/v1"
    
    	corev1 "k8s.io/api/core/v1"
    )
    
    const (
    	localPVAnnKey = "volume.kubernetes.io/selected-node"
    )
    
    // SchedulerExtender holds the necessary clients and listers.
    type SchedulerExtender struct {
    	pvLister  kubelisters.PersistentVolumeLister
    	pvcLister kubelisters.PersistentVolumeClaimLister
    }
    
    // Filter is the webhook handler for the filter verb.
    func (se *SchedulerExtender) Filter(w http.ResponseWriter, r *http.Request) {
    	var args v1.ExtenderArgs
    	body, err := io.ReadAll(r.Body)
    	if err != nil {
    		http.Error(w, "Failed to read request body", http.StatusBadRequest)
    		return
    	}
    
    	if err := json.Unmarshal(body, &args); err != nil {
    		http.Error(w, "Failed to unmarshal request", http.StatusBadRequest)
    		return
    	}
    
    	pod := args.Pod
    	nodes := args.Nodes.Items
    
    	// The core logic starts here.
    	targetNode := se.findNodeForPod(pod)
    	var filteredNodes []corev1.Node
    
    	if targetNode != "" {
    		// If a target node is identified, filter the list to only that node.
    		for _, node := range nodes {
    			if node.Name == targetNode {
    				filteredNodes = append(filteredNodes, node)
    				break
    			}
    		}
    	} else {
    		// If no target node, the pod doesn't have a local PV, so it can go anywhere.
    		filteredNodes = nodes
    	}
    
    	resp := v1.ExtenderFilterResult{
    		Nodes: &corev1.NodeList{
    			Items: filteredNodes,
    		},
    	}
    
    	respBody, err := json.Marshal(resp)
    	if err != nil {
    		http.Error(w, "Failed to marshal response", http.StatusInternalServerError)
    		return
    	}
    
    	w.Header().Set("Content-Type", "application/json")
    	w.WriteHeader(http.StatusOK)
    	_, _ = w.Write(respBody)
    }
    
    // findNodeForPod contains the business logic to identify the target node.
    func (se *SchedulerExtender) findNodeForPod(pod *corev1.Pod) string {
    	for _, vol := range pod.Spec.Volumes {
    		if vol.PersistentVolumeClaim == nil {
    			continue
    		}
    
    		pvcName := vol.PersistentVolumeClaim.ClaimName
    		pvc, err := se.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
    		if err != nil {
    			log.Printf("WARN: Failed to get PVC %s/%s: %v", pod.Namespace, pvcName, err)
    			continue // Or handle error more robustly
    		}
    
    		pvName := pvc.Spec.VolumeName
    		if pvName == "" {
    			continue
    		}
    
    		pv, err := se.pvLister.Get(pvName)
    		if err != nil {
    			log.Printf("WARN: Failed to get PV %s: %v", pvName, err)
    			continue
    		}
    
    		if node, ok := pv.Annotations[localPVAnnKey]; ok {
    			log.Printf("INFO: Pod %s/%s requires node %s for its local PV %s", pod.Namespace, pod.Name, node, pv.Name)
    			return node
    		}
    	}
    	return ""
    }
    
    func main() {
    	// --- Kubernetes Client-Go Setup ---
    	// This is crucial for performance. Instead of hitting the API server for every PV/PVC lookup,
    	// we use a shared informer to maintain a local cache.
    	config, err := clientcmd.BuildConfigFromFlags("", "") // In-cluster config
    	if err != nil {
    		log.Fatalf("Failed to build config: %v", err)
    	}
    	clientset, err := kubeclientset.NewForConfig(config)
    	if err != nil {
    		log.Fatalf("Failed to create clientset: %v", err)
    	}
    
    	factory := kubeinformers.NewSharedInformerFactory(clientset, 0) // 0 = default resync period
    	pvLister := factory.Core().V1().PersistentVolumes().Lister()
    	pvcLister := factory.Core().V1().PersistentVolumeClaims().Lister()
    
    	stopCh := make(chan struct{})
    	defer close(stopCh)
    	factory.Start(stopCh)
    
    	// Wait for caches to sync
    	if !cache.WaitForCacheSync(stopCh, factory.Core().V1().PersistentVolumes().Informer().HasSynced) {
    		log.Fatal("Failed to sync PV cache")
    	}
    	if !cache.WaitForCacheSync(stopCh, factory.Core().V1().PersistentVolumeClaims().Informer().HasSynced) {
    		log.Fatal("Failed to sync PVC cache")
    	}
    
    	// --- HTTP Server Setup ---
    	se := &SchedulerExtender{
    		pvLister:  pvLister,
    		pvcLister: pvcLister,
    	}
    
    	http.HandleFunc("/filter", se.Filter)
    	// You would also implement /prioritize, /preempt, etc. here if needed.
    
    	log.Println("INFO: Starting scheduler extender server on :8888")
    	if err := http.ListenAndServe(":8888", nil); err != nil {
    		log.Fatalf("Failed to start server: %v", err)
    	}
    }

    2. Deployment and Configuration

    First, you need to package this Go application into a Docker image and push it to a registry. Then, deploy it to your cluster.

    yaml
    # extender-deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: local-pv-extender
      namespace: kube-system
    spec:
      replicas: 2 # For HA
      selector:
        matchLabels:
          app: local-pv-extender
      template:
        metadata:
          labels:
            app: local-pv-extender
        spec:
          # RBAC is critical. The extender needs to read PVs and PVCs.
          serviceAccountName: extender-sa
          containers:
          - name: extender
            image: your-registry/local-pv-extender:v1.0.0
            ports:
            - containerPort: 8888
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: local-pv-extender-svc
      namespace: kube-system
    spec:
      ports:
      - port: 80
        targetPort: 8888
      selector:
        app: local-pv-extender
    ---
    # RBAC configuration
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: extender-sa
      namespace: kube-system
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRole
    metadata:
      name: pv-pvc-reader
    rules:
    - apiGroups: [""]
      resources: ["persistentvolumes", "persistentvolumeclaims"]
      verbs: ["get", "list", "watch"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: extender-pv-pvc-reader-binding
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: ClusterRole
      name: pv-pvc-reader
    subjects:
    - kind: ServiceAccount
      name: extender-sa
      namespace: kube-system

    Finally, and this is the most critical part, you must configure kube-scheduler to use this extender. This is typically done by modifying its static pod manifest on the master nodes (/etc/kubernetes/manifests/kube-scheduler.yaml).

    Create a policy file:

    json
    // scheduler-policy.json
    {
      "kind": "Policy",
      "apiVersion": "v1",
      "extenders": [
        {
          "urlPrefix": "http://local-pv-extender-svc.kube-system.svc.cluster.local",
          "filterVerb": "filter",
          "enableHTTPS": false,
          "ignorable": false
        }
      ]
    }

    Mount this file into the kube-scheduler pod and add the --config flag:

    yaml
    # In /etc/kubernetes/manifests/kube-scheduler.yaml
    apiVersion: v1
    kind: Pod
    metadata:
      # ...
      name: kube-scheduler
      namespace: kube-system
    spec:
      containers:
      - command:
        - kube-scheduler
        - --authentication-kubeconfig=/etc/kubernetes/scheduler.conf
        - --authorization-kubeconfig=/etc/kubernetes/scheduler.conf
        # ... other flags
        - --config=/etc/kubernetes/scheduler-policy.json # <-- ADD THIS
        # ...
        volumeMounts:
        # ...
        - name: scheduler-policy-config
          mountPath: /etc/kubernetes/scheduler-policy.json
          readOnly: true
      volumes:
      # ...
      - name: scheduler-policy-config
        hostPath:
          path: /etc/kubernetes/config/scheduler-policy.json #<-- Path where you place the file on the host
          type: File

    Performance Edge Case: The use of client-go informers is non-negotiable here. A naive implementation that creates a new clientset and queries the API server on every /filter request would cripple your cluster's scheduling throughput. The informer maintains a local, in-memory cache that is kept in sync with the API server, allowing for sub-millisecond lookups of PVs and PVCs.


    Advanced Implementation: A Full Custom Scheduler for ML Workloads

    Now for the more complex scenario. We need to schedule groups of interdependent ML training pods (e.g., a parameter server and several workers) with two strict requirements:

  • All pods in a training job must land on nodes within the same high-speed network fabric (identified by a node label topology.corp.com/rack-id).
    • The scheduler should attempt to balance the number of training jobs across the available racks.

    This requires stateful, group-aware logic that is impossible for an extender. We need a full custom scheduler.

    1. The Custom Scheduler Go Implementation

    This implementation is more involved. It will run in a loop, watching for pods assigned to it, and making binding decisions.

    go
    // main.go
    package main
    
    import (
    	"context"
    	"fmt"
    	"log"
    	"math/rand"
    	"os"
    	"time"
    
    	corev1 "k8s.io/api/core/v1"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/apimachinery/pkg/labels"
    	"k8s.io/client-go/informers"
    	"k8s.io/client-go/kubernetes"
    	corelisters "k8s.io/client-go/listers/core/v1"
    	"k8s.io/client-go/tools/cache"
    	"k8s.io/client-go/tools/clientcmd"
    	"k8s.io/client-go/tools/leaderelection"
    	"k8s.io/client-go/tools/leaderelection/resourcelock"
    )
    
    const (
    	SchedulerName    = "gpu-fabric-scheduler"
    	JobGroupAnnotation = "gpu-fabric-scheduler.io/job-group"
    	RackLabel        = "topology.corp.com/rack-id"
    )
    
    // Scheduler encapsulates all the components needed to run.
    type Scheduler struct {
    	clientset  *kubernetes.Clientset
    	podLister  corelisters.PodLister
    	nodeLister corelisters.NodeLister
    	podQueue   chan *corev1.Pod
    }
    
    // NewScheduler creates a new scheduler instance.
    func NewScheduler(clientset *kubernetes.Clientset, factory informers.SharedInformerFactory) *Scheduler {
    	podInformer := factory.Core().V1().Pods()
    	nodeInformer := factory.Core().V1().Nodes()
    
    	s := &Scheduler{
    		clientset:  clientset,
    		podLister:  podInformer.Lister(),
    		nodeLister: nodeInformer.Lister(),
    		podQueue:   make(chan *corev1.Pod, 300),
    	}
    
    	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc: func(obj interface{}) {
    			p := obj.(*corev1.Pod)
    			// Only queue pods that are assigned to this scheduler and are not yet scheduled.
    			if p.Spec.SchedulerName == SchedulerName && p.Spec.NodeName == "" {
    				s.podQueue <- p
    			}
    		},
    	})
    
    	return s
    }
    
    // Run starts the scheduler's main loop.
    func (s *Scheduler) Run(ctx context.Context) {
    	log.Printf("INFO: Starting custom scheduler %s", SchedulerName)
    	for {
    		select {
    		case pod := <-s.podQueue:
    			log.Printf("INFO: Scheduling pod %s/%s", pod.Namespace, pod.Name)
    			nodeName, err := s.findBestNode(pod)
    			if err != nil {
    				log.Printf("ERROR: Failed to find node for pod %s/%s: %v", pod.Namespace, pod.Name, err)
    				// In a real implementation, you would re-queue the pod with backoff.
    				continue
    			}
    			if err := s.bindPodToNode(ctx, pod, nodeName); err != nil {
    				log.Printf("ERROR: Failed to bind pod %s/%s to node %s: %v", pod.Namespace, pod.Name, nodeName, err)
    			}
    		case <-ctx.Done():
    			log.Println("INFO: Shutting down scheduler")
    			return
    		}
    	}
    }
    
    // findBestNode implements the core scheduling logic.
    func (s *Scheduler) findBestNode(pod *corev1.Pod) (string, error) {
    	jobGroup, ok := pod.Annotations[JobGroupAnnotation]
    	if !ok {
    		return "", fmt.Errorf("pod %s/%s is missing required annotation: %s", pod.Namespace, pod.Name, JobGroupAnnotation)
    	}
    
    	// 1. Check if other pods from this job group are already scheduled.
    	podsInGroup, err := s.podLister.Pods(pod.Namespace).List(labels.SelectorFromSet(labels.Set{
    		JobGroupAnnotation: jobGroup,
    	}))
    	if err != nil {
    		return "", fmt.Errorf("failed to list pods for job group %s: %v", jobGroup, err)
    	}
    
    	var targetRack string
    	for _, p := range podsInGroup {
    		if p.Spec.NodeName != "" {
    			node, err := s.nodeLister.Get(p.Spec.NodeName)
    			if err != nil {
    				log.Printf("WARN: Failed to get node %s for existing pod %s/%s: %v", p.Spec.NodeName, p.Namespace, p.Name, err)
    				continue
    			}
    			if rack, found := node.Labels[RackLabel]; found {
    				targetRack = rack
    				log.Printf("INFO: Job group '%s' is already on rack '%s'. Enforcing co-location.", jobGroup, targetRack)
    				break
    			}
    		}
    	}
    
    	// 2. Get all schedulable nodes.
    	allNodes, err := s.nodeLister.List(labels.Everything())
    	if err != nil {
    		return "", fmt.Errorf("failed to list nodes: %v", err)
    	}
    
    	// 3. Filter nodes.
    	var feasibleNodes []*corev1.Node
    	if targetRack != "" {
    		// Co-location case: only consider nodes in the target rack.
    		for _, node := range allNodes {
    			if node.Labels[RackLabel] == targetRack {
    				feasibleNodes = append(feasibleNodes, node)
    			}
    		}
    	} else {
    		// First pod in group case: all nodes are potentially feasible.
    		feasibleNodes = allNodes
    	}
    
    	if len(feasibleNodes) == 0 {
    		return "", fmt.Errorf("no feasible nodes found for pod %s/%s (rack requirement: %s)", pod.Namespace, pod.Name, targetRack)
    	}
    
    	// 4. Score nodes (simple random choice for this example; production would be more complex).
    	// A better scoring model would balance jobs across racks.
    	return feasibleNodes[rand.Intn(len(feasibleNodes))].Name, nil
    }
    
    // bindPodToNode performs the binding action.
    func (s *Scheduler) bindPodToNode(ctx context.Context, pod *corev1.Pod, nodeName string) error {
    	log.Printf("INFO: Binding pod %s/%s to node %s", pod.Namespace, pod.Name, nodeName)
    	binding := &corev1.Binding{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      pod.Name,
    			Namespace: pod.Namespace,
    		},
    		Target: corev1.ObjectReference{
    			Kind: "Node",
    			Name: nodeName,
    		},
    	}
    
    	return s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
    }
    
    // main function with Leader Election for High Availability
    func main() {
    	// ... (standard client-go config setup as in the extender example)
    	config, err := clientcmd.BuildConfigFromFlags("", "")
    	if err != nil { log.Fatal(err) }
    	clientset, err := kubernetes.NewForConfig(config)
    	if err != nil { log.Fatal(err) }
    
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    
    	// Leader election setup
    	lockID := "gpu-fabric-scheduler-lock"
    	ns := os.Getenv("POD_NAMESPACE") // Get namespace from Downward API
    	if ns == "" {
    		ns = "kube-system"
    		log.Printf("WARN: POD_NAMESPACE env var not set, defaulting to %s", ns)
    	}
    	
    	lock := &resourcelock.LeaseLock{
    		LeaseMeta: metav1.ObjectMeta{
    			Name:      lockID,
    			Namespace: ns,
    		},
    		Client: clientset.CoordinationV1(),
    		LockConfig: resourcelock.ResourceLockConfig{
    			Identity: os.Getenv("POD_NAME"), // Get pod name from Downward API
    		},
    	}
    
    	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    		Lock:          lock,
    		ReleaseOnCancel: true,
    		LeaseDuration:   15 * time.Second,
    		RenewDeadline:   10 * time.Second,
    		RetryPeriod:     2 * time.Second,
    		Callbacks: leaderelection.LeaderCallbacks{
    			OnStartedLeading: func(ctx context.Context) {
    				log.Println("INFO: Became leader, starting scheduler.")
    				factory := informers.NewSharedInformerFactory(clientset, 0)
    				scheduler := NewScheduler(clientset, factory)
    				factory.Start(ctx.Done())
    				if !cache.WaitForCacheSync(ctx.Done(), factory.Core().V1().Pods().Informer().HasSynced, factory.Core().V1().Nodes().Informer().HasSynced) {
    					log.Fatal("Failed to sync caches")
    				}
    				scheduler.Run(ctx)
    			},
    			OnStoppedLeading: func() {
    				log.Fatal("FATAL: Lost leader election.")
    			},
    		},
    	})
    }

    2. Deployment and Usage

    This scheduler is deployed as a standard Deployment in the cluster. It requires RBAC to get/list/watch pods and nodes, and to create Binding objects. Critically, it also needs permissions for leader election (Leases in the coordination.k8s.io API group).

    To use it, you would submit your pod YAML with the custom schedulerName and annotations:

    yaml
    # ml-job.yaml
    apiVersion: v1
    kind: Pod
    metadata:
      name: ps-0
      namespace: ml-jobs
      annotations:
        gpu-fabric-scheduler.io/job-group: "resnet50-trial-123"
    spec:
      schedulerName: gpu-fabric-scheduler
      containers:
      - name: parameter-server
        image: my-ml-image:ps
    ---
    apiVersion: v1
    kind: Pod
    metadata:
      name: worker-0
      namespace: ml-jobs
      annotations:
        gpu-fabric-scheduler.io/job-group: "resnet50-trial-123"
    spec:
      schedulerName: gpu-fabric-scheduler
      containers:
      - name: worker
        image: my-ml-image:worker

    3. Critical Considerations: HA and Race Conditions

    * High Availability: A single instance of a custom scheduler is a single point of failure. The example code demonstrates the non-negotiable pattern for HA: leader election. By running multiple replicas of the scheduler deployment and wrapping the main logic in leaderelection.RunOrDie, you ensure that only one instance (the leader) is actively scheduling pods at any given time. If the leader fails, another replica will acquire the LeaseLock and take over.

    * Race Conditions: What if the state of the cluster changes between when you make a scheduling decision and when you issue the Bind? For example, your scheduler picks Node A, but before the Bind API call completes, another pod is scheduled on Node A, consuming the last available resources. Your Bind call will fail. A robust scheduler must handle this failure gracefully, typically by putting the pod back into its work queue to be retried later. The API server provides the ultimate source of truth and atomicity.

    * Preemption: This example doesn't implement preemption. A full-featured custom scheduler would need to. This involves a much more complex logic loop: if a high-priority pod cannot be scheduled, the scheduler must identify nodes where running, lower-priority pods could be evicted to make room. It would then need the RBAC permissions to delete those pods and then attempt to bind the high-priority pod once the resources are freed. This is a highly advanced feature that requires careful implementation to avoid cluster instability.

    Conclusion: A Tool of Last Resort

    Customizing the Kubernetes scheduler is one of the most powerful ways to tailor a cluster to a specific, demanding workload. It allows you to encode complex business and performance logic directly into your infrastructure's control plane.

    However, this power comes with significant complexity and operational responsibility. Before embarking on building a custom scheduler, always exhaust the capabilities of the standard primitives:

    * Can your problem be solved with advanced nodeAffinity, podAffinity, and podAntiAffinity rules?

    * Can topologySpreadConstraints achieve the distribution you need?

    * Can taints and tolerations isolate workloads effectively?

    If the answer to these questions is no, then the patterns discussed here provide a roadmap. Start with an extender for simple, stateless filtering and scoring. Only when you need absolute control over the scheduling workflow, stateful logic, or gang scheduling should you take on the challenge of building and maintaining a full custom scheduler. When you do, rely heavily on the patterns established by client-go, especially informers for performance and leaderelection for high availability.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles