Implementing the Saga Pattern with Temporal for Resilient Workflows
The Inherent Challenge of Distributed Transactions
In a monolithic architecture, maintaining data consistency is a solved problem: ACID-compliant database transactions. When you commit, all changes succeed or all fail. In a distributed microservices environment, this guarantee evaporates. A single business process, like an e-commerce order, might span multiple services—Inventory, Payments, Shipping—each with its own database. A simple COMMIT is no longer an option.
The classic solution, two-phase commit (2PC), is often dismissed in modern architectures due to its synchronous, blocking nature. It introduces tight coupling and a single point of failure (the transaction coordinator), undermining the very reasons for adopting microservices.
This is where the Saga pattern emerges. A Saga is a sequence of local transactions where each transaction updates data within a single service. If a local transaction fails, the Saga executes a series of compensating transactions to semantically undo the preceding successful transactions. While this achieves eventual consistency, implementing a Saga is notoriously complex.
Developers often start with a choreography-based approach using message queues. Service A completes its work and emits an event. Service B listens for that event, does its work, and emits another. This is decentralized but creates a nightmare of distributed debugging. Which service failed? Where is the state of the overall process? What happens if a compensation event is missed?
This article focuses on the superior alternative for complex workflows: Orchestration-based Sagas. We will demonstrate how to use Temporal, an open-source durable execution system, to build a resilient, observable, and maintainable Saga. Temporal acts as the orchestrator, allowing you to write your entire distributed transaction as a single piece of code—a Workflow—while it handles the durability, retries, state management, and timeouts required to make it fault-tolerant.
We will move beyond the conceptual and dive into a production-grade implementation, tackling the difficult edge cases that separate trivial examples from robust systems.
The Scenario: A Multi-Service E-commerce Order
Our Saga will model a simplified but realistic e-commerce order placement process. The business process involves four distinct microservices:
Each step can fail. If the payment fails, we must release the inventory. If creating the shipment fails, we must refund the payment and release the inventory. The entire process must be atomic from a business perspective.
Why Temporal is the Ideal Saga Orchestrator
Before we write code, it's crucial to understand why Temporal is so effective for this pattern. A Temporal Workflow is a regular function written in a general-purpose language (we'll use Go), but its execution is durable. This means its state, including local variables and execution stack, is preserved by the Temporal Cluster across any process or server failure.
This model elegantly solves the core problems of Saga implementation:
* State Management: The current state of the Saga is simply the current execution point of your workflow code. There is no need for a separate saga_state table in a database that you must manually manage.
Retries and Timeouts: Calling a microservice is modeled as an Activity*. Temporal handles the complex retry logic (with exponential backoff) and timeouts for you with simple configuration.
* Compensation Logic: We can use standard programming constructs like defer or try/catch/finally to build clear and reliable compensation logic. If the workflow function exits due to an error, the deferred compensation functions are executed.
* Observability: The state and history of every Saga execution are fully recorded and queryable, making debugging distributed processes trivial compared to hunting through logs across multiple services.
Let's build this.
Implementation Deep Dive: Building the Order Workflow
We'll structure our Go project with clear separation of concerns: activities, workflow, and the worker/starter executables.
1. Defining the Activities
Activities are the functions that interact with the outside world—in our case, calling the microservices. They should be idempotent, meaning calling them multiple times with the same input yields the same result without unintended side effects. Temporal encourages idempotency by allowing you to configure a RequestID for each activity execution.
First, let's define the input/output structures and the activity interfaces.
// shared/model.go
package shared
type CartItem struct {
ItemID string
Quantity int
}
type OrderDetails struct {
UserID string
Items []CartItem
CardToken string
}
type OrderResult struct {
OrderID string
ShipmentID string
}
Now for the activity definitions. We'll include both the primary actions and their corresponding compensations.
// activities/activities.go
package activities
import (
"context"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"go.temporal.io/sdk/activity"
"your_project/shared"
)
// Mock external services
type Services struct{}
func (s *Services) ReserveInventory(ctx context.Context, items []shared.CartItem) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Reserving inventory...", "items", items)
// Simulate a transient failure 25% of the time
if time.Now().Unix()%4 == 0 {
return "", errors.New("inventory service unavailable")
}
reservationID := uuid.New().String()
logger.Info("Inventory reserved", "reservationID", reservationID)
return reservationID, nil
}
func (s *Services) ProcessPayment(ctx context.Context, userID, cardToken string, amount float64) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Processing payment...", "amount", amount)
// Simulate a hard failure for a specific card token
if cardToken == "tok_declined" {
return "", errors.New("payment declined: insufficient funds")
}
transactionID := fmt.Sprintf("ch_%s", uuid.New().String())
logger.Info("Payment successful", "transactionID", transactionID)
return transactionID, nil
}
func (s *Services) CreateShipment(ctx context.Context, userID string, items []shared.CartItem) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Creating shipment...", "user", userID)
// Simulate a failure
if userID == "user_fail_shipment" {
return "", errors.New("invalid address for shipment")
}
shipmentID := fmt.Sprintf("shp_%s", uuid.New().String())
logger.Info("Shipment created", "shipmentID", shipmentID)
return shipmentID, nil
}
func (s *Services) NotifyUser(ctx context.Context, userID string, message string) error {
logger := activity.GetLogger(ctx)
logger.Info("Sending notification...", "user", userID, "message", message)
// Notifications are important but failure shouldn't roll back the order
return nil
}
// --- Compensation Activities ---
func (s *Services) ReleaseInventory(ctx context.Context, reservationID string) error {
logger := activity.GetLogger(ctx)
logger.Info("Releasing inventory...", "reservationID", reservationID)
// This should be an idempotent operation in the real service
return nil
}
func (s *Services) RefundPayment(ctx context.Context, transactionID string) error {
logger := activity.GetLogger(ctx)
logger.Info("Refunding payment...", "transactionID", transactionID)
// This should also be idempotent
return nil
}
// Note: We don't have a CancelShipment because we assume CreateShipment is the point of no return.
// If it could be cancelled, we would add that activity here.
2. The Saga Workflow Implementation
This is where the magic happens. We'll write the entire business process as a single function. We use Go's defer statement to build a stack of compensation actions. If the workflow function returns an error at any point, the deferred functions are executed in last-in, first-out (LIFO) order, naturally implementing the Saga's rollback logic.
// workflow/order_workflow.go
package workflow
import (
"time"
"go.temporal.io/sdk/workflow"
"your_project/activities"
"your_project/shared"
)
func OrderWorkflow(ctx workflow.Context, order shared.OrderDetails) (*shared.OrderResult, error) {
// Configure activity options with retries
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("Order workflow started", "UserID", order.UserID)
var compensations []func()
var a *activities.Services
var err error
// Defer the execution of all compensations.
// This will run if the workflow function returns, either normally or with an error.
defer func() {
if err != nil {
// Execute compensations in reverse order
logger.Error("Workflow failed, starting compensation.", "error", err)
for _, compensation := range compensations {
compensation()
}
}
}()
// 1. Reserve Inventory
var reservationID string
err = workflow.ExecuteActivity(ctx, a.ReserveInventory, order.Items).Get(ctx, &reservationID)
if err != nil {
return nil, err
}
compensations = append(compensations, func() {
// Use a disconnected context for compensations as the original might be cancelled
compensationCtx, _ := workflow.NewDisconnectedContext(ctx)
wfErr := workflow.ExecuteActivity(compensationCtx, a.ReleaseInventory, reservationID).Get(compensationCtx, nil)
if wfErr != nil {
logger.Error("Failed to compensate inventory reservation", "error", wfErr)
}
})
// 2. Process Payment (e.g., calculate total amount)
var transactionID string
// In a real app, this would be calculated from order.Items
const paymentAmount = 125.50
err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order.UserID, order.CardToken, paymentAmount).Get(ctx, &transactionID)
if err != nil {
return nil, err
}
compensations = append(compensations, func() {
compensationCtx, _ := workflow.NewDisconnectedContext(ctx)
wfErr := workflow.ExecuteActivity(compensationCtx, a.RefundPayment, transactionID).Get(compensationCtx, nil)
if wfErr != nil {
logger.Error("Failed to compensate payment", "error", wfErr)
}
})
// 3. Create Shipment
var shipmentID string
err = workflow.ExecuteActivity(ctx, a.CreateShipment, order.UserID, order.Items).Get(ctx, &shipmentID)
if err != nil {
return nil, err
}
// No compensation for shipment - this is our point of no return for this example.
// 4. Notify User
// This is a non-critical step. We can run it asynchronously and not fail the whole workflow if it fails.
msg := "Your order has been confirmed!"
_ = workflow.ExecuteActivity(ctx, a.NotifyUser, order.UserID, msg).Get(ctx, nil)
// We ignore the error here deliberately.
result := &shared.OrderResult{
OrderID: workflow.GetInfo(ctx).WorkflowExecution.ID,
ShipmentID: shipmentID,
}
logger.Info("Workflow completed successfully!")
return result, nil
}
This workflow code is remarkably clear. It reads like a standard sequential program, yet it orchestrates a complex, fault-tolerant distributed transaction. The compensations slice acts as our compensation stack, and the defer block is our rollback engine.
3. Worker and Starter
Finally, we need code to host and run our workflows and activities (the Worker) and code to start new workflow executions (the Starter).
// worker/main.go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"your_project/activities"
"your_project/workflow"
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
w := worker.New(c, "order-processing", worker.Options{})
w.RegisterWorkflow(workflow.OrderWorkflow)
a := &activities.Services{}
w.RegisterActivity(a)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
// starter/main.go
package main
import (
"context"
"fmt"
"log"
"github.com/google/uuid"
"go.temporal.io/sdk/client"
"your_project/shared"
"your_project/workflow"
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// Example 1: Successful order
orderDetailsSuccess := shared.OrderDetails{
UserID: "user-123",
Items: []shared.CartItem{{ItemID: "item-abc", Quantity: 1}},
CardToken: "tok_valid",
}
startWorkflow(c, orderDetailsSuccess)
// Example 2: Order fails at payment, triggering compensation
orderDetailsFail := shared.OrderDetails{
UserID: "user-456",
Items: []shared.CartItem{{ItemID: "item-def", Quantity: 2}},
CardToken: "tok_declined",
}
startWorkflow(c, orderDetailsFail)
}
func startWorkflow(c client.Client, details shared.OrderDetails) {
workflowOptions := client.StartWorkflowOptions{
ID: "order-" + uuid.New().String(),
TaskQueue: "order-processing",
}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflow.OrderWorkflow, details)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
var result shared.OrderResult
err = we.Get(context.Background(), &result)
if err != nil {
log.Printf("Workflow for user %s failed: %v\n", details.UserID, err)
} else {
log.Printf("Workflow for user %s completed. Result: %+v\n", details.UserID, result)
}
}
Running this code against a local Temporal server will show the successful completion of the first workflow and the failure and subsequent compensation of the second.
Advanced Edge Case Handling
A simple success/fail path is just the beginning. Production systems must handle more complex scenarios.
1. Compensation Failures
What if RefundPayment itself fails? This is a critical failure. In our current implementation, the log message Failed to compensate payment is our only recourse. This is insufficient for a production system.
Solution: Configure robust retry policies specifically for compensation activities. These should likely be more aggressive than the forward-path activities, potentially retrying indefinitely, as a failed compensation can lead to data inconsistency and financial loss.
// Inside the compensation closure
compensationCtx, _ := workflow.NewDisconnectedContext(ctx)
// Use a more aggressive retry policy for critical compensations
compensationActivityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
// Keep retrying until it succeeds or a human intervenes.
MaximumAttempts: 0,
},
}
compensationCtx = workflow.WithActivityOptions(compensationCtx, compensationActivityOptions)
wfErr := workflow.ExecuteActivity(compensationCtx, a.RefundPayment, transactionID).Get(compensationCtx, nil)
if wfErr != nil {
// This is now a much more serious condition.
// The workflow will effectively be stuck here, retrying the refund.
// This is where business logic for manual intervention comes in.
logger.Error("CRITICAL: Indefinite failure in payment refund compensation", "error", wfErr)
// You could emit a metric, send an alert, or use a Temporal Signal to notify an operator.
}
Using a workflow.NewDisconnectedContext is crucial here. It ensures that the compensation activity is not cancelled if the parent workflow context is, for example, due to a workflow timeout.
2. Workflow Versioning for Long-Running Sagas
Imagine an order workflow that can take days to complete (e.g., waiting for a backordered item). What if you need to deploy a new version of the workflow logic during that time? A naive deployment would break the in-flight workflows because their execution history would not match the new code's logic (a non-deterministic error in Temporal).
Solution: Use Temporal's built-in versioning feature, workflow.Patch.
Let's say we want to add a new fraud check step between reserving inventory and processing payment.
// ... after ReserveInventory ...
// VERSIONING: Introduce a new step in the workflow
isNewLogicPath := workflow.IsReplaying() || (workflow.Patch(ctx, "add-fraud-check"))
if isNewLogicPath {
var fraudCheckResult string
err = workflow.ExecuteActivity(ctx, a.RunFraudCheck, order.UserID).Get(ctx, &fraudCheckResult)
if err != nil {
return nil, err
}
if fraudCheckResult == "DENY" {
err = errors.New("fraud check failed")
return nil, err
}
}
// 2. Process Payment
// ... rest of the workflow
The workflow.Patch call with a change ID ("add-fraud-check") creates a marker in the workflow history.
* For new workflows: IsReplaying() is false, Patch returns true. The new if block is executed.
* For old workflows replaying their history: IsReplaying() is true, Patch sees the absence of the marker in the history and returns false. The new if block is skipped, maintaining determinism.
* For old workflows executing new code after the patch: They will execute the Patch call, add the marker to their history, and proceed down the new logic path.
This allows you to safely evolve complex, long-running Sagas without downtime or data corruption.
Performance and Scalability Considerations
* Worker Tuning: The worker.Options{} struct has numerous fields to control performance. MaxConcurrentActivityTaskPollers and MaxConcurrentWorkflowTaskPollers are key. If your activities are I/O-bound (like calling a service), you can increase MaxConcurrentActivityTaskPollers significantly to improve throughput on a single worker instance.
* Task Queues: Don't run all your workflows on a single task queue. You can create separate task queues for different business domains (e.g., order-processing, user-onboarding). This allows you to scale worker fleets independently. You could even have a high-priority payment-activities task queue with dedicated workers to ensure payment processing is never starved for resources.
* Activity Heartbeating: If an activity can run for a long time (e.g., a batch processing step), it should heartbeat its progress back to Temporal using activity.RecordHeartbeat. This allows Temporal to detect a crashed activity worker much faster than waiting for the StartToCloseTimeout to expire, enabling quicker retries.
Testing Your Saga
Testing distributed systems is hard. Temporal makes it dramatically easier with its testsuite package.
// workflow/order_workflow_test.go
package workflow
import (
"errors"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
"your_project/activities"
"your_project/shared"
)
type UnitTestSuite struct {
suite.Suite
*testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *UnitTestSuite) SetupTest() {
s.WorkflowTestSuite = &testsuite.WorkflowTestSuite{}
s.env = s.NewTestWorkflowEnvironment()
}
func (s *UnitTestSuite) AfterTest(suiteName, testName string) {
s.env.AssertExpectations(s.T())
}
func TestUnitTestSuite(t *testing.T) {
suite.Run(t, new(UnitTestSuite))
}
func (s *UnitTestSuite) TestOrderWorkflow_Success() {
order := shared.OrderDetails{UserID: "test-user", CardToken: "tok_valid"}
a := &activities.Services{}
s.env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return("res-123", nil).Once()
s.env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("ch-123", nil).Once()
s.env.OnActivity(a.CreateShipment, mock.Anything, mock.Anything, mock.Anything).Return("shp-123", nil).Once()
s.env.OnActivity(a.NotifyUser, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
s.env.ExecuteWorkflow(OrderWorkflow, order)
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var result shared.OrderResult
s.env.GetWorkflowResult(&result)
s.Equal("shp-123", result.ShipmentID)
}
func (s *UnitTestSuite) TestOrderWorkflow_PaymentFails_CompensationCalled() {
order := shared.OrderDetails{UserID: "test-user-fail", CardToken: "tok_declined"}
a := &activities.Services{}
// Mock the forward path
s.env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return("res-456", nil).Once()
s.env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", errors.New("payment failed")).Once()
// Crucially, assert that the compensation is called
s.env.OnActivity(a.ReleaseInventory, mock.Anything, "res-456").Return(nil).Once()
s.env.ExecuteWorkflow(OrderWorkflow, order)
s.True(s.env.IsWorkflowCompleted())
s.Error(s.env.GetWorkflowError()) // The workflow itself should report an error
}
This test suite runs the entire workflow logic in-memory, mocking the activity executions. It allows you to simulate any failure scenario and assert that the correct compensation logic is triggered, all without needing to spin up microservices or even a Temporal cluster. This provides an incredibly powerful and fast feedback loop for developing complex, resilient systems.
Conclusion
The Saga pattern is a powerful tool for maintaining data consistency in a microservices world, but its manual implementation is fraught with peril. By leveraging a durable execution system like Temporal, we can elevate the implementation from a complex, state-machine-and-queue-based system to a straightforward, testable, and observable piece of code.
We've demonstrated how to build an orchestration-based Saga in Go, handling not just the happy path but also critical production concerns like compensation failures and safe deployment of new logic via workflow versioning. By abstracting the infrastructure of resilience, Temporal allows senior engineers to focus on what matters: the business logic of the workflow itself. The result is a more robust, maintainable, and ultimately more reliable distributed system.