Taming Hot Partitions: Advanced DynamoDB Write Sharding Patterns

29 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Collision: Time-Series Data and DynamoDB Hot Partitions

For any system ingesting high-velocity, time-ordered data—be it IoT metrics, application logs, or financial trades—DynamoDB presents a compelling offering. Yet, its greatest strength, predictable performance at scale, hinges on a single, crucial principle: uniform data access. Time-series data is the antithesis of this principle. By its nature, it arrives in order, creating a write pattern that targets a single logical partition key over and over again. This creates a 'hot partition,' a scenario where a single underlying physical storage node is overwhelmed, leading to the dreaded ProvisionedThroughputExceededException, throttling, and a cascade of failures in upstream services.

This article is not about the 'what' but the 'how'. We assume you understand DynamoDB's partitioning basics. Instead, we will dissect three production-proven write sharding patterns to solve the hot partition problem for time-series workloads. We will explore their implementation nuances, read-path complexities, performance trade-offs, and the specific scenarios where each one excels.

Anatomy of a Time-Series Hot Partition

DynamoDB partitions data based on the Partition Key (PK). A naive but common approach for time-series data is to use an entity identifier as the PK and the timestamp as the Sort Key (SK).

* PK: device_id

* SK: timestamp

This works perfectly for retrieving data for a single device in chronological order. However, if you have a different access pattern, such as ingesting all events for all devices as they happen, you might model it like this:

* PK: event_type (e.g., SENSOR_READING)

* SK: timestamp#device_id

In this second model, every single write for SENSOR_READING events goes to the same logical partition. DynamoDB's underlying physical partitions have hard limits (currently ~1000 WCU or 3000 RCU per second). When your ingest velocity exceeds this for a single PK, you are throttled. Period.

Let's simulate this failure scenario in Python:

python
# WARNING: This code is designed to CAUSE a hot partition.
# Do not run against a production table without appropriate capacity planning.

import boto3
import time
import uuid
import threading
from datetime import datetime, timezone

# Configure your table name and region
TABLE_NAME = 'TimeSeriesData'
AWS_REGION = 'us-east-1'

# A low WCU setting to demonstrate throttling easily
PROVISIONED_WCU = 10

dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)
table = dynamodb.Table(TABLE_NAME)

def create_table_if_not_exists():
    client = boto3.client('dynamodb', region_name=AWS_REGION)
    try:
        client.describe_table(TableName=TABLE_NAME)
        print(f"Table '{TABLE_NAME}' already exists.")
    except client.exceptions.ResourceNotFoundException:
        print(f"Creating table '{TABLE_NAME}'...")
        client.create_table(
            TableName=TABLE_NAME,
            KeySchema=[
                {'AttributeName': 'PK', 'KeyType': 'HASH'},
                {'AttributeName': 'SK', 'KeyType': 'RANGE'}
            ],
            AttributeDefinitions=[
                {'AttributeName': 'PK', 'AttributeType': 'S'},
                {'AttributeName': 'SK', 'AttributeType': 'S'}
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': PROVISIONED_WCU
            }
        )
        waiter = client.get_waiter('table_exists')
        waiter.wait(TableName=TABLE_NAME)
        print(f"Table '{TABLE_NAME}' created.")

def write_hot_partition_data(thread_id, num_records):
    hot_pk = "SENSOR_READING#2023-10-27"
    throttled_count = 0
    for i in range(num_records):
        try:
            timestamp = datetime.now(timezone.utc).isoformat()
            item_id = str(uuid.uuid4())
            table.put_item(
                Item={
                    'PK': hot_pk,
                    'SK': f"{timestamp}#{item_id}",
                    'thread_id': thread_id,
                    'payload': {'value': i}
                }
            )
            if i % 10 == 0:
                print(f"[Thread {thread_id}] Wrote {i+1}/{num_records} records.")
        except Exception as e:
            if 'ProvisionedThroughputExceededException' in str(e):
                throttled_count += 1
            else:
                print(f"[Thread {thread_id}] Error: {e}")
    print(f"--- [Thread {thread_id}] Finished. Throttled {throttled_count} times. ---")

if __name__ == "__main__":
    create_table_if_not_exists()
    
    num_threads = 5
    records_per_thread = 100
    threads = []

    print(f"Starting {num_threads} threads to write to a single partition key...")
    for i in range(num_threads):
        thread = threading.Thread(target=write_hot_partition_data, args=(i, records_per_thread))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print("All writes completed. Check CloudWatch for 'WriteThrottleEvents'.")

Running this code against a table with low WCU will quickly generate a significant number of WriteThrottleEvents, proving the concept. The solution is to artificially create more partition keys for the same logical group of data. This is write sharding.

Pattern 1: Random Suffix Sharding

This is the simplest and often the most effective strategy for pure write distribution. The core idea is to append a random suffix from a fixed set to the partition key.

Data Model:

* PK: # (e.g., SENSOR_READING#2023-10-27#_7)

* SK: timestamp#

The random_suffix is an integer between 0 and N-1, where N is your chosen shard count.

Implementation: Write Path

The write path is straightforward. Before writing, you simply append a random suffix to your base PK.

python
import random

class ShardedTimeSeriesWriter:
    def __init__(self, table, shard_count=10):
        self.table = table
        self.shard_count = shard_count
        print(f"Writer initialized with {self.shard_count} shards.")

    def write_event(self, base_pk, payload):
        shard_id = random.randint(0, self.shard_count - 1)
        sharded_pk = f"{base_pk}#_{shard_id}"
        
        timestamp = datetime.now(timezone.utc).isoformat()
        item_id = str(uuid.uuid4())
        sort_key = f"{timestamp}#{item_id}"
        
        try:
            self.table.put_item(
                Item={
                    'PK': sharded_pk,
                    'SK': sort_key,
                    'payload': payload
                }
            )
            return True, sharded_pk
        except Exception as e:
            print(f"Error writing to shard {sharded_pk}: {e}")
            return False, None

# Usage:
# writer = ShardedTimeSeriesWriter(table, shard_count=20)
# for i in range(1000):
#     success, pk = writer.write_event("SENSOR_READING#2023-10-27", {'value': i})
#     if not success:
#         # Implement retry logic with exponential backoff and jitter here
#         time.sleep(0.1)

This effectively distributes writes across 20 different logical partitions, dramatically increasing your aggregate write throughput capacity for that base key.

Implementation: Read Path (The Scatter-Gather Problem)

Herein lies the critical trade-off. To read all data for SENSOR_READING#2023-10-27 within a specific time range, you no longer have a single partition to query. You must query all N partitions in parallel and merge the results in your application layer. This is a classic scatter-gather pattern.

python
from boto3.dynamodb.conditions import Key
from concurrent.futures import ThreadPoolExecutor

class ShardedTimeSeriesReader:
    def __init__(self, table, shard_count=10):
        self.table = table
        self.shard_count = shard_count

    def _query_shard(self, base_pk, shard_id, start_time, end_time):
        sharded_pk = f"{base_pk}#_{shard_id}"
        print(f"Querying shard: {sharded_pk}")
        try:
            response = self.table.query(
                KeyConditionExpression=Key('PK').eq(sharded_pk) & Key('SK').between(
                    start_time.isoformat(), 
                    end_time.isoformat()
                )
            )
            return response.get('Items', [])
        except Exception as e:
            print(f"Error querying shard {sharded_pk}: {e}")
            return []

    def get_events_for_range(self, base_pk, start_time, end_time):
        all_items = []
        with ThreadPoolExecutor(max_workers=self.shard_count) as executor:
            futures = [
                executor.submit(self._query_shard, base_pk, i, start_time, end_time)
                for i in range(self.shard_count)
            ]
            for future in futures:
                all_items.extend(future.result())

        # The results are from N different shards, they must be sorted by the SK
        all_items.sort(key=lambda item: item['SK'])
        return all_items

# Usage:
# from datetime import datetime, timedelta, timezone
# reader = ShardedTimeSeriesReader(table, shard_count=20)
# end_ts = datetime.now(timezone.utc)
# start_ts = end_ts - timedelta(minutes=5)
# items = reader.get_events_for_range("SENSOR_READING#2023-10-27", start_ts, end_ts)
# print(f"Retrieved {len(items)} items across all shards.")

Analysis of Random Suffix Sharding:

* Pros:

