Idempotent gRPC BiDi Streams with Client-Side Buffering & DLQs

13 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Fragility of Stateful Streaming in Unreliable Networks

In distributed systems, gRPC bidirectional streams are a powerful tool for low-latency, full-duplex communication. They are ideal for use cases like real-time IoT data ingestion, financial market data feeds, or collaborative application state synchronization. However, the standard implementation of a bidirectional stream is fundamentally a transient, stateful connection. This creates a critical vulnerability: network partitions, client restarts, or server deployments will sever the connection, leading to an indeterminate state. Messages in flight may be lost, and upon reconnection, there's no built-in mechanism to prevent the client from resending already-processed data.

This article addresses this challenge head-on. We will architect and implement a production-ready system that layers durability and exactly-once semantics on top of a standard gRPC bidirectional stream. Our goal is to create a communication channel that can withstand interruptions and guarantee that every message is processed exactly once, even in the face of chaos.

We will not cover the basics of gRPC or protocol buffers. The assumption is that you are a senior engineer who has built gRPC services before and is now facing the complex reality of making them resilient in production.

The core of our solution is a three-part strategy:

  • Client-Side Persistent Buffering: The client will not send messages directly over the wire. Instead, it will write them to a persistent, on-disk buffer. A separate process will read from this buffer and attempt to send the messages. This ensures that even if the client application crashes or the network is down, no data is lost.
  • Idempotency and Acknowledgements: Each message sent from the client will be tagged with a unique identifier (an idempotency key). The server will maintain a short-term cache (e.g., Redis) of recently processed message IDs. Upon receiving a message, the server first checks if the ID has been seen. If so, it acknowledges the message without re-processing it. If not, it processes the message, stores the ID, and then sends an acknowledgement back to the client. The client will only remove a message from its persistent buffer upon receiving this specific acknowledgement.
  • Server-Side Dead-Letter Queue (DLQ) Integration: What if a message is structurally valid but cannot be processed due to a transient downstream dependency or a data validation error? Instead of returning an error that might block the entire stream or cause the client to retry indefinitely, the server will shunt this "poison pill" message to a Dead-Letter Queue (e.g., RabbitMQ, AWS SQS) for offline analysis and reprocessing, while still acknowledging it to the client to unblock the stream.
  • This architecture transforms gRPC from a simple request-response mechanism into a durable, resilient data transport layer.


    System Architecture and Protocol Definition

    Before diving into the code, let's formalize our protocol. We need a StreamRequest that contains our business payload along with the idempotency key, and a StreamResponse that serves as a specific acknowledgement.

    Here is our ingestion.proto file:

    protobuf
    syntax = "proto3";
    
    package ingestion;
    
    option go_package = ".;ingestion";
    
    import "google/protobuf/timestamp.proto";
    
    // The service definition for our data ingestion stream.
    service IngestionService {
      // IngestData is a bidirectional stream for high-throughput data.
      rpc IngestData(stream IngestDataRequest) returns (stream IngestDataResponse);
    }
    
    // IngestDataRequest wraps the payload with metadata for idempotency.
    message IngestDataRequest {
      // A unique identifier for this specific message (e.g., UUIDv4).
      // This is the core of our idempotency mechanism.
      string idempotency_key = 1;
    
      // The actual business data being sent.
      DataPayload payload = 2;
    }
    
    // DataPayload represents the actual data point we care about.
    message DataPayload {
      string device_id = 1;
      google.protobuf.Timestamp timestamp = 2;
      double temperature_celsius = 3;
      double humidity_percent = 4;
    }
    
    // IngestDataResponse is an acknowledgement for a specific request.
    message IngestDataResponse {
      // The idempotency_key of the message that was successfully processed.
      string ack_key = 1;
    
      // Status of the processing.
      enum Status {
        UNKNOWN = 0;
        ACKNOWLEDGED = 1; // Successfully processed and committed.
        REJECTED_DLQ = 2; // Valid request, but failed processing and was sent to DLQ.
      }
      Status status = 2;
    }

    This protocol is explicit. The client sends a request with an idempotency_key. The server must respond with a response that contains the corresponding ack_key. This direct link is crucial for the client's buffer management.


    Deep Dive: The Resilient Go Client

    The client's complexity lies in managing state across connections. It must:

    • Generate and assign a unique idempotency key to each outgoing message.
  • Write messages to a persistent buffer before attempting to send them.
    • Manage the gRPC stream, including robust reconnection logic with exponential backoff.
    • Run two concurrent goroutines for the stream: one for sending from the buffer, one for receiving acks.
    • Atomically remove messages from the buffer only after their corresponding ack is received.

    For the persistent buffer, a simple file-based write-ahead log (WAL) is a good choice for performance and simplicity. For this example, we'll simulate this with an in-memory map to represent the "unacknowledged" messages, but in a real system, this would be backed by a library like bbolt or leveldb.

    Client Implementation (`client/main.go`)

    go
    package main
    
    import (
    	"context"
    	"io"
    	"log"
    	"sync"
    	"time"
    
    	"github.com/google/uuid"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"google.golang.org/protobuf/types/known/timestamppb"
    
    	pb "path/to/your/ingestion"
    )
    
    // In a real implementation, this would be a persistent store (e.g., BoltDB, LevelDB, or a WAL file).
    // Using a concurrent map to simulate for this example.
    type UnackedMessageBuffer struct {
    	mu       sync.Mutex
    	messages map[string]*pb.IngestDataRequest
    }
    
    func NewUnackedMessageBuffer() *UnackedMessageBuffer {
    	return &UnackedMessageBuffer{
    		messages: make(map[string]*pb.IngestDataRequest),
    	}
    }
    
    func (b *UnackedMessageBuffer) Add(msg *pb.IngestDataRequest) {
    	b.mu.Lock()
    	defer b.mu.Unlock()
    	b.messages[msg.IdempotencyKey] = msg
    }
    
    func (b *UnackedMessageBuffer) Ack(key string) {
    	b.mu.Lock()
    	defer b.mu.Unlock()
    	delete(b.messages, key)
    	log.Printf("CLIENT: Acked and removed message %s from buffer", key)
    }
    
    func (b *UnackedMessageBuffer) GetUnacked() []*pb.IngestDataRequest {
    	b.mu.Lock()
    	defer b.mu.Unlock()
    	msgs := make([]*pb.IngestDataRequest, 0, len(b.messages))
    	for _, msg := range b.messages {
    		msgs = append(msgs, msg)
    	}
    	return msgs
    }
    
    type ResilientClient struct {
    	conn         *grpc.ClientConn
    	client       pb.IngestionServiceClient
    	buffer       *UnackedMessageBuffer
    	stream       pb.IngestionService_IngestDataClient
    	streamCancel context.CancelFunc
    	mu           sync.Mutex
    	serverAddr   string
    }
    
    func NewResilientClient(serverAddr string) *ResilientClient {
    	return &ResilientClient{
    		serverAddr: serverAddr,
    		buffer:     NewUnackedMessageBuffer(),
    	}
    }
    
    func (c *ResilientClient) Connect() error {
    	c.mu.Lock()
    	defer c.mu.Unlock()
    
    	var err error
    	c.conn, err = grpc.Dial(c.serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		return err
    	}
    	c.client = pb.NewIngestionServiceClient(c.conn)
    	return c.establishStream()
    }
    
    func (c *ResilientClient) establishStream() error {
    	log.Println("CLIENT: Establishing new stream...")
    	streamCtx, cancel := context.WithCancel(context.Background())
    	c.streamCancel = cancel
    
    	var err error
    	c.stream, err = c.client.IngestData(streamCtx)
    	if err != nil {
    		log.Printf("CLIENT: Failed to establish stream: %v", err)
    		return err
    	}
    
    	log.Println("CLIENT: Stream established. Starting receiver and resending unacked messages.")
    	go c.receiveAcks()
    	go c.resendUnacked()
    
    	return nil
    }
    
    func (c *ResilientClient) resendUnacked() {
    	unacked := c.buffer.GetUnacked()
    	if len(unacked) > 0 {
    		log.Printf("CLIENT: Resending %d unacknowledged messages...", len(unacked))
    		for _, msg := range unacked {
    			if err := c.stream.Send(msg); err != nil {
    				log.Printf("CLIENT: Error resending message %s: %v", msg.IdempotencyKey, err)
    				// The stream is likely broken, the main loop will handle reconnection.
    				return
    			}
    		}
    	}
    }
    
    func (c *ResilientClient) receiveAcks() {
    	for {
    		ack, err := c.stream.Recv()
    		if err != nil {
    			if err == io.EOF {
    				log.Println("CLIENT: Stream closed by server (EOF).")
    			} else {
    				log.Printf("CLIENT: Error receiving ack: %v. Stream is likely broken.", err)
    			}
    			// Signal that the stream is dead and needs reconnection.
    			c.closeStream()
    			return
    		}
    		c.buffer.Ack(ack.AckKey)
    	}
    }
    
    func (c *ResilientClient) Send(payload *pb.DataPayload) error {
    	c.mu.Lock()
    	defer c.mu.Unlock()
    
    	if c.stream == nil {
    		return io.ErrClosedPipe // Indicate that we are currently disconnected
    	}
    
    	msg := &pb.IngestDataRequest{
    		IdempotencyKey: uuid.New().String(),
    		Payload:        payload,
    	}
    
    	// Crucial: Add to buffer *before* sending.
    	c.buffer.Add(msg)
    
    	if err := c.stream.Send(msg); err != nil {
    		log.Printf("CLIENT: Failed to send message %s: %v", msg.IdempotencyKey, err)
    		// Don't return the error immediately. The reconnection logic will handle it.
    		// The message is safe in our buffer.
    		c.closeStream()
    		return err
    	}
    
    	log.Printf("CLIENT: Sent message %s", msg.IdempotencyKey)
    	return nil
    }
    
    func (c *ResilientClient) closeStream() {
    	if c.streamCancel != nil {
    		c.streamCancel()
    		c.streamCancel = nil
    		c.stream = nil
    	}
    }
    
    func (c *ResilientClient) Run() {
    	// Initial connection attempt
    	for {
    		if err := c.Connect(); err == nil {
    			break
    		}
    		log.Printf("CLIENT: Initial connection failed, retrying in 5 seconds...")
    		time.Sleep(5 * time.Second)
    	}
    
    	// Data generation loop
    	go func() {
    		ticker := time.NewTicker(2 * time.Second)
    		defer ticker.Stop()
    		for range ticker.C {
    			payload := &pb.DataPayload{
    				DeviceId:         "device-007",
    				Timestamp:        timestamppb.Now(),
    				TemperatureCelsius: 23.5,
    				HumidityPercent:  45.1,
    			}
    			if err := c.Send(payload); err != nil {
    				log.Printf("CLIENT: Send failed (currently disconnected): %v", err)
    			}
    		}
    	}()
    
    	// Reconnection loop
    	backoff := 1.0
    	for {
    		c.mu.Lock()
    		streamIsNil := c.stream == nil
    		c.mu.Unlock()
    
    		if streamIsNil {
    			log.Printf("CLIENT: Detected disconnected state. Attempting to reconnect in %.1f seconds...", backoff)
    			time.Sleep(time.Duration(backoff) * time.Second)
    			backoff *= 2 // Exponential backoff
    			if backoff > 60 {
    				backoff = 60
    			}
    
    			if err := c.Connect(); err == nil {
    				log.Println("CLIENT: Reconnection successful!")
    				backoff = 1.0 // Reset backoff on success
    			} else {
    				log.Printf("CLIENT: Reconnection failed: %v", err)
    			}
    		} else {
    			time.Sleep(1 * time.Second)
    		}
    	}
    }
    
    func main() {
    	client := NewResilientClient("localhost:50051")
    	client.Run()
    }

    Deep Dive: The Idempotent Go Server

    The server's responsibility is to be stateless from the perspective of the connection, but stateful regarding the data it has processed. It will:

    • Accept incoming stream connections.
  • For each message on the stream, check for the idempotency_key in a Redis set.
  • If the key exists, it's a duplicate. Immediately send an ACKNOWLEDGED response.
    • If the key does not exist, attempt the core business logic.
  • If processing succeeds, add the key to Redis (with a TTL) and send an ACKNOWLEDGED response.
  • If processing fails due to a recoverable error, push the raw message to a DLQ and send a REJECTED_DLQ response. This still counts as an acknowledgement to the client.
  • Using Redis with a SADD or SET with NX command provides the atomic "check-and-set" operation needed for the idempotency check. A TTL is crucial to prevent the Redis cache from growing indefinitely. The duration of this TTL defines your "idempotency window"—the period during which the server can guarantee deduplication.

    Server Implementation (`server/main.go`)

    go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"io"
    	"log"
    	"net"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    
    	pb "path/to/your/ingestion"
    )
    
    // MockDLQ simulates a connection to RabbitMQ, SQS, etc.
    type MockDLQ struct{}
    
    func (d *MockDLQ) Publish(message []byte) error {
    	log.Printf("DLQ: Publishing message: %s", string(message))
    	// In a real system, this would publish to a message broker.
    	return nil
    }
    
    type ingestionServer struct {
    	pb.UnimplementedIngestionServiceServer
    	redisClient *redis.Client
    	dlq         *MockDLQ
    	// How long to remember idempotency keys to prevent duplicates
    	idempotencyWindow time.Duration
    }
    
    func newServer() *ingestionServer {
    	rdb := redis.NewClient(&redis.Options{
    		Addr: "localhost:6379",
    	})
    	return &ingestionServer{
    		redisClient:       rdb,
    		dlq:               &MockDLQ{},
    		idempotencyWindow: 24 * time.Hour,
    	}
    }
    
    func (s *ingestionServer) IngestData(stream pb.IngestionService_IngestDataServer) error {
    	log.Println("SERVER: New client stream connected.")
    	for {
    		req, err := stream.Recv()
    		if err == io.EOF {
    			log.Println("SERVER: Client closed the stream (EOF).")
    			return nil
    		}
    		if err != nil {
    			log.Printf("SERVER: Error receiving from stream: %v", err)
    			return err
    		}
    
    		key := req.IdempotencyKey
    		if key == "" {
    			log.Println("SERVER: Received message without idempotency key. Rejecting.")
    			continue // Or return an error
    		}
    
    		// --- Idempotency Check ---
    		// SET key "processed" NX EX duration
    		// NX -- Only set the key if it does not already exist.
    		// This is an atomic check-and-set operation.
    		wasSet, err := s.redisClient.SetNX(stream.Context(), key, "processed", s.idempotencyWindow).Result()
    		if err != nil {
    			log.Printf("SERVER: Redis error checking key %s: %v", key, err)
    			// If Redis is down, we can't guarantee idempotency. Fail the stream.
    			return status.Errorf(codes.Internal, "cannot check idempotency")
    		}
    
    		if !wasSet {
    			// This is a duplicate message.
    			log.Printf("SERVER: Duplicate message detected: %s. Acknowledging without processing.", key)
    			if err := stream.Send(&pb.IngestDataResponse{AckKey: key, Status: pb.IngestDataResponse_ACKNOWLEDGED}); err != nil {
    				log.Printf("SERVER: Error sending duplicate ack for %s: %v", key, err)
    				return err
    			}
    			continue
    		}
    
    		// --- Business Logic --- 
    		log.Printf("SERVER: Processing new message: %s, Device: %s", key, req.Payload.DeviceId)
    		ackStatus := pb.IngestDataResponse_ACKNOWLEDGED
    
    		// Simulate a processing failure that should go to the DLQ.
    		if req.Payload.TemperatureCelsius > 100.0 {
    			log.Printf("SERVER: Unprocessable message %s (temp > 100). Sending to DLQ.", key)
    			jsonData, _ := json.Marshal(req)
    			if err := s.dlq.Publish(jsonData); err != nil {
    				log.Printf("SERVER: FAILED to publish to DLQ for key %s: %v", key, err)
    				// This is a critical failure. We might need to rollback the idempotency key.
    				s.redisClient.Del(stream.Context(), key) // Attempt to allow reprocessing
    				return status.Errorf(codes.Internal, "failed to publish to DLQ")
    			}
    			ackStatus = pb.IngestDataResponse_REJECTED_DLQ
    		} else {
    			// Simulate successful processing.
    			time.Sleep(50 * time.Millisecond)
    		}
    
    		// --- Send Acknowledgement ---
    		if err := stream.Send(&pb.IngestDataResponse{AckKey: key, Status: ackStatus}); err != nil {
    			log.Printf("SERVER: Error sending ack for %s: %v", key, err)
    			// If we can't send the ack, the client will resend. Our idempotency check will handle it.
    			return err
    		}
    	}
    }
    
    func main() {
    	lis, err := net.Listen("tcp", ":50051")
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    
    	s := grpc.NewServer()
    	pb.RegisterIngestionServiceServer(s, newServer())
    
    	log.Println("SERVER: Starting gRPC server on :50051")
    	if err := s.Serve(lis); err != nil {
    		log.Fatalf("failed to serve: %v", err)
    	}
    }

    Advanced Edge Cases and Performance Considerations

    This architecture is robust, but production environments will surface numerous edge cases.

    1. Idempotency Window and State Management

    The idempotencyWindow (24 hours in our example) is a critical trade-off. A longer window provides stronger guarantees against very old duplicates but consumes more memory in Redis. A shorter window is more memory-efficient but increases the risk of duplicates if a client is offline for an extended period. The correct value depends entirely on your business requirements and the expected client offline duration.

    2. Client Buffer Bloat and Backpressure

    What if a client is offline for days and generates millions of messages? Its local persistent buffer could grow to an unmanageable size. A production-grade client must implement a strategy for this:

    * Capped Buffer: The buffer should have a maximum size (in bytes or message count). Once full, new messages should either be dropped or the application should apply backpressure to the source of the data.

    * Buffer Compaction: For some use cases (e.g., stock tickers), only the latest value for a given key matters. The client could implement compaction logic to discard older, un-sent messages for the same entity.

    3. "Thundering Herd" on Reconnect

    If a network partition is resolved and hundreds of clients reconnect simultaneously, they will all try to dump their unacknowledged message buffers onto the server at once. The exponential backoff in our client helps, but the server also needs protection.

    * Rate Limiting: Implement a server-side rate limiter (e.g., a token bucket algorithm) on a per-stream or per-IP basis to prevent any single client from overwhelming the service.

    * Load Shedding: If the server is under extreme load, it can strategically drop streams, forcing clients to reconnect and back off, thus smoothing out the load.

    4. Atomic Commit: Processing and Acknowledgement

    In our server, we first perform the business logic and then send the acknowledgement. What happens if the server crashes between these two steps? The idempotency key is already in Redis, but the client never received the ack. The client will reconnect and resend. The server will see the key and treat it as a duplicate, sending an ack without re-processing. This is the desired behavior and demonstrates the power of the SETNX pattern.

    The more dangerous scenario is a failure to write to the DLQ. Our code attempts to roll back the idempotency key by deleting it from Redis. This is a best-effort recovery. A more robust solution might use a two-phase commit pattern, but that adds significant complexity. For most systems, logging the DLQ publish failure and relying on monitoring and alerting is a pragmatic compromise.

    5. Performance Benchmarking

    When evaluating this system, look beyond simple message-per-second throughput. Key metrics include:

    * P99 Latency: The round-trip time for a message, from Send() on the client to the ack being processed.

    * Reconnection Latency: How long does it take from a simulated network drop for the client to re-establish the stream and receive its first new ack?

    * Buffer Drain Rate: After a prolonged disconnection, how quickly can the client drain its backlog of unacknowledged messages? This tests the server's ability to handle burst traffic.

    * Redis Memory Footprint: Monitor the memory usage of your Redis instance as a function of message volume and idempotency window size.

    Conclusion: Beyond RPC

    By layering a persistent client-side buffer, a server-side idempotency cache, and a DLQ for fault tolerance, we have elevated a simple gRPC stream into a durable, resilient transport for critical data. This pattern demonstrates a fundamental principle of advanced software engineering: the protocol itself is often just the foundation. The real work lies in building robust state management, error handling, and recovery mechanisms around it to handle the inherent chaos of distributed systems.

    While managed services like Kafka or Pulsar solve similar problems at a larger scale, this gRPC pattern provides a powerful alternative for scenarios requiring extremely low latency, direct client-server communication without the operational overhead of a full-scale message broker. It is a complex but highly effective solution for building stateful, production-grade streaming applications.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles