DynamoDB Write Sharding for High-Velocity Time-Series Data

20 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Bottleneck: Understanding Time-Series Hot Partitions

In any high-throughput system ingesting time-series data—be it IoT metrics, application logs, or user events—Amazon DynamoDB presents itself as a compelling choice due to its scalability and performance. However, a naive table design quickly collides with a fundamental architectural constraint: partition throughput limits. When ingesting time-series data, the most intuitive partition key is often the entity identifier (device_id, user_id) and the sort key is a timestamp. This works until a single entity becomes 'hot' and exceeds the per-partition limit of 1,000 Write Capacity Units (WCUs) per second.

More insidiously, if the partition key is based on the event time itself (e.g., YYYY-MM-DD), all incoming data for that day will target a single logical partition, creating a massive, predictable bottleneck. This phenomenon, known as a 'hot partition', is not a flaw in DynamoDB but a direct consequence of its hashing-based partitioning scheme. The partition key is hashed to determine which of the underlying storage partitions will handle the read/write operation. Sequential or low-cardinality keys at high velocity guarantee that one partition will bear the brunt of the load, leading to throttling (ProvisionedThroughputExceededException) and system failure.

This article bypasses introductory concepts and dives directly into the advanced, production-tested strategies for resolving this specific problem through write sharding. We will dissect three primary patterns, analyze their performance characteristics, and provide complete implementation code for a real-world IoT sensor data scenario.

Prerequisite: A Quick Refresher on Partition Mechanics

Before we architect solutions, let's align on the mechanics. A DynamoDB table is composed of one or more partitions. DynamoDB allocates an item to a partition based on the hash value of its partition key. Each partition has a hard limit:

* Storage: ~10 GB of data.

* Throughput: 3,000 Read Capacity Units (RCUs) or 1,000 Write Capacity Units (WCUs) per second (or a linear combination).

When you provision throughput for a table, it's divided evenly among its partitions. However, DynamoDB's adaptive capacity allows a high-traffic partition to temporarily 'borrow' throughput from other idle partitions on the same underlying storage node. While useful, this is a reactive, best-effort mechanism and should never be relied upon as a primary scaling strategy for predictable, high-velocity workloads. Our goal is proactive, architectural distribution of writes, rendering adaptive capacity a fallback, not a feature.

Core Strategy: Artificial Cardinality via Composite Partition Keys

The fundamental solution to a hot partition is to artificially increase the cardinality of the partition key. Instead of a single partition key that becomes hot, we create a composite key that distributes writes for the same logical entity across a predetermined number of logical partitions. The pattern is simple in concept:

new_partition_key = base_partition_key + "#" + shard_id

Here, base_partition_key is our original identifier (e.g., sensor-A1B2), and shard_id is a suffix calculated to distribute the load. The challenge lies in how we determine this shard_id and the profound impact that choice has on our ability to read the data back efficiently.

Let's model our scenario: We are ingesting data from millions of IoT sensors. Each sensor reports its temperature, humidity, and pressure every second.

* Peak Global Ingestion Rate: 50,000 writes/second.

* Peak Single Sensor Rate: 60 writes/minute (can burst).

* Average Item Size: ~500 bytes (0.5 KB).

* Problematic Sensor: sensor-alpha-001 is located in a critical environment and sometimes bursts to 2,000 writes/second during an event.

A single sensor bursting to 2,000 writes/second requires 2,000 WCUs (since each write is <= 1KB). This is double the 1,000 WCU limit of a single partition. Sharding is not optional; it's a requirement.

Pattern 1: Random Suffix Sharding

This is the simplest sharding strategy to implement. For each incoming write, we append a random suffix from a fixed range to the base partition key.

* Partition Key: sensor_id#random_integer

* Sort Key: timestamp

Implementation

First, we need to determine our shard count (N). A good starting point is to calculate it based on the peak expected throughput for a single entity.

N = Ceil(Peak WCU for single entity / Max WCU per partition)

N = Ceil(2000 / 1000) = 2

To be safe and account for uneven distribution, we should choose a higher number. A common practice is to use a factor of 2x to 10x the calculated minimum. Let's choose N=10.

