Kstream : versioning The Ktable State Store

Publié le 7 octobre 2025 à 10:30

In event-driven architectures with Kafka Streams, it’s common to track multiple versions of the same entity over time.
This can be achieved by aggregating updates in a KTable into a list that maintains all versions of a record by its key.
This approach allows you to know the history of changes and select the appropriate version later for further processing.

KStream Versioning a KTable

The goal here is to convert updates from a KTable into a versioned list, so that every change to an object is captured and stored for future use.


private KTable<String, EntityVersionList> aggregateEntityVersions(
        SpecificAvroSerde<Entity> entitySerde,
        SpecificAvroSerde<EntityVersionList> entityListSerde,
        KTable<String, Entity> entityTable) {

    log.info("Aggregating entity versions by entityId");

    return entityTable
            .toStream()
            .groupByKey(Grouped.with(Serdes.String(), entitySerde))
            .aggregate(
                    () -> new EntityVersionList(new ArrayList<>()),
                    (key, newEntity, versionList) -> {
                        if (Objects.nonNull(newEntity)) {
                            versionList.getVersions().add(newEntity);
                        }
                        return versionList;
                    },
                    Named.as("EntityVersionListStore"),
                    Materialized
                            .<String, EntityVersionList, KeyValueStore<Bytes, byte[]>>as("entity-version-store")
                            .withKeySerde(Serdes.String())
                            .withValueSerde(entityListSerde)
            );
}

In this snippet, every update to an Entity triggers the aggregate function.
Each version is appended to an EntityVersionList, effectively maintaining a history of all changes.
Later, you can inspect timestamps or version numbers of each entity in the list to determine which version was used to produce other derived objects or for auditing purposes.

Explanation

This approach provides a lightweight versioning mechanism without requiring a separate event store.
By using a KTable + aggregate, Kafka Streams handles updates in real-time and persists the full history of each entity in a compacted state store.
This is especially useful when you need to reference previous versions for reconciliation, auditing, or creating derived entities.

Utility: Creating an Avro Serde

Here is a helper method to create a Specific Avro Serde for any type of entity:

public static <T extends SpecificRecord> SpecificAvroSerde<T> createAvroSerde( Map<String, String> serdeConfig, boolean isKey) { SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(); serde.configure(serdeConfig, isKey); return serde; } 

Ajouter un commentaire

Commentaires

Il n'y a pas encore de commentaire.