Resilient Real-time Streams: Idempotent SSE with Last-Event-ID
The Deceptive Simplicity of Server-Sent Events
For many developers, Server-Sent Events (SSE) represent a straightforward, unidirectional alternative to WebSockets. The native browser EventSource API and the simplicity of the text-based protocol make it trivial to push data from server to client. A naive implementation in a framework like Express might look something like this:
// DO NOT USE IN PRODUCTION - Naive example
app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
const intervalId = setInterval(() => {
const message = `data: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`;
res.write(message);
}, 2000);
req.on('close', () => {
clearInterval(intervalId);
res.end();
});
});
This works, but it's a house of cards in any real-world scenario. Its fundamental flaw is its ephemerality. If a client's Wi-Fi drops for 30 seconds, or a server instance is redeployed, any events generated during that downtime are lost forever. The browser's automatic reconnection is a feature, but in this naive model, it simply starts a new, empty stream. This "at-most-once" delivery is unacceptable for financial tickers, live monitoring dashboards, or collaborative applications where data integrity is paramount.
This article dissects a robust, production-grade pattern that transforms SSE from a fragile tool into a fault-tolerant, resumable streaming mechanism. We will leverage the id field and the corresponding Last-Event-ID header, backed by a durable message store like Redis Streams, to guarantee that clients never miss an event, even across disconnects and server failures.
The `Last-Event-ID` Header: The Core of Resumability
The SSE specification includes a mechanism for stream resumption that is powerful but often overlooked. When you send an event, you can optionally include an id field:
data: {"message": "hello world"}
id: 1678886400000-0
The browser, upon receiving an event with an id, automatically stores this value. If the connection is ever lost, the EventSource API will attempt to reconnect. When it does, it will automatically include an HTTP header in its request: Last-Event-ID.
GET /events HTTP/1.1
Host: example.com
Accept: text/event-stream
Last-Event-ID: 1678886400000-0
This is the hook we need. The server's responsibility is now twofold:
Last-Event-ID header. If it exists, the server must query its event history and send the client all events that occurred after that ID, before resuming the live stream.This simple contract shifts the delivery guarantee from "at-most-once" to "at-least-once," forming the foundation of our resilient system.
Architecting a Production-Grade SSE Server
To fulfill this contract, the server needs a durable, queryable event history. An in-memory array is insufficient; it would be wiped out on a server restart. The ideal tool for this is a persistent message broker. While Kafka or RabbitMQ could work, Redis Streams are exceptionally well-suited for this pattern due to their lightweight nature, time-ordered IDs, and efficient range queries.
Our architecture will look like this:
XADD command. Redis automatically assigns a unique, time-based, monotonic ID (e.g., 1678886400000-0). * When a client connects, it checks for Last-Event-ID.
* It uses Redis's XRANGE or XREAD command to fetch historical messages from the stream, starting from the client's last known ID.
* It streams these historical messages to the client first.
* It then begins streaming new, live messages as they are added to the Redis Stream.
This decouples event generation from event delivery and provides the necessary persistence to survive any single-point failure.
Full Implementation: Node.js, Express, and Redis Streams
Let's build this system. You'll need Node.js and a running Redis instance.
Project Setup:
npm init -y
npm install express redis
We'll create two main files: producer.js to simulate event generation and server.js for our SSE endpoint.
producer.js - The Event Generator
This script will connect to Redis and periodically add new events to a stream named event_stream.
// producer.js
import { createClient } from 'redis';
const client = createClient();
async function produceEvents() {
console.log('Producer started. Adding events to `event_stream`...');
await client.connect();
let eventCounter = 0;
setInterval(async () => {
try {
eventCounter++;
const eventData = {
message: `Event number ${eventCounter}`,
timestamp: new Date().toISOString(),
payload: Math.random(),
};
// XADD event_stream * field value [field value ...]
// Using '*' tells Redis to auto-generate the ID
const eventId = await client.xAdd(
'event_stream',
'*',
{ data: JSON.stringify(eventData) }
);
console.log(`Produced event ${eventId}:`, eventData.message);
} catch (err) {
console.error('Error producing event:', err);
}
}, 2500); // Produce an event every 2.5 seconds
}
produceEvents().catch(console.error);
Run this in a separate terminal: node producer.js. You should see it start logging events to Redis.
server.js - The Resilient SSE Endpoint
This is the core of our system. It handles connections, backfills historical data, and streams live events.
// server.js
import express from 'express';
import { createClient } from 'redis';
const app = express();
const PORT = 3000;
// We need two Redis clients: one for subscribing (blocking) and one for other commands.
const redisClient = createClient();
const subscriber = redisClient.duplicate();
// Global list of connected clients
// In a real multi-instance setup, this state should not be here.
// But for a single instance, it's a simple way to manage connections.
let clients = [];
app.get('/events', async (req, res) => {
console.log('Client connected.');
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// Store the client response object
const clientId = Date.now();
const newClient = { id: clientId, res };
clients.push(newClient);
// Immediately send a connection confirmation
res.write('event: connected\ndata: {"message":"Connection established"}\n\n');
// Check for the Last-Event-ID header
const lastEventId = req.headers['last-event-id'];
if (lastEventId) {
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
try {
// Use XRANGE to get all events AFTER the last received ID.
// The '(' prefix on the start ID means an exclusive range.
const missedEvents = await redisClient.xRange('event_stream', `(${lastEventId}`, '+');
if (missedEvents.length > 0) {
console.log(`Found ${missedEvents.length} missed events. Sending now...`);
for (const event of missedEvents) {
const formattedMessage = `id: ${event.id}\ndata: ${event.message.data}\n\n`;
res.write(formattedMessage);
}
}
} catch (err) {
console.error('Error querying Redis for missed events:', err);
}
}
req.on('close', () => {
console.log(`Client ${clientId} disconnected.`);
clients = clients.filter(client => client.id !== clientId);
res.end();
});
});
// Function to broadcast new events to all connected clients
async function broadcastNewEvents() {
// Using XREAD with BLOCK to wait for new messages is highly efficient.
// We start reading from '$', which means only new messages arriving after we connect.
let currentId = '$';
while (true) {
try {
const response = await subscriber.xRead(
{
key: 'event_stream',
id: currentId
},
{ BLOCK: 0 } // Block indefinitely until a message arrives
);
if (response) {
const [stream] = response;
const [message] = stream.messages;
currentId = message.id;
console.log(`Broadcasting new event ${message.id}`);
const formattedMessage = `id: ${message.id}\ndata: ${message.message.data}\n\n`;
// Send to all connected clients
clients.forEach(client => client.res.write(formattedMessage));
}
} catch (err) {
console.error('Error in XREAD loop:', err);
// Implement backoff/retry logic here for production
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
async function startServer() {
await redisClient.connect();
await subscriber.connect();
app.listen(PORT, () => {
console.log(`SSE server listening on http://localhost:${PORT}`);
// Start the long-polling broadcast loop
broadcastNewEvents().catch(err => {
console.error('Broadcast function crashed:', err);
process.exit(1);
});
});
}
startServer();
index.html - The Client
Create a simple HTML file to test the connection.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Resilient SSE Client</title>
<style>
body { font-family: monospace; }
#events { border: 1px solid #ccc; padding: 10px; height: 500px; overflow-y: scroll; }
.event { padding: 5px; border-bottom: 1px solid #eee; }
.missed { background-color: #fffbe6; }
.live { background-color: #e6f7ff; }
</style>
</head>
<body>
<h1>SSE Events</h1>
<div id="status">Connecting...</div>
<div id="events"></div>
<script>
const eventsDiv = document.getElementById('events');
const statusDiv = document.getElementById('status');
let eventSource;
function connect() {
eventSource = new EventSource('/events');
eventSource.onopen = () => {
statusDiv.textContent = 'Connected.';
statusDiv.style.color = 'green';
};
eventSource.onerror = () => {
statusDiv.textContent = 'Disconnected. Retrying...';
statusDiv.style.color = 'red';
// EventSource auto-reconnects, no manual intervention needed
};
// Custom event for connection confirmation
eventSource.addEventListener('connected', (e) => {
const data = JSON.parse(e.data);
logEvent(data.message, 'system');
});
// Default message handler
eventSource.onmessage = (e) => {
const data = JSON.parse(e.data);
const eventId = e.lastEventId;
logEvent(`ID: ${eventId} | ${data.message}`, 'live');
};
}
function logEvent(text, type) {
const p = document.createElement('p');
p.className = 'event ' + type;
p.textContent = text;
eventsDiv.prepend(p); // Prepend to see new events at the top
}
connect();
</script>
</body>
</html>
Testing the Resilience
- Start Redis.
node producer.js in one terminal.node server.js in another terminal.index.html in your browser.You will see live events streaming in. Now, let's test the failure modes:
* Network Disconnect: In your browser's developer tools, go to the Network tab and set the connection to "Offline." Wait for about 10 seconds (enough for 3-4 events to be produced). Then, set the network back to "Online." You will see the client reconnect, and the server will immediately send the 3-4 missed events before the live stream resumes. The server logs will show Client reconnecting with Last-Event-ID: ... followed by Found X missed events. Success!
* Server Restart: Keep the client and producer running. Stop the server.js process (Ctrl+C). Wait 10 seconds. Restart it (node server.js). The browser will automatically reconnect to the new server instance. Because the new server can read the Last-Event-ID header and query the persistent Redis Stream, it will backfill all missed events exactly as before. Our system is resilient to server failure.
Advanced Edge Cases and Production Hardening
The core pattern is solid, but production systems require attention to detail.
1. Load Balancing and State
In our example, we used a global clients array. This is a critical flaw in a multi-instance, load-balanced environment. If a client disconnects and reconnects, the load balancer might send them to a different server instance. The original server would have a dangling connection, and the new server wouldn't know about it.
The solution is to make the SSE server stateless. Our Redis-backed architecture already achieves the most important part of this: the event history is centralized. The clients array, used for broadcasting live events, is the problem.
The correct pattern is to use Redis's Pub/Sub mechanism for live broadcasts.
live_events_channel).producer adds an event to the stream with XADD, it also PUBLISHes the event ID to this channel.- All server instances receive the notification via their Pub/Sub subscription.
This way, it doesn't matter which server the producer notifies or which server the client is connected to. All instances are synchronized via Redis Pub/Sub, and the client's state (Last-Event-ID) is managed entirely on the client and in the persistent stream.
2. Event Buffer Management (`MAXLEN`)
A Redis Stream can grow indefinitely, which could exhaust memory. We must cap its size. The XADD command has a MAXLEN option for this.
// In producer.js
await client.xAdd(
'event_stream',
'*',
{ data: JSON.stringify(eventData) },
{ TRIM: { 'MAXLEN': 10000 } } // Keep only the latest 10,000 events
);
Choosing the MAXLEN value is a critical business decision. It represents the trade-off between memory usage and the maximum duration a client can be disconnected before losing data. If you produce 10 events/second, a MAXLEN of 10,000 provides a buffer of 1,000 seconds (~16 minutes). A client disconnected for longer than that would experience data loss upon reconnection.
3. Heartbeats and Dead Connection Detection
Proxies, load balancers, and firewalls are notorious for silently dropping idle TCP connections. An SSE connection can appear idle if no events are sent for a minute. To prevent this, the server should periodically send a comment line (which is ignored by the EventSource API):
: heartbeat
This generates network traffic without triggering a message event on the client.
// In server.js, within the app.get('/events', ...) handler
const heartbeatInterval = setInterval(() => {
res.write(': heartbeat\n\n');
}, 20000); // Send a heartbeat every 20 seconds
// Don't forget to clear it on close
req.on('close', () => {
clearInterval(heartbeatInterval);
// ... rest of close logic
});
This also allows the client to detect a truly dead connection if it hasn't received a heartbeat or a message for a certain period, enabling more robust client-side error handling.
4. Idempotency vs. Resumability
Our system provides resumability (recovering missed messages), which is a form of "at-least-once" delivery. This implies that a client might, in rare race conditions, receive the same message twice. For example:
123-0.- Client receives it and processes it.
lastEventId is updated or before the TCP ACK is fully processed.Last-Event-ID: 122-0.123-0.To achieve true idempotency, the client application must be responsible for de-duplication. It should maintain a small, in-memory cache of the most recent event IDs it has processed and simply ignore any duplicates.
// In index.html <script>
const processedEventIds = new Set();
eventSource.onmessage = (e) => {
const eventId = e.lastEventId;
if (processedEventIds.has(eventId)) {
console.warn(`Duplicate event received and ignored: ${eventId}`);
return;
}
processedEventIds.add(eventId);
// To prevent this set from growing forever, you can implement a sliding window/LRU cache.
const data = JSON.parse(e.data);
logEvent(`ID: ${eventId} | ${data.message}`, 'live');
};
Performance and Scaling Considerations
* Redis Throughput: A single Redis instance can handle tens of thousands of XADD/XRANGE operations per second. For most applications, Redis will not be the bottleneck. For extreme scale, you can shard the event stream.
* Connection Limits: A single Node.js process can handle thousands of concurrent SSE connections, but this is memory and CPU-intensive. The primary scaling vector is horizontal: add more stateless server instances behind a load balancer.
* Backfill Storms: If many clients reconnect simultaneously (e.g., after a network partition resolves), they will all request historical data from Redis. This can cause a CPU and network spike. Redis is very efficient at range queries, but the Node.js server must be able to handle streaming large amounts of data to many clients at once. Monitor your event loop latency and consider implementing rate-limiting or backpressure if this becomes an issue.
Conclusion
By embracing the full SSE specification and integrating a durable message broker like Redis Streams, we have elevated Server-Sent Events from a simple notification mechanism to a truly resilient, fault-tolerant streaming protocol. The Last-Event-ID pattern provides a robust foundation for building real-time applications where data integrity is non-negotiable.
The key takeaways for building such a system are:
id with every event.Last-Event-ID and backfill missed messages from the persistent log.This architecture provides the reliability of more complex queuing systems with the simplicity and low-latency benefits of Server-Sent Events, making it a powerful pattern for the modern real-time web.