Here's a Python implementation using boto3:

python
import boto3
import random
import time
import uuid
from decimal import Decimal

# Configuration
DYNAMODB_TABLE_NAME = 'TimeSeriesData'
SHARD_COUNT = 10
HOT_SENSOR_ID = 'sensor-alpha-001'

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE_NAME)

def write_random_sharded_item(sensor_id: str, temperature: float, humidity: float):
    """
    Writes a single item to DynamoDB using random suffix sharding.
    """
    shard_id = random.randint(0, SHARD_COUNT - 1)
    partition_key = f"{sensor_id}#{shard_id}"
    iso_timestamp = time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime(time.time())) + f"{int(time.time() * 1000) % 1000:03d}Z"
    
    item = {
        'PK': partition_key,
        'SK': iso_timestamp, # Sort key is the precise timestamp
        'sensor_id': sensor_id, # Store original ID for GSI or filtering
        'temperature': Decimal(str(temperature)),
        'humidity': Decimal(str(humidity)),
        'ttl': int(time.time()) + (86400 * 30) # Expire after 30 days
    }
    
    try:
        table.put_item(Item=item)
        print(f"Successfully wrote item with PK: {partition_key}")
    except Exception as e:
        print(f"Error writing item: {e}")

# --- Usage Example: Simulating a burst from the hot sensor ---
def simulate_burst_write():
    print("--- Simulating Burst Write with Random Suffix Sharding ---")
    for _ in range(20):
        write_random_sharded_item(
            sensor_id=HOT_SENSOR_ID,
            temperature=random.uniform(90.0, 110.0),
            humidity=random.uniform(10.0, 20.0)
        )

# simulate_burst_write()

Analysis

* Write Performance: Excellent. The random nature of the suffix ensures that burst writes from sensor-alpha-001 are distributed evenly across 10 different logical partitions. The probability of hitting the 1,000 WCU limit is virtually eliminated.

* Read Performance: Terrible for time-range queries. To fetch all data for sensor-alpha-001 within a specific hour, you have no choice but to perform a scatter-gather query. You must issue N parallel Query operations—one for each possible shard—and merge the results in your application.

python
def read_random_sharded_items(sensor_id: str, start_time: str, end_time: str):
    """
    Reads items for a sensor by querying all possible shards in parallel.
    This is INEFFICIENT and demonstrates the drawback of this pattern.
    """
    print(f"\n--- Performing Scatter-Gather Read for {sensor_id} ---")
    from concurrent.futures import ThreadPoolExecutor

    all_items = []

    def query_shard(shard_id):
        partition_key = f"{sensor_id}#{shard_id}"
        print(f"Querying shard: {partition_key}")
        response = table.query(
            KeyConditionExpression='PK = :pk AND SK BETWEEN :start AND :end',
            ExpressionAttributeValues={
                ':pk': partition_key,
                ':start': start_time,
                ':end': end_time
            }
        )
        return response.get('Items', [])

    with ThreadPoolExecutor(max_workers=SHARD_COUNT) as executor:
        futures = [executor.submit(query_shard, i) for i in range(SHARD_COUNT)]
        for future in futures:
            all_items.extend(future.result())
    
    # Sort results chronologically as they come from different shards
    all_items.sort(key=lambda x: x['SK'])
    return all_items

# --- Usage Example: Reading data back ---
# start_iso = "2023-10-27T10:00:00.000Z"
# end_iso = "2023-10-27T11:00:00.000Z"
# items = read_random_sharded_items(HOT_SENSOR_ID, start_iso, end_iso)
# print(f"Found {len(items)} items across {SHARD_COUNT} shards.")

Verdict: Use Random Suffix Sharding only when the access pattern is exclusively write-heavy or involves fetching single items (if you also store the full sharded key elsewhere). It's a poor choice for workloads requiring frequent range queries on the sharded entity.

Pattern 2: Calculated Suffix Sharding

This pattern improves on random sharding by making the shard ID deterministic, based on a high-cardinality attribute of the data item itself. For our IoT example, we could use a unique event_id that comes with each payload.

* Partition Key: sensor_id#hash(event_id) % N

* Sort Key: timestamp

Implementation

