Implementing Resilient Sagas with Temporal's Compensation Logic

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Complexity of Distributed Transactions

In modern microservice architectures, maintaining data consistency across service boundaries is a formidable challenge. The classic ACID-compliant, two-phase commit (2PC) protocol, common in monolithic systems, introduces tight coupling and synchronous blocking, making it an anti-pattern in a distributed world. This leaves us with a critical problem: how do we execute a business transaction that spans multiple services, like an e-commerce order, while ensuring that the system doesn't end up in a corrupt, inconsistent state if one step fails?

Enter the Saga pattern. A Saga is a sequence of local transactions where each transaction updates data within a single service. When a local transaction completes, the Saga triggers the next one. If any local transaction fails, the Saga executes a series of compensating transactions that revert the changes made by the preceding successful transactions.

While the concept is straightforward, implementing a Saga orchestrator from scratch is fraught with peril. You become responsible for durable state management, retries with exponential backoff, handling process crashes, and managing the complex logic of compensations. This is precisely where a durable execution framework like Temporal shines, abstracting away the infrastructure concerns and allowing you to express the Saga logic as plain code.

This article is not an introduction to the Saga pattern. It assumes you understand why you need it. Instead, we will perform a deep dive into a production-grade implementation of an orchestrated Saga using Temporal and Go, focusing on the nuances of compensation logic, advanced failure handling, and performance tuning.

Our Scenario: A Multi-Service E-Commerce Order

