Orchestrating Stateful Sets with Custom Kubernetes Operators & CRDs
The Limits of Declarative Primitives for Stateful Workloads
As senior engineers, we appreciate the declarative power of Kubernetes. Deployments, Services, and StatefulSets provide a robust foundation for stateless and simple stateful applications. However, when managing complex, distributed stateful systems—like a custom NoSQL cluster, a sharded message queue, or a multi-node search engine—these primitives fall short.
A StatefulSet can ensure ordered, graceful deployment and scaling, stable network identifiers, and persistent storage. But it cannot natively handle application-specific logic such as:
Attempting to solve these problems with Helm charts and init containers often leads to brittle, complex scripts that are difficult to test and maintain. The state of the application becomes decoupled from the state declared in Kubernetes. This is the operational gap the Operator Pattern is designed to fill.
An Operator is an application-specific controller that extends the Kubernetes API to create, configure, and manage instances of complex applications on behalf of a user. It codifies human operational knowledge into software that can reliably manage the application's lifecycle.
This article will guide you through building a production-grade Operator in Go to manage a hypothetical DistributedDB cluster. We won't focus on the boilerplate of setting up a project with kubebuilder or operator-sdk. Instead, we'll dive straight into the heart of the matter: the advanced logic within the reconciliation loop that distinguishes a toy operator from a production-ready one.
1. Defining the API: The `DistributedDB` Custom Resource Definition (CRD)
The foundation of any Operator is its Custom Resource Definition (CRD). This defines our new API object, DistributedDB, which users will interact with. A well-designed CRD spec is declarative and intuitive, while the status subresource provides crucial feedback about the system's state.
Here is our DistributedDB CRD definition. Pay close attention to the spec for user-defined state and the status for operator-managed state.
api/v1alpha1/distributeddb_types.go
package v1alpha1
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DistributedDBSpec defines the desired state of DistributedDB
type DistributedDBSpec struct {
// Number of desired pods. This is a pointer to distinguish between explicit
// zero and not specified. Defaults to 3.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=3
Replicas *int32 `json:"replicas"`
// The container image for the database.
// +kubebuilder:validation:Required
Image string `json:"image"`
// Storage configuration for the database pods.
// +kubebuilder:validation:Required
Storage StorageConfig `json:"storage"`
// BackupPolicy defines the automated backup schedule and destination.
// +optional
BackupPolicy *BackupPolicy `json:"backupPolicy,omitempty"`
}
// StorageConfig defines the persistent storage for database nodes.
type StorageConfig struct {
// The Kubernetes storage class to use for persistent volumes.
StorageClassName string `json:"storageClassName"`
// The size of the persistent volume, e.g., "10Gi".
// +kubebuilder:validation:Pattern=`^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$`
Size string `json:"size"`
}
// BackupPolicy defines the backup strategy.
type BackupPolicy struct {
// Cron schedule for backups, e.g., "0 0 * * *" for daily at midnight.
// +kubebuilder:validation:Pattern=`^(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|(@every (\d+(ns|us|ms|s|m|h))+)|((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7})$`
Schedule string `json:"schedule"`
// S3-compatible storage endpoint for backups.
S3Endpoint string `json:"s3Endpoint"`
// Secret name containing S3 credentials (accessKey, secretKey).
S3CredentialsSecret string `json:"s3CredentialsSecret"`
}
// DistributedDBStatus defines the observed state of DistributedDB
type DistributedDBStatus struct {
// Conditions show the state of the cluster in a structured way.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
// The current version of the database cluster.
Version string `json:"version,omitempty"`
// Total number of non-terminated pods targeted by this deployment (their labels match the selector).
// +optional
Replicas int32 `json:"replicas,omitempty"`
// Total number of ready pods targeted by this deployment.
// +optional
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// ObservedGeneration is the most recent generation observed for this DistributedDB. It corresponds to the
// DistributedDB's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Version",type="string",JSONPath=".status.version"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// DistributedDB is the Schema for the distributeddbs API
type DistributedDB struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DistributedDBSpec `json:"spec,omitempty"`
Status DistributedDBStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// DistributedDBList contains a list of DistributedDB
type DistributedDBList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []DistributedDB `json:"items"`
}
func init() {
SchemeBuilder.Register(&DistributedDB{}, &DistributedDBList{})
}
Key Design Choices:
* +kubebuilder markers: These are crucial for generating the CRD manifest, validation rules, and even kubectl print columns (+kubebuilder:printcolumn).
* status subresource: The //+kubebuilder:subresource:status marker is critical. It ensures that controllers can only modify the .status field, preventing conflicts with user modifications to .spec.
* Conditions: Using metav1.Condition is a Kubernetes best practice. It provides a standard, extensible way to report the status of the resource, making it easy for humans and other controllers to understand its state (e.g., Type: Available, Status: True, Reason: QuorumReached).
* ObservedGeneration: This field in the status is essential for robust reconciliation. It allows the controller to know if it has already processed the latest changes to the spec.
2. The Core Logic: The Reconciliation Loop
The Reconcile function in our controller is the brain of the operator. It's invoked whenever there's a change to a DistributedDB resource or any of the resources it owns (like a StatefulSet). The goal of each reconciliation is to drive the current state of the cluster towards the desired state defined in the DistributedDB spec.
A production-grade reconciliation loop follows a clear, idempotent pattern:
DistributedDB instance for the given request.StatefulSet, Service, etc.) exist and match the spec..status field.Here's the skeleton of our Reconcile method in controllers/distributeddb_controller.go:
package controllers
import (
"context"
"time"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
dbv1alpha1 "my.domain/distdb/api/v1alpha1"
)
// DistributedDBReconciler reconciles a DistributedDB object
type DistributedDBReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger
}
const distributedDBFinalizer = "db.my.domain/finalizer"
//+kubebuilder:rbac:groups=db.my.domain,resources=distributeddbs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=db.my.domain,resources=distributeddbs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=db.my.domain,resources=distributeddbs/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *DistributedDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. Fetch the DistributedDB instance
db := &dbv1alpha1.DistributedDB{}
err := r.Get(ctx, req.NamespacedName, db)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("DistributedDB resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get DistributedDB")
return ctrl.Result{}, err
}
// 2. Handle Deletion Logic with Finalizers
isMarkedForDeletion := db.GetDeletionTimestamp() != nil
if isMarkedForDeletion {
if containsString(db.GetFinalizers(), distributedDBFinalizer) {
if err := r.finalizeDistributedDB(ctx, db); err != nil {
// Don't remove finalizer if cleanup fails, so we can retry.
return ctrl.Result{}, err
}
// Cleanup successful, remove the finalizer.
db.SetFinalizers(removeString(db.GetFinalizers(), distributedDBFinalizer))
if err := r.Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// Add finalizer if it doesn't exist
if !containsString(db.GetFinalizers(), distributedDBFinalizer) {
db.SetFinalizers(append(db.GetFinalizers(), distributedDBFinalizer))
if err := r.Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
// 3. Reconcile owned resources
// Reconcile Headless Service
headlessSvc, err := r.reconcileHeadlessService(ctx, db)
if err != nil {
return ctrl.Result{}, err
}
// Reconcile StatefulSet
sts, err := r.reconcileStatefulSet(ctx, db, headlessSvc)
if err != nil {
return ctrl.Result{}, err
}
// 4. Update Status
// This is a simplified status update. A production operator would have more logic.
db.Status.ReadyReplicas = sts.Status.ReadyReplicas
db.Status.Replicas = sts.Status.Replicas
db.Status.Version = db.Spec.Image // Simplified, real logic would check pod images
db.Status.ObservedGeneration = db.Generation
// ... more complex status condition logic would go here ...
if err := r.Status().Update(ctx, db); err != nil {
logger.Error(err, "Failed to update DistributedDB status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DistributedDBReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dbv1alpha1.DistributedDB{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Complete(r)
}
// (Helper functions like containsString, removeString, and finalizeDistributedDB would be defined elsewhere)
3. Advanced Pattern: Idempotent Resource Reconciliation
A core task is to create and manage the underlying StatefulSet. This must be done idempotently. If the StatefulSet doesn't exist, we create it. If it does exist, we check if its spec has drifted from what our DistributedDB CR requires. If so, we update it.
Crucially, we must also establish ownership. By setting our DistributedDB instance as the owner of the StatefulSet, we ensure that when the DistributedDB is deleted, the StatefulSet is automatically garbage-collected by Kubernetes.
Here’s a robust implementation for reconciling the StatefulSet:
// reconcileStatefulSet ensures the StatefulSet for the DistributedDB exists and is configured correctly.
func (r *DistributedDBReconciler) reconcileStatefulSet(ctx context.Context, db *dbv1alpha1.DistributedDB, headlessSvc *corev1.Service) (*appsv1.StatefulSet, error) {
logger := log.FromContext(ctx)
desiredSts := r.desiredStatefulSet(db, headlessSvc)
// Set DistributedDB instance as the owner and controller
if err := ctrl.SetControllerReference(db, desiredSts, r.Scheme); err != nil {
return nil, err
}
// Check if the StatefulSet already exists
foundSts := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKey{Name: desiredSts.Name, Namespace: desiredSts.Namespace}, foundSts)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", desiredSts.Namespace, "StatefulSet.Name", desiredSts.Name)
err = r.Create(ctx, desiredSts)
if err != nil {
return nil, err
}
// StatefulSet created successfully - return and requeue to check status later
return desiredSts, nil
} else if err != nil {
return nil, err
}
// StatefulSet exists, check for drift and update if needed
// A simple comparison of replicas and image. A real-world operator would need a much more sophisticated diffing mechanism.
if *foundSts.Spec.Replicas != *desiredSts.Spec.Replicas || foundSts.Spec.Template.Spec.Containers[0].Image != desiredSts.Spec.Template.Spec.Containers[0].Image {
logger.Info("Updating existing StatefulSet", "StatefulSet.Namespace", foundSts.Namespace, "StatefulSet.Name", foundSts.Name)
foundSts.Spec = desiredSts.Spec // Overwrite the spec with our desired state
if err = r.Update(ctx, foundSts); err != nil {
return nil, err
}
}
return foundSts, nil
}
// desiredStatefulSet returns a StatefulSet object for the given DistributedDB
func (r *DistributedDBReconciler) desiredStatefulSet(db *dbv1alpha1.DistributedDB, headlessSvc *corev1.Service) *appsv1.StatefulSet {
labels := map[string]string{"app": "distributed-db", "cluster": db.Name}
replicas := db.Spec.Replicas
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name,
Namespace: db.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: replicas,
Selector: &metav1.LabelSelector{MatchLabels: labels},
ServiceName: headlessSvc.Name,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: db.Spec.Image,
Name: "database",
Ports: []corev1.ContainerPort{{
ContainerPort: 8080,
Name: "db-port",
}},
VolumeMounts: []corev1.VolumeMount{{
Name: "data",
MountPath: "/var/lib/data",
}},
}},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{Name: "data"},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
StorageClassName: &db.Spec.Storage.StorageClassName,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(db.Spec.Storage.Size),
},
},
},
}},
},
}
}
Production Consideration: The simple spec comparison (foundSts.Spec.Replicas != desiredSts.Spec.Replicas || ...) is naive. A production operator should use a more robust method, such as equality.Semantic.DeepEqual, and carefully consider which fields are mutable and which should be ignored to prevent unnecessary updates triggered by Kubernetes defaults or mutating webhooks.
4. Advanced Pattern: Coordinated Rolling Upgrades
This is where an Operator truly outperforms a standard StatefulSet. Imagine our DistributedDB requires a specific, coordinated upgrade procedure when the spec.image changes:
StatefulSet update strategy to OnDelete to prevent automatic updates.db-2, then db-1, then db-0).- For each pod:
a. Perform a pre-upgrade action (e.g., tell the cluster to migrate leadership away from this node).
b. Delete the pod.
c. Wait for the Kubernetes controller to recreate the pod with the new image and for it to become Ready.
d. Perform a post-upgrade action (e.g., verify the node has rejoined the cluster successfully).
status.version and set the status condition to Available.This logic resides within the Reconcile function, creating a state machine that progresses through the upgrade. It requires careful status management to track progress and ensure the process can be resumed if the operator restarts.
Here’s a conceptual sketch of the upgrade logic within the main Reconcile loop:
// Inside Reconcile function, after fetching the StatefulSet...
// Check if an upgrade is needed
upgradeNeeded := db.Spec.Image != db.Status.Version && db.Status.Version != ""
if upgradeNeeded {
logger.Info("Starting coordinated upgrade", "FromVersion", db.Status.Version, "ToVersion", db.Spec.Image)
// 1. Ensure StatefulSet is in OnDelete mode
if sts.Spec.UpdateStrategy.Type != appsv1.OnDeleteStatefulSetStrategyType {
sts.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{Type: appsv1.OnDeleteStatefulSetStrategyType}
if err := r.Update(ctx, sts); err != nil {
logger.Error(err, "Failed to switch StatefulSet to OnDelete strategy for upgrade")
return ctrl.Result{}, err
}
// Requeue to ensure the update has propagated before we start deleting pods
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
// 2. Iterate pods in reverse order
for i := *db.Spec.Replicas - 1; i >= 0; i-- {
pod := &corev1.Pod{}
podName := fmt.Sprintf("%s-%d", db.Name, i)
err := r.Get(ctx, client.ObjectKey{Name: podName, Namespace: db.Namespace}, pod)
if err != nil {
// Pod might not exist yet, which is fine, we just continue.
logger.Error(err, "Could not get pod for upgrade check", "PodName", podName)
continue
}
// Check if this pod needs upgrading
if pod.Spec.Containers[0].Image != db.Spec.Image {
logger.Info("Upgrading pod", "PodName", podName)
// a. Perform pre-upgrade action (e.g., via exec or an API call to the pod)
// if err := r.executePreUpgradeAction(ctx, pod); err != nil { ... }
// b. Delete the pod
if err := r.Delete(ctx, pod); err != nil {
logger.Error(err, "Failed to delete pod for upgrade", "PodName", podName)
return ctrl.Result{}, err
}
// c. Requeue and wait for the pod to be ready. This is critical.
// We stop the loop here and let the next reconciliation check its status.
logger.Info("Pod deleted for upgrade, requeueing to wait for it to become ready", "PodName", podName)
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}
// d. Check if the pod is ready after upgrade before moving to the next one
if !isPodReady(pod) {
logger.Info("Waiting for upgraded pod to become ready", "PodName", podName)
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}
}
// 4. All pods are upgraded. Finalize the upgrade.
logger.Info("Coordinated upgrade complete.")
// Switch StatefulSet back to RollingUpdate (optional, but good practice)
sts.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{Type: appsv1.RollingUpdateStatefulSetStrategyType}
if err := r.Update(ctx, sts); err != nil {
return ctrl.Result{}, err
}
// Update status to reflect completion
db.Status.Version = db.Spec.Image
// ... update conditions ...
if err := r.Status().Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
// Upgrade is done, no need to requeue.
return ctrl.Result{}, nil
}
5. Edge Cases and Performance Optimizations
A. Safe Deletion with Finalizers
What if a user runs kubectl delete distributeddb my-cluster? We don't want Kubernetes to immediately delete the CR and orphan the underlying database backups. This is where finalizers come in.
A finalizer is a key in the resource's metadata that tells Kubernetes to wait for a controller to perform cleanup actions before fully deleting the resource. Our Reconcile loop already has the skeleton for this.
The finalizeDistributedDB function would contain the critical pre-deletion logic:
func (r *DistributedDBReconciler) finalizeDistributedDB(ctx context.Context, db *dbv1alpha1.DistributedDB) error {
logger := log.FromContext(ctx)
// Example: Trigger a final backup before deletion
if db.Spec.BackupPolicy != nil {
logger.Info("Performing final backup before deletion...")
// ... logic to trigger a final backup job ...
// This could involve creating a Kubernetes Job and waiting for it to complete.
// If the backup fails, we return an error, and the finalizer is NOT removed,
// ensuring the deletion is retried.
}
logger.Info("Successfully finalized DistributedDB resources")
return nil
}
B. API Server Load and Watch Predicates
By default, our controller will reconcile for any change to the DistributedDB CR or the StatefulSet it owns. This includes frequent status updates on the StatefulSet pods, which can cause our operator to reconcile needlessly, increasing API server load.
We can filter these events using predicate functions in SetupWithManager. A common and highly effective strategy is to only trigger reconciliation when the metadata.generation of a resource changes. The generation is only incremented when the resource's spec is modified.
// In SetupWithManager...
import "sigs.k8s.io/controller-runtime/pkg/predicate"
func (r *DistributedDBReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dbv1alpha1.DistributedDB{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
This simple change dramatically reduces unnecessary reconciliation loops caused by status updates, making our operator a better citizen in the cluster.
C. Requeue Strategy and Exponential Backoff
If an operation fails (e.g., an API call to a cloud provider for a backup fails), we should not immediately retry. This can lead to tight loops that overwhelm the external system or the API server.
controller-runtime provides exponential backoff for retries by default when you return an error. However, for controlled delays where there isn't an error (like waiting for a pod to become ready), you should use ctrl.Result{RequeueAfter: duration}. This signals to the manager that the reconciliation was successful for now, but should be re-run after a specific time.
Conclusion
Building a Kubernetes Operator is a significant step up from using built-in controllers. It allows you to encapsulate complex, domain-specific operational knowledge into a robust, automated system that integrates seamlessly with the Kubernetes control plane.
We've moved beyond the basics to explore production-critical patterns: idempotent reconciliation with owner references, coordinated stateful upgrades that primitives like StatefulSet cannot handle alone, safe resource lifecycle management with finalizers, and performance optimizations using watch predicates.
By mastering these advanced techniques, you can transform manual, error-prone operational tasks into reliable, declarative, and automated processes, unlocking the full power of Kubernetes for even the most complex stateful applications.