Handling Serialization and Deserialization Exceptions in Kafka Streams
Last week, our Kafka Streams application crashed at 3 AM. A single malformed JSON message from an upstream service took down the entire pipeline. Orders stopped processing, alerts fired, and we spent the next two hours manually skipping offsets to get things running again.
Sound familiar?
This is the reality of stream processing in production. Data issues happen. Networks fail. Schemas evolve. And if you're not prepared, your entire infrastructure grinds to a halt.
The Problem with Default Exception Handling
By default, Kafka Streams has a simple approach to serialization errors: crash. When it can't deserialize a message or serialize a result, it throws an exception and your application dies.
This might seem reasonable in theory — fail fast, fix the problem, restart. But in practice, it's catastrophic.
Consider this scenario: You're processing 10,000 messages per second. Message number 7,342 has a JSON parsing error. Without proper exception handling:
- Your application crashes immediately
- All 10,000 messages stop processing
- Kubernetes restarts your pod
- It crashes again on the same bad message
- You're now in a restart loop
- Your on-call engineer gets paged at 3 AM
The blast radius of one bad message just took out your entire data pipeline.
A Better Approach
What if instead of crashing, we could:
- Catch the exception gracefully
- Log the error with full context
- Send the failed message to a Dead Letter Queue
- Continue processing the remaining 9,999 valid messages
This is exactly what custom exception handlers enable.
Key insight: In production systems, availability trumps perfection. It's better to process 99.99% of messages successfully while quarantining the 0.01% that fail, than to process 0% because you crashed on a single error.
Understanding the Two Failure Points
Kafka Streams has two critical moments where serialization can fail:
1. Deserialization (Reading from Kafka)
This happens when Kafka Streams consumes a message and tries to convert the raw bytes into a Java object. Common causes:
- JsonParseException — malformed JSON from upstream
- AvroRuntimeException — missing required fields in Avro schema
- Schema Registry issues — unknown schema ID or version mismatch
- Character encoding problems — UTF-8 vs UTF-16 confusion
Without a handler, your app crashes here and enters a restart loop because the bad message stays in Kafka.
2. Serialization (Writing to Kafka)
This happens after processing when Kafka Streams tries to write the result back to an output topic. Common causes:
- Schema Registry timeout or unavailability
- Incompatible schema evolution (breaking changes)
- Your business logic produced an object that violates schema constraints
- Network partition to Kafka brokers
Serialization failures are often more serious than deserialization failures. Why? Because they might indicate bugs in your code, not just bad input data.
Building Custom Exception Handlers
Let's build handlers that are production-ready. Not proof-of-concept code, but something you'd actually deploy.
The Configuration Layer
First, we need a way to inject configuration into our handlers. Kafka Streams instantiates handlers via reflection (no-arg constructor), so we can't use Spring's dependency injection directly.
The solution: a static configuration holder.
package com.example.kafka.config; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import jakarta.annotation.PostConstruct; @Component @Slf4j public class ExceptionHandlerConfig { @Getter private static String dlqTopic; @Getter private static String serviceName; @Getter private static int maxMessageSize; @Value("${app.dlq.topic}") private String dlqTopicInstance; @Value("${spring.application.name}") private String serviceNameInstance; @Value("${app.dlq.max-message-size:1000000}") private int maxMessageSizeInstance; @PostConstruct public void init() { dlqTopic = this.dlqTopicInstance; serviceName = this.serviceNameInstance; maxMessageSize = this.maxMessageSizeInstance; log.info("Exception handlers configured:"); log.info(" DLQ topic: {}", dlqTopic); log.info(" Service: {}", serviceName); log.info(" Max size: {} bytes", maxMessageSize); } }
This gets initialized at startup via Spring, then our handlers access it statically. Not elegant, but it works.
Deserialization Exception Handler
This handler catches failures when reading from Kafka topics.
package com.example.kafka.exceptions; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; @Slf4j public class CustomDeserializationExceptionHandler implements DeserializationExceptionHandler { private KafkaTemplate<byte[], byte[]> kafkaTemplate; private String dlqTopic; private String serviceName; private int maxMessageSize; // Required no-arg constructor for Kafka Streams reflection public CustomDeserializationExceptionHandler() { log.info("Handler instantiated"); } @Override public DeserializationHandlerResponse handle( ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) { log.error("Deserialization error - topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset(), exception); try { sendToDlq(record, exception); log.info("Message sent to DLQ: {}", dlqTopic); return DeserializationHandlerResponse.CONTINUE; } catch (Exception e) { log.error("Failed to send to DLQ, message lost", e); return DeserializationHandlerResponse.CONTINUE; } } private void sendToDlq(ConsumerRecord<byte[], byte[]> record, Exception exception) { byte[] truncatedValue = truncate(record.value(), maxMessageSize); ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(dlqTopic, null, record.key(), truncatedValue); enrichHeaders(dlqRecord.headers(), record, exception); kafkaTemplate.send(dlqRecord).whenComplete((result, ex) -> { if (ex != null) { log.error("DLQ send failed: {}", ex.getMessage()); } else { log.info("DLQ sent - offset: {}", result.getRecordMetadata().offset()); } }); } private void enrichHeaders(Headers headers, ConsumerRecord<byte[], byte[]> record, Exception exception) { headers.add("dlq.service.name", serviceName.getBytes(StandardCharsets.UTF_8)); headers.add("dlq.source.topic", record.topic().getBytes(StandardCharsets.UTF_8)); headers.add("dlq.source.partition", ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()); headers.add("dlq.source.offset", String.valueOf(record.offset()).getBytes(StandardCharsets.UTF_8)); headers.add("error.type", "DESERIALIZATION_ERROR".getBytes(StandardCharsets.UTF_8)); headers.add("error.exception.class", exception.getClass().getName().getBytes(StandardCharsets.UTF_8)); if (exception.getMessage() != null) { headers.add("error.exception.message", truncate(exception.getMessage() .getBytes(StandardCharsets.UTF_8), maxMessageSize)); } String stacktrace = ExceptionUtils.getStackTrace(exception); headers.add("error.stacktrace", truncate(stacktrace.getBytes(StandardCharsets.UTF_8), maxMessageSize)); headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()) .getBytes(StandardCharsets.UTF_8)); } private byte[] truncate(byte[] data, int maxSize) { if (data == null || data.length <= maxSize) { return data; } byte[] truncated = new byte[maxSize]; System.arraycopy(data, 0, truncated, 0, maxSize); String marker = "...[TRUNCATED]"; byte[] markerBytes = marker.getBytes(StandardCharsets.UTF_8); int markerStart = Math.max(0, maxSize - markerBytes.length); System.arraycopy(markerBytes, 0, truncated, markerStart, Math.min(markerBytes.length, maxSize)); return truncated; } @Override public void configure(Map<String, ?> configs) { this.dlqTopic = ExceptionHandlerConfig.getDlqTopic(); this.serviceName = ExceptionHandlerConfig.getServiceName(); this.maxMessageSize = ExceptionHandlerConfig.getMaxMessageSize(); Map<String, Object> producerConfigs = new java.util.HashMap<>(configs); producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 3); this.kafkaTemplate = new KafkaTemplate<>( new DefaultKafkaProducerFactory<>(producerConfigs)); log.info("Deserialization handler configured - DLQ: {}", dlqTopic); } }
The critical line here is return DeserializationHandlerResponse.CONTINUE. This tells Kafka Streams: "I handled this error, skip this message and move on."
If you return FAIL instead, you're back to crashing on every bad message.
Serialization Exception Handler
This handler catches failures when writing to output topics.
package com.example.kafka.exceptions; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; @Slf4j public class CustomSerializationExceptionHandler implements ProductionExceptionHandler { private KafkaTemplate<byte[], byte[]> kafkaTemplate; private String dlqTopic; private String serviceName; private int maxMessageSize; public CustomSerializationExceptionHandler() { log.info("Handler instantiated"); } @Override public ProductionExceptionHandlerResponse handle( ProducerRecord<byte[], byte[]> record, Exception exception) { log.error("Serialization error - topic: {}, partition: {}", record.topic(), record.partition(), exception); try { sendToDlq(record, exception); log.info("Message sent to DLQ: {}", dlqTopic); } catch (Exception e) { log.error("Failed to send to DLQ", e); } if (isRecoverable(exception)) { log.warn("Recoverable error, continuing"); return ProductionExceptionHandlerResponse.CONTINUE; } log.error("Non-recoverable error, failing"); return ProductionExceptionHandlerResponse.FAIL; } private void sendToDlq(ProducerRecord<byte[], byte[]> record, Exception exception) { byte[] truncatedValue = truncate(record.value(), maxMessageSize); ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(dlqTopic, null, record.key(), truncatedValue); enrichHeaders(dlqRecord.headers(), record, exception); kafkaTemplate.send(dlqRecord).whenComplete((result, ex) -> { if (ex != null) { log.error("DLQ send failed: {}", ex.getMessage()); } else { log.info("DLQ sent - offset: {}", result.getRecordMetadata().offset()); } }); } private void enrichHeaders(Headers headers, ProducerRecord<byte[], byte[]> record, Exception exception) { headers.add("dlq.service.name", serviceName.getBytes(StandardCharsets.UTF_8)); headers.add("dlq.source.topic", record.topic().getBytes(StandardCharsets.UTF_8)); if (record.partition() != null) { headers.add("dlq.source.partition", ByteBuffer.allocate(Integer.BYTES) .putInt(record.partition()).array()); } headers.add("error.type", "SERIALIZATION_ERROR".getBytes(StandardCharsets.UTF_8)); headers.add("error.exception.class", exception.getClass().getName().getBytes(StandardCharsets.UTF_8)); if (exception.getMessage() != null) { headers.add("error.exception.message", truncate(exception.getMessage() .getBytes(StandardCharsets.UTF_8), maxMessageSize)); } String stacktrace = ExceptionUtils.getStackTrace(exception); headers.add("error.stacktrace", truncate(stacktrace.getBytes(StandardCharsets.UTF_8), maxMessageSize)); headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()) .getBytes(StandardCharsets.UTF_8)); } private byte[] truncate(byte[] data, int maxSize) { if (data == null || data.length <= maxSize) { return data; } byte[] truncated = new byte[maxSize]; System.arraycopy(data, 0, truncated, 0, maxSize); String marker = "...[TRUNCATED]"; byte[] markerBytes = marker.getBytes(StandardCharsets.UTF_8); int markerStart = Math.max(0, maxSize - markerBytes.length); System.arraycopy(markerBytes, 0, truncated, markerStart, Math.min(markerBytes.length, maxSize)); return truncated; } private boolean isRecoverable(Exception exception) { return exception instanceof org.apache.kafka.common.errors.TimeoutException || exception instanceof org.apache.kafka.common.errors.RetriableException || exception instanceof org.apache.kafka.common.errors.NetworkException; } @Override public void configure(Map<String, ?> configs) { this.dlqTopic = ExceptionHandlerConfig.getDlqTopic(); this.serviceName = ExceptionHandlerConfig.getServiceName(); this.maxMessageSize = ExceptionHandlerConfig.getMaxMessageSize(); Map<String, Object> producerConfigs = new java.util.HashMap<>(configs); producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 3); this.kafkaTemplate = new KafkaTemplate<>( new DefaultKafkaProducerFactory<>(producerConfigs)); log.info("Serialization handler configured - DLQ: {}", dlqTopic); } }
Notice the difference here. For serialization errors, we check if the error is recoverable. Network timeouts? Continue. Schema incompatibility that indicates a bug in our code? Fail fast so we can fix it.
The Dead Letter Queue
Both handlers send failed messages to a DLQ — a standard Kafka topic that acts as a quarantine zone for problematic data.
Each DLQ message includes:
- The original message bytes (truncated to prevent oversized messages)
- Source metadata (topic, partition, offset)
- Error details (exception class, message, full stacktrace)
- Timestamp of when the error occurred
Here's what the enriched headers look like:
private void enrichHeaders(Headers headers, ConsumerRecord record, Exception exception) { headers.add("dlq.service.name", serviceName.getBytes(StandardCharsets.UTF_8)); headers.add("dlq.source.topic", record.topic().getBytes(StandardCharsets.UTF_8)); headers.add("dlq.source.partition", ByteBuffer.allocate(Integer.BYTES) .putInt(record.partition()).array()); headers.add("dlq.source.offset", String.valueOf(record.offset()).getBytes(StandardCharsets.UTF_8)); headers.add("error.type", "DESERIALIZATION_ERROR".getBytes(StandardCharsets.UTF_8)); headers.add("error.exception.class", exception.getClass().getName().getBytes(StandardCharsets.UTF_8)); headers.add("error.stacktrace", truncate(ExceptionUtils.getStackTrace(exception) .getBytes(StandardCharsets.UTF_8), maxMessageSize)); headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()) .getBytes(StandardCharsets.UTF_8)); }
Why a Separate Producer?
You might notice we use a dedicated KafkaProducer for the DLQ, not Kafka Streams' internal producer.
This is intentional. Kafka Streams' producer operates within an exactly-once transaction. If DLQ writes were part of that transaction, a DLQ write failure would abort the entire batch of valid messages.
By using a separate producer, DLQ writes are best-effort and independent. If the DLQ write fails, we log it but don't crash the stream.
Configuration in application.yml
spring: application: name: kafka-streams-app kafka: bootstrap-servers: localhost:9092 streams: application-id: kafka-streams-app properties: # Exception handlers configuration default.deserialization.exception.handler: com.example.kafka.exceptions.CustomDeserializationExceptionHandler default.production.exception.handler: com.example.kafka.exceptions.CustomSerializationExceptionHandler # Processing guarantees processing.guarantee: exactly_once_v2 commit.interval.ms: 1000 # Serdes default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde # Schema Registry schema.registry.url: http://localhost:8081 app: dlq: topic: error-dlq max-message-size: 1000000 # 1MB topics: input: orders: orders-topic programs: programs-topic clients: clients-topic output: processed: processed-orders-topic
Two things to note here:
- We keep exactly-once semantics enabled. The DLQ writes happen outside this guarantee, but your normal processing remains transactional.
- Message size limits prevent oversized messages from breaking the DLQ itself.
What About Monitoring?
Exception handlers without monitoring are just silent failures with extra steps.
You need to track:
- dlq_messages_total — counter, labeled by error type
- error_rate_5m — sliding window to catch spikes
- dlq_topic_lag — are errors being investigated or piling up?
Set up alerts:
- DLQ rate > 10 messages in 5 minutes → page on-call
- Same error repeated 50+ times → upstream data quality issue
- Application state != RUNNING → health check failure
Investigating DLQ Messages
Build a simple consumer that reads your DLQ topic and extracts patterns:
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic error-dlq \ --property print.headers=true \ --from-beginning
Look for patterns:
- Are all errors from the same source topic? Upstream issue.
- Same exception class repeated? Schema problem.
- Errors started at a specific timestamp? Recent deployment correlation.
Reprocessing from DLQ
Once you've fixed the root cause (updated schema, fixed upstream service, patched your code), you can replay messages from the DLQ:
- Consume messages from DLQ
- Extract original bytes from message value
- Extract source topic from headers
- Produce back to the original topic
- Let your Kafka Streams app reprocess them
This closes the loop — no data loss, just delayed processing.
Common Pitfalls
1. Using Spring's KafkaTemplate
Don't share your application's KafkaTemplate. Create a dedicated one in the handler's configure() method to avoid coupling and config conflicts.
2. Not Truncating Messages
Kafka's default max message size is 1MB. A 5MB JSON blob will fail to write to the DLQ. Always truncate both message payloads and stacktraces.
3. Forgetting the DLQ Topic Exists
If auto-topic-creation is disabled and your DLQ topic doesn't exist, writes fail silently. Create it upfront with appropriate retention (7-30 days).
4. Returning FAIL for All Deserialization Errors
I've seen this mistake multiple times. Returning FAIL in the deserialization handler defeats the entire purpose. You'll crash on the first bad message, just like before.
5. Not Distinguishing Recoverable vs Non-Recoverable Errors
For serialization errors, a TimeoutException is transient — retry makes sense. But a schema validation error indicates a bug in your code. Fail fast so you notice and fix it.
The Impact
After implementing these handlers in our production environment:
- Zero crashes from malformed data in 6 months
- Average of 2-3 DLQ messages per day (down from 10-15 crashes per week)
- Mean time to recovery dropped from 2 hours to 0 (no recovery needed)
- We catch upstream data issues before they spread
More importantly, we sleep better. That 3 AM page? Hasn't happened since.
Conclusion
Exception handling in Kafka Streams isn't optional. It's the difference between a fragile prototype and a production-grade system.
The pattern is straightforward:
- Catch exceptions at serialization boundaries
- Enrich with diagnostic metadata
- Send to DLQ
- Continue processing
- Monitor and alert
- Investigate and fix root causes
- Replay when appropriate
Your data pipeline will be more resilient, your on-call rotation will thank you, and you'll have full visibility into failures instead of wondering what data got lost during crashes.
The code is production-tested, the patterns are proven, and the benefits are immediate.
Now go fix your exception handling before your next 3 AM incident.
Ajouter un commentaire
Commentaires