Kstream : Consolidating Partial Events into a Global State

Publié le 3 octobre 2025 à 15:17

In event-driven systems, it is common to receive partial updates for an entity over time.
For example, different services may emit complementary events about the same entity.
With Kafka Streams, we can consolidate these partial updates into a global state, ensuring we always have the latest view of each entity.

Setting Up the Project

You need a Kafka cluster (Confluent Cloud or local) and a simple Spring Boot or Java application
with the kafka-streams dependency.
We will create a Kafka Streams topology that consumes partial updates and builds a consolidated state.

The Problem

Imagine receiving multiple events for the same entity, but each event only updates a subset of fields.
We want to aggregate these events into a single entity state in a compacted topic or state store.

Kafka Streams Solution

We use a KStream to consume partial events, repartition them by entity ID,
and then aggregate into a KTable that holds the consolidated state.


public class KafkaStreamApp {

    public static void main(final String[] args) {

        StreamsBuilder builder = new StreamsBuilder();

        var serdeConfig = new HashMap<String, String>();
        serdeConfig.put("schema.registry.url", "http://localhost:8081");

        var partialUpdateSerde = KafkaStreamUtils.<PartialUpdate>createAvroSerde(serdeConfig, false);
        var entityStateSerde   = KafkaStreamUtils.<EntityState>createAvroSerde(serdeConfig, false);

        KStream<String, PartialUpdate> updates = builder.stream(
            "entity-updates",
            Consumed.with(Serdes.String(), partialUpdateSerde)
        );

        KStream<String, PartialUpdate> repartitioned = updates
            .selectKey((key, event) -> event.getEntityId().toString());

        // aggregate partial updates into consolidated state
        KTable<String, EntityState> entityStateTable = repartitioned
            .groupByKey(Grouped.with(Serdes.String(), partialUpdateSerde))
            .aggregate(
                EntityState::new, 
                (entityId, newEvent, currentState) -> currentState.merge(newEvent),
                Materialized.<String, EntityState, KeyValueStore<Bytes, byte[]>>as("entity-state-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(entityStateSerde)
            );

        entityStateTable
            .toStream()
            .to("entity-state", Produced.with(Serdes.String(), entityStateSerde));
    }
}

public class KafkaStreamUtils {

    public static <T extends SpecificRecord> SpecificAvroSerde<T> 
        createAvroSerde(Map<String, String> serdeConfig, boolean isKey) {
        
        SpecificAvroSerde<T> serde = new SpecificAvroSerde<>();
        serde.configure(serdeConfig, isKey);
        return serde;
    }
}

Supporting Classes

The following simple classes model a partial update and the consolidated state.


public class PartialUpdate {
    private String entityId;
    private Boolean flag;
    private String detail;
    // getters/setters
}

public class EntityState {
    private String entityId;
    private Boolean flag = false;
    private String detail = "";

    public EntityState merge(PartialUpdate update) {
        this.entityId = update.getEntityId();
        if (update.getFlag() != null) this.flag = update.getFlag();
        if (update.getDetail() != null && !update.getDetail().isEmpty()) {
            this.detail = update.getDetail();
        }
        return this;
    }
}

Explanation

- Repartitioning ensures all events for the same entity go to the same partition.
- The aggregate operator merges partial updates into the current state.
- The result is stored in a KTable and can be published downstream.

Conclusion

By consolidating partial updates into a unified state, Kafka Streams simplifies building real-time views of entities.
This pattern is generic and can be applied to orders, user profiles, IoT devices, or any domain where data arrives incrementally.
It ensures a reliable, always-up-to-date representation of your system’s entities.

Ajouter un commentaire

Commentaires

Il n'y a pas encore de commentaire.