The key change is that we can now target a specific item for a read or update if we know its event_id, as we can recalculate the full partition key. This eliminates the need for a scatter-gather for single-item lookups.

python
import hashlib

def get_calculated_shard_id(event_id: str, shard_count: int) -> int:
    """
    Calculates a deterministic shard ID from a high-cardinality string.
    """
    # Use a stable hashing algorithm like SHA-256
    hashed = hashlib.sha256(event_id.encode('utf-8')).hexdigest()
    # Convert hex hash to an integer and modulo by shard count
    return int(hashed, 16) % shard_count

def write_calculated_sharded_item(sensor_id: str, event_id: str, temperature: float, humidity: float):
    """
    Writes a single item using a calculated suffix from the event_id.
    """
    shard_id = get_calculated_shard_id(event_id, SHARD_COUNT)
    partition_key = f"{sensor_id}#{shard_id}"
    iso_timestamp = time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime(time.time())) + f"{int(time.time() * 1000) % 1000:03d}Z"
    
    item = {
        'PK': partition_key,
        'SK': iso_timestamp,
        'sensor_id': sensor_id,
        'event_id': event_id,
        'temperature': Decimal(str(temperature)),
        'humidity': Decimal(str(humidity)),
    }
    
    try:
        table.put_item(Item=item)
        print(f"Successfully wrote item with PK: {partition_key} (from event_id: {event_id})")
    except Exception as e:
        print(f"Error writing item: {e}")

# --- Usage Example ---
def simulate_burst_with_calculated_shards():
    print("\n--- Simulating Burst Write with Calculated Suffix Sharding ---")
    for _ in range(20):
        # In a real system, the event_id would come from the device
        unique_event_id = str(uuid.uuid4())
        write_calculated_sharded_item(
            sensor_id=HOT_SENSOR_ID,
            event_id=unique_event_id,
            temperature=random.uniform(90.0, 110.0),
            humidity=random.uniform(10.0, 20.0)
        )

# simulate_burst_with_calculated_shards()

Analysis

* Write Performance: Excellent. Assuming the event_id is sufficiently unique and random (like a UUID), the hash function will provide a uniform distribution across the shards.

* Read Performance (Single Item): Greatly improved. If you need to fetch a specific event, you can reconstruct the full partition key: PK = f"{sensor_id}#{get_calculated_shard_id(event_id, N)}" and perform a targeted GetItem call.

* Read Performance (Time Range): Still terrible. The fundamental problem remains: to get all data for a sensor in a time range, you don't know which shards contain that data. You are still forced into a costly scatter-gather operation.

Verdict: Calculated Suffix Sharding is ideal for use cases where write distribution is paramount and reads are primarily focused on fetching specific, known items. It's an improvement over the random pattern but still fails to address the time-series range query problem efficiently.

Pattern 3: Hybrid Time-Bucket Sharding (The Production Standard)

This is the most robust and commonly used pattern for high-velocity time-series data because it solves both the write distribution and the read-range query problem, albeit with its own set of trade-offs.

The strategy is to create a composite partition key that includes both a time bucket and a shard identifier.

* Partition Key: entity_id#YYYY-MM-DD-HH (or a more granular bucket like HH:MM)

* Sort Key: nanosecond_timestamp#event_id

Wait, this doesn't look sharded! If we use this key, all data for sensor-alpha-001 in a given hour will go to the same partition (sensor-alpha-001#2023-10-27-10). We've solved for time-based queries but re-introduced the hot partition problem for a single entity.

The true hybrid pattern combines the time bucket with a calculated shard suffix. The entity_id is no longer part of the partition key; it's part of the sort key or a separate attribute indexed by a Global Secondary Index (GSI).

The Advanced Schema:

* Table Partition Key (PK): shard_id#YYYY-MM-DD-HH

* Table Sort Key (SK): sensor_id#iso_timestamp

* Attributes: event_id, temperature, humidity, etc.

* Shard ID Calculation: shard_id = hash(sensor_id) % N

