Real-Time Feature Ingestion with Kafka & Flink for Online ML Models

21 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The High Cost of Stale Features in Online Inference

In online machine learning systems—be it for fraud detection, real-time bidding, or personalized recommendations—the predictive power of a model is inextricably linked to the freshness of its input features. A model making a decision based on user activity from five minutes ago is operating with a significant handicap compared to one using data from the last five seconds. This problem, known as feature staleness, directly impacts model accuracy and, consequently, business outcomes. While batch-based feature engineering pipelines running every few hours are sufficient for many analytics use cases, they are fundamentally inadequate for online inference where millisecond-level latency is paramount.

The challenge is to compute complex, stateful features from high-throughput event streams and make them available to a serving layer with minimal delay. This requires an architecture that moves beyond request-response or simple batch processing. We need a system that can continuously process unbounded streams of data, maintain state over time (e.g., a user's activity in the last 30 minutes), handle the inherent disorder of real-world event streams, and do so in a fault-tolerant, scalable manner.

This is where a modern streaming stack excels. Our target architecture for this deep dive is as follows:

  • Event Source (Apache Kafka): A durable, high-throughput message bus that ingests raw events (e.g., user clicks, transactions, IoT sensor readings).
  • Stream Processor (Apache Flink): The computational engine that consumes events from Kafka, performs stateful transformations and aggregations, and computes the feature vectors.
  • Feature Store (Redis): A low-latency key-value store where the computed features are written. Online models query this store at inference time.
  • Online Model Service: The microservice that receives a prediction request, fetches the latest features from the Feature Store, and executes the model.
  • This article will focus exclusively on the core of this pipeline: building and operationalizing the Flink application that transforms raw Kafka events into high-value features ready for inference.


    Core Architecture: Why Flink and Kafka?

    Before diving into the implementation, it's crucial to understand the specific technical reasons for choosing these components for this demanding use case.

    Apache Kafka: The Unpeeled Log

    Kafka serves as more than just a message queue; it's a distributed, partitioned, and replicated commit log. For our pipeline, its key properties are:

    * Durability and Replayability: Events are persisted. If our Flink job fails and needs to be restarted, we can resume processing from the exact same point without data loss. We can also replay historical data to backfill features or train new models.

    * High Throughput & Low Latency: Kafka is engineered to handle millions of messages per second, making it suitable for ingesting raw event streams from numerous sources.

    * Partitioning for Parallelism: Kafka topics are divided into partitions. This is the fundamental unit of parallelism that Flink will leverage. By keying our events correctly (e.g., by userId), we ensure that all events for a given entity land on the same partition, allowing for stateful processing in Flink.

    * Schema Enforcement with Schema Registry: In a production environment with multiple microservices producing events, maintaining a consistent data contract is non-negotiable. A Schema Registry (like Confluent's) paired with a format like Avro ensures that producers and consumers adhere to a defined schema, preventing data quality issues and enabling safe schema evolution.

    Apache Flink: The Stateful Stream Processor

    While other stream processors exist (Spark Streaming, Kafka Streams), Flink offers a unique combination of features that make it exceptionally well-suited for complex, low-latency feature engineering:

    * True Event-Time Processing: Flink has first-class support for processing events based on the timestamp embedded within the event itself, not when the processor observes it. This is critical for correctly handling out-of-order data, a common reality in distributed systems. Its watermarking mechanism is a sophisticated yet robust way to track the progress of event time.

    * Advanced State Management: Flink provides fine-grained control over state. It can manage terabytes of state reliably using its integration with the RocksDB state backend. This allows us to perform complex computations like building user profiles or detecting patterns over long periods without relying on an external database for every intermediate calculation.

    * Exactly-Once Semantics (EOS): Through its checkpointing mechanism, Flink can provide end-to-end exactly-once processing guarantees from a Kafka source to a transactional sink. This means that even in the face of failures, each event will be processed and its effect on our feature store will be reflected exactly once, preventing corrupted feature values.

    * Flexible Windowing and ProcessFunctions: Beyond standard tumbling and sliding windows, Flink’s low-level ProcessFunction gives us direct access to state, timers, and watermarks. This enables the implementation of complex custom logic, such as session windows with inactivity gaps or pattern detection (CEP - Complex Event Processing).


    Production Implementation: A Multi-Feature Flink Job

    Let's build a Flink job that computes several features for a hypothetical e-commerce platform. We'll process a stream of user interaction events and calculate:

  • Tumbling Window Features: The total value of transactions per user in the last 5 minutes.
  • Session-based Features: The number of clicks and total time spent in a user's current session, where a session is defined by a 15-minute inactivity gap.
  • We'll use Java and Maven for this example.

    1. Project Setup

    Your pom.xml will need the following core dependencies:

    xml
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.15.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.15.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.15.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>1.15.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.15.2</version>
        </dependency>
        <!-- For Redis Sink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    2. Data Models

    We define our input event and the output feature vector as simple POJOs.

    java
    // Input Event from Kafka
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class UserInteractionEvent {
        private String userId;
        private String eventType; // e.g., "click", "purchase"
        private double eventValue; // e.g., transaction amount
        private long eventTimestamp; // epoch milliseconds
    }
    
    // Output Feature Vector to be written to Redis
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class UserFeatures {
        private String userId;
        private double transactionTotalLast5Min;
        private long clicksInSession;
        private long sessionDurationSeconds;
        private long lastUpdatedTimestamp;
    }

    3. The Main Flink Application

    This is the skeleton of our Flink job. We'll set up the environment, configure the Kafka source, and define the processing logic.

    java
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class RealTimeFeatureEngineeringJob {
    
        public static void main(String[] args) throws Exception {
            // 1. Set up the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // --- Production-grade configurations ---
            env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints"));
            env.enableCheckpointing(60000); // Checkpoint every 60 seconds
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 seconds pause
            env.getCheckpointConfig().setCheckpointTimeout(120000); // 2 minutes timeout
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
            // 2. Create a Kafka source
            KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("kafka:9092")
                .setTopics("user-interactions")
                .setGroupId("feature-engineering-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    
            // 3. Define the data stream and processing logic
            DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    
            // Assume a simple JSON parsing function (in reality, use Flink's JSON format or a robust library)
            DataStream<UserInteractionEvent> eventStream = kafkaStream
                .map(jsonString -> parseJson(jsonString)) // Implement parseJson with Gson/Jackson
                .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<UserInteractionEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((event, timestamp) -> event.getEventTimestamp())
                );
    
            // Key the stream by userId for stateful operations
            KeyedStream<UserInteractionEvent, String> keyedStream = eventStream.keyBy(UserInteractionEvent::getUserId);
    
            // --- Here we will plug in our feature calculation logic ---
    
            // 4. Execute the job
            env.execute("Real-Time Feature Engineering");
        }
        
        // Dummy parse function
        private static UserInteractionEvent parseJson(String json) {
            // In production, use Jackson or Gson ObjectMapper
            // This is a simplified placeholder
            // ... parsing logic ...
            return new UserInteractionEvent(); 
        }
    }

    4. Implementing Feature Logic

    A. Tumbling Window Aggregation

    This is a straightforward use of Flink's Window API.

    java
    // Inside the main method, after creating keyedStream
    
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.api.common.functions.AggregateFunction;
    
    // Calculate transaction total over a 5-minute tumbling window
    DataStream<UserFeatures> transactionFeatures = keyedStream
        .filter(event -> "purchase".equals(event.getEventType()))
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .aggregate(new TransactionAggregator());
    
    // Custom AggregateFunction for efficiency
    public static class TransactionAggregator implements AggregateFunction<UserInteractionEvent, Double, UserFeatures> {
        @Override
        public Double createAccumulator() {
            return 0.0;
        }
    
        @Override
        public Double add(UserInteractionEvent value, Double accumulator) {
            return accumulator + value.getEventValue();
        }
    
        @Override
        public UserFeatures getResult(Double accumulator) {
            // This is incomplete. We need the userId. 
            // A ProcessWindowFunction would be needed to get window metadata and key.
            // We'll see a more robust pattern next.
            UserFeatures features = new UserFeatures();
            features.setTransactionTotalLast5Min(accumulator);
            return features;
        }
    
        @Override
        public Double merge(Double a, Double b) {
            return a + b;
        }
    }

    The above AggregateFunction is efficient but has a flaw: it loses context like the userId. A better approach is to combine it with a ProcessWindowFunction to enrich the result.

    B. Advanced Session Features with `ProcessFunction`

    Calculating session features requires custom logic that isn't captured by standard windows. We need to manage state and timers directly. This is a perfect use case for a KeyedProcessFunction.

    java
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.util.Collector;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    
    public class UserSessionProcessor extends KeyedProcessFunction<String, UserInteractionEvent, UserFeatures> {
    
        private transient ValueState<Long> sessionClickCount;
        private transient ValueState<Long> sessionStartTime;
        private transient ValueState<Long> lastInteractionTime;
    
        private static final long SESSION_TIMEOUT_MS = 15 * 60 * 1000; // 15 minutes
    
        @Override
        public void open(Configuration parameters) throws Exception {
            sessionClickCount = getRuntimeContext().getState(new ValueStateDescriptor<>("sessionClickCount", Long.class));
            sessionStartTime = getRuntimeContext().getState(new ValueStateDescriptor<>("sessionStartTime", Long.class));
            lastInteractionTime = getRuntimeContext().getState(new ValueStateDescriptor<>("lastInteractionTime", Long.class));
        }
    
        @Override
        public void processElement(UserInteractionEvent event, Context ctx, Collector<UserFeatures> out) throws Exception {
            long currentTimestamp = event.getEventTimestamp();
    
            // Initialize state for a new session
            if (sessionStartTime.value() == null) {
                sessionStartTime.update(currentTimestamp);
                sessionClickCount.update(0L);
            }
    
            // Update session state
            if ("click".equals(event.getEventType())) {
                sessionClickCount.update(sessionClickCount.value() + 1);
            }
            lastInteractionTime.update(currentTimestamp);
    
            // Register a timer to fire when the session should time out
            ctx.timerService().registerEventTimeTimer(currentTimestamp + SESSION_TIMEOUT_MS);
            
            // We can emit features on every event, or only when the session ends.
            // For real-time inference, emitting on every event is often preferred.
            UserFeatures currentFeatures = new UserFeatures();
            currentFeatures.setUserId(ctx.getCurrentKey());
            currentFeatures.setClicksInSession(sessionClickCount.value());
            currentFeatures.setSessionDurationSeconds((currentTimestamp - sessionStartTime.value()) / 1000);
            currentFeatures.setLastUpdatedTimestamp(System.currentTimeMillis());
            out.collect(currentFeatures);
        }
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<UserFeatures> out) throws Exception {
            // Check if the timer is for a session timeout
            long lastTime = lastInteractionTime.value();
            if (lastTime != null && timestamp >= lastTime + SESSION_TIMEOUT_MS) {
                // Session has officially ended. Emit final state and clear.
                UserFeatures finalFeatures = new UserFeatures();
                finalFeatures.setUserId(ctx.getCurrentKey());
                finalFeatures.setClicksInSession(sessionClickCount.value());
                finalFeatures.setSessionDurationSeconds((lastTime - sessionStartTime.value()) / 1000);
                finalFeatures.setLastUpdatedTimestamp(System.currentTimeMillis());
                out.collect(finalFeatures);
    
                // Clear state for the next session
                sessionClickCount.clear();
                sessionStartTime.clear();
                lastInteractionTime.clear();
            }
        }
    }

    Now, you would apply this processor to the keyedStream:

    java
    // In main method
    DataStream<UserFeatures> sessionFeatures = keyedStream.process(new UserSessionProcessor());

    5. Combining Feature Streams and Sinking to Redis

    We now have two streams of features (transactionFeatures and sessionFeatures). They need to be combined and written to Redis. We can use union if they have the same type, or a connect with a CoProcessFunction to merge them into a single, unified UserFeatures object.

    For simplicity, let's assume our UserSessionProcessor is modified to handle all feature logic. The final step is sinking to Redis.

    java
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    // In main method, assuming a single `allFeatures` stream
    
    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("redis").setPort(6379).build();
    
    allFeatures.addSink(new RedisSink<>(conf, new UserFeaturesRedisMapper()));
    
    // Mapper class to define how UserFeatures objects are written to Redis
    public static class UserFeaturesRedisMapper implements RedisMapper<UserFeatures> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            // We will store features in a HASH, keyed by userId
            return new RedisCommandDescription(RedisCommand.HSET, "user_features");
        }
    
        @Override
        public String getKeyFromData(UserFeatures data) {
            // The key of the HASH in Redis
            return data.getUserId();
        }
    
        @Override
        public String getValueFromData(UserFeatures data) {
            // The value to be stored; we'll use JSON
            // In a real system, you'd use a shared JSON library
            return convertToJsonString(data);
        }
    }

    This Redis sink will perform an HSET user_features command for each feature update, effectively upserting the latest computed features for each user.


    Advanced Concepts & Edge Case Management

    Building the happy-path pipeline is only half the battle. Production systems must gracefully handle the complexities of distributed data processing.

    Handling Late-Arriving Data

    Our WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) tells Flink to expect events to be up to 10 seconds late. A watermark for time T will only be emitted when the source processor has seen an event with timestamp T and is confident no more events with a timestamp < T will arrive. Any event that arrives after the watermark for its window has passed is considered "late".

    By default, Flink drops late events. This is often unacceptable. We can use two mechanisms to handle them:

  • Allowed Lateness: We can configure our window to accept late data for a specific period after the window has fired. The window will then re-fire with an updated result.
  • java
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .allowedLateness(Time.minutes(1))
        // ... aggregator
  • Side Outputs: For ultimate control, we can redirect late events to a separate stream for logging, manual inspection, or a separate reprocessing job.
  • java
        // In the window definition
        .sideOutputLateData(new OutputTag<UserInteractionEvent>("late-events"))
        
        // After the window operation
        SingleOutputStreamOperator<UserFeatures> mainStream = ...;
        DataStream<UserInteractionEvent> lateStream = mainStream.getSideOutput(new OutputTag<UserInteractionEvent>("late-events"){});
        
        // Now we can process the lateStream, e.g., sink it to a separate Kafka topic
        lateStream.sinkTo(...);

    State Management and Fault Tolerance

    Our UserSessionProcessor relies heavily on Flink's managed state. The RocksDBStateBackend is critical here because:

    * It stores state on local disk, not in JVM heap. This allows us to manage state that is much larger than available memory, enabling long-running aggregations over millions of keys.

    * It supports incremental checkpoints. When Flink checkpoints, it only needs to upload the changes since the last checkpoint to durable storage (like HDFS or S3), not the entire state snapshot. This significantly reduces checkpointing time for large-state jobs.

    Our checkpointing configuration ensures that every 60 seconds, Flink saves a consistent snapshot of the entire application's state (including Kafka offsets and operator state) to a remote, durable filesystem. If a TaskManager fails, Flink deploys a new one, restores the state from the latest successful checkpoint, and resumes processing from the exact point of failure, guaranteeing exactly-once semantics.

    Schema Evolution

    Imagine a new field deviceType is added to the UserInteractionEvent. If we deploy a new producer without updating our Flink job, the job will fail during deserialization. This is where a Schema Registry becomes indispensable.

    By using Avro and the ConfluentRegistryAvroDeserializationSchema, our Flink source can automatically handle schema changes:

    • The producer serializes the event, embedding a schema ID.
    • The Flink consumer receives the event, extracts the schema ID, and queries the Schema Registry to fetch the exact schema used by the producer (the "writer's schema
    • It also fetches the schema it was compiled with (the "reader's schema").
    • The Avro library resolves the two schemas. If the changes are compatible (e.g., adding an optional field, which is backward compatible), deserialization succeeds. The new field might be ignored, or if the reader's schema has a default value, it will be populated.

    This decouples producers and consumers, allowing for independent deployment and evolution of microservices—a necessity in any large-scale system.


    Performance Tuning and Optimization

    To meet strict latency SLAs, tuning the Flink job is essential.

    * Parallelism: The parallelism of each operator in the Flink job should be carefully configured. A good starting point is to match the source parallelism to the number of Kafka partitions. Downstream operators can be scaled up or down based on their computational intensity. env.setParallelism(32); sets a global default, but it can be overridden per-operator: .map(...).setParallelism(64). Chaining should also be considered; Flink chains operators together into single tasks to avoid network shuffles, but sometimes disabling chaining (.disableChaining()) can help break up long, complex task chains.

    * State Serialization: Flink needs to serialize data for network transfer and for storing in state. The default Kryo serializer is fast but can be brittle. For state, using Avro is highly recommended as it provides robust schema evolution capabilities. You can create an AvroSerializer for your state descriptors:

    java
        AvroSerializer<SessionState> serializer = new AvroSerializer<>(SessionState.class);
        ValueStateDescriptor<SessionState> descriptor = new ValueStateDescriptor<>("sessionState", serializer);

    * Tuning RocksDB: The RocksDB state backend can be tuned via Flink's configuration. Key parameters include state.backend.rocksdb.block.cache-size (how much memory to use for caching data blocks) and state.backend.rocksdb.writebuffer.size (controls the size of the memtable). Increasing these can improve performance at the cost of higher memory consumption on TaskManagers.

    * Backpressure Monitoring: The Flink Web UI provides critical metrics on backpressure. If a sink (like our Redis sink) cannot keep up with the data rate from upstream operators, it will exert backpressure. The UI will show operators as having "High" backpressure. This is a signal to either scale up the sink's parallelism, optimize the sink's performance (e.g., using batch writes to Redis), or provision a more powerful Redis instance.


    Conclusion: Closing the Loop with the Online Model

    We have constructed a robust, scalable, and fault-tolerant pipeline that transforms a raw stream of events into rich, timely features. The final step in the architecture is the online inference service. When a request comes in, for example, to predict the likelihood of a user clicking an ad, the service performs a simple, low-latency lookup:

    python
    import redis
    import json
    
    # This service receives a request for a prediction for a given user
    def predict_for_user(user_id):
        r = redis.Redis(host='redis', port=6379, db=0)
        
        # HGET user_features <userId>
        # This is a sub-millisecond operation
        feature_json = r.hget('user_features', user_id)
        
        if not feature_json:
            # Handle cold start: user has no features yet
            features = get_default_features()
        else:
            features = json.loads(feature_json)
    
        # The features dict now contains keys like:
        # 'transactionTotalLast5Min', 'clicksInSession', etc.
        
        # Prepare feature vector for the model
        model_input = preprocess(features)
        
        # Make prediction
        prediction = ml_model.predict(model_input)
        
        return prediction

    This architecture successfully decouples the complex, stateful computation of features from the simple, stateless act of inference. The Flink pipeline handles the heavy lifting of stream processing, while the online service benefits from readily available, fresh features, enabling it to make more accurate predictions with minimal latency. By mastering the advanced patterns of stateful stream processing, we can build online ML systems that are not just reactive, but truly real-time.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles