Temporal Sagas: Implementing Compensating Distributed Transactions
The Inescapable Problem: Atomicity in a Microservices World
In a monolithic architecture, achieving atomicity is a solved problem: database transactions. The BEGIN, COMMIT, and ROLLBACK commands provide ACID guarantees that are the bedrock of reliable systems. However, as we decompose systems into distributed microservices, each with its own private database, this bedrock crumbles. A single business process, like booking a trip, might now span multiple services: BookingService, PaymentService, FlightService, and HotelService. A failure in the FlightService after a successful payment creates a state of partial completion, violating business integrity.
Traditional distributed transaction protocols like Two-Phase Commit (2PC) are often a poor fit for modern microservices. They introduce tight temporal coupling and rely on locks, which can severely degrade system availability and performance. A failure in the transaction coordinator can bring the entire process to a halt.
This is where the Saga pattern emerges. A Saga is a sequence of local transactions where each transaction updates data within a single service. If a local transaction fails, the Saga executes a series of compensating transactions to semantically undo the preceding successful transactions. The critical challenge with Sagas has always been the implementation complexity. Engineers are forced to build and maintain complex, ad-hoc state machines using database tables, message queues (like Kafka or RabbitMQ), and timers to track the Saga's progress, handle retries, and trigger compensations. This infrastructure is brittle, hard to test, and distracts from the core business logic.
Temporal.io changes this paradigm entirely. By providing a durable, stateful workflow-as-code abstraction, it handles the state management, retries, and failure detection, allowing us to implement a complex Saga as if it were a single, straightforward function. This article will demonstrate how to build a production-grade compensating Saga using Temporal and Go, focusing on the advanced patterns and edge cases you'll encounter in the real world.
Core Architecture: A Trip Booking Saga
Let's model a trip booking workflow. The process involves three services:
Our Saga's happy path is CreateBooking -> TakePayment -> BookFlight. If BookFlight fails, we must compensate by executing RefundPayment and CancelBooking. If TakePayment fails, we only need to CancelBooking.
Defining the Workflow and Activities
In Temporal, the Saga logic resides in a Workflow, and each interaction with an external service is an Activity. Workflows are deterministic and resumable, while Activities are where non-deterministic, fallible side-effects happen.
Here is our initial setup in Go:
// file: shared/activities.go
package shared
import (
"context"
"fmt"
"time"
)
// Mock external services
func CreateBooking(ctx context.Context, bookingID string) (string, error) {
fmt.Printf("Activity: Creating booking %s\n", bookingID)
// Simulate DB call
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("booking-confirmation-%s", bookingID), nil
}
func TakePayment(ctx context.Context, bookingID string, amount int) (string, error) {
fmt.Printf("Activity: Taking payment of %d for booking %s\n", amount, bookingID)
// Simulate payment gateway call
time.Sleep(100 * time.Millisecond)
// To test failure, we can add a condition here
// if amount > 1000 {
// return "", fmt.Errorf("payment gateway declined for amount: %d", amount)
// }
return fmt.Sprintf("payment-txn-%s", bookingID), nil
}
func BookFlight(ctx context.Context, bookingID string) (string, error) {
fmt.Printf("Activity: Booking flight for booking %s\n", bookingID)
// Simulate airline API call
time.Sleep(100 * time.Millisecond)
return "", fmt.Errorf("airline API unavailable - no seats left") // Simulate failure
}
// Compensation Activities
func CancelBooking(ctx context.Context, bookingID string) error {
fmt.Printf("Compensation: Cancelling booking %s\n", bookingID)
time.Sleep(100 * time.Millisecond)
return nil
}
func RefundPayment(ctx context.Context, transactionID string) error {
fmt.Printf("Compensation: Refunding payment with transaction ID %s\n", transactionID)
time.Sleep(100 * time.Millisecond)
return nil
}
func CancelFlight(ctx context.Context, flightBookingID string) error {
// This activity is never reached in our failure scenario but is good practice to have
fmt.Printf("Compensation: Cancelling flight booking %s\n", flightBookingID)
time.Sleep(100 * time.Millisecond)
return nil
}
Implementing the Compensation Logic
The most elegant way to implement compensation in a Temporal Go workflow is by leveraging defer. A deferred function call is pushed onto a stack and executed after the surrounding function returns. In a Temporal workflow, this defer stack is durable. If the worker process crashes and the workflow is recovered on another worker, the defer stack is restored.
We'll build a Saga struct to manage the compensation stack. This pattern provides a clean, testable, and explicit way to manage our Saga's state.
// file: workflow/saga.go
package workflow
import "go.temporal.io/sdk/workflow"
// A Saga is a sequence of actions that can be compensated.
type Saga struct {
compensations []func()
compensated bool
}
// NewSaga creates a new Saga.
func NewSaga() *Saga {
return &Saga{}
}
// AddCompensation adds a compensation function to the saga.
// The function will be executed in reverse order of addition.
func (s *Saga) AddCompensation(compensation func()) {
if s.compensated {
// Already compensated, just run the new compensation immediately.
compensation()
return
}
s.compensations = append(s.compensations, compensation)
}
// Compensate runs all the compensation functions in reverse order.
func (s *Saga) Compensate(ctx workflow.Context) {
if s.compensated {
return
}
s.compensated = true
for i := len(s.compensations) - 1; i >= 0; i-- {
// We must execute each compensation in a separate coroutine
// to ensure that it is not blocking the workflow thread.
// It also allows parallel compensation, though we don't use it here.
workflow.ExecuteLocalActivity(ctx, s.compensations[i])
}
}
Now, let's use this Saga utility in our main workflow. Note how we add a compensation function immediately after a successful activity call. The defer block ensures that saga.Compensate is called if the workflow function returns an error.
// file: workflow/workflow.go
package workflow
import (
"time"
"github.com/your-repo/shared"
"go.temporal.io/sdk/workflow"
)
func TripBookingWorkflow(ctx workflow.Context, bookingID string, amount int) (string, error) {
// Set a short timeout for activities to demonstrate failures quickly
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("Trip booking workflow started", "BookingID", bookingID)
saga := NewSaga()
var err error
// Defer the compensation logic. This will run if the workflow function returns an error.
defer func() {
if err != nil {
logger.Error("Workflow failed, starting compensation.", "Error", err)
saga.Compensate(ctx)
}
}()
// 1. Create Booking
var bookingConfirmationID string
err = workflow.ExecuteActivity(ctx, shared.CreateBooking, bookingID).Get(ctx, &bookingConfirmationID)
if err != nil {
logger.Error("Failed to create booking.", "Error", err)
return "", err
}
saga.AddCompensation(func() {
_ = workflow.ExecuteActivity(ctx, shared.CancelBooking, bookingID).Get(ctx, nil)
})
// 2. Take Payment
var transactionID string
err = workflow.ExecuteActivity(ctx, shared.TakePayment, bookingID, amount).Get(ctx, &transactionID)
if err != nil {
logger.Error("Failed to take payment.", "Error", err)
return "", err
}
saga.AddCompensation(func() {
_ = workflow.ExecuteActivity(ctx, shared.RefundPayment, transactionID).Get(ctx, nil)
})
// 3. Book Flight - This activity is designed to fail
var flightBookingID string
err = workflow.ExecuteActivity(ctx, shared.BookFlight, bookingID).Get(ctx, &flightBookingID)
if err != nil {
logger.Error("Failed to book flight.", "Error", err)
return "", err
}
// In a real scenario, you would add flight cancellation compensation here
// saga.AddCompensation(func() { ... })
result := fmt.Sprintf("Trip booked successfully! Confirmation: %s", flightBookingID)
logger.Info("Workflow completed successfully.", "Result", result)
return result, nil
}
When this workflow runs, the BookFlight activity will fail. The error propagates up, causing the TripBookingWorkflow function to exit. The defer block catches this, calls saga.Compensate(), which then executes RefundPayment and CancelBooking in LIFO (Last-In, First-Out) order, correctly unwinding the transaction.
Advanced Edge Case: Idempotency and Non-Compensatable Failures
Real-world systems are messy. What happens if our RefundPayment activity times out, the worker crashes, and Temporal retries it? Without idempotency, we might issue multiple refunds. What if the refund activity itself fails permanently?
Handling Idempotency
Every activity that mutates state, especially compensation activities, must be idempotent. The caller should be able to retry the activity multiple times with the same input and get the same result without causing duplicate side effects.
The best practice is to pass an idempotency key from the workflow to the activity. The workflow can generate a unique ID for each attempt, or better yet, use an ID derived from the workflow's context.
Temporal's workflow.SideEffect is perfect for generating a deterministic, unique ID within a workflow execution.
Let's refactor the payment activity:
// shared/activities.go - Modified TakePayment
func TakePayment(ctx context.Context, bookingID string, amount int, idempotencyKey string) (string, error) {
fmt.Printf(
"Activity: Taking payment for booking %s with idempotency key %s\n",
bookingID,
idempotencyKey,
)
// In your service, you would check if a payment with this idempotencyKey has already been processed.
// db.Find(&payment, "idempotency_key = ?", idempotencyKey)
// if payment.Exists() { return payment.TransactionID, nil }
// ... proceed with payment processing ...
return fmt.Sprintf("payment-txn-%s", bookingID), nil
}
// shared/activities.go - Modified RefundPayment
func RefundPayment(ctx context.Context, transactionID string, idempotencyKey string) error {
fmt.Printf(
"Compensation: Refunding payment %s with idempotency key %s\n",
transactionID,
idempotencyKey,
)
// Your refund service must also use this key to prevent duplicate refunds.
return nil
}
And update the workflow to provide these keys:
// workflow/workflow.go - Snippet from TripBookingWorkflow
// ... inside workflow
// 2. Take Payment
var transactionID string
var paymentIdempotencyKey string
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.NewString() // Use a real UUID library
}).Get(&paymentIdempotencyKey)
err = workflow.ExecuteActivity(ctx, shared.TakePayment, bookingID, amount, paymentIdempotencyKey).Get(ctx, &transactionID)
if err != nil {
// ... handle error
}
saga.AddCompensation(func() {
var refundIdempotencyKey string
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.NewString()
}).Get(&refundIdempotencyKey)
// Note: We're using a *new* idempotency key for the refund.
// This is crucial. The refund is a separate operation from the payment.
_ = workflow.ExecuteActivity(ctx, shared.RefundPayment, transactionID, refundIdempotencyKey).Get(ctx, nil)
})
// ... rest of workflow
The Non-Compensatable Failure Problem
What if RefundPayment fails repeatedly with a non-retriable error (e.g., the payment gateway's API returns 400 Bad Request - Invalid Transaction ID)? Temporal's default retry policy might retry forever. This is a business logic failure, not a transient one.
We need to configure a custom RetryPolicy for our compensation activities to handle this. We can specify non-retriable error types and a maximum number of attempts.
// workflow/workflow.go - Snippet for compensation with custom retry
saga.AddCompensation(func() {
// Define a retry policy for critical compensation.
// If this fails, we need manual intervention.
compensationAo := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5, // Only try 5 times
NonRetryableErrorTypes: []string{"InvalidTransactionError"}, // Business-level error
},
}
compensationCtx := workflow.WithActivityOptions(ctx, compensationAo)
refundErr := workflow.ExecuteActivity(compensationCtx, shared.RefundPayment, transactionID, refundIdempotencyKey).Get(ctx, nil)
if refundErr != nil {
// CRITICAL: The compensation failed permanently.
// The system is now in an inconsistent state.
logger.Error("FATAL: Could not compensate payment.", "TransactionID", transactionID, "Error", refundErr)
// This is where you trigger a human-in-the-loop process.
// For example, execute another activity to create a ticket in JIRA or PagerDuty.
_ = workflow.ExecuteActivity(ctx, shared.CreateManualInterventionTicket, "Payment refund failed for booking: "+bookingID).Get(ctx, nil)
}
})
This pattern—a limited retry policy followed by an alert/ticketing activity—is a robust way to handle the dreaded non-compensatable failure. It acknowledges that not all errors can be resolved automatically and provides a durable, auditable hand-off to a human operator.
Advanced Edge Case: Concurrent External Events with Signals
Our Saga handles internal failures well, but what if an external event occurs? Imagine the user clicks the 'Cancel' button on the website while our TripBookingWorkflow is in the middle of execution (e.g., it has taken payment but is waiting for the flight booking to complete).
We cannot simply start a new cancellation workflow, as that would create a race condition. The correct approach is to Signal the existing workflow. A Temporal Signal is a one-way, asynchronous message sent to a running workflow execution.
Let's define a cancel signal.
// workflow/workflow.go - Modified TripBookingWorkflow
func TripBookingWorkflow(ctx workflow.Context, bookingID string, amount int) (string, error) {
// ... (setup code as before)
// Channel to receive cancellation signal
cancellationChan := workflow.GetSignalChannel(ctx, "cancelTrip")
// Use a selector to react to either activity completion or cancellation
selector := workflow.NewSelector(ctx)
var workflowErr error
selector.AddReceive(cancellationChan, func(c workflow.Channel, more bool) {
var signalVal string
c.Receive(ctx, &signalVal)
logger.Info("Cancellation signal received", "Reason", signalVal)
// Create a cancellation error to trigger compensation
workflowErr = temporal.NewCanceledError("Cancelled by user request")
})
var executionResult workflow.Future
// Execute the main saga logic in a coroutine
workflow.Go(ctx, func(ctx workflow.Context) {
// ... (The entire saga logic from before goes here)
// err = workflow.ExecuteActivity(...).Get(ctx, &...)
// if err != nil { ... }
// saga.AddCompensation(...)
// ... etc.
// On success, set the future
executionResult.Set("Trip booked successfully!", nil)
})
// Wait for either the saga to complete or a cancellation signal to arrive
selector.Select(ctx)
if workflowErr != nil {
logger.Warn("Workflow interrupted, starting compensation.", "Reason", workflowErr)
saga.Compensate(ctx)
return "", workflowErr
}
var finalResult string
if err := executionResult.Get(ctx, &finalResult); err != nil {
// This branch is hit if an activity within the Go routine fails
saga.Compensate(ctx)
return "", err
}
return finalResult, nil
}
This implementation is significantly more complex but robust. It uses workflow.Go to run the main Saga logic in a background coroutine and a workflow.Selector to wait on two possible events:
executionResult future).cancellationChan.If the cancel signal arrives first, we create a CanceledError, which causes the workflow to exit its main path and trigger the deferred saga.Compensate logic. This ensures that even if a user cancels mid-flight, we correctly refund their payment and cancel the booking.
Testing Strategies for Complex Sagas
Testing complex failure and compensation logic is paramount. Temporal's temporaltest.TestWorkflowEnvironment provides a powerful framework for this.
We can write unit tests that mock specific activities to force failures and then assert that the correct compensation activities were called.
// workflow/workflow_test.go
package workflow
import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)
func TestTripBookingSaga_FlightFails(t *testing.T) {
s := testsuite.WorkflowTestSuite{}
env := s.NewTestWorkflowEnvironment()
// Mock activities
env.OnActivity(shared.CreateBooking, mock.Anything, "booking-123").Return("booking-confirmation-123", nil).Once()
env.OnActivity(shared.TakePayment, mock.Anything, "booking-123", 500, mock.Anything).Return("txn-123", nil).Once()
// Mock the flight booking to fail
env.OnActivity(shared.BookFlight, mock.Anything, "booking-123").Return("", fmt.Errorf("no seats left")).Once()
// Mock the compensation activities that we expect to be called
env.OnActivity(shared.RefundPayment, mock.Anything, "txn-123", mock.Anything).Return(nil).Once()
env.OnActivity(shared.CancelBooking, mock.Anything, "booking-123").Return(nil).Once()
env.ExecuteWorkflow(TripBookingWorkflow, "booking-123", 500)
require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
// Assert that all mocked functions were called as expected
env.AssertExpectations(t)
}
This test precisely simulates the scenario where the BookFlight activity fails. It then verifies that the workflow completes with an error and, crucially, that RefundPayment and CancelBooking were invoked exactly once with the correct arguments. This allows you to test every conceivable failure path in your Saga with confidence, without needing to run any downstream dependencies.
Conclusion: Sagas as First-Class Citizens
By leveraging a durable execution engine like Temporal, the Saga pattern transforms from a complex, infrastructure-heavy architectural burden into a clean, testable, and maintainable business logic implementation. The patterns discussed here—using defer for compensation, managing idempotency, handling non-compensatable failures, and reacting to external signals—provide a robust toolkit for building resilient distributed systems.
The key takeaway is that the workflow code itself becomes the single source of truth for the entire business process. There are no separate state machine definitions, no message queue topics to manage, and no database tables for tracking Saga state. The logic is just code, but code that is guaranteed to execute to completion, no matter how long it takes or what failures occur along the way.