Idempotent Lambda Processing with DynamoDB Conditional Writes

19 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitability of Duplicates in Event-Driven Architectures

In a perfect world, every event in a distributed system would be delivered exactly once. In reality, the fundamental trade-offs of network partitions and system failures, as described by the CAP theorem, make "exactly-once" delivery an expensive, and often unattainable, guarantee. Most message brokers and event buses, including AWS SQS, SNS, and EventBridge, opt for a more resilient guarantee: at-least-once delivery.

This design choice prioritizes durability over uniqueness. The system guarantees your message will be processed, but to achieve this, it might occasionally deliver the same message more than once. For a senior engineer designing a critical backend process, this is not a theoretical edge case—it's a production certainty. A retry from a client, a network blip in the message broker, or a consumer failing to acknowledge a message in time can all trigger a redelivery.

Consider the consequences for non-idempotent operations:

  • Payment Processing: A duplicate chargeCustomer event could result in double billing.
  • Email Notifications: A duplicate sendWelcomeEmail event annoys users and damages brand perception.
  • Inventory Management: A duplicate decrementStock event leads to inaccurate stock counts and overselling.
  • Simply hoping duplicates won't happen is not a strategy. The responsibility for handling them falls to the consumer. This article presents a robust, scalable, and battle-tested pattern for achieving idempotency in AWS Lambda functions using DynamoDB's powerful conditional write capabilities.

    The Idempotency Key Pattern with DynamoDB

    The core principle is to track the processing state of each unique event. We can achieve this by creating a dedicated DynamoDB table to serve as an idempotency store. The workflow looks like this:

  • Identify a Unique Key: Every incoming event must have a unique identifier we can use as an idempotency key. This could be an SQS messageId, a custom X-Request-Id header, or a UUID generated by the event producer.
  • Check and Set State: Before executing the core business logic, the Lambda function attempts to write a record to the idempotency table using the unique key. This write is conditional: it will only succeed if a record with that key does not already exist.
  • Handle the Outcome:
  • * Success: The write succeeds. This is the first time we've seen this event. We mark its status as INPROGRESS, execute the business logic, and then update the record to COMPLETED.

    * Failure (ConditionalCheckFailedException): The write fails because the key already exists. This signals a duplicate event. We can then check the status of the existing record. If it's COMPLETED, we can safely return the stored result. If it's INPROGRESS, another invocation is currently handling it, and we can terminate gracefully.

    Why DynamoDB?

    DynamoDB is exceptionally well-suited for this pattern:

  • Low Latency: GetItem and PutItem operations typically complete in single-digit milliseconds, minimizing overhead.
  • Scalability: It scales seamlessly to handle virtually any request volume.
  • Conditional Writes: The ConditionExpression parameter provides an atomic test-and-set operation at the database level, which is the cornerstone of this pattern for preventing race conditions.
  • Time To Live (TTL): We can automatically purge old idempotency records to manage table size and cost.
  • Idempotency Table Schema

    A minimal but effective schema for our IdempotencyStore table would be:

  • id (Partition Key, String): The unique idempotency key from the event.
  • expiry (Number): A Unix timestamp used for DynamoDB's TTL feature. Records will be automatically deleted after this time.
  • status (String): The current processing state (e.g., INPROGRESS, COMPLETED, FAILED).
  • result (String/Map): The serialized result of the Lambda execution. Storing this allows us to return the exact same response for duplicate requests without re-executing the logic.
  • json
    // Example DynamoDB Record
    {
      "id": "sqs-message-id-abcdef123456",
      "expiry": 1704128400, // Timestamp for 24 hours from now
      "status": "COMPLETED",
      "result": "{\"orderId\": \"ORD-98765\", \"status\": \"CONFIRMED\"}"
    }

    Production-Grade Implementation in Python

    Let's build a complete, production-ready Lambda handler in Python using boto3. This example assumes the Lambda is triggered by an SQS queue.

    python
    import os
    import json
    import time
    import logging
    from decimal import Decimal
    
    import boto3
    from botocore.exceptions import ClientError
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    dynamodb = boto3.resource('dynamodb')
    IDEMPOTENCY_TABLE_NAME = os.environ.get('IDEMPOTENCY_TABLE_NAME')
    IDEMPOTENCY_TTL_MINUTES = int(os.environ.get('IDEMPOTENCY_TTL_MINUTES', 60))
    
    idempotency_table = dynamodb.Table(IDEMPOTENCY_TABLE_NAME)
    
    class DuplicateRequestError(Exception):
        """Custom exception for a duplicate request that is already completed."""
        def __init__(self, message, stored_result):
            super().__init__(message)
            self.stored_result = stored_result
    
    class ConcurrentRequestError(Exception):
        """Custom exception for a request being processed by another invocation."""
        pass
    
    def start_processing(idempotency_key: str):
        """Attempt to mark an event as INPROGRESS."""
        expiry_timestamp = int(time.time()) + (IDEMPOTENCY_TTL_MINUTES * 60)
        try:
            idempotency_table.put_item(
                Item={
                    'id': idempotency_key,
                    'expiry': expiry_timestamp,
                    'status': 'INPROGRESS'
                },
                # This is the core of the idempotency check
                ConditionExpression='attribute_not_exists(id)'
            )
            logger.info(f"Successfully locked idempotency key: {idempotency_key}")
        except ClientError as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                logger.warning(f"Idempotency key conflict: {idempotency_key}")
                # Key already exists, we need to check its status
                handle_duplicate(idempotency_key)
            else:
                logger.error(f"DynamoDB error on start_processing: {e}")
                raise
    
    def handle_duplicate(idempotency_key: str):
        """Handles the case where a ConditionalCheckFailedException occurred."""
        try:
            response = idempotency_table.get_item(Key={'id': idempotency_key})
            item = response.get('Item')
    
            if not item:
                # This is a rare race condition: item was deleted between put and get.
                # We can treat it as a transient error and let the caller retry.
                logger.warning(f"Item for key {idempotency_key} disappeared. Retrying may be necessary.")
                raise ConcurrentRequestError(f"Race condition detected for key: {idempotency_key}")
    
            status = item.get('status')
            if status == 'COMPLETED':
                logger.info(f"Duplicate request detected. Returning stored result for key: {idempotency_key}")
                stored_result = json.loads(item.get('result', '{}'))
                raise DuplicateRequestError("Request already completed", stored_result)
            elif status == 'INPROGRESS':
                logger.warning(f"Concurrent request detected for key: {idempotency_key}")
                raise ConcurrentRequestError(f"Request is already in progress: {idempotency_key}")
            else:
                # Handle other statuses if you have them (e.g., FAILED)
                logger.error(f"Idempotency key {idempotency_key} in unhandled state: {status}")
                raise Exception(f"Unhandled status for key {idempotency_key}: {status}")
    
        except ClientError as e:
            logger.error(f"DynamoDB error on handle_duplicate: {e}")
            raise
    
    def complete_processing(idempotency_key: str, result: dict):
        """Update the idempotency record to COMPLETED and store the result."""
        expiry_timestamp = int(time.time()) + (IDEMPOTENCY_TTL_MINUTES * 60)
        try:
            # We use update_item to be more efficient and specific
            idempotency_table.update_item(
                Key={'id': idempotency_key},
                UpdateExpression="SET #status = :status, #result = :result, #expiry = :expiry",
                ExpressionAttributeNames={
                    '#status': 'status',
                    '#result': 'result',
                    '#expiry': 'expiry'
                },
                ExpressionAttributeValues={
                    ':status': 'COMPLETED',
                    ':result': json.dumps(result, cls=DecimalEncoder),
                    ':expiry': expiry_timestamp
                }
            )
            logger.info(f"Successfully marked idempotency key as COMPLETED: {idempotency_key}")
        except ClientError as e:
            logger.error(f"DynamoDB error on complete_processing: {e}")
            # This is a critical failure. The logic executed, but we failed to record it.
            # This could lead to re-processing on retry. Requires monitoring/alerting.
            raise
    
    class DecimalEncoder(json.JSONEncoder):
        """Helper class to convert a DynamoDB item to JSON."""
        def default(self, o):
            if isinstance(o, Decimal):
                if o % 1 > 0:
                    return float(o)
                else:
                    return int(o)
            return super(DecimalEncoder, self).default(o)
    
    # --- THE LAMBDA HANDLER ---
    
    def lambda_handler(event, context):
        # Assuming SQS trigger, we process records in a loop
        for record in event['Records']:
            idempotency_key = record['messageId']
            logger.info(f"Processing message with idempotency key: {idempotency_key}")
    
            try:
                # 1. Start: Attempt to acquire the lock
                start_processing(idempotency_key)
    
                # 2. Core Business Logic (if lock is acquired)
                # This is where your actual work goes.
                # Example: process an order from the message body
                message_body = json.loads(record['body'])
                order_id = message_body.get('order_id')
                logger.info(f"Executing core business logic for order: {order_id}")
                # Simulate work
                time.sleep(2)
                business_result = {
                    'orderId': order_id,
                    'status': 'CONFIRMED',
                    'confirmation_timestamp': int(time.time())
                }
    
                # 3. Complete: Mark as completed and store result
                complete_processing(idempotency_key, business_result)
    
                # SQS Lambda integration will automatically delete the message on successful return
                # so we don't need to do anything else here.
    
            except DuplicateRequestError as e:
                # This is a successful outcome for a duplicate message.
                # We don't want SQS to retry, so we exit gracefully.
                logger.info(f"Handled duplicate request for key {idempotency_key}. Result: {e.stored_result}")
                # You could optionally use the stored result here.
                # Return success to the caller.
                continue # Move to the next record
    
            except ConcurrentRequestError as e:
                # Another invocation is handling this. This should be treated as a transient error.
                # By raising an exception, we tell the SQS Lambda integration to not delete the message.
                # It will become visible in the queue again after the visibility timeout and be retried.
                logger.error(f"Concurrent processing detected. Failing to force retry for key {idempotency_key}")
                # IMPORTANT: This will cause the entire batch to fail and be retried.
                # See 'Advanced Considerations' for batch handling.
                raise e
    
            except Exception as e:
                # Any other exception during business logic should also trigger a retry.
                logger.error(f"An unexpected error occurred for key {idempotency_key}: {e}", exc_info=True)
                # We don't update the idempotency record, it remains INPROGRESS.
                # This allows a subsequent retry to proceed after the TTL expires.
                raise e
    
        return {
            'statusCode': 200,
            'body': json.dumps('Batch processed successfully')
        }

    Advanced Considerations and Edge Cases

    This pattern is robust, but in a distributed system, the devil is in the details. Senior engineers must consider the following scenarios.

    1. Lambda Timeouts and Partial Failures

    Problem: What happens if the Lambda function times out after start_processing succeeds but before complete_processing is called? The idempotency record will be left in the INPROGRESS state.

    Solution: This is where the expiry TTL attribute is critical. When a subsequent invocation for the same event arrives, it will find the record INPROGRESS. Our current handle_duplicate logic treats this as a ConcurrentRequestError and fails, forcing a retry. This is the correct behavior. The message will return to the queue and be picked up again after its visibility timeout.

    However, if the first invocation truly died, we don't want the event to be blocked forever. The TTL on the DynamoDB record acts as a safety valve. If we set IDEMPOTENCY_TTL_MINUTES to 60, a stuck INPROGRESS record will be automatically deleted after an hour, allowing a future retry to proceed as a fresh request. The TTL should be configured to be longer than your maximum expected processing time plus any retry delays.

    2. SQS Batch Processing Failures

    Problem: The default SQS Lambda integration processes messages in a batch. If our handler raises an exception for any single message (like our ConcurrentRequestError), the entire batch is considered a failure. All messages in that batch, including those that were successfully processed, will be returned to the queue for reprocessing.

    This can lead to cascading failures and unnecessary re-invocation of business logic for already completed items.

    Solution: Report Batch Item Failures

    The SQS Lambda integration supports a newer response format that allows you to specify which messages in the batch failed. This is a critical optimization.

    To enable this, you must configure the Lambda trigger with a FunctionResponseType of ReportBatchItemFailures.

    Your handler would then need to be modified to track failures and return them:

    python
    # Modified lambda_handler for batch item failure reporting
    def lambda_handler(event, context):
        batch_item_failures = []
    
        for record in event['Records']:
            idempotency_key = record['messageId']
            try:
                # ... same logic as before ...
                pass
            except (ConcurrentRequestError, Exception) as e:
                # Any error that requires a retry is a batch item failure
                logger.error(f"Adding message {idempotency_key} to batch failure report.")
                batch_item_failures.append({"itemIdentifier": record['messageId']})
            except DuplicateRequestError as e:
                # This is NOT a failure, it's a successful handling of a duplicate.
                logger.info(f"Successfully handled duplicate {idempotency_key}, not reporting as failure.")
                continue
    
        return {
            "batchItemFailures": batch_item_failures
        }

    With this change, only the specific messages that failed (e.g., due to a temporary concurrency issue) will be returned to the queue, while the successfully processed ones are deleted.

    3. Performance and Cost Analysis

    This pattern is not free. It introduces overhead that must be evaluated.

  • Latency Overhead: Each successful invocation adds two DynamoDB API calls: one PutItem and one UpdateItem. In most regions, the p99 latency for these operations is under 10ms. For a Lambda function doing non-trivial work (e.g., lasting > 100ms), this is often an acceptable overhead for the safety it provides.
  • Cost Overhead:
  • * Write Capacity Units (WCUs): You will consume at least two WCUs per successful invocation (one for the initial put_item, one for the update_item).

    * Read Capacity Units (RCUs): A duplicate invocation will consume one RCU to check the status.

    * Storage: The cost is generally negligible unless you are storing very large results and have a very long TTL.

    Benchmarking: Before deploying this to a high-throughput, latency-sensitive system, benchmark it. Use AWS X-Ray to visualize the trace and see exactly how much time is spent on the DynamoDB calls versus your business logic.

    For a system processing 1 million events per day:

    • Successful Invocations: 1,000,000
    • WCUs consumed: 2,000,000
  • Cost (us-east-1, On-Demand): (2,000,000 WCUs / 1,000) * $1.25 per million write request units = $2.50 / day
  • This is a small price to pay for guaranteed idempotency in most business-critical applications.

    Alternative Patterns and When to Use Them

    While the DynamoDB pattern is a fantastic general-purpose solution, it's not the only one.

  • Designing Naturally Idempotent Business Logic:
  • This is the ideal solution if achievable. Instead of tracking state externally, make the operation itself safe to repeat.

    - Example: Instead of INSERTing a new record, use an UPSERT (e.g., INSERT ... ON CONFLICT DO UPDATE in PostgreSQL). Instead of amount = amount - 10, use SET amount = 10.

    - When to use: When your data model and business logic allow for it. This is often the most performant and simplest solution.

  • Transactional Database Locks (RDBMS):
  • If your business logic already involves writing to a relational database like PostgreSQL or MySQL, you can leverage its transactional capabilities.

    - Example:

    1. BEGIN TRANSACTION;

    2. SELECT id FROM idempotency_keys WHERE id = :key FOR UPDATE;

    3. If row exists, ROLLBACK and exit.

    4. If not, INSERT INTO idempotency_keys ...

    5. Execute business logic (which may involve other tables).

    6. COMMIT;

    - When to use: When your Lambda is already tightly coupled to an RDBMS and the business logic and idempotency check should be part of the same atomic transaction.

    - Downside: Can be slower and less scalable than DynamoDB, and can introduce lock contention issues under high load.

    Conclusion: A Reusable Layer for Serverless Resiliency

    The at-least-once delivery model of modern event-driven systems is a feature, not a bug. It prioritizes durability and resilience. By implementing a robust idempotency layer using DynamoDB's conditional writes, we can build consumers that are immune to the side effects of duplicate events.

    This pattern provides an atomic, low-latency, and highly scalable mechanism to track event processing state. By carefully considering edge cases like timeouts, batch failures, and performance overhead, you can create a production-ready component that can be abstracted into a decorator or middleware, making it a reusable asset in your serverless toolkit. For any critical, state-changing operation triggered by an event, this pattern should be a default consideration in your system design.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles