Custom Kubernetes Schedulers for Stateful, Data-Intensive Workloads
The Limits of `kube-scheduler` for Specialized Workloads
The default Kubernetes scheduler, kube-scheduler, is remarkably effective for stateless applications. Its two-phase process—Filtering (Predicates) and Scoring (Priorities)—efficiently places pods across a cluster based on resource requests, affinity/anti-affinity rules, and topology spread constraints. However, for senior engineers managing complex stateful systems like distributed databases (Cassandra, Vitess), stream processors (Kafka), or large-scale ML training jobs, the limitations of this general-purpose approach become a critical production bottleneck.
These advanced workloads introduce scheduling requirements that transcend standard primitives:
hostPath volumes in specific scenarios), a pod is inextricably tied to the node holding its data. If that pod is rescheduled, it must land on the exact same node to remount its PersistentVolume. The default scheduler has no inherent, foolproof mechanism to guarantee this, especially during complex recovery scenarios.TopologyManager helps, a custom scheduler can enforce even more granular policies, ensuring that a set of pods for a high-performance computing (HPC) task are all placed on nodes with an identical hardware topology to prevent performance degradation from cross-NUMA memory access.topology.corp.com/rack-id). Furthermore, gang scheduling—where a group of pods must be scheduled simultaneously or not at all—is a concept entirely foreign to the default scheduler's one-pod-at-a-time design.When these requirements become non-negotiable for performance and reliability, it's time to architect a custom scheduling solution. We will explore the two primary paths: the minimally invasive Scheduler Extender and the all-powerful Full Custom Scheduler.
Architectural Decision: Extender vs. Full Custom Scheduler
Choosing the right approach is a critical architectural decision with significant implications for development complexity, operational overhead, and flexibility.
The Scheduler Extender: A Webhook-Based Approach
A Scheduler Extender allows you to augment the default scheduler's logic via webhooks without replacing it. The kube-scheduler remains in control but calls out to your external HTTP/S endpoint at specific points in its decision-making process.
How it Works:
kube-scheduler with a policy file pointing to your extender's endpoints./filter endpoint, sending the pod and the candidate node. Your extender replies with a simple pass/fail./prioritize endpoint with the pod and the list of all filtered nodes. Your extender returns a list of scores (0-10) for each node, which are added to the scores from the scheduler's internal priority functions.Pros:
* Reduced Complexity: You don't need to reinvent the wheel. You leverage the default scheduler's robust, battle-tested core: its caching mechanisms (informers), handling of pod queues, and standard predicate checks.
* Lower Maintenance: You are only responsible for the specialized logic in your webhook, not the entire scheduling pipeline.
Cons:
Limited Control: You can only influence* the decision; you cannot fundamentally change the scheduling algorithm or implement concepts like gang scheduling.
* Performance Overhead: Every scheduling decision for a relevant pod incurs at least one, and often two, round-trip HTTP calls. High latency in your extender can slow down scheduling for the entire cluster.
* Statelessness: Extenders are best suited for stateless logic. While you can maintain state in your extender service, it's not as tightly integrated as a full scheduler's internal cache.
The Full Custom Scheduler: Ultimate Control
A full custom scheduler is a completely separate control plane component that runs alongside (or in place of) kube-scheduler. It watches the API server for pods that have designated it as their scheduler and takes full responsibility for placing them.
How it Works:
spec.schedulerName: my-custom-scheduler.kube-scheduler ignores this pod.- Your custom scheduler, running as a Deployment in the cluster, is watching for unscheduled pods assigned to it.
- When it sees such a pod, it executes its own internal filtering and scoring logic, which can be arbitrarily complex.
Binding object and POSTs it to the Kubernetes API, which tells the kubelet on the target node to start the pod.Pros:
* Total Flexibility: You control every aspect of the scheduling logic. You can implement stateful algorithms, gang scheduling, custom resource bin-packing, and integrate with any external system.
* High Performance: With an efficient implementation using client-go informers, a custom scheduler can make decisions with very low latency as all cluster state is held in its local memory cache.
Cons:
* Maximum Complexity: You are now responsible for everything: maintaining a consistent cache of the cluster state (pods, nodes, PVs, etc.), handling race conditions, ensuring high availability via leader election, and implementing preemption if needed.
* Higher Development & Maintenance Cost: This is a significant piece of infrastructure that requires deep Kubernetes internals knowledge to build and operate reliably.
Decision Heuristic: Start with a Scheduler Extender if your logic can be expressed as a simple filter/score function (e.g., checking for data locality). Graduate to a Full Custom Scheduler only when you need to fundamentally alter the scheduling workflow (e.g., gang scheduling) or require a highly stateful, performance-critical scheduling algorithm.
Production Pattern: Building an Extender for Data Locality
Let's tackle a common and critical problem: ensuring a pod using a LocalPersistentVolume is always scheduled back to the node where its data resides.
Scenario: We have a StatefulSet using a StorageClass that provisions LocalPersistentVolumes. When a node reboots, the pod is evicted. The Kubernetes controller manager will recreate the pod, but the default scheduler has no strict guarantee it will place the new pod on the original node. Our extender will enforce this.
When a LocalPersistentVolume is created and bound to a PVC, Kubernetes adds a node affinity annotation to the PersistentVolume object: volume.kubernetes.io/selected-node. This is our key. Our extender's filter logic will be: "If a pod uses a PVC, find its PV. If that PV has the selected-node annotation, then only allow the pod to be scheduled on that specific node."
1. The Extender Go Implementation
We'll build a simple Go HTTP server. For production, you would use a robust web framework, but net/http is sufficient for this example.
// main.go
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
kubelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kube-scheduler/extender/v1"
corev1 "k8s.io/api/core/v1"
)
const (
localPVAnnKey = "volume.kubernetes.io/selected-node"
)
// SchedulerExtender holds the necessary clients and listers.
type SchedulerExtender struct {
pvLister kubelisters.PersistentVolumeLister
pvcLister kubelisters.PersistentVolumeClaimLister
}
// Filter is the webhook handler for the filter verb.
func (se *SchedulerExtender) Filter(w http.ResponseWriter, r *http.Request) {
var args v1.ExtenderArgs
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
if err := json.Unmarshal(body, &args); err != nil {
http.Error(w, "Failed to unmarshal request", http.StatusBadRequest)
return
}
pod := args.Pod
nodes := args.Nodes.Items
// The core logic starts here.
targetNode := se.findNodeForPod(pod)
var filteredNodes []corev1.Node
if targetNode != "" {
// If a target node is identified, filter the list to only that node.
for _, node := range nodes {
if node.Name == targetNode {
filteredNodes = append(filteredNodes, node)
break
}
}
} else {
// If no target node, the pod doesn't have a local PV, so it can go anywhere.
filteredNodes = nodes
}
resp := v1.ExtenderFilterResult{
Nodes: &corev1.NodeList{
Items: filteredNodes,
},
}
respBody, err := json.Marshal(resp)
if err != nil {
http.Error(w, "Failed to marshal response", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(respBody)
}
// findNodeForPod contains the business logic to identify the target node.
func (se *SchedulerExtender) findNodeForPod(pod *corev1.Pod) string {
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim == nil {
continue
}
pvcName := vol.PersistentVolumeClaim.ClaimName
pvc, err := se.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil {
log.Printf("WARN: Failed to get PVC %s/%s: %v", pod.Namespace, pvcName, err)
continue // Or handle error more robustly
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
continue
}
pv, err := se.pvLister.Get(pvName)
if err != nil {
log.Printf("WARN: Failed to get PV %s: %v", pvName, err)
continue
}
if node, ok := pv.Annotations[localPVAnnKey]; ok {
log.Printf("INFO: Pod %s/%s requires node %s for its local PV %s", pod.Namespace, pod.Name, node, pv.Name)
return node
}
}
return ""
}
func main() {
// --- Kubernetes Client-Go Setup ---
// This is crucial for performance. Instead of hitting the API server for every PV/PVC lookup,
// we use a shared informer to maintain a local cache.
config, err := clientcmd.BuildConfigFromFlags("", "") // In-cluster config
if err != nil {
log.Fatalf("Failed to build config: %v", err)
}
clientset, err := kubeclientset.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create clientset: %v", err)
}
factory := kubeinformers.NewSharedInformerFactory(clientset, 0) // 0 = default resync period
pvLister := factory.Core().V1().PersistentVolumes().Lister()
pvcLister := factory.Core().V1().PersistentVolumeClaims().Lister()
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
// Wait for caches to sync
if !cache.WaitForCacheSync(stopCh, factory.Core().V1().PersistentVolumes().Informer().HasSynced) {
log.Fatal("Failed to sync PV cache")
}
if !cache.WaitForCacheSync(stopCh, factory.Core().V1().PersistentVolumeClaims().Informer().HasSynced) {
log.Fatal("Failed to sync PVC cache")
}
// --- HTTP Server Setup ---
se := &SchedulerExtender{
pvLister: pvLister,
pvcLister: pvcLister,
}
http.HandleFunc("/filter", se.Filter)
// You would also implement /prioritize, /preempt, etc. here if needed.
log.Println("INFO: Starting scheduler extender server on :8888")
if err := http.ListenAndServe(":8888", nil); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
2. Deployment and Configuration
First, you need to package this Go application into a Docker image and push it to a registry. Then, deploy it to your cluster.
# extender-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: local-pv-extender
namespace: kube-system
spec:
replicas: 2 # For HA
selector:
matchLabels:
app: local-pv-extender
template:
metadata:
labels:
app: local-pv-extender
spec:
# RBAC is critical. The extender needs to read PVs and PVCs.
serviceAccountName: extender-sa
containers:
- name: extender
image: your-registry/local-pv-extender:v1.0.0
ports:
- containerPort: 8888
---
apiVersion: v1
kind: Service
metadata:
name: local-pv-extender-svc
namespace: kube-system
spec:
ports:
- port: 80
targetPort: 8888
selector:
app: local-pv-extender
---
# RBAC configuration
apiVersion: v1
kind: ServiceAccount
metadata:
name: extender-sa
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pv-pvc-reader
rules:
- apiGroups: [""]
resources: ["persistentvolumes", "persistentvolumeclaims"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: extender-pv-pvc-reader-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: pv-pvc-reader
subjects:
- kind: ServiceAccount
name: extender-sa
namespace: kube-system
Finally, and this is the most critical part, you must configure kube-scheduler to use this extender. This is typically done by modifying its static pod manifest on the master nodes (/etc/kubernetes/manifests/kube-scheduler.yaml).
Create a policy file:
// scheduler-policy.json
{
"kind": "Policy",
"apiVersion": "v1",
"extenders": [
{
"urlPrefix": "http://local-pv-extender-svc.kube-system.svc.cluster.local",
"filterVerb": "filter",
"enableHTTPS": false,
"ignorable": false
}
]
}
Mount this file into the kube-scheduler pod and add the --config flag:
# In /etc/kubernetes/manifests/kube-scheduler.yaml
apiVersion: v1
kind: Pod
metadata:
# ...
name: kube-scheduler
namespace: kube-system
spec:
containers:
- command:
- kube-scheduler
- --authentication-kubeconfig=/etc/kubernetes/scheduler.conf
- --authorization-kubeconfig=/etc/kubernetes/scheduler.conf
# ... other flags
- --config=/etc/kubernetes/scheduler-policy.json # <-- ADD THIS
# ...
volumeMounts:
# ...
- name: scheduler-policy-config
mountPath: /etc/kubernetes/scheduler-policy.json
readOnly: true
volumes:
# ...
- name: scheduler-policy-config
hostPath:
path: /etc/kubernetes/config/scheduler-policy.json #<-- Path where you place the file on the host
type: File
Performance Edge Case: The use of client-go informers is non-negotiable here. A naive implementation that creates a new clientset and queries the API server on every /filter request would cripple your cluster's scheduling throughput. The informer maintains a local, in-memory cache that is kept in sync with the API server, allowing for sub-millisecond lookups of PVs and PVCs.
Advanced Implementation: A Full Custom Scheduler for ML Workloads
Now for the more complex scenario. We need to schedule groups of interdependent ML training pods (e.g., a parameter server and several workers) with two strict requirements:
topology.corp.com/rack-id).- The scheduler should attempt to balance the number of training jobs across the available racks.
This requires stateful, group-aware logic that is impossible for an extender. We need a full custom scheduler.
1. The Custom Scheduler Go Implementation
This implementation is more involved. It will run in a loop, watching for pods assigned to it, and making binding decisions.
// main.go
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
const (
SchedulerName = "gpu-fabric-scheduler"
JobGroupAnnotation = "gpu-fabric-scheduler.io/job-group"
RackLabel = "topology.corp.com/rack-id"
)
// Scheduler encapsulates all the components needed to run.
type Scheduler struct {
clientset *kubernetes.Clientset
podLister corelisters.PodLister
nodeLister corelisters.NodeLister
podQueue chan *corev1.Pod
}
// NewScheduler creates a new scheduler instance.
func NewScheduler(clientset *kubernetes.Clientset, factory informers.SharedInformerFactory) *Scheduler {
podInformer := factory.Core().V1().Pods()
nodeInformer := factory.Core().V1().Nodes()
s := &Scheduler{
clientset: clientset,
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
podQueue: make(chan *corev1.Pod, 300),
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*corev1.Pod)
// Only queue pods that are assigned to this scheduler and are not yet scheduled.
if p.Spec.SchedulerName == SchedulerName && p.Spec.NodeName == "" {
s.podQueue <- p
}
},
})
return s
}
// Run starts the scheduler's main loop.
func (s *Scheduler) Run(ctx context.Context) {
log.Printf("INFO: Starting custom scheduler %s", SchedulerName)
for {
select {
case pod := <-s.podQueue:
log.Printf("INFO: Scheduling pod %s/%s", pod.Namespace, pod.Name)
nodeName, err := s.findBestNode(pod)
if err != nil {
log.Printf("ERROR: Failed to find node for pod %s/%s: %v", pod.Namespace, pod.Name, err)
// In a real implementation, you would re-queue the pod with backoff.
continue
}
if err := s.bindPodToNode(ctx, pod, nodeName); err != nil {
log.Printf("ERROR: Failed to bind pod %s/%s to node %s: %v", pod.Namespace, pod.Name, nodeName, err)
}
case <-ctx.Done():
log.Println("INFO: Shutting down scheduler")
return
}
}
}
// findBestNode implements the core scheduling logic.
func (s *Scheduler) findBestNode(pod *corev1.Pod) (string, error) {
jobGroup, ok := pod.Annotations[JobGroupAnnotation]
if !ok {
return "", fmt.Errorf("pod %s/%s is missing required annotation: %s", pod.Namespace, pod.Name, JobGroupAnnotation)
}
// 1. Check if other pods from this job group are already scheduled.
podsInGroup, err := s.podLister.Pods(pod.Namespace).List(labels.SelectorFromSet(labels.Set{
JobGroupAnnotation: jobGroup,
}))
if err != nil {
return "", fmt.Errorf("failed to list pods for job group %s: %v", jobGroup, err)
}
var targetRack string
for _, p := range podsInGroup {
if p.Spec.NodeName != "" {
node, err := s.nodeLister.Get(p.Spec.NodeName)
if err != nil {
log.Printf("WARN: Failed to get node %s for existing pod %s/%s: %v", p.Spec.NodeName, p.Namespace, p.Name, err)
continue
}
if rack, found := node.Labels[RackLabel]; found {
targetRack = rack
log.Printf("INFO: Job group '%s' is already on rack '%s'. Enforcing co-location.", jobGroup, targetRack)
break
}
}
}
// 2. Get all schedulable nodes.
allNodes, err := s.nodeLister.List(labels.Everything())
if err != nil {
return "", fmt.Errorf("failed to list nodes: %v", err)
}
// 3. Filter nodes.
var feasibleNodes []*corev1.Node
if targetRack != "" {
// Co-location case: only consider nodes in the target rack.
for _, node := range allNodes {
if node.Labels[RackLabel] == targetRack {
feasibleNodes = append(feasibleNodes, node)
}
}
} else {
// First pod in group case: all nodes are potentially feasible.
feasibleNodes = allNodes
}
if len(feasibleNodes) == 0 {
return "", fmt.Errorf("no feasible nodes found for pod %s/%s (rack requirement: %s)", pod.Namespace, pod.Name, targetRack)
}
// 4. Score nodes (simple random choice for this example; production would be more complex).
// A better scoring model would balance jobs across racks.
return feasibleNodes[rand.Intn(len(feasibleNodes))].Name, nil
}
// bindPodToNode performs the binding action.
func (s *Scheduler) bindPodToNode(ctx context.Context, pod *corev1.Pod, nodeName string) error {
log.Printf("INFO: Binding pod %s/%s to node %s", pod.Namespace, pod.Name, nodeName)
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
Target: corev1.ObjectReference{
Kind: "Node",
Name: nodeName,
},
}
return s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
}
// main function with Leader Election for High Availability
func main() {
// ... (standard client-go config setup as in the extender example)
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil { log.Fatal(err) }
clientset, err := kubernetes.NewForConfig(config)
if err != nil { log.Fatal(err) }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Leader election setup
lockID := "gpu-fabric-scheduler-lock"
ns := os.Getenv("POD_NAMESPACE") // Get namespace from Downward API
if ns == "" {
ns = "kube-system"
log.Printf("WARN: POD_NAMESPACE env var not set, defaulting to %s", ns)
}
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockID,
Namespace: ns,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: os.Getenv("POD_NAME"), // Get pod name from Downward API
},
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Println("INFO: Became leader, starting scheduler.")
factory := informers.NewSharedInformerFactory(clientset, 0)
scheduler := NewScheduler(clientset, factory)
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), factory.Core().V1().Pods().Informer().HasSynced, factory.Core().V1().Nodes().Informer().HasSynced) {
log.Fatal("Failed to sync caches")
}
scheduler.Run(ctx)
},
OnStoppedLeading: func() {
log.Fatal("FATAL: Lost leader election.")
},
},
})
}
2. Deployment and Usage
This scheduler is deployed as a standard Deployment in the cluster. It requires RBAC to get/list/watch pods and nodes, and to create Binding objects. Critically, it also needs permissions for leader election (Leases in the coordination.k8s.io API group).
To use it, you would submit your pod YAML with the custom schedulerName and annotations:
# ml-job.yaml
apiVersion: v1
kind: Pod
metadata:
name: ps-0
namespace: ml-jobs
annotations:
gpu-fabric-scheduler.io/job-group: "resnet50-trial-123"
spec:
schedulerName: gpu-fabric-scheduler
containers:
- name: parameter-server
image: my-ml-image:ps
---
apiVersion: v1
kind: Pod
metadata:
name: worker-0
namespace: ml-jobs
annotations:
gpu-fabric-scheduler.io/job-group: "resnet50-trial-123"
spec:
schedulerName: gpu-fabric-scheduler
containers:
- name: worker
image: my-ml-image:worker
3. Critical Considerations: HA and Race Conditions
* High Availability: A single instance of a custom scheduler is a single point of failure. The example code demonstrates the non-negotiable pattern for HA: leader election. By running multiple replicas of the scheduler deployment and wrapping the main logic in leaderelection.RunOrDie, you ensure that only one instance (the leader) is actively scheduling pods at any given time. If the leader fails, another replica will acquire the LeaseLock and take over.
* Race Conditions: What if the state of the cluster changes between when you make a scheduling decision and when you issue the Bind? For example, your scheduler picks Node A, but before the Bind API call completes, another pod is scheduled on Node A, consuming the last available resources. Your Bind call will fail. A robust scheduler must handle this failure gracefully, typically by putting the pod back into its work queue to be retried later. The API server provides the ultimate source of truth and atomicity.
* Preemption: This example doesn't implement preemption. A full-featured custom scheduler would need to. This involves a much more complex logic loop: if a high-priority pod cannot be scheduled, the scheduler must identify nodes where running, lower-priority pods could be evicted to make room. It would then need the RBAC permissions to delete those pods and then attempt to bind the high-priority pod once the resources are freed. This is a highly advanced feature that requires careful implementation to avoid cluster instability.
Conclusion: A Tool of Last Resort
Customizing the Kubernetes scheduler is one of the most powerful ways to tailor a cluster to a specific, demanding workload. It allows you to encode complex business and performance logic directly into your infrastructure's control plane.
However, this power comes with significant complexity and operational responsibility. Before embarking on building a custom scheduler, always exhaust the capabilities of the standard primitives:
* Can your problem be solved with advanced nodeAffinity, podAffinity, and podAntiAffinity rules?
* Can topologySpreadConstraints achieve the distribution you need?
* Can taints and tolerations isolate workloads effectively?
If the answer to these questions is no, then the patterns discussed here provide a roadmap. Start with an extender for simple, stateless filtering and scoring. Only when you need absolute control over the scheduling workflow, stateful logic, or gang scheduling should you take on the challenge of building and maintaining a full custom scheduler. When you do, rely heavily on the patterns established by client-go, especially informers for performance and leaderelection for high availability.