We'll model a simplified but realistic e-commerce order process involving three services:

  • Order Service: Creates and manages the state of the order (e.g., PENDING, APPROVED, CANCELED).
  • Payment Service: Processes the payment via a third-party gateway.
  • Inventory Service: Reserves items from stock.
  • The business transaction must execute these steps in order. If the payment succeeds but inventory reservation fails, the payment must be refunded, and the order must be marked as CANCELED. This is our Saga.

    Why Temporal is a Superior Saga Orchestrator

    Before diving into the code, it's crucial to understand why Temporal is the right tool. A naive implementation might use a database table to track the Saga's state or a complex chain of message queue topics. These approaches suffer from several problems:

    * State Management: You have to build a robust state machine, handle concurrent updates, and ensure the state is durably persisted after every step.

    * Worker Crashes: If the orchestrator process crashes mid-Saga, how do you resume it from the exact point of failure without re-executing completed steps?

    * Timeouts and Retries: You are responsible for implementing your own timeout and retry logic, which is notoriously difficult to get right, especially with backoff strategies.

    Temporal solves these problems at the platform level:

    * Durable Execution: Temporal workflows are fully durable. The state of a workflow execution is continuously saved by the Temporal Cluster. If a worker crashes, another available worker will pick up and resume execution from the last known state, preserving local variables and execution progress.

    * Code as Workflow: The entire Saga orchestration logic is just a function. There's no need for YAML, DSLs, or database state machines. This makes the logic highly testable and easy to reason about.

    * Built-in Primitives: Timeouts, retries, and error handling are first-class citizens, configured via simple options.

    Production-Grade Implementation: The Order Processing Saga

    Let's build our Saga. We will use the Go SDK for Temporal. The core of our implementation will be a single workflow function that orchestrates activities, with a robust compensation strategy.

    Project Structure

    A typical Temporal project might be structured as follows:

    text
    /ecommerce-saga
    ├── activities/         # Business logic implementations
    │   ├── inventory_activity.go
    │   ├── order_activity.go
    │   └── payment_activity.go
    ├── shared/             # Shared data structures
    │   └── types.go
    ├── workflow/           # Workflow orchestration logic
    │   └── order_workflow.go
    ├── worker/             # Worker process setup
    │   └── main.go
    └── starter/            # Client to start workflows
        └── main.go

    Defining Activities and Compensations

    Activities are the building blocks of a workflow, representing the individual local transactions. A crucial pattern for Sagas is to define a corresponding compensation for each fallible forward-moving activity.

    shared/types.go

    go
    package shared
    
    // OrderDetails contains all information for an order.
    type OrderDetails struct {
    	OrderID    string
    	UserID     string
    	ItemID     string
    	Quantity   int
    	TotalPrice float64
    }

    activities/order_activity.go

    go
    package activities
    
    import (
    	"context"
    	"fmt"
    
    	"github.com/your-repo/ecommerce-saga/shared"
    )
    
    // In a real app, this would interact with a database.
    func CreateOrder(ctx context.Context, details shared.OrderDetails) (string, error) {
    	fmt.Printf("Creating order %s with status PENDING\n", details.OrderID)
    	// DB logic to insert order with 'PENDING' status
    	return details.OrderID, nil
    }
    
    func SetOrderStatusToCompleted(ctx context.Context, orderID string) error {
    	fmt.Printf("Setting order %s status to COMPLETED\n", orderID)
    	// DB logic to update order status
    	return nil
    }
    
    // Compensation for CreateOrder
    func CancelOrder(ctx context.Context, orderID string) error {
    	fmt.Printf("COMPENSATION: Canceling order %s\n", orderID)
    	// DB logic to update order status to 'CANCELED'
    	return nil
    }

    activities/payment_activity.go

    Here, we introduce the concept of idempotency keys, which are critical for making activities safe to retry.

    go
    package activities
    
    import (
    	"context"
    	"fmt"
    	"time"
    )
    
    // ProcessPayment simulates calling a payment gateway.
    // It requires an idempotency key to prevent double charges on retry.
    func ProcessPayment(ctx context.Context, orderID string, amount float64, idempotencyKey string) (string, error) {
    	fmt.Printf("Processing payment of $%.2f for order %s (Idempotency Key: %s)\n", amount, orderID, idempotencyKey)
    	// Simulate a potentially failing API call
    	if time.Now().Second()%2 == 0 { // Fail 50% of the time for demonstration
    		// return "", fmt.Errorf("payment gateway timeout") // Uncomment to test failure
    	}
    	transactionID := fmt.Sprintf("txn-%s", idempotencyKey)
    	return transactionID, nil
    }
    
    // Compensation for ProcessPayment
    func RefundPayment(ctx context.Context, transactionID string, orderID string) error {
    	fmt.Printf("COMPENSATION: Refunding payment %s for order %s\n", transactionID, orderID)
    	// Call payment gateway's refund API
    	return nil
    }

    activities/inventory_activity.go

    go
    package activities
    
    import (
    	"context"
    	"fmt"
    )
    
    // ReserveInventory simulates reserving items from stock.
    func ReserveInventory(ctx context.Context, itemID string, quantity int) error {
    	fmt.Printf("Reserving %d of item %s from inventory\n", quantity, itemID)
    	// DB logic to decrement stock count. This could fail if stock is insufficient.
    	// return fmt.Errorf("insufficient stock for item %s", itemID) // Uncomment to test failure
    	return nil
    }
    
    // Compensation for ReserveInventory
    func RestockInventory(ctx context.Context, itemID string, quantity int) error {
    	fmt.Printf("COMPENSATION: Restocking %d of item %s to inventory\n", itemID, quantity)
    	// DB logic to increment stock count
    	return nil
    }

    Orchestrating the Saga Workflow

    This is where the magic happens. We'll use a defer block within our workflow to manage the compensation stack. This is a clean, idiomatic Go pattern that ensures cleanup logic runs even if the workflow panics or returns an error.

    workflow/order_workflow.go

    go
    package workflow
    
    import (
    	"time"
    
    	"github.com/google/uuid"
    	"github.com/your-repo/ecommerce-saga/activities"
    	"github.com/your-repo/ecommerce-saga/shared"
    	"go.temporal.io/sdk/workflow"
    )
    
    func OrderProcessingWorkflow(ctx workflow.Context, orderDetails shared.OrderDetails) (string, error) {
    	// Set up activity options: timeouts and retry policies.
    	// These are crucial for production-grade robustness.
    	ao := workflow.ActivityOptions{
    		StartToCloseTimeout: 10 * time.Second,
    		RetryPolicy: &workflow.RetryPolicy{
    			InitialInterval:    time.Second,
    			BackoffCoefficient: 2.0,
    			MaximumInterval:    100 * time.Second,
    			MaximumAttempts:    3,
    		},
    	}
    	ctx = workflow.WithActivityOptions(ctx, ao)
    
    	// Compensation stack. We add compensation functions here as activities succeed.
    	var compensationStack []func()
    	var err error
    
    	// Defer the execution of compensation functions.
    	// This will run if the workflow function returns an error or completes.
    	defer func() {
    		if err != nil {
    			// Workflow failed, run compensations in reverse order.
    			workflow.GetLogger(ctx).Error("Workflow failed, starting compensation.", "Error", err)
    
    			// Use a new context for compensations to ensure they run even if the main context is cancelled.
    			compensationCtx := workflow.NewDisconnectedContext(ctx)
    			
    			// Set a longer timeout for compensations.
    			cao := workflow.ActivityOptions{
    				StartToCloseTimeout: 30 * time.Second, // Compensations should be reliable
    			}
    			compensationCtx = workflow.WithActivityOptions(compensationCtx, cao)
    
    			for _, compensation := range compensationStack {
    				compensation()
    			}
    		}
    	}()
    
    	// 1. Create Order record in a 'PENDING' state
    	err = workflow.ExecuteActivity(ctx, activities.CreateOrder, orderDetails).Get(ctx, nil)
    	if err != nil {
    		return "", err
    	}
    	// Add compensation to the stack if the activity succeeded.
    	compensationStack = append([]func(){func() { 
    		_ = workflow.ExecuteActivity(compensationCtx, activities.CancelOrder, orderDetails.OrderID).Get(compensationCtx, nil)
    	}}, compensationStack...)
    
    	// 2. Process Payment
    	var transactionID string
    	// Generate a unique idempotency key for the payment activity.
    	// This ensures that if the workflow retries after this point, we don't double-charge.
    	idempotencyKey := uuid.New().String()
    	err = workflow.ExecuteActivity(ctx, activities.ProcessPayment, orderDetails.OrderID, orderDetails.TotalPrice, idempotencyKey).Get(ctx, &transactionID)
    	if err != nil {
    		return "", err
    	}
    	compensationStack = append([]func(){func() { 
    		_ = workflow.ExecuteActivity(compensationCtx, activities.RefundPayment, transactionID, orderDetails.OrderID).Get(compensationCtx, nil) 
    	}}, compensationStack...)
    
    	// 3. Reserve Inventory
    	err = workflow.ExecuteActivity(ctx, activities.ReserveInventory, orderDetails.ItemID, orderDetails.Quantity).Get(ctx, nil)
    	if err != nil {
    		return "", err
    	}
    	compensationStack = append([]func(){func() { 
    		_ = workflow.ExecuteActivity(compensationCtx, activities.RestockInventory, orderDetails.ItemID, orderDetails.Quantity).Get(compensationCtx, nil)
    	}}, compensationStack...)
    
    	// 4. All steps succeeded. Mark order as 'COMPLETED'.
    	err = workflow.ExecuteActivity(ctx, activities.SetOrderStatusToCompleted, orderDetails.OrderID).Get(ctx, nil)
    	if err != nil {
    		// This is a critical failure. If we can't update the order status, the Saga is technically complete
    		// but the final state isn't recorded. The compensations will run.
    		// In a real system, you might want more nuanced logic here.
    		return "", err
    	}
    
    	workflow.GetLogger(ctx).Info("Workflow completed successfully.")
    	return "Order completed successfully: " + orderDetails.OrderID, nil
    }

    Key Implementation Details:

  • Compensation Stack: We use a []func() slice. When an activity succeeds, we append a new function literal containing its compensation call to the front of the slice. This ensures that when we iterate over the stack on failure, we execute compensations in the correct LIFO (Last-In, First-Out) order.
  • Disconnected Context: Inside the defer block, we use workflow.NewDisconnectedContext(ctx). This is critical. It creates a new context that is not tied to the workflow's main context. This ensures that even if the workflow context is canceled (e.g., by a parent workflow or an external cancellation request), our vital compensation logic will still run to completion.
  • Error Handling: The defer function is triggered only when the err variable is non-nil. We explicitly check this to differentiate between a successful completion and a failure.
  • Idempotency Key: For the ProcessPayment activity, we generate a UUID inside the workflow. Because workflow code is deterministic and re-executed on recovery, this ensures the same idempotency key is passed to the activity on any retry, preventing duplicate charges.
  • Setting up the Worker and Starter

    To run this, you need a worker to host the workflow and activity implementations and a starter to trigger the workflow.

    worker/main.go

    go
    package main
    
    import (
    	"log"
    
    	"github.com/your-repo/ecommerce-saga/activities"
    	"github.com/your-repo/ecommerce-saga/workflow"
    	"go.temporal.io/sdk/client"
    	"go.temporal.io/sdk/worker"
    )
    
    func main() {
    	c, err := client.Dial(client.Options{})
    	if err != nil {
    		log.Fatalln("Unable to create client", err)
    	}
    	defer c.Close()
    
    	w := worker.New(c, "order-processing-task-queue", worker.Options{})
    
    	w.RegisterWorkflow(workflow.OrderProcessingWorkflow)
    	w.RegisterActivity(activities.CreateOrder)
    	w.RegisterActivity(activities.CancelOrder)
    	w.RegisterActivity(activities.ProcessPayment)
    	w.RegisterActivity(activities.RefundPayment)
    	w.RegisterActivity(activities.ReserveInventory)
    	w.RegisterActivity(activities.RestockInventory)
    	w.RegisterActivity(activities.SetOrderStatusToCompleted)
    
    	err = w.Run(worker.InterruptCh())
    	if err != nil {
    		log.Fatalln("Unable to start worker", err)
    	}
    }

    starter/main.go

    go
    package main
    
    import (
    	"context"
    	"fmt"
    	"log"
    
    	"github.com/google/uuid"
    	"github.com/your-repo/ecommerce-saga/shared"
    	"github.com/your-repo/ecommerce-saga/workflow"
    	"go.temporal.io/sdk/client"
    )
    
    func main() {
    	c, err := client.Dial(client.Options{})
    	if err != nil {
    		log.Fatalln("Unable to create client", err)
    	}
    	defer c.Close()
    
    	orderDetails := shared.OrderDetails{
    		OrderID:    uuid.New().String(),
    		UserID:     "user-123",
    		ItemID:     "item-456",
    		Quantity:   1,
    		TotalPrice: 99.99,
    	}
    
    	options := client.StartWorkflowOptions{
    		ID:        "order-processing-" + orderDetails.OrderID,
    		TaskQueue: "order-processing-task-queue",
    	}
    
    	we, err := c.ExecuteWorkflow(context.Background(), options, workflow.OrderProcessingWorkflow, orderDetails)
    	if err != nil {
    		log.Fatalln("Unable to execute workflow", err)
    	}
    
    	log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
    
    	var result string
    	err = we.Get(context.Background(), &result)
    	if err != nil {
    		log.Fatalln("Workflow failed", err)
    	}
    	log.Println("Workflow result:", result)
    }

    Handling Advanced Edge Cases and Failures

    The implementation above is robust, but senior engineers must consider the truly difficult edge cases.

    The Nightmare Scenario: A Failing Compensation

    What happens if ReserveInventory succeeds, but the subsequent SetOrderStatusToCompleted fails, triggering compensations, and then the RefundPayment compensation activity also fails? This is where many naive Saga implementations break down, potentially leaving a customer charged for an order they will never receive.

    Temporal's durability provides a strong foundation for solving this. Here's the strategy:

  • Make Compensations Highly Reliable: Compensation activities should have a more aggressive retry policy than forward activities. They represent a promise to revert state, and should be treated as such. You can define a separate, more lenient ActivityOptions for the compensationCtx.
  • go
        // Inside the defer block
        cao := workflow.ActivityOptions{
            StartToCloseTimeout: 5 * time.Minute, // Longer timeout
            RetryPolicy: &workflow.RetryPolicy{
                InitialInterval:    time.Second,
                BackoffCoefficient: 2.0,
                MaximumInterval:    5 * time.Minute,
                NonRetryableErrorTypes: []string{"BusinessError"}, // Only retry infrastructure errors
                // MaximumAttempts: 0, // Infinite retries
            },
        }
        compensationCtx = workflow.WithActivityOptions(workflow.NewDisconnectedContext(ctx), cao)
  • Manual Intervention via Signals: If a compensation fails even after all retries (e.g., a payment gateway is down for hours), the workflow will remain in a pending state. You cannot simply let this fail silently. The solution is to emit a high-priority alert to an observability platform (Datadog, Sentry, etc.) and allow for manual intervention using a Temporal Signal.
  • A Signal is an external, asynchronous event sent to a running workflow. We can define a signal to force a retry of the compensation.

    Code Example: A Workflow with a Signal for Manual Compensation Retry

    First, we refactor the compensation logic into a helper function and add a signal channel.

    go
    // In your workflow function
    
    func executeCompensations(ctx workflow.Context, stack []func(workflow.Context)) {
    	for i := len(stack) - 1; i >= 0; i-- {
    		compensation := stack[i]
    		var compensated bool
    		for !compensated {
    			err := workflow.Sleep(ctx, 1*time.Minute) // Wait before retrying
    			if err != nil {
    				// handle sleep error
    			}
    			
    			// Try to execute compensation
    			// In a real scenario you would wrap this in a selector to listen to a signal
    			compensation(ctx)
    			compensated = true // Assume success for this example
    			// In a real implementation, you would need to check the error from ExecuteActivity
    		}
    	}
    }
    
    // ... inside the workflow ...
    
    // We can make our compensation logic more explicit than a defer
    // by using a try/catch style error handling block
    
    var compensationStack []func(workflow.Context)
    
    err := func(ctx workflow.Context) error {
        // ... execute activities and build compensation stack ...
        // (code from before)
        return nil
    }(ctx)
    
    if err != nil {
        // Workflow failed, run compensations
        compensationCtx := workflow.NewDisconnectedContext(ctx)
        // ... set up compensation activity options ...
        executeCompensations(compensationCtx, compensationStack)
        return "", err
    }
    

    This is a conceptual sketch. A full implementation would involve a workflow.Selector to wait on both the activity completion and a signal channel (workflow.GetSignalChannel). If the activity fails, it would enter a loop, log the error, and wait for a signal to retry.

    Performance and Scalability Considerations

    * Activity Heartbeating: If an activity can run for a long time (e.g., > 1 minute), it should send heartbeats back to the Temporal Cluster. This allows the cluster to detect a worker crash much faster than the StartToCloseTimeout. If a heartbeat isn't received within the HeartbeatTimeout period, the activity is marked as failed and will be retried on another worker.

    go
        // Inside a long-running activity
        import "go.temporal.io/sdk/activity"
    
        func LongRunningActivity(ctx context.Context) error {
            for i := 0; i < 100; i++ {
                // Do some work
                time.Sleep(30 * time.Second)
                activity.RecordHeartbeat(ctx, i) // Send progress back
            }
            return nil
        }

    * Worker Tuning: The worker.Options struct allows you to configure concurrency. MaxConcurrentActivityExecutionSize and MaxConcurrentWorkflowTaskExecutionSize are critical. Don't just set them to high numbers. Profile your workers under load. CPU-bound activities require fewer concurrent executions than I/O-bound ones. A good starting point is to set MaxConcurrentActivityExecutionSize to a number slightly higher than the number of CPU cores for I/O-bound workloads.

    * Task Queues: Don't run all your workflows on a single task queue. Isolate them. For our e-commerce example, you could have a high-priority-payments task queue and a standard-orders task queue. This prevents a high volume of standard orders from starving the critical payment-related tasks, ensuring better resource management and latency.

    Testing Your Saga Workflow

    Temporal's Go SDK includes a powerful testing framework (testsuite) that allows you to test your workflow logic in-process without needing a running Temporal Cluster.

    Code Example: A Complete Unit Test for the Saga

    This test will simulate a failure in the ReserveInventory activity and assert that the RefundPayment and CancelOrder compensations are called in the correct order.

    go
    package workflow_test
    
    import (
    	"testing"
    	"errors"
    
    	"github.com/stretchr/testify/mock"
    	"github.com/stretchr/testify/suite"
    	"github.com/your-repo/ecommerce-saga/shared"
    	"github.com/your-repo/ecommerce-saga/workflow"
    	"go.temporal.io/sdk/testsuite"
    )
    
    type UnitTestSuite struct {
    	suite.Suite
    	*testsuite.WorkflowTestSuite
    
    	// Use a mock for activities
    	activityEnv *testsuite.TestActivityEnvironment
    }
    
    func (s *UnitTestSuite) SetupTest() {
    	s.WorkflowTestSuite = &testsuite.WorkflowTestSuite{}
    	s.activityEnv = s.NewTestActivityEnvironment()
    }
    
    func TestUnitTestSuite(t *testing.T) {
    	suite.Run(t, new(UnitTestSuite))
    }
    
    func (s *UnitTestSuite) TestOrderWorkflow_InventoryFailure_Compensates() {
    	// Prepare mock activity inputs/outputs
    	orderDetails := shared.OrderDetails{
    		OrderID:    "test-order-123",
    		ItemID:     "item-456",
    		Quantity:   1,
    		TotalPrice: 99.99,
    	}
    	transactionID := "txn-abc-123"
    
    	// Register activities with the mock environment
    	s.activityEnv.RegisterActivity(activities.CreateOrder)
    	s.activityEnv.RegisterActivity(activities.ProcessPayment)
    	s.activityEnv.RegisterActivity(activities.ReserveInventory)
    	s.activityEnv.RegisterActivity(activities.RefundPayment)
    	s.activityEnv.RegisterActivity(activities.CancelOrder)
    
    	// Expect calls in order
    	s.activityEnv.On("CreateOrder", mock.Anything, orderDetails).Return(nil).Once()
    	s.activityEnv.On("ProcessPayment", mock.Anything, orderDetails.OrderID, orderDetails.TotalPrice, mock.AnythingOfType("string")).Return(transactionID, nil).Once()
    	s.activityEnv.On("ReserveInventory", mock.Anything, orderDetails.ItemID, orderDetails.Quantity).Return(errors.New("insufficient stock")).Once()
    
    	// Expect compensation calls in REVERSE order
    	s.activityEnv.On("RefundPayment", mock.Anything, transactionID, orderDetails.OrderID).Return(nil).Once()
    	s.activityEnv.On("CancelOrder", mock.Anything, orderDetails.OrderID).Return(nil).Once()
    
    	wfEnv := s.NewTestWorkflowEnvironment()
    	wfEnv.ExecuteWorkflow(workflow.OrderProcessingWorkflow, orderDetails)
    
    	s.True(wfEnv.IsWorkflowCompleted())
    	s.Error(wfEnv.GetWorkflowError())
    
    	// Assert that all expected mock calls were made
    	s.activityEnv.AssertExpectations(s.T())
    }

    This test provides high confidence that your orchestration logic is correct without the overhead of external dependencies.

    Conclusion

    The Saga pattern is a powerful tool for maintaining consistency in a distributed system. However, its implementation details are non-trivial. By leveraging a durable execution framework like Temporal, you delegate the hard infrastructure problems—state management, retries, and recovery—to the platform. This allows you to focus on writing clear, resilient, and testable business logic. The patterns discussed here—using a deferred compensation stack, disconnected contexts for cleanup, planning for compensation failures with signals, and rigorous testing—are not just best practices; they are essential for building production-grade, fault-tolerant systems that can gracefully handle the inevitable failures of a distributed world.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles