Crafting a Custom K8s Operator in Go for Stateful Workloads
The Operator Pattern: Moving Beyond Off-the-Shelf Controllers
As senior engineers, we've all hit the ceiling of what standard Kubernetes resources like Deployments and even StatefulSets can manage out-of-the-box. While powerful, they are generic primitives. When you're managing a complex, stateful system—a sharded database, a distributed cache, or a message queue cluster—you have domain-specific operational knowledge that can't be expressed in a simple YAML manifest. How do you handle orchestrated rolling upgrades? What about coordinated backup and restore procedures? How do you rebalance shards when a node is added?
This is where the Operator Pattern becomes indispensable. An Operator is an application-specific controller that extends the Kubernetes API to create, configure, and manage instances of complex stateful applications on behalf of a Kubernetes user. It codifies human operational knowledge into software that is more reliable than a human operator.
This article is not an introduction. We assume you understand what an Operator is and have a working knowledge of Go and Kubernetes concepts. We will dive directly into building a production-grade DatabaseCluster operator using Kubebuilder, focusing on the nuanced, critical aspects that separate a toy project from a reliable, production-ready system.
Our goal is to implement an operator that manages a DatabaseCluster custom resource. This operator will:
StatefulSet to manage the database pods.Headless Service for stable network identity and discovery..status subresource.- Be optimized to avoid unnecessary reconciliation cycles.
Let's begin by defining our API.
Defining a Robust API with CRDs and Validation
The foundation of any operator is its Custom Resource Definition (CRD). A well-designed API makes the operator intuitive to use and robust against misconfiguration. We'll use Kubebuilder's code-generation markers to define our schema and add server-side validation.
First, we scaffold our project and API:
# Ensure you have Kubebuilder installed
kubebuilder init --domain my.domain --repo my.domain/database-operator
kubebuilder create api --group database --version v1alpha1 --kind DatabaseCluster
Now, let's edit api/v1alpha1/databasecluster_types.go to define a robust spec and status.
// api/v1alpha1/databasecluster_types.go
package v1alpha1
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DatabaseClusterSpec defines the desired state of DatabaseCluster
type DatabaseClusterSpec struct {
// Number of desired pods. This is a pointer to distinguish between explicit
// zero and not specified. Minimum 1, maximum 9.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=9
// +kubebuilder:validation:Required
Replicas *int32 `json:"replicas"`
// The container image for the database nodes.
// +kubebuilder:validation:Required
// +kubebuilder:validation:Pattern=`^.+$`
Image string `json:"image"`
// Resources required by each database node.
// +kubebuilder:validation:Required
Resources corev1.ResourceRequirements `json:"resources"`
// Storage configuration for the database nodes.
// +kubebuilder:validation:Required
Storage StorageSpec `json:"storage"`
}
// StorageSpec defines the storage configuration for the database nodes
type StorageSpec struct {
// The storage class to use for the persistent volumes.
// +kubebuilder:validation:Required
StorageClassName string `json:"storageClassName"`
// The size of the persistent volume, e.g., "10Gi".
// +kubebuilder:validation:Required
// +kubebuilder:validation:Pattern=`^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$`
Size string `json:"size"`
}
// DatabaseClusterStatus defines the observed state of DatabaseCluster
type DatabaseClusterStatus struct {
// Total number of non-terminated pods targeted by this cluster (their labels match the selector).
// +optional
Replicas int32 `json:"replicas,omitempty"`
// Total number of ready pods targeted by this cluster.
// +optional
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// Represents the latest available observations of a DatabaseCluster's current state.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[?(@.type==\"Available\")].status"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// DatabaseCluster is the Schema for the databaseclusters API
type DatabaseCluster struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DatabaseClusterSpec `json:"spec,omitempty"`
Status DatabaseClusterStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// DatabaseClusterList contains a list of DatabaseCluster
type DatabaseClusterList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []DatabaseCluster `json:"items"`
}
func init() {
SchemeBuilder.Register(&DatabaseCluster{}, &DatabaseClusterList{})
}
Key Production-Ready Details here:
// +kubebuilder:validation markers (Minimum, Maximum, Required, Pattern) generates OpenAPI v3 schema for our CRD. This means the Kubernetes API server will reject invalid DatabaseCluster resources before our controller even sees them. This is a critical first line of defense against bad configuration.Replicas *int32 is a deliberate choice. It allows us to distinguish between a user explicitly setting replicas: 0 and not specifying the field at all. This is crucial for defaulting logic.//+kubebuilder:subresource:status is vital. It tells the API server that the .status field is managed by the controller. This prevents users from modifying it and creates a separate endpoint for status updates, which improves RBAC granularity and prevents race conditions where a user's update to the spec overwrites a controller's update to the status.Conditions field in the status (using metav1.Condition) is a standard Kubernetes pattern. It provides a structured, extensible way to report the state of the resource (e.g., Available, Progressing, Degraded).After modifying the types, run make manifests generate to update the CRD and generated code.
The Core: A Robust Reconciliation Loop
The heart of any operator is the Reconcile function in controllers/databasecluster_controller.go. This function is invoked whenever a DatabaseCluster resource changes, or any of the resources it owns (like a StatefulSet) change. The key principle is idempotency: the loop must be able to run multiple times and converge on the same desired state without causing issues.
Let's structure our reconciler.
// controllers/databasecluster_controller.go
package controllers
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
databasev1alpha1 "my.domain/database-operator/api/v1alpha1"
)
const databaseClusterFinalizer = "database.my.domain/finalizer"
// DatabaseClusterReconciler reconciles a DatabaseCluster object
type DatabaseClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=database.my.domain,resources=databaseclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=database.my.domain,resources=databaseclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=database.my.domain,resources=databaseclusters/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *DatabaseClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. Fetch the DatabaseCluster instance
cluster := &databasev1alpha1.DatabaseCluster{}
err := r.Get(ctx, req.NamespacedName, cluster)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("DatabaseCluster resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get DatabaseCluster")
return ctrl.Result{}, err
}
// 2. Handle deletion with Finalizers
isMarkedForDeletion := cluster.GetDeletionTimestamp() != nil
if isMarkedForDeletion {
if controllerutil.ContainsString(cluster.GetFinalizers(), databaseClusterFinalizer) {
// Run our finalizer logic
if err := r.finalizeDatabaseCluster(ctx, cluster); err != nil {
// Don't remove the finalizer if cleanup fails, so we can retry.
return ctrl.Result{}, err
}
// Remove finalizer. Once all finalizers are removed, the object will be deleted.
controllerutil.RemoveString(cluster.GetFinalizers(), databaseClusterFinalizer)
if err := r.Update(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// Add finalizer for this CR if it doesn't exist
if !controllerutil.ContainsString(cluster.GetFinalizers(), databaseClusterFinalizer) {
controllerutil.AddString(cluster.GetFinalizers(), databaseClusterFinalizer)
if err := r.Update(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
}
// 3. Reconcile owned resources (Service, StatefulSet)
if err := r.reconcileHeadlessService(ctx, cluster); err != nil {
return ctrl.Result{}, err
}
sts, err := r.reconcileStatefulSet(ctx, cluster)
if err != nil {
return ctrl.Result{}, err
}
// 4. Update the status
if err := r.updateStatus(ctx, cluster, sts); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// ... (helper functions finalizeDatabaseCluster, reconcileHeadlessService, etc. will be defined below)
func (r *DatabaseClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&databasev1alpha1.DatabaseCluster{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Complete(r)
}
Edge Case: Graceful Deletion with Finalizers
What happens when a user runs kubectl delete databasecluster my-cluster? Without a finalizer, Kubernetes will immediately delete the CR and then garbage collect the owned StatefulSet and Service. This is often not what you want for a stateful application. You might need to:
- Tell the database leader to perform a clean shutdown.
- Take a final backup of the persistent volumes.
- Deregister the cluster from an external monitoring service.
A finalizer is a string in the metadata.finalizers list. When a resource has finalizers, a kubectl delete command only sets the metadata.deletionTimestamp. The object is not actually removed from etcd until its finalizers list is empty. This gives our controller a chance to perform cleanup actions.
Here is the implementation of our finalizeDatabaseCluster function:
// controllers/databasecluster_controller.go
func (r *DatabaseClusterReconciler) finalizeDatabaseCluster(ctx context.Context, cluster *databasev1alpha1.DatabaseCluster) error {
logger := log.FromContext(ctx)
logger.Info("Performing finalizer logic for DatabaseCluster", "name", cluster.Name)
// This is where you would add your complex cleanup logic.
// For example, calling a backup API, draining connections, etc.
// For this example, we'll just log a message.
logger.Info("Simulating graceful shutdown and backup before deletion.")
// In a real-world scenario, you might have to wait for an external job to complete.
// If that job fails, you would return an error here, and the reconciliation
// would be retried, preventing the finalizer from being removed.
logger.Info("Finalizer logic completed successfully.")
return nil
}
This pattern is critical for preventing data loss or orphaned external resources when managing stateful systems.
Reconciling Child Resources
The core of the reconciliation is ensuring the child resources (Service, StatefulSet) match the state defined in our DatabaseCluster spec. A common pattern is to construct the desired object in memory and then use the controller-runtime client to manage it.
Crucially, we must set the owner reference. controllerutil.SetControllerReference sets the DatabaseCluster as the owner of the child resource. This has two effects:
DatabaseCluster is deleted, Kubernetes automatically deletes the owned Service and StatefulSet.Here's the implementation for the Headless Service:
// controllers/databasecluster_controller.go
func (r *DatabaseClusterReconciler) reconcileHeadlessService(ctx context.Context, cluster *databasev1alpha1.DatabaseCluster) error {
logger := log.FromContext(ctx)
svc := &corev1.Service{}
svcName := cluster.Name + "-headless"
err := r.Get(ctx, client.ObjectKey{Name: svcName, Namespace: cluster.Namespace}, svc)
if err != nil && errors.IsNotFound(err) {
// Define a new service
desiredSvc := r.serviceForDatabaseCluster(cluster)
logger.Info("Creating a new Headless Service", "Service.Namespace", desiredSvc.Namespace, "Service.Name", desiredSvc.Name)
if err := r.Create(ctx, desiredSvc); err != nil {
logger.Error(err, "Failed to create new Headless Service")
return err
}
return nil
} else if err != nil {
logger.Error(err, "Failed to get Headless Service")
return err
}
// Service already exists - no updates needed for a simple headless service, but you could add update logic here if needed.
return nil
}
// serviceForDatabaseCluster returns a DatabaseCluster Service object
func (r *DatabaseClusterReconciler) serviceForDatabaseCluster(cluster *databasev1alpha1.DatabaseCluster) *corev1.Service {
labels := labelsForDatabaseCluster(cluster.Name)
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name + "-headless",
Namespace: cluster.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None", // Headless
Selector: labels,
Ports: []corev1.ServicePort{{
Port: 9000,
Name: "db",
}},
},
}
// Set DatabaseCluster instance as the owner and controller
controllerutil.SetControllerReference(cluster, svc, r.Scheme)
return svc
}
func labelsForDatabaseCluster(name string) map[string]string {
return map[string]string{"app": "databasecluster", "databasecluster_cr": name}
}
The logic for the StatefulSet is similar but more complex, as we need to handle updates (e.g., scaling replicas, changing the image).
// controllers/databasecluster_controller.go
import (
// ... other imports
"reflect"
"k8s.io/apimachinery/pkg/api/resource"
)
func (r *DatabaseClusterReconciler) reconcileStatefulSet(ctx context.Context, cluster *databasev1alpha1.DatabaseCluster) (*appsv1.StatefulSet, error) {
logger := log.FromContext(ctx)
foundSts := &appsv1.StatefulSet{}
desiredSts := r.statefulSetForDatabaseCluster(cluster)
err := r.Get(ctx, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}, foundSts)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", desiredSts.Namespace, "StatefulSet.Name", desiredSts.Name)
if err := r.Create(ctx, desiredSts); err != nil {
logger.Error(err, "Failed to create new StatefulSet")
return nil, err
}
return desiredSts, nil
} else if err != nil {
logger.Error(err, "Failed to get StatefulSet")
return nil, err
}
// StatefulSet already exists, check for updates
// A simple deep equal might be too sensitive. We check specific fields.
if *foundSts.Spec.Replicas != *desiredSts.Spec.Replicas || foundSts.Spec.Template.Spec.Containers[0].Image != desiredSts.Spec.Template.Spec.Containers[0].Image {
logger.Info("Updating StatefulSet", "name", desiredSts.Name)
foundSts.Spec.Replicas = desiredSts.Spec.Replicas
foundSts.Spec.Template.Spec.Containers = desiredSts.Spec.Template.Spec.Containers
// Note: In a real operator, you'd need a much more sophisticated update logic,
// especially for fields that are immutable after creation.
if err := r.Update(ctx, foundSts); err != nil {
logger.Error(err, "Failed to update StatefulSet")
return nil, err
}
}
return foundSts, nil
}
func (r *DatabaseClusterReconciler) statefulSetForDatabaseCluster(cluster *databasev1alpha1.DatabaseCluster) *appsv1.StatefulSet {
labels := labelsForDatabaseCluster(cluster.Name)
storageSize := resource.MustParse(cluster.Spec.Storage.Size)
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: cluster.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: labels},
ServiceName: cluster.Name + "-headless",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: cluster.Spec.Image,
Name: "database",
Resources: cluster.Spec.Resources,
Ports: []corev1.ContainerPort{{ContainerPort: 9000, Name: "db"}},
VolumeMounts: []corev1.VolumeMount{{
Name: "data",
MountPath: "/var/lib/database",
}},
}},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{Name: "data"},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
StorageClassName: &cluster.Spec.Storage.StorageClassName,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: storageSize},
},
},
}},
},
}
controllerutil.SetControllerReference(cluster, sts, r.Scheme)
return sts
}
The Status Subresource: A Single Source of Truth
How does a user (or another automated system) know the state of our DatabaseCluster? They look at the .status field. It is the controller's job to keep this field up-to-date with the observed state of the world.
A critical implementation detail: You must use the r.Status().Update() client method, not r.Update(). Using the former targets the /status subresource endpoint, which is essential for preventing conflicts and respecting RBAC.
Our updateStatus function will fetch the latest state from the owned StatefulSet and populate our CR's status.
// controllers/databasecluster_controller.go
import (
// ... other imports
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func (r *DatabaseClusterReconciler) updateStatus(ctx context.Context, cluster *databasev1alpha1.DatabaseCluster, sts *appsv1.StatefulSet) error {
logger := log.FromContext(ctx)
// Create a deep copy to avoid modifying the cache
clusterToUpdate := cluster.DeepCopy()
clusterToUpdate.Status.Replicas = sts.Status.Replicas
clusterToUpdate.Status.ReadyReplicas = sts.Status.ReadyReplicas
// Set the Available condition based on the StatefulSet's ready replicas
var newCondition metav1.Condition
if clusterToUpdate.Status.ReadyReplicas == *cluster.Spec.Replicas {
newCondition = metav1.Condition{
Type: "Available",
Status: metav1.ConditionTrue,
Reason: "AllReplicasReady",
Message: fmt.Sprintf("All %d/%d replicas are ready", clusterToUpdate.Status.ReadyReplicas, *cluster.Spec.Replicas),
}
} else {
newCondition = metav1.Condition{
Type: "Available",
Status: metav1.ConditionFalse,
Reason: "ReplicasNotReady",
Message: fmt.Sprintf("%d/%d replicas are ready", clusterToUpdate.Status.ReadyReplicas, *cluster.Spec.Replicas),
}
}
// meta.SetStatusCondition is a helper to manage the conditions list.
// It will update the existing condition or add a new one.
meta.SetStatusCondition(&clusterToUpdate.Status.Conditions, newCondition)
// Use equality check to avoid unnecessary updates
if !reflect.DeepEqual(cluster.Status, clusterToUpdate.Status) {
logger.Info("Updating DatabaseCluster status")
err := r.Status().Update(ctx, clusterToUpdate)
if err != nil {
logger.Error(err, "Failed to update DatabaseCluster status")
return err
}
}
return nil
}
This function demonstrates several best practices:
* Deep Copy: We update a deep copy of the object to avoid race conditions with the controller-runtime's shared informer cache.
* Condition Management: meta.SetStatusCondition is a standard library helper that correctly adds or updates a condition in the list, handling timestamps and transitions properly.
* Avoid Unnecessary Updates: We perform a reflect.DeepEqual check. If the status hasn't actually changed, we don't call the API server. This reduces load on the API server and prevents noisy event streams.
Performance and Scalability: Avoiding Noisy Reconciles
In a busy cluster, your operator could be triggered constantly. An update to a StatefulSet's status (e.g., a pod becoming ready) will trigger a reconciliation of our DatabaseCluster. But what if nothing in our spec has changed? The reconciliation loop will run, fetch all objects, and conclude there's nothing to do. This is wasteful.
We can dramatically improve performance by using predicates to filter which events trigger a reconciliation.
A common and highly effective optimization is to ignore status-only updates on child resources. We only care if the spec of our StatefulSet changes (which should only happen if our controller changes it) or if it's deleted. The metadata.generation field is perfect for this; it's an integer that increments only when the .spec of an object is changed.
We can apply a predicate in our SetupWithManager function:
// controllers/databasecluster_controller.go
import (
// ... other imports
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func (r *DatabaseClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&databasev1alpha1.DatabaseCluster{}).
Owns(&appsv1.StatefulSet{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&corev1.Service{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}
With predicate.GenerationChangedPredicate{}, our operator will no longer be woken up every time a pod in the StatefulSet flaps or its status is updated. It will only reconcile if the StatefulSet.spec changes, which is a much rarer event. This simple change can reduce the CPU and memory consumption of your operator by an order of magnitude in a large, active cluster.
Another key performance knob is MaxConcurrentReconciles, configured when setting up the manager in main.go. The default is 1. Increasing this allows the controller to process multiple DatabaseCluster resources in parallel. This is crucial if you expect to manage hundreds of clusters, but requires your reconciliation logic to be thread-safe (which it should be by design, as each reconciliation operates on a specific namespaced name).
Conclusion: From Automation to True Cloud-Native Operation
We've journeyed through the core components of building a production-grade Kubernetes operator. We moved beyond the basic scaffolding to implement critical patterns for stateful application management: robust API validation, graceful deletion with finalizers, idempotent reconciliation of child resources, and accurate reporting via the status subresource. We also saw how a single line of code—adding a predicate—can have a massive impact on performance and scalability.
The real power of the Operator Pattern is not just in automating kubectl apply. It's in capturing the complex, stateful, and often-error-prone logic of a human operator and encoding it into a resilient, self-healing system that is native to the Kubernetes control plane. By building operators, you are not just using Kubernetes; you are extending it to understand and manage your specific applications, creating a truly cloud-native operational model.