LLM Inference at Scale: Custom Kubernetes Schedulers for Dynamic Batching

21 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The GPU Utilization Dilemma in Production LLM Inference

In the realm of large language models, the chasm between training and inference efficiency is vast. During training, we saturate high-end GPUs like A100s or H100s with massive, uniform batches of data, achieving near-peak TFLOPS. Inference, however, is the wild west of computational workloads. Requests arrive stochastically, demand low latency, and a single-token generation for a user prompt might only utilize a fraction of a GPU's massive parallel processing power for milliseconds.

Deploying an LLM service using a standard Kubernetes Deployment with a resource request of nvidia.com/gpu: 1 is the "hello world" of MLOps, but it's also a recipe for financial ruin at scale. A single request to a 70B parameter model might only occupy the GPU for 100ms, leaving it idle for the next 900ms of that second. This results in GPU utilization percentages in the low single digits—a profoundly inefficient use of hardware that costs upwards of $4 per hour.

The obvious first-level optimization is batching. Grouping multiple incoming requests into a single forward() pass through the model dramatically improves throughput. However, this introduces a new, critical trade-off: latency vs. throughput.

* Static Batching: A simple approach where the inference server (e.g., Triton, TGI) is configured with a fixed max_batch_size. The server waits until the batch is full or a timeout is reached. This is problematic. If max_batch_size is 32 and traffic is low, the first request might wait seconds for the batch to fill, violating any reasonable latency SLO. This approach is only viable for offline, high-throughput tasks.

* Dynamic Batching: A more sophisticated approach where a component external or internal to the inference server groups requests based on a dual-condition trigger: a maximum batch size (MAX_BATCH_SIZE) and a maximum wait time (MAX_WAIT_TIME). If the queue receives 32 requests in 5ms, it batches them. If it only receives 5 requests in 50ms (the max wait time), it batches those five and sends them to the GPU. This strategy is essential for interactive applications.

While frameworks like vLLM and TGI implement sophisticated internal batching (often "continuous batching"), they operate within the confines of a single pod. The true scaling challenge arises in a distributed, multi-node, multi-GPU Kubernetes cluster. How does a load balancer or request router know which of the 50 GPU pods has the capacity for another request in its current batching window? The default Kubernetes scheduler and services (like ClusterIP or NodePort) are oblivious to this application-level state. This is where we must descend into the Kubernetes control plane and build our own intelligence.

This article details an architecture for a highly available, topology-aware dynamic batching system for LLM inference by creating and deploying a custom Kubernetes Scheduler Extender.

Architecture of a Distributed Dynamic Batching System

Our goal is to build a system that intelligently routes individual inference requests to GPU worker pods that can accommodate them within an active, non-dispatched batch. This prevents requests from queueing at one saturated pod while another sits idle.