* Excellent write distribution. Almost perfectly uniform if the random number generator is good.

* Simple to implement the write path.

* Cons:

* Extremely inefficient reads. A single logical query requires N read operations against DynamoDB, consuming N times the RCU and increasing cost proportionally.

* Higher read latency. The overall latency is dictated by the slowest of the N parallel queries.

* Application-side complexity for merging and sorting results.

* When to Use: Ideal for write-intensive, read-light workloads. Think of systems that archive massive amounts of data that is rarely queried in real-time, or where write availability is far more critical than read performance (e.g., collecting raw logs for offline batch processing).

Pattern 2: Calculated Suffix Sharding

This pattern refines random sharding by using a high-cardinality attribute from the data itself to deterministically calculate the shard suffix. This can dramatically improve read performance if your access patterns align.

Imagine our SENSOR_READING payload also contains a device_id.

Data Model:

* Payload: { "device_id": "dev-abc-123", "temperature": 23.5 }

* PK: # (e.g., SENSOR_READING#2023-10-27#_4)

* SK: timestamp#

Implementation: Write Path

The write path now involves hashing a field from the payload to determine the shard.

python
import hashlib

class CalculatedShardedWriter:
    def __init__(self, table, shard_count=10):
        self.table = table
        self.shard_count = shard_count

    def _calculate_shard_id(self, key_string):
        # Use a stable hashing algorithm like SHA-256
        hashed = hashlib.sha256(key_string.encode('utf-8')).hexdigest()
        # Convert a portion of the hex hash to an integer and modulo
        return int(hashed[:8], 16) % self.shard_count

    def write_event(self, base_pk, sharding_key, payload):
        shard_id = self._calculate_shard_id(sharding_key)
        sharded_pk = f"{base_pk}#_{shard_id}"
        
        timestamp = datetime.now(timezone.utc).isoformat()
        # Include the sharding key in the SK for uniqueness and retrieval
        sort_key = f"{timestamp}#{sharding_key}"
        
        try:
            self.table.put_item(
                Item={
                    'PK': sharded_pk,
                    'SK': sort_key,
                    'payload': payload
                }
            )
            return True, sharded_pk
        except Exception as e:
            print(f"Error writing to shard {sharded_pk}: {e}")
            return False, None

# Usage:
# writer = CalculatedShardedWriter(table, shard_count=20)
# device_id = "dev-xyz-789"
# payload = {"device_id": device_id, "pressure": 1012}
# success, pk = writer.write_event("SENSOR_READING#2023-10-27", device_id, payload)

Implementation: Read Path Advantage

The magic happens when your read query includes the same high-cardinality key. If you want to retrieve all readings for dev-xyz-789 within a time range, you can calculate the exact shard to query, avoiding the scatter-gather entirely.

python
class CalculatedShardedReader:
    def __init__(self, table, shard_count=10):
        self.table = table
        self.shard_count = shard_count
        # Share the same hashing logic with the writer
        self._writer_logic = CalculatedShardedWriter(table, shard_count)

    def get_events_for_key(self, base_pk, sharding_key, start_time, end_time):
        # Calculate the specific shard to target
        shard_id = self._writer_logic._calculate_shard_id(sharding_key)
        sharded_pk = f"{base_pk}#_{shard_id}"
        print(f"Targeting specific shard: {sharded_pk}")
        
        try:
            response = self.table.query(
                KeyConditionExpression=Key('PK').eq(sharded_pk) & Key('SK').between(
                    start_time.isoformat(), 
                    end_time.isoformat()
                )
            )
            # We might still need to filter if the SK contains more than just the timestamp
            # In our case, SK is timestamp#device_id, so a range query on timestamp is fine.
            return response.get('Items', [])
        except Exception as e:
            print(f"Error querying shard {sharded_pk}: {e}")
            return []

    def get_all_events_for_range(self, base_pk, start_time, end_time):
        # If the sharding key is not available, we must fall back to scatter-gather
        print("Sharding key not provided. Falling back to scatter-gather.")
        # (Implementation would be identical to Random Suffix reader)
        pass

Analysis of Calculated Suffix Sharding:

* Pros:

* Excellent balance of write distribution and read performance.

* Enables highly efficient, single-partition reads for specific access patterns.

