Implementing Resilient Sagas with Temporal's Compensation Logic
The Inevitable Complexity of Distributed Transactions
In modern microservice architectures, maintaining data consistency across service boundaries is a formidable challenge. The classic ACID-compliant, two-phase commit (2PC) protocol, common in monolithic systems, introduces tight coupling and synchronous blocking, making it an anti-pattern in a distributed world. This leaves us with a critical problem: how do we execute a business transaction that spans multiple services, like an e-commerce order, while ensuring that the system doesn't end up in a corrupt, inconsistent state if one step fails?
Enter the Saga pattern. A Saga is a sequence of local transactions where each transaction updates data within a single service. When a local transaction completes, the Saga triggers the next one. If any local transaction fails, the Saga executes a series of compensating transactions that revert the changes made by the preceding successful transactions.
While the concept is straightforward, implementing a Saga orchestrator from scratch is fraught with peril. You become responsible for durable state management, retries with exponential backoff, handling process crashes, and managing the complex logic of compensations. This is precisely where a durable execution framework like Temporal shines, abstracting away the infrastructure concerns and allowing you to express the Saga logic as plain code.
This article is not an introduction to the Saga pattern. It assumes you understand why you need it. Instead, we will perform a deep dive into a production-grade implementation of an orchestrated Saga using Temporal and Go, focusing on the nuances of compensation logic, advanced failure handling, and performance tuning.
Our Scenario: A Multi-Service E-Commerce Order
We'll model a simplified but realistic e-commerce order process involving three services:
The business transaction must execute these steps in order. If the payment succeeds but inventory reservation fails, the payment must be refunded, and the order must be marked as CANCELED. This is our Saga.
Why Temporal is a Superior Saga Orchestrator
Before diving into the code, it's crucial to understand why Temporal is the right tool. A naive implementation might use a database table to track the Saga's state or a complex chain of message queue topics. These approaches suffer from several problems:
* State Management: You have to build a robust state machine, handle concurrent updates, and ensure the state is durably persisted after every step.
* Worker Crashes: If the orchestrator process crashes mid-Saga, how do you resume it from the exact point of failure without re-executing completed steps?
* Timeouts and Retries: You are responsible for implementing your own timeout and retry logic, which is notoriously difficult to get right, especially with backoff strategies.
Temporal solves these problems at the platform level:
* Durable Execution: Temporal workflows are fully durable. The state of a workflow execution is continuously saved by the Temporal Cluster. If a worker crashes, another available worker will pick up and resume execution from the last known state, preserving local variables and execution progress.
* Code as Workflow: The entire Saga orchestration logic is just a function. There's no need for YAML, DSLs, or database state machines. This makes the logic highly testable and easy to reason about.
* Built-in Primitives: Timeouts, retries, and error handling are first-class citizens, configured via simple options.
Production-Grade Implementation: The Order Processing Saga
Let's build our Saga. We will use the Go SDK for Temporal. The core of our implementation will be a single workflow function that orchestrates activities, with a robust compensation strategy.
Project Structure
A typical Temporal project might be structured as follows:
/ecommerce-saga
├── activities/ # Business logic implementations
│ ├── inventory_activity.go
│ ├── order_activity.go
│ └── payment_activity.go
├── shared/ # Shared data structures
│ └── types.go
├── workflow/ # Workflow orchestration logic
│ └── order_workflow.go
├── worker/ # Worker process setup
│ └── main.go
└── starter/ # Client to start workflows
└── main.go
Defining Activities and Compensations
Activities are the building blocks of a workflow, representing the individual local transactions. A crucial pattern for Sagas is to define a corresponding compensation for each fallible forward-moving activity.
shared/types.go
package shared
// OrderDetails contains all information for an order.
type OrderDetails struct {
OrderID string
UserID string
ItemID string
Quantity int
TotalPrice float64
}
activities/order_activity.go
package activities
import (
"context"
"fmt"
"github.com/your-repo/ecommerce-saga/shared"
)
// In a real app, this would interact with a database.
func CreateOrder(ctx context.Context, details shared.OrderDetails) (string, error) {
fmt.Printf("Creating order %s with status PENDING\n", details.OrderID)
// DB logic to insert order with 'PENDING' status
return details.OrderID, nil
}
func SetOrderStatusToCompleted(ctx context.Context, orderID string) error {
fmt.Printf("Setting order %s status to COMPLETED\n", orderID)
// DB logic to update order status
return nil
}
// Compensation for CreateOrder
func CancelOrder(ctx context.Context, orderID string) error {
fmt.Printf("COMPENSATION: Canceling order %s\n", orderID)
// DB logic to update order status to 'CANCELED'
return nil
}
activities/payment_activity.go
Here, we introduce the concept of idempotency keys, which are critical for making activities safe to retry.
package activities
import (
"context"
"fmt"
"time"
)
// ProcessPayment simulates calling a payment gateway.
// It requires an idempotency key to prevent double charges on retry.
func ProcessPayment(ctx context.Context, orderID string, amount float64, idempotencyKey string) (string, error) {
fmt.Printf("Processing payment of $%.2f for order %s (Idempotency Key: %s)\n", amount, orderID, idempotencyKey)
// Simulate a potentially failing API call
if time.Now().Second()%2 == 0 { // Fail 50% of the time for demonstration
// return "", fmt.Errorf("payment gateway timeout") // Uncomment to test failure
}
transactionID := fmt.Sprintf("txn-%s", idempotencyKey)
return transactionID, nil
}
// Compensation for ProcessPayment
func RefundPayment(ctx context.Context, transactionID string, orderID string) error {
fmt.Printf("COMPENSATION: Refunding payment %s for order %s\n", transactionID, orderID)
// Call payment gateway's refund API
return nil
}
activities/inventory_activity.go
package activities
import (
"context"
"fmt"
)
// ReserveInventory simulates reserving items from stock.
func ReserveInventory(ctx context.Context, itemID string, quantity int) error {
fmt.Printf("Reserving %d of item %s from inventory\n", quantity, itemID)
// DB logic to decrement stock count. This could fail if stock is insufficient.
// return fmt.Errorf("insufficient stock for item %s", itemID) // Uncomment to test failure
return nil
}
// Compensation for ReserveInventory
func RestockInventory(ctx context.Context, itemID string, quantity int) error {
fmt.Printf("COMPENSATION: Restocking %d of item %s to inventory\n", itemID, quantity)
// DB logic to increment stock count
return nil
}
Orchestrating the Saga Workflow
This is where the magic happens. We'll use a defer
block within our workflow to manage the compensation stack. This is a clean, idiomatic Go pattern that ensures cleanup logic runs even if the workflow panics or returns an error.
workflow/order_workflow.go
package workflow
import (
"time"
"github.com/google/uuid"
"github.com/your-repo/ecommerce-saga/activities"
"github.com/your-repo/ecommerce-saga/shared"
"go.temporal.io/sdk/workflow"
)
func OrderProcessingWorkflow(ctx workflow.Context, orderDetails shared.OrderDetails) (string, error) {
// Set up activity options: timeouts and retry policies.
// These are crucial for production-grade robustness.
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &workflow.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Compensation stack. We add compensation functions here as activities succeed.
var compensationStack []func()
var err error
// Defer the execution of compensation functions.
// This will run if the workflow function returns an error or completes.
defer func() {
if err != nil {
// Workflow failed, run compensations in reverse order.
workflow.GetLogger(ctx).Error("Workflow failed, starting compensation.", "Error", err)
// Use a new context for compensations to ensure they run even if the main context is cancelled.
compensationCtx := workflow.NewDisconnectedContext(ctx)
// Set a longer timeout for compensations.
cao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second, // Compensations should be reliable
}
compensationCtx = workflow.WithActivityOptions(compensationCtx, cao)
for _, compensation := range compensationStack {
compensation()
}
}
}()
// 1. Create Order record in a 'PENDING' state
err = workflow.ExecuteActivity(ctx, activities.CreateOrder, orderDetails).Get(ctx, nil)
if err != nil {
return "", err
}
// Add compensation to the stack if the activity succeeded.
compensationStack = append([]func(){func() {
_ = workflow.ExecuteActivity(compensationCtx, activities.CancelOrder, orderDetails.OrderID).Get(compensationCtx, nil)
}}, compensationStack...)
// 2. Process Payment
var transactionID string
// Generate a unique idempotency key for the payment activity.
// This ensures that if the workflow retries after this point, we don't double-charge.
idempotencyKey := uuid.New().String()
err = workflow.ExecuteActivity(ctx, activities.ProcessPayment, orderDetails.OrderID, orderDetails.TotalPrice, idempotencyKey).Get(ctx, &transactionID)
if err != nil {
return "", err
}
compensationStack = append([]func(){func() {
_ = workflow.ExecuteActivity(compensationCtx, activities.RefundPayment, transactionID, orderDetails.OrderID).Get(compensationCtx, nil)
}}, compensationStack...)
// 3. Reserve Inventory
err = workflow.ExecuteActivity(ctx, activities.ReserveInventory, orderDetails.ItemID, orderDetails.Quantity).Get(ctx, nil)
if err != nil {
return "", err
}
compensationStack = append([]func(){func() {
_ = workflow.ExecuteActivity(compensationCtx, activities.RestockInventory, orderDetails.ItemID, orderDetails.Quantity).Get(compensationCtx, nil)
}}, compensationStack...)
// 4. All steps succeeded. Mark order as 'COMPLETED'.
err = workflow.ExecuteActivity(ctx, activities.SetOrderStatusToCompleted, orderDetails.OrderID).Get(ctx, nil)
if err != nil {
// This is a critical failure. If we can't update the order status, the Saga is technically complete
// but the final state isn't recorded. The compensations will run.
// In a real system, you might want more nuanced logic here.
return "", err
}
workflow.GetLogger(ctx).Info("Workflow completed successfully.")
return "Order completed successfully: " + orderDetails.OrderID, nil
}
Key Implementation Details:
[]func()
slice. When an activity succeeds, we append
a new function literal containing its compensation call to the front of the slice. This ensures that when we iterate over the stack on failure, we execute compensations in the correct LIFO (Last-In, First-Out) order.defer
block, we use workflow.NewDisconnectedContext(ctx)
. This is critical. It creates a new context that is not tied to the workflow's main context. This ensures that even if the workflow context is canceled (e.g., by a parent workflow or an external cancellation request), our vital compensation logic will still run to completion.defer
function is triggered only when the err
variable is non-nil. We explicitly check this to differentiate between a successful completion and a failure.ProcessPayment
activity, we generate a UUID inside the workflow. Because workflow code is deterministic and re-executed on recovery, this ensures the same idempotency key is passed to the activity on any retry, preventing duplicate charges.Setting up the Worker and Starter
To run this, you need a worker to host the workflow and activity implementations and a starter to trigger the workflow.
worker/main.go
package main
import (
"log"
"github.com/your-repo/ecommerce-saga/activities"
"github.com/your-repo/ecommerce-saga/workflow"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
w := worker.New(c, "order-processing-task-queue", worker.Options{})
w.RegisterWorkflow(workflow.OrderProcessingWorkflow)
w.RegisterActivity(activities.CreateOrder)
w.RegisterActivity(activities.CancelOrder)
w.RegisterActivity(activities.ProcessPayment)
w.RegisterActivity(activities.RefundPayment)
w.RegisterActivity(activities.ReserveInventory)
w.RegisterActivity(activities.RestockInventory)
w.RegisterActivity(activities.SetOrderStatusToCompleted)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
starter/main.go
package main
import (
"context"
"fmt"
"log"
"github.com/google/uuid"
"github.com/your-repo/ecommerce-saga/shared"
"github.com/your-repo/ecommerce-saga/workflow"
"go.temporal.io/sdk/client"
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
orderDetails := shared.OrderDetails{
OrderID: uuid.New().String(),
UserID: "user-123",
ItemID: "item-456",
Quantity: 1,
TotalPrice: 99.99,
}
options := client.StartWorkflowOptions{
ID: "order-processing-" + orderDetails.OrderID,
TaskQueue: "order-processing-task-queue",
}
we, err := c.ExecuteWorkflow(context.Background(), options, workflow.OrderProcessingWorkflow, orderDetails)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Workflow failed", err)
}
log.Println("Workflow result:", result)
}
Handling Advanced Edge Cases and Failures
The implementation above is robust, but senior engineers must consider the truly difficult edge cases.
The Nightmare Scenario: A Failing Compensation
What happens if ReserveInventory
succeeds, but the subsequent SetOrderStatusToCompleted
fails, triggering compensations, and then the RefundPayment
compensation activity also fails? This is where many naive Saga implementations break down, potentially leaving a customer charged for an order they will never receive.
Temporal's durability provides a strong foundation for solving this. Here's the strategy:
ActivityOptions
for the compensationCtx
. // Inside the defer block
cao := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute, // Longer timeout
RetryPolicy: &workflow.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 5 * time.Minute,
NonRetryableErrorTypes: []string{"BusinessError"}, // Only retry infrastructure errors
// MaximumAttempts: 0, // Infinite retries
},
}
compensationCtx = workflow.WithActivityOptions(workflow.NewDisconnectedContext(ctx), cao)
A Signal is an external, asynchronous event sent to a running workflow. We can define a signal to force a retry of the compensation.
Code Example: A Workflow with a Signal for Manual Compensation Retry
First, we refactor the compensation logic into a helper function and add a signal channel.
// In your workflow function
func executeCompensations(ctx workflow.Context, stack []func(workflow.Context)) {
for i := len(stack) - 1; i >= 0; i-- {
compensation := stack[i]
var compensated bool
for !compensated {
err := workflow.Sleep(ctx, 1*time.Minute) // Wait before retrying
if err != nil {
// handle sleep error
}
// Try to execute compensation
// In a real scenario you would wrap this in a selector to listen to a signal
compensation(ctx)
compensated = true // Assume success for this example
// In a real implementation, you would need to check the error from ExecuteActivity
}
}
}
// ... inside the workflow ...
// We can make our compensation logic more explicit than a defer
// by using a try/catch style error handling block
var compensationStack []func(workflow.Context)
err := func(ctx workflow.Context) error {
// ... execute activities and build compensation stack ...
// (code from before)
return nil
}(ctx)
if err != nil {
// Workflow failed, run compensations
compensationCtx := workflow.NewDisconnectedContext(ctx)
// ... set up compensation activity options ...
executeCompensations(compensationCtx, compensationStack)
return "", err
}
This is a conceptual sketch. A full implementation would involve a workflow.Selector
to wait on both the activity completion and a signal channel (workflow.GetSignalChannel
). If the activity fails, it would enter a loop, log the error, and wait for a signal to retry.
Performance and Scalability Considerations
* Activity Heartbeating: If an activity can run for a long time (e.g., > 1 minute), it should send heartbeats back to the Temporal Cluster. This allows the cluster to detect a worker crash much faster than the StartToCloseTimeout
. If a heartbeat isn't received within the HeartbeatTimeout
period, the activity is marked as failed and will be retried on another worker.
// Inside a long-running activity
import "go.temporal.io/sdk/activity"
func LongRunningActivity(ctx context.Context) error {
for i := 0; i < 100; i++ {
// Do some work
time.Sleep(30 * time.Second)
activity.RecordHeartbeat(ctx, i) // Send progress back
}
return nil
}
* Worker Tuning: The worker.Options
struct allows you to configure concurrency. MaxConcurrentActivityExecutionSize
and MaxConcurrentWorkflowTaskExecutionSize
are critical. Don't just set them to high numbers. Profile your workers under load. CPU-bound activities require fewer concurrent executions than I/O-bound ones. A good starting point is to set MaxConcurrentActivityExecutionSize
to a number slightly higher than the number of CPU cores for I/O-bound workloads.
* Task Queues: Don't run all your workflows on a single task queue. Isolate them. For our e-commerce example, you could have a high-priority-payments
task queue and a standard-orders
task queue. This prevents a high volume of standard orders from starving the critical payment-related tasks, ensuring better resource management and latency.
Testing Your Saga Workflow
Temporal's Go SDK includes a powerful testing framework (testsuite
) that allows you to test your workflow logic in-process without needing a running Temporal Cluster.
Code Example: A Complete Unit Test for the Saga
This test will simulate a failure in the ReserveInventory
activity and assert that the RefundPayment
and CancelOrder
compensations are called in the correct order.
package workflow_test
import (
"testing"
"errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/your-repo/ecommerce-saga/shared"
"github.com/your-repo/ecommerce-saga/workflow"
"go.temporal.io/sdk/testsuite"
)
type UnitTestSuite struct {
suite.Suite
*testsuite.WorkflowTestSuite
// Use a mock for activities
activityEnv *testsuite.TestActivityEnvironment
}
func (s *UnitTestSuite) SetupTest() {
s.WorkflowTestSuite = &testsuite.WorkflowTestSuite{}
s.activityEnv = s.NewTestActivityEnvironment()
}
func TestUnitTestSuite(t *testing.T) {
suite.Run(t, new(UnitTestSuite))
}
func (s *UnitTestSuite) TestOrderWorkflow_InventoryFailure_Compensates() {
// Prepare mock activity inputs/outputs
orderDetails := shared.OrderDetails{
OrderID: "test-order-123",
ItemID: "item-456",
Quantity: 1,
TotalPrice: 99.99,
}
transactionID := "txn-abc-123"
// Register activities with the mock environment
s.activityEnv.RegisterActivity(activities.CreateOrder)
s.activityEnv.RegisterActivity(activities.ProcessPayment)
s.activityEnv.RegisterActivity(activities.ReserveInventory)
s.activityEnv.RegisterActivity(activities.RefundPayment)
s.activityEnv.RegisterActivity(activities.CancelOrder)
// Expect calls in order
s.activityEnv.On("CreateOrder", mock.Anything, orderDetails).Return(nil).Once()
s.activityEnv.On("ProcessPayment", mock.Anything, orderDetails.OrderID, orderDetails.TotalPrice, mock.AnythingOfType("string")).Return(transactionID, nil).Once()
s.activityEnv.On("ReserveInventory", mock.Anything, orderDetails.ItemID, orderDetails.Quantity).Return(errors.New("insufficient stock")).Once()
// Expect compensation calls in REVERSE order
s.activityEnv.On("RefundPayment", mock.Anything, transactionID, orderDetails.OrderID).Return(nil).Once()
s.activityEnv.On("CancelOrder", mock.Anything, orderDetails.OrderID).Return(nil).Once()
wfEnv := s.NewTestWorkflowEnvironment()
wfEnv.ExecuteWorkflow(workflow.OrderProcessingWorkflow, orderDetails)
s.True(wfEnv.IsWorkflowCompleted())
s.Error(wfEnv.GetWorkflowError())
// Assert that all expected mock calls were made
s.activityEnv.AssertExpectations(s.T())
}
This test provides high confidence that your orchestration logic is correct without the overhead of external dependencies.
Conclusion
The Saga pattern is a powerful tool for maintaining consistency in a distributed system. However, its implementation details are non-trivial. By leveraging a durable execution framework like Temporal, you delegate the hard infrastructure problems—state management, retries, and recovery—to the platform. This allows you to focus on writing clear, resilient, and testable business logic. The patterns discussed here—using a deferred compensation stack, disconnected contexts for cleanup, planning for compensation failures with signals, and rigorous testing—are not just best practices; they are essential for building production-grade, fault-tolerant systems that can gracefully handle the inevitable failures of a distributed world.