Our system consists of four key components:

  • Request Gateway: A public-facing service (e.g., an Istio Gateway, Nginx Ingress) that receives raw API requests.
  • Batching Orchestrator: A central brain that holds incoming requests in a temporary queue. This is not the batching logic itself, but the component that decides which GPU worker a request should be routed to. It's a stateful component that understands the real-time capacity of every worker.
  • Custom Kubernetes Scheduler Extender: A webhook server that integrates with kube-scheduler. It ensures that our orchestrator pods are scheduled intelligently and are aware of the GPU worker topology.
  • GPU Inference Pods: The workhorses running the LLM. Each pod runs an inference server (like a custom FastAPI/vLLM server) and a lightweight sidecar agent that reports its batching status back to a central state store.
  • Here is a high-level view of the data flow:

    mermaid
    graph TD
        subgraph User Space
            A[Client Request] --> B{Request Gateway};
        end
    
        subgraph Kubernetes Cluster
            B --> C{Batching Orchestrator Pod};
            C -- 1. Query State Store --> D[Redis/etcd];
            C -- 2. Route to specific pod IP --> E[GPU Worker Pod 1];
            C -- 2. Route to specific pod IP --> F[GPU Worker Pod 2];
            C -- 2. Route to specific pod IP --> G[GPU Worker Pod N];
    
            subgraph GPU Worker Pod
                H[Inference Server e.g., vLLM];
                I[Status Sidecar];
            end
            E --> H & I;
    
            I -- Reports batch status --> D;
    
            subgraph Control Plane
                J[kube-scheduler] -- Scheduling Webhook --> K{Custom Scheduler Extender};
                K -- Queries for capacity --> D;
            end
        end

    The critical insight is the decoupling of request ingestion from batch execution. The Batching Orchestrator doesn't execute the model; it plays a high-speed game of Tetris, fitting incoming requests into the available slots on the worker fleet.

    Implementing the Core Batching Logic

    Before tackling the Kubernetes complexity, let's implement the core dynamic batching logic that would live inside each GPU worker. This logic, often called a "batching loop," is the heart of the performance gains.

    Here is a production-ready Python example using asyncio to manage concurrent requests and the batching window.

    python
    # gpu_worker/batch_processor.py
    import asyncio
    import time
    import uuid
    from typing import List, Dict, Any, Tuple
    
    # Configuration
    MAX_BATCH_SIZE = 32
    MAX_WAIT_TIME_S = 0.05  # 50 milliseconds
    
    class InferenceRequest:
        def __init__(self, request_id: str, data: Dict[str, Any]):
            self.request_id = request_id
            self.data = data
            self.future = asyncio.Future()
    
    # In-memory queue for incoming requests
    request_queue = asyncio.Queue()
    
    async def llm_inference_engine(batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Placeholder for the actual model inference call (e.g., to vLLM, TGI)."""
        print(f"[{time.time():.4f}] Running inference on batch of size {len(batch)}...")
        # Simulate GPU work that scales with batch size
        processing_time = 0.020 + 0.005 * len(batch)
        await asyncio.sleep(processing_time)
        
        # Simulate generating responses
        responses = [
            {"request_id": req["request_id"], "output": f"Response for {req['prompt'][:10]}..."}
            for req in batch
        ]
        print(f"[{time.time():.4f}] Inference complete for batch of size {len(batch)}.")
        return responses
    
    async def batching_loop():
        """The core loop that collects requests and dispatches batches."""
        while True:
            batch_to_process: List[InferenceRequest] = []
            start_time = time.time()
    
            while True:
                # Calculate remaining time in the current window
                time_since_start = time.time() - start_time
                timeout = max(0, MAX_WAIT_TIME_S - time_since_start)
    
                try:
                    # Wait for a new request, but with a timeout
                    req = await asyncio.wait_for(request_queue.get(), timeout=timeout)
                    batch_to_process.append(req)
    
                    # If batch is full, break the inner loop to process it
                    if len(batch_to_process) >= MAX_BATCH_SIZE:
                        break
                except asyncio.TimeoutError:
                    # Timeout hit, break the inner loop to process the current (possibly empty) batch
                    break
    
            if not batch_to_process:
                continue
    
            # Prepare data for the model
            requests_data = [
                {"request_id": req.request_id, **req.data}
                for req in batch_to_process
            ]
    
            # Dispatch the batch for inference (non-blocking)
            inference_task = asyncio.create_task(llm_inference_engine(requests_data))
    
            # After dispatching, handle setting the futures for the completed requests
            def when_done(task: asyncio.Task):
                try:
                    responses = task.result()
                    response_map = {res["request_id"]: res for res in responses}
                    
                    for req in batch_to_process:
                        if req.request_id in response_map:
                            req.future.set_result(response_map[req.request_id])
                        else:
                            # This is a critical edge case
                            req.future.set_exception(Exception(f"Response for {req.request_id} not found in batch result"))
                except Exception as e:
                    # If the whole batch fails, fail all requests in it
                    for req in batch_to_process:
                        if not req.future.done():
                           req.future.set_exception(e)
    
            inference_task.add_done_callback(when_done)
    
    async def handle_api_request(data: Dict[str, Any]) -> Dict[str, Any]:
        """API endpoint function that adds a request to the queue and waits for its result."""
        req = InferenceRequest(request_id=str(uuid.uuid4()), data=data)
        await request_queue.put(req)
        # This await will pause the handler until the future is resolved by the batching loop
        return await req.future
    
    # Example usage (e.g., in a FastAPI app)
    async def main():
        # Start the batching loop as a background task
        asyncio.create_task(batching_loop())
        
        # Simulate 50 concurrent API requests arriving
        print("Simulating 50 API requests arriving nearly simultaneously...")
        api_tasks = [
            handle_api_request({"prompt": f"This is prompt number {i}"})
            for i in range(50)
        ]
        
        results = await asyncio.gather(*api_tasks)
        print(f"\nReceived {len(results)} results.")
        # print(results[0])
    
    if __name__ == "__main__":
        asyncio.run(main())

    This code demonstrates the core pattern: API handlers don't block on the model but instead place a request object (containing an asyncio.Future) into a queue. A single, separate background task (batching_loop) drains this queue, forms batches, and dispatches them. The key is that the handle_api_request function awaits the Future, effectively pausing its execution until the batch it belongs to has been processed and its specific result is ready.

    Output of the simulation:

    text
    Simulating 50 API requests arriving nearly simultaneously...
    [1678886400.1234] Running inference on batch of size 32...
    [1678886400.3034] Inference complete for batch of size 32.
    [1678886400.3036] Running inference on batch of size 18...
    [1678886400.4136] Inference complete for batch of size 18.
    
    Received 50 results.

    Notice two batches were formed: one full batch of 32, and a subsequent partial batch of 18. This is dynamic batching in action.

    The Kubernetes Integration Challenge

    Now, imagine 100 pods each running this batching_loop. A standard Service of type ClusterIP in Kubernetes will round-robin requests to these pods. What happens?

    * Request 1 -> Pod A: Pod A starts a 50ms batching window.

    * Request 2 -> Pod B: Pod B starts its own independent 50ms batching window.

    * ...

    * Request 32 -> Pod Z: Pod Z starts yet another 50ms window.

    After 50ms, we will have dispatched 32 separate batches of size 1. We have completely defeated the purpose of batching. We've achieved high distribution but zero consolidation. This is the core problem that a custom scheduler must solve. We need to make the infrastructure aware of the application's batching state.

    Solution: A Custom Kubernetes Scheduler Extender

    The Kubernetes scheduler is responsible for assigning pods to nodes. It's highly extensible. A Scheduler Extender is a webhook that the default scheduler (kube-scheduler) calls during its scheduling cycle. We can inject our custom logic at two crucial points:

  • Filter: The extender can tell the scheduler to disqualify certain nodes from consideration for a given pod.
  • Prioritize: The extender can assign a score to each of the remaining, valid nodes, influencing which one the scheduler ultimately chooses.
  • Our goal is to create a scheduler extender that schedules our Batching Orchestrator pods onto nodes that have available GPU capacity. This sounds simple, but it requires a feedback loop.

    The Architecture Revisited with the Scheduler:

  • GPU Worker State: Each GPU worker pod's sidecar reports its state to a central, low-latency data store like Redis. The state includes pod_ip, current_batch_size, max_batch_size, and model_loaded.
  • json
        // Example key in Redis: 'gpu-worker-status:pod-123-abc'
        {
          "pod_ip": "10.1.2.3",
          "model": "llama2-70b",
          "current_batch_size": 12,
          "max_batch_size": 32,
          "last_updated": 1678886401.0
        }
  • Scheduler Extender Logic: When a new Batching Orchestrator pod needs to be scheduled, the kube-scheduler calls our extender.
  • Filter Predicate: The extender queries Redis for all GPU workers for the required model. It filters out any node that only* contains fully saturated GPU workers (where current_batch_size >= max_batch_size).

    Prioritize Function: For the remaining nodes, it calculates a score. A good strategy is to prioritize nodes with the most* available batch capacity (max_batch_size - current_batch_size). This encourages spreading the load.

  • Orchestrator Awareness: Once the orchestrator pod is scheduled on a suitable node, it can query the same Redis store to get a real-time list of available pod_ips and their batch capacities. When a request arrives, it can perform a smart routing decision, sending the request directly to the IP of a worker pod that has room in its current batch.
  • Implementing the Scheduler Extender Webhook

    Here's a sample implementation of the extender server in Go, which is well-suited for control plane components.

    go
    // scheduler-extender/main.go
    package main
    
    import (
    	"bytes"
    	"encoding/json"
    	"fmt"
    	"io/ioutil"
    	"log"
    	"net/http"
    
    	v1 "k8s.io/api/core/v1"
    	"k8s.io/apimachinery/pkg/types"
    	"k8s.io/kubernetes/pkg/scheduler/api/v1beta1"
    )
    
    // For simplicity, we'll mock the Redis client
    func getGpuWorkerStats() map[string]int {
        // In a real implementation, this would query Redis
        return map[string]int{
            "node-1-gpu-a": 10, // node name -> available batch slots
            "node-2-gpu-b": 0,
            "node-3-gpu-c": 32,
        }
    }
    
    func filter(w http.ResponseWriter, r *http.Request) {
    	body, err := ioutil.ReadAll(r.Body)
    	if err != nil {
    		http.Error(w, "Failed to read request body", http.StatusBadRequest)
    		return
    	}
    
    	var args v1beta1.ExtenderArgs
    	if err := json.Unmarshal(body, &args); err != nil {
    		http.Error(w, "Failed to unmarshal request", http.StatusBadRequest)
    		return
    	}
    
        // We only care about pods that are our orchestrators
        if args.Pod.Labels["app"] != "batching-orchestrator" {
            // Not our pod, approve all nodes
            resp := v1beta1.ExtenderFilterResult{Nodes: args.Nodes}
            json.NewEncoder(w).Encode(resp)
            return
        }
    
    	gpuStats := getGpuWorkerStats()
    	filteredNodes := v1.NodeList{}
        failedNodes := make(v1beta1.FailedNodesMap)
    
    	for _, node := range args.Nodes.Items {
    		if capacity, ok := gpuStats[node.Name]; ok && capacity > 0 {
    			filteredNodes.Items = append(filteredNodes.Items, node)
    		} else {
    			failedNodes[node.Name] = fmt.Sprintf("Node %s has no available GPU batch capacity", node.Name)
    		}
    	}
    
    	resp := v1beta1.ExtenderFilterResult{
    		Nodes:       &filteredNodes,
    		FailedNodes: failedNodes,
    		Error:       "",
    	}
    
    	if err := json.NewEncoder(w).Encode(resp); err != nil {
    		log.Printf("Error encoding response: %v", err)
    	}
    }
    
    func prioritize(w http.ResponseWriter, r *http.Request) {
        // Similar logic to filter, but returns a score for each node.
        // Higher score = more preferred.
        // ... implementation omitted for brevity but would score based on capacity ...
        // For now, we'll just return a dummy response.
        var args v1beta1.ExtenderArgs
        json.NewDecoder(r.Body).Decode(&args)
        hostPriorityList := v1beta1.HostPriorityList{}
        for _, node := range args.Nodes.Items {
            hostPriorityList = append(hostPriorityList, v1beta1.HostPriority{Host: node.Name, Score: 10})
        }
        json.NewEncoder(w).Encode(hostPriorityList)
    }
    
    func main() {
    	http.HandleFunc("/filter", filter)
    	http.HandleFunc("/prioritize", prioritize)
    	log.Println("Starting scheduler extender on :8888")
    	log.Fatal(http.ListenAndServe(":8888", nil))
    }

    Configuring `kube-scheduler`

    To make the default scheduler use our extender, we provide a configuration file.

    yaml
    # scheduler-policy.yaml
    apiVersion: v1
    kind: Policy
    extenders:
      - urlPrefix: "http://<extender-service-ip>:8888"
        filterVerb: "filter"
        prioritizeVerb: "prioritize"
        weight: 1
        enableHTTPS: false

    You then run kube-scheduler with the --policy-config-file flag pointing to this file. This is typically done by modifying the static pod manifest for kube-scheduler in /etc/kubernetes/manifests on your control plane nodes.

    Production-Grade Considerations and Edge Cases

    A working prototype is one thing; a production system is another. Here are critical edge cases to handle.

    1. Request Cancellation and Timeouts:

    What if a client disconnects while its request is sitting in the orchestrator's queue or even in a worker's batching window? You cannot simply ignore it. The Future object in our Python example will remain unresolved, potentially causing a memory leak.

    * Solution: Implement a timeout mechanism in the orchestrator. If a response Future is not resolved within a global timeout (e.g., 30 seconds), it should be cancelled and an error returned. For cancellations, the client API should provide a request ID that can be used to send a cancellation signal. The orchestrator would then have to find the request and mark it as 'to-be-ignored', and the worker, upon completing the batch, would simply discard the result for that request ID.

    2. Heterogeneous GPU Types and Multi-Model Serving:

    Your cluster may have a mix of A100s, H100s, and older T4s. A batch of 32 on a T4 is not the same as a batch of 32 on an H100. Furthermore, you might be serving multiple models (llama2-70b, mistral-7b).

    * Solution: The state reported by worker sidecars must be richer. It should include gpu_type and model_name. The scheduler extender logic must be updated to filter nodes based not just on capacity but also on matching the required hardware and model profile for the incoming orchestrator pod. The orchestrator itself must be model-aware, routing a request for mistral-7b only to pods that have that model loaded and have capacity.

    3. Continuous Batching vs. Static Batching Window:

    The simple MAX_WAIT_TIME approach is good, but can be improved. Continuous batching (popularized by frameworks like vLLM) is a more advanced technique. Instead of fixed batches, the system maintains a running list of active sequences. On each iteration of the model, it processes one token for all sequences in the current "batch". As sequences complete (generating an EOS token), they are removed. New requests can be added to this active set on any iteration, as long as there is enough GPU memory (especially for the KV cache).

    Our Architecture's Role: Our custom scheduler and orchestrator are enablers* for continuous batching at scale. The state reported by the worker sidecar would change from current_batch_size to active_sequences and available_kv_cache_blocks. The orchestrator's job is now even more critical: it must decide if a new request (with its prompt length and max_new_tokens) can fit into the available KV cache on a specific worker. This is a much more granular and efficient way to pack the GPU.

    Performance Benchmarking Analysis

    Let's analyze the expected performance of three scenarios under a load of 100 requests per second (RPS) for a 7B parameter model.

    ScenarioAvg. GPU UtilizationThroughput (RPS)p99 Latency (ms)Cost per 1M TokensNotes
    1. Naive K8s Deployment (1 pod/req)< 5%~20 (per GPU)250HighRequires many GPUs to handle load, most of which are idle.
    2. Static Batching (max_size=16, timeout=2s)40-60%~100 (per GPU)> 2000LowHigh throughput but unacceptable latency for interactive use.
    3. Dynamic Batching w/ Custom Scheduler70-90%~100 (per GPU)450Very LowBalances high utilization and acceptable latency by intelligent routing.

    * Scenario 1: The classic money pit. You scale horizontally by adding more idle GPUs.

    * Scenario 2: Great for offline jobs, but the high p99 latency caused by waiting for batches to fill makes it unsuitable for chatbots or copilots.

    * Scenario 3: The optimal solution. By setting a tight MAX_WAIT_TIME (e.g., 25-50ms), we add only a small, predictable amount of latency. The custom scheduler ensures that our orchestrators are routing requests to workers that can immediately accommodate them, maximizing the chance of forming large batches without waiting. This leads to high GPU utilization, high throughput, and controlled latency, dramatically reducing the cost per token.

    Conclusion

    Serving large language models efficiently in a distributed environment is fundamentally a systems and infrastructure problem, not just a machine learning one. While inference servers provide the core batching logic, they cannot solve the cross-node orchestration challenge. By leveraging the extensibility of the Kubernetes control plane with a custom scheduler extender, we can build an application-aware infrastructure that transforms a fleet of isolated GPU workers into a single, cohesive, high-throughput compute fabric. This approach moves beyond generic, resource-agnostic scheduling to a stateful, performance-oriented paradigm, which is the only financially viable path to deploying LLMs at production scale.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles