Custom Kubernetes Controllers: Managing Stateful Dependencies with Client-Go
The Orchestration Gap: When StatefulSets Aren't Enough
As senior engineers, we appreciate the power of Kubernetes primitives. StatefulSets are the go-to solution for deploying stateful workloads like databases or message queues, providing stable network identities and persistent storage. However, their capabilities have limits. They excel at managing a homogeneous group of pods but fall short when orchestrating complex, multi-component applications with strict lifecycle dependencies.
Consider a hypothetical distributed database system we'll call ShardDB. A ShardDB cluster has the following requirements:
StatefulSet for stable identity.Job that seeds data or configures replication.Deployment must be provisioned for the entire ShardDB cluster, but only after all primary shards are healthy and have successfully registered with a discovery service.ShardDB cluster is deleted, the cache must be flushed and connections drained before the database pods are terminated to prevent data loss.A standard StatefulSet manifest can't enforce this sequence. You could try to manage this with Helm hooks or a series of manual kubectl apply commands, but this is brittle, error-prone, and antithetical to the declarative nature of Kubernetes. This is the orchestration gap where the Operator Pattern, implemented via a custom controller, becomes essential.
This post will guide you through building a production-grade controller to manage our ShardDB application. We won't cover the basics of kubebuilder scaffolding; we assume you've done that before. Instead, we'll focus on the advanced logic within the reconciliation loop, implementing finalizers for safety, and ensuring our controller is robust and self-healing.
Defining the Contract: The `ShardDB` CRD
Our controller's first job is to extend the Kubernetes API with a new resource type that understands our application's domain. This is our Custom Resource Definition (CRD). The spec defines the desired state, and the status reflects the observed state.
Here's the Go type definition for our ShardDB resource, which kubebuilder will use to generate the CRD manifest. Note the specificity: we define specs for the database itself and its dependent cache.
// api/v1alpha1/sharddb_types.go
package v1alpha1
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// CacheSpec defines the desired state of the dependent cache layer
type CacheSpec struct {
// Image is the container image for the cache pods.
Image string `json:"image"`
// Replicas is the number of cache pods.
Replicas *int32 `json:"replicas"`
}
// ShardDBSpec defines the desired state of ShardDB
type ShardDBSpec struct {
// Shards is the number of primary database shards.
// +kubebuilder:validation:Minimum=1
Shards int32 `json:"shards"`
// DBImage is the container image for the database shards.
DBImage string `json:"dbImage"`
// Cache is the specification for the dependent cache layer.
Cache CacheSpec `json:"cache"`
}
// ShardDBStatus defines the observed state of ShardDB
type ShardDBStatus struct {
// Conditions represent the latest available observations of an object's state.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
// ReadyShards is the number of database shards that are fully initialized and ready.
ReadyShards int32 `json:"readyShards"`
// CacheReady indicates whether the dependent cache layer is ready.
CacheReady bool `json:"cacheReady"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Shards",type=integer,JSONPath=".spec.shards"
//+kubebuilder:printcolumn:name="Ready",type=string,JSONPath=".status.conditions[?(@.type==\"Available\")].status"
//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=".metadata.creationTimestamp"
// ShardDB is the Schema for the sharddbs API
type ShardDB struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ShardDBSpec `json:"spec,omitempty"`
Status ShardDBStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// ShardDBList contains a list of ShardDB
type ShardDBList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ShardDB `json:"items"`
}
func init() {
SchemeBuilder.Register(&ShardDB{}, &ShardDBList{})
}
Key takeaways from this definition:
* +kubebuilder:subresource:status: This is critical. It tells Kubernetes that the .status field should be treated as a separate subresource. This prevents controllers from fighting over updates to the main object and its status, enforcing a clean separation of concerns. Only the controller should write to status.
* metav1.Condition: We are not using a simple phase: string or ready: bool in our status. We are using the standardized Condition type from meta/v1. This provides a rich, machine-readable format for reporting status, with types like Available, Progressing, and Degraded, along with reasons and messages. This is a production best practice.
The Heart of the Machine: The Reconciliation Loop
The Reconcile function is the core of any controller. It's a state machine that is invoked whenever our ShardDB resource (or a resource it owns) changes. Its sole purpose is to observe the current state of the cluster and take actions to drive it towards the desired state defined in the ShardDB spec.
The logic must be idempotent. It will be called repeatedly, and it should produce the same outcome if the state hasn't changed. Our reconciliation logic will follow this sequence:
ShardDB instance.StatefulSet for the database shards.StatefulSet. If not ready, stop and requeue.StatefulSet is ready, reconcile the Deployment for the cache layer.ShardDB status with the overall health.Here is a skeleton of our Reconcile function in controllers/sharddb_controller.go:
// controllers/sharddb_controller.go
import (
// ... other imports
"context"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
mygroupv1alpha1 "github.com/example/sharddb-operator/api/v1alpha1"
)
// ShardDBReconciler reconciles a ShardDB object
type ShardDBReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *ShardDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. Fetch the ShardDB instance
shardDB := &mygroupv1alpha1.ShardDB{}
err := r.Get(ctx, req.NamespacedName, shardDB)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
logger.Info("ShardDB resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
logger.Error(err, "Failed to get ShardDB")
return ctrl.Result{}, err
}
// ... Reconciliation logic will go here ...
return ctrl.Result{}, nil
}
Reconciling the StatefulSet
Our first task is to ensure the StatefulSet for our database shards exists and matches the spec. We'll construct the desired StatefulSet in a helper function and then use the controller's client to check if it exists. If not, we create it. If it exists, we could check for differences and update it (though for this example, we'll keep it simple).
A crucial detail here is setting the owner reference. By calling ctrl.SetControllerReference, we are telling Kubernetes that our ShardDB custom resource owns this StatefulSet. When the ShardDB object is deleted, the Kubernetes garbage collector will automatically delete the StatefulSet and its pods.
// controllers/sharddb_controller.go
func (r *ShardDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// ... (fetch ShardDB instance as before)
// 2. Reconcile the StatefulSet for DB shards
sts := &appsv1.StatefulSet{}
err = r.Get(ctx, client.ObjectKey{Name: shardDB.Name + "-db", Namespace: shardDB.Namespace}, sts)
if err != nil && errors.IsNotFound(err) {
// Define a new StatefulSet
logger.Info("Creating a new StatefulSet for DB shards")
desiredSts := r.statefulSetForShardDB(shardDB)
if err := r.Create(ctx, desiredSts); err != nil {
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", desiredSts.Namespace, "StatefulSet.Name", desiredSts.Name)
return ctrl.Result{}, err
}
// StatefulSet created successfully - return and requeue to check status later
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
logger.Error(err, "Failed to get StatefulSet")
return ctrl.Result{}, err
}
// ... (rest of the logic)
return ctrl.Result{}, nil
}
// statefulSetForShardDB returns a ShardDB StatefulSet object
func (r *ShardDBReconciler) statefulSetForShardDB(s *mygroupv1alpha1.ShardDB) *appsv1.StatefulSet {
labels := map[string]string{"app": "sharddb", "sharddb_cr": s.Name}
replicas := s.Spec.Shards
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: s.Name + "-db",
Namespace: s.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: s.Spec.DBImage,
Name: "database",
Ports: []corev1.ContainerPort{{
ContainerPort: 5432,
Name: "db",
}},
}},
},
},
},
}
// Set ShardDB instance as the owner and controller
ctrl.SetControllerReference(s, sts, r.Scheme)
return sts
}
Checking Status and Reconciling the Cache Deployment
Once the StatefulSet exists, we must check if it's actually ready. We compare sts.Status.ReadyReplicas with our shardDB.Spec.Shards. If they don't match, our work here is done for now. We update our status to reflect we are progressing and requeue the request for a later time. This prevents us from creating the cache Deployment prematurely.
// controllers/sharddb_controller.go ... inside Reconcile function
// ... after getting or creating the StatefulSet ...
// 3. Check StatefulSet status
if sts.Status.ReadyReplicas != shardDB.Spec.Shards {
logger.Info("DB shards not ready yet", "Ready", sts.Status.ReadyReplicas, "Required", shardDB.Spec.Shards)
// Update status to Progressing
// ... (status update logic will be shown later)
// Requeue after a short delay to check again
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}
// 4. If StatefulSet is ready, reconcile the cache Deployment
cacheDep := &appsv1.Deployment{}
err = r.Get(ctx, client.ObjectKey{Name: shardDB.Name + "-cache", Namespace: shardDB.Namespace}, cacheDep)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new Deployment for the cache layer")
desiredCacheDep := r.deploymentForCache(shardDB)
if err := r.Create(ctx, desiredCacheDep); err != nil {
logger.Error(err, "Failed to create new Deployment")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
logger.Error(err, "Failed to get cache Deployment")
return ctrl.Result{}, err
}
// ... (final status update logic)
The deploymentForCache helper function would be similar to statefulSetForShardDB, creating a Deployment object and setting the ShardDB as the owner reference.
Advanced Pattern: Finalizers for Graceful Deletion
Owner references provide automatic garbage collection, but they are immediate. When our ShardDB CR is deleted, the StatefulSet and Deployment are immediately targeted for deletion. This violates our requirement for a graceful shutdown (flushing the cache, etc.).
This is where finalizers are essential. A finalizer is a key in an object's metadata that tells the Kubernetes API server to block deletion of the object until that key is removed. Our controller can add a finalizer to a ShardDB resource upon creation. When a user requests deletion, the API server simply sets a deletionTimestamp on the object and leaves it be. This triggers our Reconcile function.
Inside the reconciler, we check if shardDB.ObjectMeta.DeletionTimestamp.IsZero() is false. If it is, we know the object is being deleted. We can then perform our custom cleanup logic. Only when that logic is complete do we remove our finalizer from the object's metadata. This signals to the API server that it can now proceed with the actual deletion.
Let's integrate this logic.
// controllers/sharddb_controller.go
const shardDBFinalizer = "mygroup.example.com/finalizer"
func (r *ShardDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// ... (fetch ShardDB instance as before)
// Check if the ShardDB instance is marked to be deleted
isShardDBMarkedToBeDeleted := shardDB.GetDeletionTimestamp() != nil
if isShardDBMarkedToBeDeleted {
if controllerutil.ContainsFinalizer(shardDB, shardDBFinalizer) {
// Run finalization logic. If it fails, don't remove the finalizer so we can retry.
if err := r.finalizeShardDB(ctx, logger, shardDB); err != nil {
return ctrl.Result{}, err
}
// Remove finalizer. Once all finalizers are removed, the object will be deleted.
controllerutil.RemoveFinalizer(shardDB, shardDBFinalizer)
err := r.Update(ctx, shardDB)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// Add finalizer for this CR if it doesn't exist
if !controllerutil.ContainsFinalizer(shardDB, shardDBFinalizer) {
controllerutil.AddFinalizer(shardDB, shardDBFinalizer)
err = r.Update(ctx, shardDB)
if err != nil {
return ctrl.Result{}, err
}
}
// ... (rest of the reconciliation logic: StatefulSet, Deployment, etc.)
return ctrl.Result{}, nil
}
func (r *ShardDBReconciler) finalizeShardDB(ctx context.Context, logger logr.Logger, s *mygroupv1alpha1.ShardDB) error {
// This is where you put your custom cleanup logic.
// For example, scaling down the cache deployment to 0 and waiting for pods to terminate.
logger.Info("Performing finalization for ShardDB resource")
cacheDep := &appsv1.Deployment{}
err := r.Get(ctx, client.ObjectKey{Name: s.Name + "-cache", Namespace: s.Namespace}, cacheDep)
if err != nil {
if errors.IsNotFound(err) {
// If cache doesn't exist, there's nothing to do.
logger.Info("Cache deployment not found, finalization complete.")
return nil
}
return err
}
// Scale down the deployment and wait
if cacheDep.Spec.Replicas != nil && *cacheDep.Spec.Replicas > 0 {
logger.Info("Scaling down cache deployment before deletion")
zeroReplicas := int32(0)
cacheDep.Spec.Replicas = &zeroReplicas
if err := r.Update(ctx, cacheDep); err != nil {
return err
}
// We can't wait here synchronously. We requeue and check the replica count on the next reconcile.
// In a real scenario, you might check `cacheDep.Status.Replicas`.
logger.Info("Cache deployment scaled to 0. Requeuing to confirm termination.")
return fmt.Errorf("cache deployment is still terminating") // Using an error forces a requeue
}
logger.Info("Successfully finalized ShardDB")
return nil
}
Note the edge case handling: Our finalizer logic is also idempotent. If it's called while the cache is still scaling down, it returns an error, which causes controller-runtime to requeue the request. The finalizer isn't removed, and the logic runs again later, until the cache Deployment has 0 replicas and the function can return nil.
Self-Healing and Performance: Watches and Leader Election
Our controller is robust, but it can be better. What happens if an administrator manually deletes the StatefulSet our controller manages? As it stands, our controller would only notice this if the ShardDB CR itself were updated, triggering a reconcile.
To make the controller truly self-healing, we need to watch the resources it owns. controller-runtime makes this straightforward in our SetupWithManager function.
// controllers/sharddb_controller.go
func (r *ShardDBReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mygroupv1alpha1.ShardDB{}).
Owns(&appsv1.StatefulSet{}).
Owns(&appsv1.Deployment{}).
Complete(r)
}
* For(&mygroupv1alpha1.ShardDB{}): This tells the controller to watch for events (create, update, delete) on ShardDB resources.
Owns(&appsv1.StatefulSet{}): This is the key. It sets up a watch on StatefulSet resources. If a StatefulSet is changed or deleted, the manager will check if it has an owner reference pointing to a ShardDB. If it does, it will trigger a reconciliation request for that parent* ShardDB object. Our reconciliation loop will then run, notice the StatefulSet is missing, and recreate it.
High Availability with Leader Election
In a production environment, you will run multiple replicas of your controller for high availability. However, you only want one replica to be active at any given time to prevent two controllers from operating on the same ShardDB resource simultaneously, which could lead to race conditions and inconsistent state.
Fortunately, controller-runtime handles this for you out of the box. When you scaffold a project with kubebuilder, main.go is configured to enable leader election by default. It uses Kubernetes Lease objects in the controller's namespace to coordinate.
// main.go
// ...
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: true, // This is the magic!
LeaderElectionID: "f8f3b7f5.example.com",
})
// ...
No extra code is needed in your reconciler. The framework ensures that only the elected leader will receive reconciliation requests.
Production-Grade Status Reporting
Finally, let's circle back to the status subresource. Simply creating child resources isn't enough; we must accurately report the aggregate status back to the user on our ShardDB object. We will use the metav1.Condition type we defined earlier.
// A helper library for managing conditions is highly recommended, like controller-runtime/pkg/reconcile/conditions
// For simplicity, we'll manage it manually here.
// ... inside the Reconcile function, at the end ...
// 5. Update the ShardDB status
// First, let's reflect the current state
shardDB.Status.ReadyShards = sts.Status.ReadyReplicas
shardDB.Status.CacheReady = cacheDep.Status.ReadyReplicas > 0
// Now, set the overall condition
availableCondition := metav1.Condition{
Type: "Available",
Status: metav1.ConditionFalse,
Reason: "Progressing",
Message: "ShardDB components are being provisioned",
}
if shardDB.Status.ReadyShards == shardDB.Spec.Shards && shardDB.Status.CacheReady {
availableCondition.Status = metav1.ConditionTrue
availableCondition.Reason = "Ready"
availableCondition.Message = "ShardDB cluster is fully available"
}
// This is a simplified way to set a condition. In a real controller, you'd use a helper
// to properly merge and update conditions without clobbering others.
// See: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile/conditions
found := false
for i, cond := range shardDB.Status.Conditions {
if cond.Type == "Available" {
// Don't update if the status and reason are the same
if cond.Status != availableCondition.Status || cond.Reason != availableCondition.Reason {
shardDB.Status.Conditions[i] = availableCondition
}
found = true
break
}
}
if !found {
shardDB.Status.Conditions = append(shardDB.Status.Conditions, availableCondition)
}
err = r.Status().Update(ctx, shardDB)
if err != nil {
logger.Error(err, "Failed to update ShardDB status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
Note the call to r.Status().Update(), not r.Update(). This specifically targets the /status subresource, which is the correct pattern for updating status fields.
Conclusion
We've moved far beyond a simple controller. By implementing a stateful reconciliation loop, leveraging owner references, ensuring graceful deletion with finalizers, and enabling self-healing with watches, we have constructed a robust, production-ready operator. This controller encapsulates the complex domain knowledge of our ShardDB application, presenting users with a simple, declarative API to manage its entire lifecycle.
This pattern is the foundation of modern Kubernetes automation. When you find yourself writing complex shell scripts or multi-stage CI/CD pipelines to deploy an application, take a step back and ask if this logic could be better encoded into a custom controller. By extending the Kubernetes API itself, you create powerful, resilient, and truly cloud-native solutions.