* Cons:

* Distribution Quality: The effectiveness depends entirely on the cardinality and distribution of your chosen sharding key. If 90% of your events come from one device_id, you will still have a hot partition.

* Access Pattern Dependency: It only provides a read advantage if the query pattern includes the sharding key. For queries that don't, you must fall back to the costly scatter-gather.

* When to Use: This is a powerful pattern for multi-tenant systems or any scenario where you have a well-distributed, high-cardinality identifier (user_id, session_id, device_id) that is present in both writes and your most common read queries.

Pattern 3: GSI-Based Write Sharding and Read Aggregation

This is the most sophisticated pattern, offering the best of both worlds: fully distributed writes and highly efficient, aggregated reads, at the cost of increased complexity and write cost. It uses a sharded base table for writes and a Global Secondary Index (GSI) to create a non-sharded, queryable view of the data.

Data Model:

* Base Table: Optimized for writes.

* PK: # (e.g., device-group-A#_15)

* SK: timestamp

* Other attributes: payload, GSI1PK, GSI1SK

* Global Secondary Index (GSI1): Optimized for reads.

* GSI1PK: # (e.g., device-group-A#2023-10-27T14:00)

* GSI1SK: timestamp

The time_bucket is a truncated version of the full timestamp (e.g., rounded down to the nearest hour or day).

Implementation: Table and GSI Definition (Infrastructure as Code)

Defining this structure is best done via IaC like CloudFormation or Terraform.

yaml
# Example CloudFormation template snippet
Resources:
  TimeSeriesTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: TimeSeriesShardedWithGSI
      AttributeDefinitions:
        - AttributeName: PK
          AttributeType: S
        - AttributeName: SK
          AttributeType: S
        - AttributeName: GSI1PK
          AttributeType: S
        - AttributeName: GSI1SK
          AttributeType: S
      KeySchema:
        - AttributeName: PK
          KeyType: HASH
        - AttributeName: SK
          KeyType: RANGE
      ProvisionedThroughput:
        ReadCapacityUnits: 50
        WriteCapacityUnits: 200 # Capacity for base table writes
      GlobalSecondaryIndexes:
        - IndexName: GSI1-Read-Aggregator
          KeySchema:
            - AttributeName: GSI1PK
              KeyType: HASH
            - AttributeName: GSI1SK
              KeyType: RANGE
          Projection:
            ProjectionType: ALL # Or INCLUDE specific attributes
          ProvisionedThroughput:
            ReadCapacityUnits: 100 # Capacity for read queries
            WriteCapacityUnits: 200 # Must match base table for On-Demand, or be provisioned separately

Implementation: Write and Read Paths

The application logic must now construct both the base table keys and the GSI keys on every write.

python
class GsiShardedService:
    def __init__(self, table, shard_count=16):
        self.table = table
        self.shard_count = shard_count

    def _get_time_bucket(self, dt_object, granularity='hour'):
        if granularity == 'hour':
            return dt_object.strftime('%Y-%m-%dT%H:00:00Z')
        elif granularity == 'day':
            return dt_object.strftime('%Y-%m-%d')
        # Add other granularities as needed
        return dt_object.isoformat()

    def write_event(self, entity_id, payload):
        shard_id = random.randint(0, self.shard_count - 1)
        sharded_pk = f"{entity_id}#_{shard_id}"
        
        now = datetime.now(timezone.utc)
        timestamp = now.isoformat()
        
        # Calculate GSI keys
        gsi1_pk = f"{entity_id}#{self._get_time_bucket(now, 'hour')}"
        gsi1_sk = timestamp

        try:
            self.table.put_item(
                Item={
                    'PK': sharded_pk,
                    'SK': timestamp,
                    'GSI1PK': gsi1_pk,
                    'GSI1SK': gsi1_sk,
                    'payload': payload
                }
            )
            return True
        except Exception as e:
            print(f"Error writing item: {e}")
            return False

    def get_events_for_range(self, entity_id, start_time, end_time):
        # This is now a GSI query
        # For simplicity, this example assumes the range is within a single hour bucket.
        # A production implementation must handle ranges spanning multiple buckets.
        time_bucket = self._get_time_bucket(start_time, 'hour')
        gsi_pk = f"{entity_id}#{time_bucket}"
        
        print(f"Querying GSI with PK: {gsi_pk}")
        try:
            response = self.table.query(
                IndexName='GSI1-Read-Aggregator',
                KeyConditionExpression=Key('GSI1PK').eq(gsi_pk) & Key('GSI1SK').between(
                    start_time.isoformat(), 
                    end_time.isoformat()
                )
            )
            return response.get('Items', [])
        except Exception as e:
            print(f"Error querying GSI: {e}")
            return []

