Building a Stateful K8s Operator in Go with Finalizers & Sub-resources
Beyond StatefulSets: The Case for a Stateful Application Operator
Standard Kubernetes controllers like StatefulSet and Deployment are powerful, but they operate on a generic model of application lifecycle. They excel at managing pods, persistent volumes, and network identity, but their knowledge ends there. When managing a complex stateful application—a sharded database, a distributed cache, or a legacy system with specific operational needs—the lifecycle often involves application-aware steps that generic controllers cannot handle.
Consider these scenarios:
* Schema Migrations: Before upgrading an application image, a database schema migration script must run to completion. A standard RollingUpdate strategy is blind to this requirement.
* Graceful Shutdown & Data Offloading: Before a node is terminated, it must flush its in-memory cache to persistent storage or hand off its partitions to another node. A simple preStop hook might not be sufficient for complex, multi-minute processes that require coordination.
* Dynamic Re-sharding: As load increases, a sharded database might need to add new shards and rebalance data. This is a complex workflow, not a simple scaling event.
* Backup Orchestration: Deleting a database instance shouldn't just remove the pods and PVCs; it should trigger a final backup to object storage for disaster recovery.
This is the domain of the Operator Pattern. An operator encodes this domain-specific operational knowledge into a custom Kubernetes controller. For senior engineers, the challenge isn't just building a controller that creates a StatefulSet; it's building one that handles the entire lifecycle, especially the complex edge cases of deletion and state reporting. This article focuses on two advanced, non-negotiable patterns for production-ready operators: Finalizers for managing complex deletion workflows and Status Sub-resources for providing a clear, reliable API surface for your application's state.
We will build an operator for a fictional ClusteredCache custom resource. This operator will manage a StatefulSet but will add critical logic for final backups on deletion and provide detailed status reporting.
The `ClusteredCache` Custom Resource Definition (CRD)
Everything starts with the API. We'll define a ClusteredCacheSpec for the desired state and a ClusteredCacheStatus for the observed state. We use kubebuilder markers to generate the CRD manifest and boilerplate Go code.
api/v1/clusteredcache_types.go
package v1
import (
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// ClusteredCacheSpec defines the desired state of ClusteredCache
type ClusteredCacheSpec struct {
// +kubebuilder:validation:Minimum=1
// Replicas is the number of cache nodes.
Replicas int32 `json:"replicas"`
// Image is the container image for the cache node.
Image string `json:"image"`
// BackupConfig specifies the configuration for final backups.
// +optional
BackupConfig *BackupConfig `json:"backupConfig,omitempty"`
}
// BackupConfig defines parameters for the final backup job.
type BackupConfig struct {
// Image for the backup job container.
Image string `json:"image"`
// S3Bucket is the target bucket for the backup.
S3Bucket string `json:"s3Bucket"`
}
// ClusteredCacheStatus defines the observed state of ClusteredCache
type ClusteredCacheStatus struct {
// ReadyReplicas is the number of cache nodes that are fully ready.
ReadyReplicas int32 `json:"readyReplicas"`
// Conditions represent the latest available observations of the ClusteredCache's state.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[?(@.type==\"Available\")].status"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// ClusteredCache is the Schema for the clusteredcaches API
type ClusteredCache struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ClusteredCacheSpec `json:"spec,omitempty"`
Status ClusteredCacheStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// ClusteredCacheList contains a list of ClusteredCache
type ClusteredCacheList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ClusteredCache `json:"items"`
}
func init() {
SchemeBuilder.Register(&ClusteredCache{}, &ClusteredCacheList{})
}
Two critical kubebuilder markers are present:
//+kubebuilder:subresource:status: This is the most important annotation. It tells the Kubernetes API server to treat the .status field as a separate sub-resource. This means updates to the main object (.spec) and the status object (.status) are handled via different API endpoints (/clusteredcaches/my-cache vs /clusteredcaches/my-cache/status). This prevents race conditions where an operator's status update overwrites a user's spec change that happened concurrently.//+kubebuilder:printcolumn: These provide a better user experience with kubectl get clusteredcaches.The Core Reconciliation Loop: Idempotency is Key
The heart of any operator is the Reconcile function. Its core principle is idempotency. It should be callable at any time, with the same input CR, and converge the system to the desired state without causing unintended side effects. It's not a one-time script; it's a continuous control loop.
Here's the skeleton of our reconciler, focusing on managing the child StatefulSet.
internal/controller/clusteredcache_controller.go
package controller
import (
"context"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
cachev1 "github.com/your-repo/clusteredcache-operator/api/v1"
)
// ClusteredCacheReconciler reconciles a ClusteredCache object
type ClusteredCacheReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=cache.my.domain,resources=clusteredcaches,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=cache.my.domain,resources=clusteredcaches/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=cache.my.domain,resources=clusteredcaches/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
func (r *ClusteredCacheReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. Fetch the ClusteredCache instance
instance := &cachev1.ClusteredCache{}
err := r.Get(ctx, req.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("ClusteredCache resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get ClusteredCache")
return ctrl.Result{}, err
}
// ... Finalizer and Deletion logic will go here ...
// 2. Reconcile the child StatefulSet
foundSts := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, foundSts)
if err != nil && errors.IsNotFound(err) {
// Define a new StatefulSet
sts := r.statefulSetForClusteredCache(instance)
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
err = r.Create(ctx, sts)
if err != nil {
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
return ctrl.Result{}, err
}
// StatefulSet created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
logger.Error(err, "Failed to get StatefulSet")
return ctrl.Result{}, err
}
// 3. Ensure the StatefulSet spec is up to date
size := instance.Spec.Replicas
image := instance.Spec.Image
updateNeeded := false
if *foundSts.Spec.Replicas != size {
foundSts.Spec.Replicas = &size
updateNeeded = true
}
if foundSts.Spec.Template.Spec.Containers[0].Image != image {
foundSts.Spec.Template.Spec.Containers[0].Image = image
updateNeeded = true
}
if updateNeeded {
logger.Info("Updating StatefulSet spec", "StatefulSet.Namespace", foundSts.Namespace, "StatefulSet.Name", foundSts.Name)
err = r.Update(ctx, foundSts)
if err != nil {
logger.Error(err, "Failed to update StatefulSet", "StatefulSet.Namespace", foundSts.Namespace, "StatefulSet.Name", foundSts.Name)
return ctrl.Result{}, err
}
}
// ... Status update logic will go here ...
return ctrl.Result{}, nil
}
// statefulSetForClusteredCache returns a ClusteredCache StatefulSet object
func (r *ClusteredCacheReconciler) statefulSetForClusteredCache(cc *cachev1.ClusteredCache) *appsv1.StatefulSet {
labels := map[string]string{"app": cc.Name}
replicas := cc.Spec.Replicas
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: cc.Name,
Namespace: cc.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: cc.Spec.Image,
Name: "cache",
Ports: []corev1.ContainerPort{{
ContainerPort: 6379, // Example port
Name: "client",
}},
}},
},
},
},
},
ctrl.SetControllerReference(cc, sts, r.Scheme)
return sts
}
func (r *ClusteredCacheReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&cachev1.ClusteredCache{}).
Owns(&appsv1.StatefulSet{}).
Owns(&batchv1.Job{}). // Also watch jobs for our backup logic
Complete(r)
}
A key function here is ctrl.SetControllerReference. This establishes an ownership relationship between our ClusteredCache CR and the StatefulSet. This is crucial for Kubernetes garbage collection: if the ClusteredCache is deleted, the API server will automatically delete the StatefulSet it owns. But what if we need to run a backup before that deletion happens? This is where garbage collection is insufficient.
Advanced Deletion Logic with Finalizers
A finalizer is a key in the metadata.finalizers list of an object. When a user requests to delete an object that has finalizers, the API server does not delete it immediately. Instead, it sets the metadata.deletionTimestamp field and leaves the object in a "terminating" state. The object is only truly removed from etcd after its finalizers list is empty.
This mechanism gives our operator a hook to perform pre-delete actions. Our reconciliation loop will now have a new branch: if deletionTimestamp is set, we execute our cleanup logic. Once complete, we remove our finalizer, signaling to the API server that it can proceed with the deletion.
Let's implement this.
Step 1: Define a finalizer name
// internal/controller/clusteredcache_controller.go
const clusteredCacheFinalizer = "cache.my.domain/finalizer"
Step 2: Modify the Reconcile function
We will add the logic at the beginning of the function, right after fetching the instance.
// ... inside Reconcile function, after fetching the instance
// Check if the object is being deleted
isMarkedForDeletion := instance.GetDeletionTimestamp() != nil
if isMarkedForDeletion {
if controllerutil.ContainsFinalizer(instance, clusteredCacheFinalizer) {
// Run our finalization logic. If it fails, we'll try again later.
if err := r.finalizeClusteredCache(ctx, instance); err != nil {
// Log the error but don't block. The reconciliation will be retried.
logger.Error(err, "Failed to finalize ClusteredCache")
return ctrl.Result{}, err
}
// Remove finalizer. Once all finalizers are removed, the object will be deleted.
logger.Info("Removing finalizer after successful finalization")
controllerutil.RemoveFinalizer(instance, clusteredCacheFinalizer)
err := r.Update(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil // Stop reconciliation as the item is being deleted
}
// Add finalizer for this CR if it doesn't exist
if !controllerutil.ContainsFinalizer(instance, clusteredCacheFinalizer) {
logger.Info("Adding finalizer for the ClusteredCache")
controllerutil.AddFinalizer(instance, clusteredCacheFinalizer)
err = r.Update(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}
}
// ... rest of the Reconcile function (StatefulSet logic, etc.)
Step 3: Implement the finalization logic
Our finalizeClusteredCache function will check if a backup is configured. If so, it will create a Kubernetes Job to perform the backup. The key here is that we don't wait for the job to finish synchronously. We create it, and on the next reconciliation, if the object is still terminating, we check the status of the job. Only when the job succeeds do we consider finalization complete.
// internal/controller/clusteredcache_controller.go
func (r *ClusteredCacheReconciler) finalizeClusteredCache(ctx context.Context, cc *cachev1.ClusteredCache) error {
logger := log.FromContext(ctx)
if cc.Spec.BackupConfig == nil {
logger.Info("No backup config found, skipping final backup.")
return nil
}
// Check if a backup job already exists
jobName := cc.Name + "-final-backup"
backupJob := &batchv1.Job{}
err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: cc.Namespace}, backupJob)
if err != nil && errors.IsNotFound(err) {
// Create the backup job
logger.Info("Creating final backup job")
job := r.newBackupJob(cc)
if err := r.Create(ctx, job); err != nil {
logger.Error(err, "Failed to create final backup job")
return err
}
// Job created, requeue to check its status later
return fmt.Errorf("backup job created, waiting for completion")
} else if err != nil {
logger.Error(err, "Failed to get backup job")
return err
}
// Check job status
if isJobFinished(backupJob) {
if backupJob.Status.Succeeded > 0 {
logger.Info("Final backup job completed successfully.")
return nil // Success!
} else {
logger.Error(fmt.Errorf("backup job failed"), "Final backup job did not succeed")
// You might want to add more sophisticated error handling here,
// e.g., inspect job logs, or decide to proceed with deletion anyway after N retries.
return fmt.Errorf("backup job failed")
}
}
// Job is still running
logger.Info("Waiting for final backup job to complete")
return fmt.Errorf("backup job still running")
}
func (r *ClusteredCacheReconciler) newBackupJob(cc *cachev1.ClusteredCache) *batchv1.Job {
// ... implementation to create a batchv1.Job object ...
// This job would contain the logic to connect to the cache and back it up to S3.
// For brevity, the full struct is omitted, but it would use cc.Spec.BackupConfig.
job := &batchv1.Job{ /* ... definition ... */ }
ctrl.SetControllerReference(cc, job, r.Scheme)
return job
}
func isJobFinished(job *batchv1.Job) bool {
for _, c := range job.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
return true
}
}
return false
}
Edge Case Analysis:
* Job Fails: Our current logic will block deletion forever if the backup job fails. In a real-world scenario, you might add a timeout or a retry limit, after which the operator gives up and removes the finalizer to unblock deletion, perhaps logging a critical error or firing an alert.
* Operator Restarts: The state is persisted in the Kubernetes API. If the operator restarts mid-finalization, on its next reconciliation for this CR, it will see the deletionTimestamp is set, the finalizer is present, and the backup job exists. It will simply resume checking the job's status.
* User Force Deletion: A user with sufficient RBAC can manually patch the CR to remove the finalizer. This is an escape hatch, but it bypasses our safety logic. This is an accepted risk in the Kubernetes model.
Exposing Rich State via Status Sub-resources
Simply knowing a StatefulSet exists is not enough. We need to report the true state of our ClusteredCache. Is it available? Is it being upgraded? Is it degraded?
The best practice is to use the status sub-resource and populate it with a list of Conditions, a standard Kubernetes API pattern.
Step 1: Update the Reconcile loop to update status
At the end of the Reconcile function, after all actions have been taken, we will gather the current state and perform a status update.
// ... inside Reconcile, at the end
// 4. Update the ClusteredCache status
// First, fetch the most recent version of the CR to avoid conflicts
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Create a deep copy to modify. This is a best practice.
status := instance.Status.DeepCopy()
// Update ReadyReplicas from the StatefulSet's status
status.ReadyReplicas = foundSts.Status.ReadyReplicas
// Update Conditions
// Condition: Available
availableCond := metav1.Condition{
Type: "Available",
Status: metav1.ConditionFalse,
Reason: "ReplicasNotReady",
Message: fmt.Sprintf("Waiting for all %d replicas to be ready", instance.Spec.Replicas),
}
if status.ReadyReplicas == instance.Spec.Replicas {
availableCond.Status = metav1.ConditionTrue
availableCond.Reason = "AllReplicasReady"
availableCond.Message = "All cache replicas are ready and available."
}
// This helper function (not shown, but available in libraries like operator-sdk)
// properly sets or updates the condition in the list.
SetStatusCondition(&status.Conditions, availableCond)
// If the status has changed, update it
if !reflect.DeepEqual(instance.Status, *status) {
instance.Status = *status
logger.Info("Updating ClusteredCache status")
err = r.Status().Update(ctx, instance)
if err != nil {
logger.Error(err, "Failed to update ClusteredCache status")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
The most critical line is r.Status().Update(ctx, instance). This uses the StatusWriter from the controller-runtime client, which specifically sends a PUT request to the /status sub-resource endpoint. Using r.Update(ctx, instance) would target the main resource endpoint and could lead to conflicts.
Why is this so important?
Imagine this sequence of events:
- The operator reconciles, calculates a new status, and prepares to update it.
kubectl apply -f to change spec.replicas from 3 to 5.r.Update() call, which contains the old spec (replicas: 3) and the new status, reaches the API server.apply request, with the new spec (replicas: 5), reaches the API server.If both use the main endpoint, one will overwrite the other. The operator might accidentally revert the user's scale-up request. By separating the writes (r.Update for spec changes, r.Status().Update for status changes), Kubernetes handles the conflicts on the separate sub-resources, and both changes can be applied correctly.
Performance and Production Considerations
* Reconciliation Rate Limiting: Controller-runtime's manager uses a workqueue with exponential backoff for rate limiting by default. If your reconciliation loop fails, it won't be retried immediately, preventing a tight loop that could overwhelm the Kubernetes API server. This is configurable in main.go if you have specific needs, such as a much faster or slower retry cycle.
* Informer Caches: The r.Get() calls in our reconciler are extremely fast. They do not hit etcd on every call. The controller-runtime manager sets up informers that watch for changes to ClusteredCache and StatefulSet objects and maintains a local cache. Your reconciler reads from this cache. This is highly efficient but introduces a small amount of eventual consistency. Be aware that the state you read might be a few milliseconds out of date from what's in etcd.
* Leader Election: When you deploy your operator with more than one replica for high availability, only one pod should be actively reconciling for a given CR. Controller-runtime handles this transparently by implementing leader election using Kubernetes Lease objects. The non-leader replicas will be idle but ready to take over if the leader fails.
* Metrics: Controller-runtime automatically exposes a /metrics endpoint with many useful Prometheus metrics about the controller's performance (e.g., controller_runtime_reconcile_time_seconds, controller_runtime_reconcile_errors_total). You should add custom metrics for your operator's specific logic, such as a counter for successful backup jobs or a gauge for the number of degraded ClusteredCache instances.
Advanced Testing: `envtest` for Integration
Mocking the client for unit tests is useful, but to test finalizers and interactions between objects, you need a more realistic environment. envtest from controller-runtime spins up a real kube-apiserver and etcd in-memory for your tests to run against.
Here's a sketch of an envtest for our finalizer logic:
internal/controller/suite_test.go (setup boilerplate)
internal/controller/clusteredcache_controller_test.go
var _ = Describe("ClusteredCache Controller", func() {
Context("When handling deletion with finalizer", func() {
It("Should run a backup job before deleting the resource", func() {
ctx := context.Background()
cache := &cachev1.ClusteredCache{
// ... definition with BackupConfig
}
Expect(k8sClient.Create(ctx, cache)).Should(Succeed())
// 1. Verify the finalizer was added
createdCache := &cachev1.ClusteredCache{}
Eventually(func() bool {
_ = k8sClient.Get(ctx, /* key */, createdCache)
return controllerutil.ContainsFinalizer(createdCache, clusteredCacheFinalizer)
}, "5s", "100ms").Should(BeTrue())
// 2. Delete the resource
Expect(k8sClient.Delete(ctx, createdCache)).Should(Succeed())
// 3. Verify backup job was created
backupJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, /* job key */, backupJob)
}, "5s", "100ms").Should(Succeed())
// 4. Manually update the job to be 'Complete'
backupJob.Status.Conditions = append(backupJob.Status.Conditions, batchv1.JobCondition{
Type: batchv1.JobComplete, Status: corev1.ConditionTrue,
})
backupJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, backupJob)).Should(Succeed())
// 5. Verify the ClusteredCache object is eventually garbage collected
Eventually(func() bool {
err := k8sClient.Get(ctx, /* key */, createdCache)
return errors.IsNotFound(err)
}, "5s", "100ms").Should(BeTrue())
})
})
})
This test provides high confidence that our finalizer logic is working correctly without needing a full Kubernetes cluster.
Conclusion
Building a Kubernetes operator moves beyond simple application deployment into the realm of true automated operations. While the basic reconciliation of a child resource is straightforward, production-grade reliability hinges on advanced patterns. By correctly implementing finalizers, you gain control over the entire resource lifecycle, enabling critical pre-delete actions like backups and data offloading. By strictly using status sub-resources, you create a clean, non-conflicting API that accurately reports the state of your application, making it observable and composable within the broader Kubernetes ecosystem. These patterns are the foundation upon which robust, resilient, and truly automated stateful application management is built.