Let's break down why this works:

  • Write Distribution: A write for sensor-alpha-001 is first hashed to determine its shard ID (e.g., hash('sensor-alpha-001') % 16 -> 7). Then, the current time bucket is determined (e.g., 2023-10-27-15). The resulting PK is 7#2023-10-27-15. This distributes writes for different sensors across different shard prefixes. Writes for the same sensor are always directed to the same shard prefix for any given hour.
  • Time-Series Locality: All data for a given sensor within the same hour is co-located under the same partition key, sorted by timestamp. This makes range queries extremely efficient.
  • Preventing Single-Sensor Hotspots: A burst from sensor-alpha-001 will be contained within its designated shard for that hour (e.g., 7#2023-10-27-15). As long as the WCU requirement for that single sensor within its shard does not exceed 1,000 WCU, the system remains healthy.
  • Implementation

    This implementation is more complex, encapsulating the key generation logic.

    python
    class TimeSeriesManager:
        def __init__(self, table_name: str, shard_count: int):
            self.table = boto3.resource('dynamodb').Table(table_name)
            self.shard_count = shard_count
    
        def _get_shard_id(self, sensor_id: str) -> int:
            hashed = hashlib.sha256(sensor_id.encode('utf-8')).hexdigest()
            return int(hashed, 16) % self.shard_count
    
        def _get_time_bucket(self, timestamp: float) -> str:
            return time.strftime('%Y-%m-%dT%H', time.gmtime(timestamp))
    
        def write_item(self, sensor_id: str, event_id: str, data: dict):
            current_time = time.time()
            shard_id = self._get_shard_id(sensor_id)
            time_bucket = self._get_time_bucket(current_time)
            
            partition_key = f"{shard_id}#{time_bucket}"
            # Add nanoseconds for uniqueness in the sort key
            iso_timestamp = time.strftime('%Y-%m-%dT%H:%M:%S.', time.gmtime(current_time)) + f"{int(current_time * 1e6) % 1e6:06.0f}Z"
            sort_key = f"{sensor_id}#{iso_timestamp}"
            
            item = {
                'PK': partition_key,
                'SK': sort_key,
                'event_id': event_id,
                **{k: Decimal(str(v)) for k, v in data.items()}
            }
            
            try:
                self.table.put_item(Item=item)
                print(f"Wrote to PK: {partition_key}, SK starts with: {sensor_id}")
            except Exception as e:
                print(f"Error: {e}")
    
        def query_sensor_for_hour(self, sensor_id: str, timestamp: float):
            shard_id = self._get_shard_id(sensor_id)
            time_bucket = self._get_time_bucket(timestamp)
            partition_key = f"{shard_id}#{time_bucket}"
            
            print(f"\n--- Querying for {sensor_id} in hour {time_bucket} (PK: {partition_key}) ---")
            
            response = self.table.query(
                KeyConditionExpression='PK = :pk AND begins_with(SK, :sk_prefix)',
                ExpressionAttributeValues={
                    ':pk': partition_key,
                    ':sk_prefix': f"{sensor_id}#"
                }
            )
            return response.get('Items', [])
    
    # --- Usage Example ---
    manager = TimeSeriesManager(DYNAMODB_TABLE_NAME, shard_count=16)
    
    # Simulate writes from two different sensors
    # They will be hashed to different shard prefixes
    manager.write_item('sensor-alpha-001', str(uuid.uuid4()), {'temp': 95.5})
    manager.write_item('sensor-beta-002', str(uuid.uuid4()), {'temp': 34.1})
    manager.write_item('sensor-alpha-001', str(uuid.uuid4()), {'temp': 96.0})
    
    # Query for one sensor's data in the current hour
    items = manager.query_sensor_for_hour('sensor-alpha-001', time.time())
    print(f"Found {len(items)} items for sensor-alpha-001.")
    for item in items:
        print(item)

    Analysis

    * Write Performance: Very good. Writes are distributed based on the sensor_id hash. A global traffic spike is spread across N partitions. The risk is now a 'hot time bucket', where a single bucket (e.g., 7#2023-10-27-15) receives too many writes from many different sensors that all happen to hash to shard 7.

    * Read Performance: Excellent for the primary access pattern (get data for a sensor in a time window). This is a single, highly efficient Query operation.

    Edge Cases and Production Considerations

    Choosing a pattern is only the beginning. Senior engineers must plan for the second-order effects and future evolution of the system.

    Edge Case 1: Querying Across Time Buckets

    Problem: A user wants the last 3 hours of data for sensor-alpha-001. This data now lives in three different partitions (e.g., 7#...-15, 7#...-14, 7#...-13).

    Solution: The application layer is responsible for this orchestration. It must:

    • Calculate the required time buckets based on the query range.
    • Calculate the deterministic shard ID for the sensor.
    • Construct the N partition keys.
  • Issue N Query operations in parallel (e.g., using asyncio or thread pools).
    • Merge, sort, and paginate the results before returning them to the client.

    This adds application complexity but keeps the database operations maximally efficient.

    Edge Case 2: Evolving the Shard Count

    Problem: Your system grows, and your initial shard_count=16 is no longer sufficient. You need to increase it to 32 to handle increased load. How do you do this without downtime or a massive data migration?

    Solution: This is a complex but solvable problem using a phased rollout.

  • Introduce Dual Writes: Update your application logic. For a period, write to both the old and new shard locations. The logic would be:
  • python
        def write_migrating_item(...):
            # ...
            old_shard_id = hash(sensor_id) % 16
            new_shard_id = hash(sensor_id) % 32
            # ...
            write_to_pk(f"{old_shard_id}#{time_bucket}", ...)
            if old_shard_id != new_shard_id:
                write_to_pk(f"{new_shard_id}#{time_bucket}", ...)
  • Update Read Logic: The read path must now query both potential locations and merge the results. It checks the new location first, then the old one.
  • Backfill Data: Run a background job (e.g., AWS Glue, EMR) that scans the old data and writes it to the new sharded locations. This is the most intensive step.
  • Cutover: Once the backfill is complete and all new data is being dual-written, you can update the application to read from and write to only the new shard_count=32 locations.
  • Cleanup: After a verification period, the dual-write logic can be removed, and the old data can be expired via TTL or deleted.
  • Edge Case 3: The 'Thundering Herd'

    Problem: In the hybrid model, what happens if a global event causes all sensors to report at the exact same second (e.g., at the top of the hour)? Even though sensors are sharded, all writes for a given time bucket will activate simultaneously, potentially causing a temporary spike in demand across all partitions for that bucket.

    Solution:

  • Client-Side Jitter: The best solution is to prevent the behavior at the source. Clients (the IoT devices) should add a small, random delay (jitter) to their reporting schedule to smear the writes over a few seconds rather than a single millisecond.
  • On-Demand Capacity: For unpredictable spikes, DynamoDB's On-Demand capacity mode is a perfect fit. It automatically scales to handle the load, albeit at a higher cost than provisioned throughput. For many time-series workloads, a hybrid approach of provisioned capacity for the baseline and on-demand for spikes is cost-effective.
  • Ingestion Queuing: Place an SQS queue in front of DynamoDB. The devices write to the queue (which can absorb massive, sudden spikes), and a fleet of Lambda functions or ECS tasks consumes from the queue at a controlled rate, smoothing the writes into DynamoDB.
  • Conclusion: A Trade-off Analysis

    There is no single 'best' sharding strategy. The optimal choice is a deliberate engineering decision based on the specific read and write patterns of your application.

    StrategyWrite DistributionRead Complexity (Range Scan)Best For...
    Random SuffixExcellentVery High (Scatter-Gather)Write-only or key-value lookup workloads where range scans are rare.
    Calculated SuffixExcellentVery High (Scatter-Gather)Write-heavy workloads where reads are for specific, known items via their unique ID.
    Hybrid Time-BucketVery GoodLow (Targeted Query)Classic time-series workloads requiring both high-throughput writes and efficient time-range queries for specific entities.

    For most advanced time-series use cases, the Hybrid Time-Bucket Sharding pattern provides the most effective balance. It requires more complex application logic for key generation and cross-bucket queries, but this is the necessary trade-off for achieving scalable write ingestion while preserving the efficient read patterns that make DynamoDB a powerful tool for time-series data. Proactively architecting for these patterns is the difference between a system that scales effortlessly and one that requires constant, reactive fire-fighting.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles