DynamoDB Idempotency: Conditional Writes for Resilient Event Consumers

22 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inescapable Problem: The Illusion of Exactly-Once Delivery

In distributed, event-driven architectures, we are perpetually haunted by the specter of duplicate message processing. Services like AWS SQS, Kinesis, and EventBridge offer at-least-once delivery guarantees. This is a pragmatic and resilient choice for the message broker, but it delegates a significant responsibility to the consumer: the ability to process the same message multiple times without creating inconsistent state or erroneous side effects.

For a senior engineer, the consequences are stark and familiar: a customer is charged twice for a single order; a critical notification is sent multiple times; an analytics counter is incorrectly incremented. The business impact is non-trivial. The solution is to enforce idempotency at the consumer level, transforming the system's behavior from at-least-once delivery to exactly-once processing.

This article dissects a highly effective and scalable pattern for achieving this using DynamoDB. We will forgo simplistic GetItem followed by PutItem checks—a classic race condition—and dive directly into a production-hardened approach using atomic conditional writes, lifecycle status tracking, and TTL for automated state cleanup.

Prerequisites

This is not an introduction. It assumes you have production experience with:

  • AWS Lambda, SQS, and DynamoDB.
  • Event-driven and serverless architecture principles.
  • The challenges of distributed systems (e.g., retries, network partitions, race conditions).
  • The AWS SDK (Boto3 for Python is used here, but the principles are universal).

The Core Architecture: An Idempotency Tracking Table

The foundation of our pattern is a dedicated DynamoDB table to record the processing state of each unique event. A naive implementation might seem simple, but the details are what separate a fragile solution from a resilient one.

Table Schema and Design Considerations

Let's define a robust schema for our IdempotencyStore table:

  • Partition Key: idempotencyKey (String). This uniquely identifies a single execution attempt. Its source is critical—it could be the messageId from an SQS message, a dedicated eventId from an EventBridge detail payload, or a composite key derived from the event data itself (e.g., tenantId#orderId).
  • Status Attribute: status (String). Tracks the processing lifecycle. We'll use IN_PROGRESS and COMPLETED.
  • Expiry Attribute: expiryTimestamp (Number). A Unix epoch timestamp used by DynamoDB's Time to Live (TTL) feature to automatically purge expired records. This is crucial for both cost management and for handling crashed/timed-out executions.
  • Response Attribute: responseData (String/Map, optional). Can be used to store the result of the execution. If a duplicate request arrives, we can return the cached response immediately without re-executing the business logic.
  • Data Attribute: eventData (String/Map, optional). Storing the raw event can be useful for debugging and manual recovery scenarios.
  • Here is a CloudFormation template snippet for such a table:

    yaml
    Resources:
      IdempotencyStoreTable:
        Type: AWS::DynamoDB::Table
        Properties:
          TableName: IdempotencyStore
          AttributeDefinitions:
            - AttributeName: idempotencyKey
              AttributeType: S
          KeySchema:
            - AttributeName: idempotencyKey
              KeyType: HASH
          BillingMode: PAY_PER_REQUEST # Often ideal for spiky, event-driven workloads
          TimeToLiveSpecification:
            AttributeName: expiryTimestamp
            Enabled: true

    The Atomic Heart: `PutItem` with `ConditionExpression`

    A common but flawed approach is to GetItem to check for the key's existence, and if it's not present, PutItem to create it. This two-step process opens a window for a race condition: two concurrent invocations of a consumer could both execute the GetItem call, find nothing, and then both proceed to PutItem and execute the business logic.

    The correct approach is to use a single, atomic database operation. DynamoDB's ConditionExpression allows us to execute a write operation only if a specific condition on the item is met. If the condition fails, the write is rejected, and the database throws a ConditionalCheckFailedException.

    Our first atomic lock is to ensure we only start processing if no record for the idempotencyKey exists.

    The condition is simple: attribute_not_exists(idempotencyKey).

    Let's implement this in a Python Lambda handler. This first version handles the basic lock acquisition.

    python
    # a_first_pass_handler.py
    import os
    import time
    import logging
    import boto3
    from botocore.exceptions import ClientError
    
    dynamodb = boto3.resource('dynamodb')
    table_name = os.environ.get('IDEMPOTENCY_TABLE')
    idempotency_table = dynamodb.Table(table_name)
    
    # Assume Lambda timeout is 30 seconds
    LAMBDA_TIMEOUT_SECONDS = 30
    
    class DuplicateEventException(Exception):
        """Custom exception to indicate a duplicate event was detected."""
        pass
    
    def process_event(event):
        """Placeholder for your actual business logic."""
        logging.info("Starting business logic...")
        time.sleep(5) # Simulate work
        logging.info("Business logic completed.")
        return {"status": "success", "message": "Order processed"}
    
    def lambda_handler(event, context):
        # For SQS, the event is in the 'Records' list
        # For simplicity, we'll assume a single event here
        sqs_message = event['Records'][0]
        idempotency_key = sqs_message['messageId']
        
        try:
            # --- ATOMIC LOCK ACQUISITION ---
            # Set an expiry timestamp slightly longer than our Lambda timeout
            expiry_ts = int(time.time()) + (LAMBDA_TIMEOUT_SECONDS * 2)
            
            idempotency_table.put_item(
                Item={
                    'idempotencyKey': idempotency_key,
                    'expiryTimestamp': expiry_ts
                },
                ConditionExpression='attribute_not_exists(idempotencyKey)'
            )
            
        except ClientError as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                logging.warning(f"Duplicate event detected: {idempotency_key}. Skipping.")
                # This is not an error, it's a successful detection of a duplicate.
                # We can return success to the caller (e.g., SQS) to have the message deleted.
                return {'statusCode': 200, 'body': 'Duplicate event, skipped.'}
            else:
                # Any other DynamoDB error is a real problem
                logging.error(f"DynamoDB error on lock acquisition: {e}")
                raise
    
        # --- BUSINESS LOGIC EXECUTION ---
        try:
            result = process_event(sqs_message)
            # Note: In this simple version, if the Lambda crashes here,
            # the lock exists but the work was not completed. The next retry will fail.
            # This is the problem we solve in the next section.
            return {'statusCode': 200, 'body': result}
        except Exception as e:
            # If business logic fails, we should ideally clean up the idempotency record
            # to allow a retry to proceed. This adds complexity.
            logging.error(f"Business logic failed: {e}")
            # Deleting the key allows the next retry to work.
            idempotency_table.delete_item(Key={'idempotencyKey': idempotency_key})
            raise
    

    This is a good start, but it has a critical flaw. If the Lambda times out or crashes after acquiring the lock but before completing, the idempotency record exists. The next SQS redelivery will see the lock, assume the event was processed, and incorrectly discard the message.


    The Full Lifecycle: A Two-Phase Commit Pattern

    To solve the crash/timeout problem, we need to track the state of the processing. This turns our simple lock into a state machine with two primary states: IN_PROGRESS and COMPLETED. This mimics a two-phase commit pattern.

    Phase 1: Acquire Lock and Set to IN_PROGRESS

  • Atomically write the record with status = 'IN_PROGRESS'.
  • The condition remains attribute_not_exists(idempotencyKey).
    • Set a short TTL, typically aligned with the function's timeout. If the function crashes, the record will eventually expire, allowing a subsequent attempt to acquire the lock again.

    Phase 2: Execute Logic and Set to COMPLETED

  • After the business logic successfully completes, update the existing item.
  • Set status = 'COMPLETED'.
  • The condition is now attribute_exists(idempotencyKey) AND status = 'IN_PROGRESS'. This ensures we are only updating a lock we currently hold.
    • Set a longer TTL. This determines how long we remember this event to prevent future duplicates. This could be hours or days, depending on business requirements.

    Let's refactor our handler to implement this robust, two-phase pattern.

    python
    # b_two_phase_handler.py
    import os
    import time
    import logging
    import json
    import boto3
    from botocore.exceptions import ClientError
    
    dynamodb = boto3.resource('dynamodb')
    table_name = os.environ.get('IDEMPOTENCY_TABLE', 'IdempotencyStore')
    idempotency_table = dynamodb.Table(table_name)
    
    # Get timeout from context, with a fallback
    LAMBDA_TIMEOUT_SECONDS = 30 # A sensible default
    # How long to remember a completed transaction
    COMPLETED_TRANSACTION_TTL_HOURS = 24 
    
    class IdempotencyException(Exception):
        pass
    
    class DuplicateInProgressException(IdempotencyException):
        """Indicates another invocation is processing this event."""
        pass
    
    class DuplicateCompletedException(IdempotencyException):
        """Indicates this event has already been successfully processed."""
        def __init__(self, message, response_data=None):
            super().__init__(message)
            self.response_data = response_data
    
    def process_event(event_body):
        """Placeholder for your actual business logic."""
        logging.info(f"Starting business logic for: {event_body}")
        time.sleep(2) # Simulate work
        result = {"status": "success", "message": f"Processed order {event_body.get('orderId')}"}
        logging.info("Business logic completed.")
        return result
    
    class IdempotencyHandler:
        def __init__(self, event, context):
            self.idempotency_key = event['Records'][0]['messageId']
            self.event_body = json.loads(event['Records'][0]['body'])
            self.context = context
            # Get actual remaining time from context if available
            self.timeout_seconds = self.context.get_remaining_time_in_millis() / 1000 if hasattr(self.context, 'get_remaining_time_in_millis') else LAMBDA_TIMEOUT_SECONDS
    
        def _check_existing_record(self):
            """Checks for an existing record if the initial put fails."""
            try:
                response = idempotency_table.get_item(Key={'idempotencyKey': self.idempotency_key})
                item = response.get('Item')
                if not item:
                    # This is a rare race condition: item expired/deleted between our put and get
                    # We can choose to fail and let SQS retry, which is safest.
                    raise IdempotencyException("Record disappeared unexpectedly.")
                
                status = item.get('status')
                if status == 'COMPLETED':
                    raise DuplicateCompletedException(
                        f"Event {self.idempotency_key} already completed.",
                        response_data=item.get('responseData')
                    )
                elif status == 'IN_PROGRESS':
                    # Check if the existing lock has expired
                    expiry = item.get('expiryTimestamp', 0)
                    if expiry < int(time.time()):
                        # The lock has expired. We can attempt to take it over.
                        # This requires a more complex UpdateItem call with a condition on the expiry.
                        # For simplicity here, we will just fail and let a future invocation handle it.
                        logging.warning(f"Found expired IN_PROGRESS record for {self.idempotency_key}")
                        raise IdempotencyException("Expired IN_PROGRESS record found. Retrying.")
                    else:
                        raise DuplicateInProgressException(f"Event {self.idempotency_key} is already in progress.")
            except ClientError as e:
                logging.error(f"DynamoDB error checking existing record: {e}")
                raise IdempotencyException("Failed to check existing record.")
    
        def __enter__(self):
            # PHASE 1: Acquire Lock
            in_progress_expiry = int(time.time()) + int(self.timeout_seconds * 0.9) # Expire just before timeout
    
            try:
                idempotency_table.put_item(
                    Item={
                        'idempotencyKey': self.idempotency_key,
                        'status': 'IN_PROGRESS',
                        'expiryTimestamp': in_progress_expiry,
                    },
                    ConditionExpression='attribute_not_exists(idempotencyKey)'
                )
                logging.info(f"Acquired lock for {self.idempotency_key}")
                return self
            except ClientError as e:
                if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                    logging.warning(f"Potential duplicate detected for {self.idempotency_key}. Checking status.")
                    self._check_existing_record()
                else:
                    logging.error(f"DynamoDB error on lock acquisition: {e}")
                    raise IdempotencyException("Failed to acquire lock.")
    
        def execute(self):
            return process_event(self.event_body)
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            if exc_type is not None:
                # Business logic failed, do not update the record to COMPLETED.
                # The IN_PROGRESS record will expire via TTL, allowing a clean retry.
                logging.error(f"Exception occurred during processing: {exc_val}. Leaving lock to expire.")
                return # Do not suppress the exception
    
            # PHASE 2: Mark as Completed
            completed_expiry = int(time.time()) + (COMPLETED_TRANSACTION_TTL_HOURS * 3600)
            try:
                # Here we could capture the result of the business logic
                result = exc_val if isinstance(exc_val, dict) else {}
    
                idempotency_table.update_item(
                    Key={'idempotencyKey': self.idempotency_key},
                    UpdateExpression='SET #status = :status, #expiry = :expiry, #resp = :resp',
                    ExpressionAttributeNames={
                        '#status': 'status',
                        '#expiry': 'expiryTimestamp',
                        '#resp': 'responseData'
                    },
                    ExpressionAttributeValues={
                        ':status': 'COMPLETED',
                        ':expiry': completed_expiry,
                        ':resp': result
                    },
                    # IMPORTANT: Ensure we still hold the lock
                    ConditionExpression='#status = :in_progress_status',
                    ExpressionAttributeValues={ # Redefine for the condition
                        ':status': 'COMPLETED',
                        ':expiry': completed_expiry,
                        ':resp': result,
                        ':in_progress_status': 'IN_PROGRESS'
                    }
                )
                logging.info(f"Marked {self.idempotency_key} as COMPLETED.")
            except ClientError as e:
                logging.error(f"Failed to mark {self.idempotency_key} as completed: {e}")
                # This is a critical failure state. The work is done but we couldn't record it.
                # The event will be reprocessed. Your business logic MUST be re-entrant.
                raise IdempotencyException("Failed to persist completion state.")
    
    def lambda_handler(event, context):
        try:
            with IdempotencyHandler(event, context) as handler:
                result = handler.execute()
                return {'statusCode': 200, 'body': json.dumps(result)}
        except DuplicateCompletedException as e:
            logging.warning(str(e))
            # Return the cached response if available
            return {'statusCode': 200, 'body': json.dumps(e.response_data or {'message': 'Duplicate, already completed'})}
        except (DuplicateInProgressException, IdempotencyException) as e:
            # These are transient errors. We want SQS to retry the message after the visibility timeout.
            # To do this, we must raise an exception from the Lambda handler.
            logging.error(str(e))
            raise e
        except Exception as e:
            logging.critical(f"Unhandled exception in handler: {e}")
            raise

    This context manager-based implementation provides a clean separation of concerns and handles the lifecycle correctly. If the business logic (handler.execute()) fails, __exit__ is called with an exception, we log it, and crucially, we do not update the record to COMPLETED. The IN_PROGRESS record simply expires, and the next SQS redelivery can attempt the entire process again.


    Edge Cases and Production Hardening

    The real world is messier. Let's consider the failure modes.

    1. Partial Failure: Business Logic Succeeds, update_item Fails

    This is the most challenging scenario. The payment was processed, but the update_item call to DynamoDB failed due to a network blip or throttling. The IN_PROGRESS record will expire, and SQS will redeliver the message. The next invocation will re-acquire the lock and re-run the business logic.

    Solution: Your core business logic must be designed to be idempotent itself, if at all possible. For example, a payment processor's API might accept an idempotency_key of its own. If not, you may need to build a reconciliation or cleanup process to handle the side effects of repeated operations.

    2. Idempotency Key Selection

  • SQS messageId: Generally a good choice. Unique per message sent to the queue.
  • Event Payload ID: If your event payload has a canonical eventId (e.g., a UUID generated by the producer), this is often better. It makes your consumer idempotent across different message sources, not just SQS. If the same logical event arrives via SQS and a direct API call, using the payload ID ensures it's processed only once.
  • Composite Keys: For operations tied to a specific entity, a key like tenant-123#order-456 can be highly effective and human-readable.
  • 3. Consumer Timeouts and Expired Locks

    In our _check_existing_record function, we noted that an IN_PROGRESS record could be found that has passed its expiryTimestamp. This means a previous invocation timed out. The safest immediate action is to fail the current invocation, allowing SQS to redeliver after the visibility timeout. This prevents a potential thundering herd of consumers trying to take over an expired lock.

    A more aggressive strategy would be to use an UpdateItem call with a ConditionExpression that checks expiryTimestamp < :now to atomically take over the lock. This is more complex and should only be used if processing delays are unacceptable.


    Performance and Cost Optimization

    This pattern introduces at least two DynamoDB writes per successful message. Understanding the performance and cost implications is vital.

    1. Capacity Mode: On-Demand vs. Provisioned

  • On-Demand: For most serverless, event-driven workloads where traffic is spiky and unpredictable, On-Demand (Pay Per Request) is the ideal choice. It eliminates the need for capacity planning for the idempotency table.
  • Provisioned: If you have a very high-throughput, predictable workload (e.g., processing a constant stream from Kinesis), Provisioned capacity with auto-scaling can be more cost-effective. You would need to monitor for throttling (ProvisionedThroughputExceededException).
  • 2. TTL is a Cost Control Mechanism

    Do not underestimate the importance of TTL. Without it, your idempotency table will grow indefinitely, increasing storage costs and potentially slowing down scans (which should be avoided anyway). The TTL on COMPLETED records should be set based on the business requirement for how long duplicates must be detected. Is it 24 hours? 7 days? 30 days? This has a direct impact on cost.

    3. Hot Partitions

    A poorly designed system could create a hot partition. Imagine a poison-pill message that constantly fails processing. SQS will redeliver it repeatedly, causing thousands of writes to the same idempotencyKey in DynamoDB. This can throttle a single partition, impacting the entire table.

    Solution: This is primarily a consumer-side problem. Implement a Dead Letter Queue (DLQ) on your SQS queue. After a message fails a configured number of times, SQS will move it to the DLQ for offline analysis, preventing it from hammering your idempotency table.

    4. Caching Responses

    By storing the responseData in the DynamoDB item when marking it COMPLETED, you can create a write-through cache. When a duplicate is detected (DuplicateCompletedException), you can retrieve the stored response and return it to the original caller. This is extremely powerful for synchronous APIs that might trigger your asynchronous flow, allowing them to get a consistent response even on retries.


    Conclusion: A Foundational Pattern for Resiliency

    Implementing an idempotency layer is not an optional extra; it is a fundamental requirement for building resilient and correct distributed systems. While the concept is simple, a production-grade implementation requires careful attention to atomicity, state management, and failure modes.

    The two-phase pattern using DynamoDB's PutItem and UpdateItem with ConditionExpression provides the atomicity needed to prevent race conditions. Combining this with status tracking (IN_PROGRESS, COMPLETED) and TTL for automatic cleanup creates a robust, scalable, and largely self-managing system. This pattern elevates a basic event consumer into an enterprise-grade component capable of safely navigating the inherent uncertainties of at-least-once message delivery, ensuring that each logical event is processed exactly once.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles