In event-driven systems, it is common to receive partial updates for an entity over time.
For example, different services may emit complementary events about the same entity.
With Kafka Streams, we can consolidate these partial updates into a global state, ensuring we always have the latest view of each entity.
Setting Up the Project
You need a Kafka cluster (Confluent Cloud or local) and a simple Spring Boot or Java application
with the kafka-streams
dependency.
We will create a Kafka Streams topology that consumes partial updates and builds a consolidated state.
The Problem
Imagine receiving multiple events for the same entity, but each event only updates a subset of fields.
We want to aggregate these events into a single entity state in a compacted topic or state store.
Kafka Streams Solution
We use a KStream
to consume partial events, repartition them by entity ID,
and then aggregate into a KTable
that holds the consolidated state.
public class KafkaStreamApp {
public static void main(final String[] args) {
StreamsBuilder builder = new StreamsBuilder();
var serdeConfig = new HashMap<String, String>();
serdeConfig.put("schema.registry.url", "http://localhost:8081");
var partialUpdateSerde = KafkaStreamUtils.<PartialUpdate>createAvroSerde(serdeConfig, false);
var entityStateSerde = KafkaStreamUtils.<EntityState>createAvroSerde(serdeConfig, false);
KStream<String, PartialUpdate> updates = builder.stream(
"entity-updates",
Consumed.with(Serdes.String(), partialUpdateSerde)
);
KStream<String, PartialUpdate> repartitioned = updates
.selectKey((key, event) -> event.getEntityId().toString());
// aggregate partial updates into consolidated state
KTable<String, EntityState> entityStateTable = repartitioned
.groupByKey(Grouped.with(Serdes.String(), partialUpdateSerde))
.aggregate(
EntityState::new,
(entityId, newEvent, currentState) -> currentState.merge(newEvent),
Materialized.<String, EntityState, KeyValueStore<Bytes, byte[]>>as("entity-state-store")
.withKeySerde(Serdes.String())
.withValueSerde(entityStateSerde)
);
entityStateTable
.toStream()
.to("entity-state", Produced.with(Serdes.String(), entityStateSerde));
}
}
public class KafkaStreamUtils {
public static <T extends SpecificRecord> SpecificAvroSerde<T>
createAvroSerde(Map<String, String> serdeConfig, boolean isKey) {
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>();
serde.configure(serdeConfig, isKey);
return serde;
}
}
Supporting Classes
The following simple classes model a partial update and the consolidated state.
public class PartialUpdate {
private String entityId;
private Boolean flag;
private String detail;
// getters/setters
}
public class EntityState {
private String entityId;
private Boolean flag = false;
private String detail = "";
public EntityState merge(PartialUpdate update) {
this.entityId = update.getEntityId();
if (update.getFlag() != null) this.flag = update.getFlag();
if (update.getDetail() != null && !update.getDetail().isEmpty()) {
this.detail = update.getDetail();
}
return this;
}
}
Explanation
- Repartitioning ensures all events for the same entity go to the same partition.
- The aggregate operator merges partial updates into the current state.
- The result is stored in a KTable and can be published downstream.
Conclusion
By consolidating partial updates into a unified state, Kafka Streams simplifies building real-time views of entities.
This pattern is generic and can be applied to orders, user profiles, IoT devices, or any domain where data arrives incrementally.
It ensures a reliable, always-up-to-date representation of your system’s entities.
Ajouter un commentaire
Commentaires