# Usage:
# service = GsiShardedService(table, shard_count=50)
# service.write_event("device-group-A", {"temp": 35.1})
# ... many writes
# items = service.get_events_for_range("device-group-A", start_ts, end_ts)

Analysis of GSI Fan-out Pattern:

* Pros:

* Optimal for both reads and writes. Achieves high write distribution on the base table and efficient, single-partition (per time bucket) reads on the GSI.

* Decouples write scaling from read scaling. You can provision capacity for the base table and GSI independently (in provisioned mode).

* Cons:

Increased Write Cost: Every put_item now incurs the cost of writing to the base table and* the GSI. With On-Demand capacity, this can roughly double your write costs.

* GSI Throttling: The GSI itself can become a hot partition. If all your data for an entity_id within a single time bucket (e.g., one hour) exceeds the GSI partition's throughput limits, the GSI will throttle, which in turn throttles the write to the base table.

* Eventual Consistency: GSIs are updated asynchronously. Reads from the GSI may not reflect the absolute latest writes to the base table (typically a lag of milliseconds, but it's not guaranteed).

* Complexity: Requires careful selection of time bucket granularity and handling queries that span multiple buckets.

Production Considerations and Decision Matrix

Choosing the right pattern is a critical architectural decision.

Determining Shard Count (N)

Don't guess your shard count. Calculate it based on your peak expected write traffic. A conservative formula is:

N = Ceil( (Peak Writes per Second for a Base Key * Avg Item Size in KB) / 1000 WCU )

For example, if you expect a peak of 5,000 writes/sec for a single base key (e.g., SENSOR_READING#), and your average item size is 1KB, you need 5,000 WCU. Since a single partition maxes out at 1,000 WCU, you need at least Ceil(5000 / 1000) = 5 shards. It's wise to add a buffer, so choosing N=10 or N=16 would be a safe starting point.

Monitoring and Error Handling

* Key Metrics: Monitor WriteThrottleEvents and ReadThrottleEvents for both the base table and any GSIs. Use CloudWatch Contributor Insights to identify which keys are 'hot'.

Robust Client: Your application must* implement exponential backoff with jitter for retrying throttled requests. boto3 has a built-in retry mechanism, but ensure its configuration is aggressive enough for your latency requirements.

Decision Matrix

Feature / ConcernRandom SuffixCalculated SuffixGSI Fan-out
Write DistributionExcellentGood (depends on key distribution)Excellent
Read EfficiencyVery Poor (Scatter-Gather)Excellent (for targeted queries)Excellent (for time-range queries)
Implementation CostLowMediumHigh
Operational CostLow Write / High ReadBalancedHigh Write / Low Read
ConsistencyStrongly Consistent (on read)Strongly Consistent (on read)Eventually Consistent (on GSI read)
Best ForWrite-heavy, read-light archival workloads.Multi-tenant systems with predictable queries.Balanced R/W workloads needing fast reads.

Conclusion

There is no silver bullet for solving the hot partition problem in DynamoDB. The optimal solution is a direct function of your application's specific access patterns, latency requirements, and cost constraints.

* Start with the simplest pattern that meets your needs. If your reads are infrequent, Random Suffix Sharding is simple and robust.

* If your system has a natural, high-cardinality key that aligns with your primary read path, Calculated Suffix Sharding offers a powerful and cost-effective balance.

* For demanding applications that require both high write throughput and low-latency range queries, the GSI Fan-out pattern, despite its complexity and cost, is the superior architectural choice.

By understanding the deep trade-offs of each approach, you can design a time-series data layer on DynamoDB that is not only scalable and performant but also resilient to the very traffic patterns that would cripple a naive implementation.

Found this article helpful?

Share it with others who might benefit from it.

More Articles