Kafka : Handle Serialization/Deserialization Exceptions

Publié le 15 octobre 2025 à 18:19
Kafka Streams Exception Handling Architecture 📥 INPUT LAYER 📋 orders-topic Source: Order events Format: Avro 📋 programs-topic Source: Config data Format: Avro 📋 clients-topic Source: Master data Format: Avro ⚙️ KAFKA STREAMS 🔍 Deserialization bytes → Java Object Using Avro Schema 🔄 Processing • Joins • Aggregations 📝 Serialization Java Object → bytes For Kafka write ERROR PATH ERROR PATH SUCCESS PATH 🛡️ EXCEPTION HANDLERS Deserialization Handler Triggers: • Invalid JSON/Avro • Missing fields • Schema mismatch Action: Log + DLQ + CONTINUE Serialization Handler Triggers: • Schema errors • Network timeout • Invalid schema ID Action: Log + DLQ + CONTINUE/FAIL 🪦 DEAD LETTER QUEUE 🪦 error-dlq Topic Contains: Original message + Error metadata Headers: error.type, stacktrace, source.topic, timestamp 📤 OUTPUT LAYER ✅ processed-orders Successfully processed data Format: Avro 📊 OBSERVABILITY 📈 Metrics dlq_messages_total error_rate_by_type processing_latency 🔔 Alerts High DLQ rate Repeated errors Pipeline health 🔍 Actions DLQ consumer Root cause analysis Manual reprocess 🔑 Key Points: ✅ Valid messages processed with exactly-once semantics ❌ Failed messages sent to DLQ with full error context and metadata 🛡️ Application continues despite errors (resilience) 📊 Full observability with metrics, alerts, and investigation tools
Handling Serialization Exceptions in Kafka Streams

Handling Serialization and Deserialization Exceptions in Kafka Streams

Last week, our Kafka Streams application crashed at 3 AM. A single malformed JSON message from an upstream service took down the entire pipeline. Orders stopped processing, alerts fired, and we spent the next two hours manually skipping offsets to get things running again.

Sound familiar?

This is the reality of stream processing in production. Data issues happen. Networks fail. Schemas evolve. And if you're not prepared, your entire infrastructure grinds to a halt.

The Problem with Default Exception Handling

By default, Kafka Streams has a simple approach to serialization errors: crash. When it can't deserialize a message or serialize a result, it throws an exception and your application dies.

This might seem reasonable in theory — fail fast, fix the problem, restart. But in practice, it's catastrophic.

Consider this scenario: You're processing 10,000 messages per second. Message number 7,342 has a JSON parsing error. Without proper exception handling:

  • Your application crashes immediately
  • All 10,000 messages stop processing
  • Kubernetes restarts your pod
  • It crashes again on the same bad message
  • You're now in a restart loop
  • Your on-call engineer gets paged at 3 AM

The blast radius of one bad message just took out your entire data pipeline.

A Better Approach

What if instead of crashing, we could:

  • Catch the exception gracefully
  • Log the error with full context
  • Send the failed message to a Dead Letter Queue
  • Continue processing the remaining 9,999 valid messages

This is exactly what custom exception handlers enable.

Key insight: In production systems, availability trumps perfection. It's better to process 99.99% of messages successfully while quarantining the 0.01% that fail, than to process 0% because you crashed on a single error.

Understanding the Two Failure Points

Kafka Streams has two critical moments where serialization can fail:

1. Deserialization (Reading from Kafka)

This happens when Kafka Streams consumes a message and tries to convert the raw bytes into a Java object. Common causes:

  • JsonParseException — malformed JSON from upstream
  • AvroRuntimeException — missing required fields in Avro schema
  • Schema Registry issues — unknown schema ID or version mismatch
  • Character encoding problems — UTF-8 vs UTF-16 confusion

Without a handler, your app crashes here and enters a restart loop because the bad message stays in Kafka.

2. Serialization (Writing to Kafka)

This happens after processing when Kafka Streams tries to write the result back to an output topic. Common causes:

  • Schema Registry timeout or unavailability
  • Incompatible schema evolution (breaking changes)
  • Your business logic produced an object that violates schema constraints
  • Network partition to Kafka brokers

Serialization failures are often more serious than deserialization failures. Why? Because they might indicate bugs in your code, not just bad input data.


Building Custom Exception Handlers

Let's build handlers that are production-ready. Not proof-of-concept code, but something you'd actually deploy.

The Configuration Layer

First, we need a way to inject configuration into our handlers. Kafka Streams instantiates handlers via reflection (no-arg constructor), so we can't use Spring's dependency injection directly.

The solution: a static configuration holder.

ExceptionHandlerConfig.java
package com.example.kafka.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;

@Component
@Slf4j
public class ExceptionHandlerConfig {

    @Getter
    private static String dlqTopic;

    @Getter
    private static String serviceName;

    @Getter
    private static int maxMessageSize;

    @Value("${app.dlq.topic}")
    private String dlqTopicInstance;

    @Value("${spring.application.name}")
    private String serviceNameInstance;

    @Value("${app.dlq.max-message-size:1000000}")
    private int maxMessageSizeInstance;

    @PostConstruct
    public void init() {
        dlqTopic = this.dlqTopicInstance;
        serviceName = this.serviceNameInstance;
        maxMessageSize = this.maxMessageSizeInstance;

        log.info("Exception handlers configured:");
        log.info("  DLQ topic: {}", dlqTopic);
        log.info("  Service: {}", serviceName);
        log.info("  Max size: {} bytes", maxMessageSize);
    }
}

This gets initialized at startup via Spring, then our handlers access it statically. Not elegant, but it works.

Deserialization Exception Handler

This handler catches failures when reading from Kafka topics.

CustomDeserializationExceptionHandler.java
package com.example.kafka.exceptions;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

@Slf4j
public class CustomDeserializationExceptionHandler 
        implements DeserializationExceptionHandler {

    private KafkaTemplate<byte[], byte[]> kafkaTemplate;
    private String dlqTopic;
    private String serviceName;
    private int maxMessageSize;

    // Required no-arg constructor for Kafka Streams reflection
    public CustomDeserializationExceptionHandler() {
        log.info("Handler instantiated");
    }

    @Override
    public DeserializationHandlerResponse handle(
            ProcessorContext context,
            ConsumerRecord<byte[], byte[]> record,
            Exception exception) {

        log.error("Deserialization error - topic: {}, partition: {}, offset: {}",
                record.topic(), record.partition(), record.offset(), exception);

        try {
            sendToDlq(record, exception);
            log.info("Message sent to DLQ: {}", dlqTopic);
            return DeserializationHandlerResponse.CONTINUE;
        } catch (Exception e) {
            log.error("Failed to send to DLQ, message lost", e);
            return DeserializationHandlerResponse.CONTINUE;
        }
    }

    private void sendToDlq(ConsumerRecord<byte[], byte[]> record, 
                          Exception exception) {
        byte[] truncatedValue = truncate(record.value(), maxMessageSize);

        ProducerRecord<byte[], byte[]> dlqRecord = 
                new ProducerRecord<>(dlqTopic, null, record.key(), truncatedValue);

        enrichHeaders(dlqRecord.headers(), record, exception);

        kafkaTemplate.send(dlqRecord).whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("DLQ send failed: {}", ex.getMessage());
            } else {
                log.info("DLQ sent - offset: {}", 
                        result.getRecordMetadata().offset());
            }
        });
    }

    private void enrichHeaders(Headers headers, 
                               ConsumerRecord<byte[], byte[]> record, 
                               Exception exception) {
        headers.add("dlq.service.name", 
                serviceName.getBytes(StandardCharsets.UTF_8));
        headers.add("dlq.source.topic", 
                record.topic().getBytes(StandardCharsets.UTF_8));
        headers.add("dlq.source.partition",
                ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array());
        headers.add("dlq.source.offset",
                String.valueOf(record.offset()).getBytes(StandardCharsets.UTF_8));
        headers.add("error.type", 
                "DESERIALIZATION_ERROR".getBytes(StandardCharsets.UTF_8));
        headers.add("error.exception.class",
                exception.getClass().getName().getBytes(StandardCharsets.UTF_8));

        if (exception.getMessage() != null) {
            headers.add("error.exception.message",
                    truncate(exception.getMessage()
                            .getBytes(StandardCharsets.UTF_8), maxMessageSize));
        }

        String stacktrace = ExceptionUtils.getStackTrace(exception);
        headers.add("error.stacktrace",
                truncate(stacktrace.getBytes(StandardCharsets.UTF_8), 
                        maxMessageSize));
        headers.add("error.timestamp",
                String.valueOf(System.currentTimeMillis())
                        .getBytes(StandardCharsets.UTF_8));
    }

    private byte[] truncate(byte[] data, int maxSize) {
        if (data == null || data.length <= maxSize) {
            return data;
        }

        byte[] truncated = new byte[maxSize];
        System.arraycopy(data, 0, truncated, 0, maxSize);

        String marker = "...[TRUNCATED]";
        byte[] markerBytes = marker.getBytes(StandardCharsets.UTF_8);
        int markerStart = Math.max(0, maxSize - markerBytes.length);
        System.arraycopy(markerBytes, 0, truncated, markerStart,
                Math.min(markerBytes.length, maxSize));

        return truncated;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        this.dlqTopic = ExceptionHandlerConfig.getDlqTopic();
        this.serviceName = ExceptionHandlerConfig.getServiceName();
        this.maxMessageSize = ExceptionHandlerConfig.getMaxMessageSize();

        Map<String, Object> producerConfigs = 
                new java.util.HashMap<>(configs);
        producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                ByteArraySerializer.class);
        producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                ByteArraySerializer.class);
        producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
        producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 3);

        this.kafkaTemplate = new KafkaTemplate<>(
                new DefaultKafkaProducerFactory<>(producerConfigs));

        log.info("Deserialization handler configured - DLQ: {}", dlqTopic);
    }
}

The critical line here is return DeserializationHandlerResponse.CONTINUE. This tells Kafka Streams: "I handled this error, skip this message and move on."

If you return FAIL instead, you're back to crashing on every bad message.

Serialization Exception Handler

This handler catches failures when writing to output topics.

CustomSerializationExceptionHandler.java
package com.example.kafka.exceptions;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

@Slf4j
public class CustomSerializationExceptionHandler 
        implements ProductionExceptionHandler {

    private KafkaTemplate<byte[], byte[]> kafkaTemplate;
    private String dlqTopic;
    private String serviceName;
    private int maxMessageSize;

    public CustomSerializationExceptionHandler() {
        log.info("Handler instantiated");
    }

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {

        log.error("Serialization error - topic: {}, partition: {}",
                record.topic(), record.partition(), exception);

        try {
            sendToDlq(record, exception);
            log.info("Message sent to DLQ: {}", dlqTopic);
        } catch (Exception e) {
            log.error("Failed to send to DLQ", e);
        }

        if (isRecoverable(exception)) {
            log.warn("Recoverable error, continuing");
            return ProductionExceptionHandlerResponse.CONTINUE;
        }

        log.error("Non-recoverable error, failing");
        return ProductionExceptionHandlerResponse.FAIL;
    }

    private void sendToDlq(ProducerRecord<byte[], byte[]> record, 
                          Exception exception) {
        byte[] truncatedValue = truncate(record.value(), maxMessageSize);

        ProducerRecord<byte[], byte[]> dlqRecord = 
                new ProducerRecord<>(dlqTopic, null, record.key(), truncatedValue);

        enrichHeaders(dlqRecord.headers(), record, exception);

        kafkaTemplate.send(dlqRecord).whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("DLQ send failed: {}", ex.getMessage());
            } else {
                log.info("DLQ sent - offset: {}", 
                        result.getRecordMetadata().offset());
            }
        });
    }

    private void enrichHeaders(Headers headers, 
                               ProducerRecord<byte[], byte[]> record, 
                               Exception exception) {
        headers.add("dlq.service.name", 
                serviceName.getBytes(StandardCharsets.UTF_8));
        headers.add("dlq.source.topic", 
                record.topic().getBytes(StandardCharsets.UTF_8));

        if (record.partition() != null) {
            headers.add("dlq.source.partition",
                    ByteBuffer.allocate(Integer.BYTES)
                            .putInt(record.partition()).array());
        }

        headers.add("error.type", 
                "SERIALIZATION_ERROR".getBytes(StandardCharsets.UTF_8));
        headers.add("error.exception.class",
                exception.getClass().getName().getBytes(StandardCharsets.UTF_8));

        if (exception.getMessage() != null) {
            headers.add("error.exception.message",
                    truncate(exception.getMessage()
                            .getBytes(StandardCharsets.UTF_8), maxMessageSize));
        }

        String stacktrace = ExceptionUtils.getStackTrace(exception);
        headers.add("error.stacktrace",
                truncate(stacktrace.getBytes(StandardCharsets.UTF_8), 
                        maxMessageSize));
        headers.add("error.timestamp",
                String.valueOf(System.currentTimeMillis())
                        .getBytes(StandardCharsets.UTF_8));
    }

    private byte[] truncate(byte[] data, int maxSize) {
        if (data == null || data.length <= maxSize) {
            return data;
        }

        byte[] truncated = new byte[maxSize];
        System.arraycopy(data, 0, truncated, 0, maxSize);

        String marker = "...[TRUNCATED]";
        byte[] markerBytes = marker.getBytes(StandardCharsets.UTF_8);
        int markerStart = Math.max(0, maxSize - markerBytes.length);
        System.arraycopy(markerBytes, 0, truncated, markerStart,
                Math.min(markerBytes.length, maxSize));

        return truncated;
    }

    private boolean isRecoverable(Exception exception) {
        return exception instanceof org.apache.kafka.common.errors.TimeoutException
                || exception instanceof org.apache.kafka.common.errors.RetriableException
                || exception instanceof org.apache.kafka.common.errors.NetworkException;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        this.dlqTopic = ExceptionHandlerConfig.getDlqTopic();
        this.serviceName = ExceptionHandlerConfig.getServiceName();
        this.maxMessageSize = ExceptionHandlerConfig.getMaxMessageSize();

        Map<String, Object> producerConfigs = 
                new java.util.HashMap<>(configs);
        producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                ByteArraySerializer.class);
        producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                ByteArraySerializer.class);
        producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
        producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 3);

        this.kafkaTemplate = new KafkaTemplate<>(
                new DefaultKafkaProducerFactory<>(producerConfigs));

        log.info("Serialization handler configured - DLQ: {}", dlqTopic);
    }
}

Notice the difference here. For serialization errors, we check if the error is recoverable. Network timeouts? Continue. Schema incompatibility that indicates a bug in our code? Fail fast so we can fix it.

The Dead Letter Queue

Both handlers send failed messages to a DLQ — a standard Kafka topic that acts as a quarantine zone for problematic data.

Each DLQ message includes:

  • The original message bytes (truncated to prevent oversized messages)
  • Source metadata (topic, partition, offset)
  • Error details (exception class, message, full stacktrace)
  • Timestamp of when the error occurred

Here's what the enriched headers look like:

enrichHeaders() method
private void enrichHeaders(Headers headers, 
                           ConsumerRecord record, 
                           Exception exception) {
    headers.add("dlq.service.name", 
            serviceName.getBytes(StandardCharsets.UTF_8));
    headers.add("dlq.source.topic", 
            record.topic().getBytes(StandardCharsets.UTF_8));
    headers.add("dlq.source.partition", 
            ByteBuffer.allocate(Integer.BYTES)
                    .putInt(record.partition()).array());
    headers.add("dlq.source.offset", 
            String.valueOf(record.offset()).getBytes(StandardCharsets.UTF_8));
    headers.add("error.type", 
            "DESERIALIZATION_ERROR".getBytes(StandardCharsets.UTF_8));
    headers.add("error.exception.class", 
            exception.getClass().getName().getBytes(StandardCharsets.UTF_8));
    headers.add("error.stacktrace", 
            truncate(ExceptionUtils.getStackTrace(exception)
                    .getBytes(StandardCharsets.UTF_8), maxMessageSize));
    headers.add("error.timestamp", 
            String.valueOf(System.currentTimeMillis())
                    .getBytes(StandardCharsets.UTF_8));
}

Why a Separate Producer?

You might notice we use a dedicated KafkaProducer for the DLQ, not Kafka Streams' internal producer.

This is intentional. Kafka Streams' producer operates within an exactly-once transaction. If DLQ writes were part of that transaction, a DLQ write failure would abort the entire batch of valid messages.

By using a separate producer, DLQ writes are best-effort and independent. If the DLQ write fails, we log it but don't crash the stream.

Configuration in application.yml

application.yml
spring:
  application:
    name: kafka-streams-app
  
  kafka:
    bootstrap-servers: localhost:9092
    
    streams:
      application-id: kafka-streams-app
      properties:
        # Exception handlers configuration
        default.deserialization.exception.handler: 
          com.example.kafka.exceptions.CustomDeserializationExceptionHandler
        default.production.exception.handler: 
          com.example.kafka.exceptions.CustomSerializationExceptionHandler
        
        # Processing guarantees
        processing.guarantee: exactly_once_v2
        commit.interval.ms: 1000
        
        # Serdes
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        
        # Schema Registry
        schema.registry.url: http://localhost:8081

app:
  dlq:
    topic: error-dlq
    max-message-size: 1000000  # 1MB
  
  topics:
    input:
      orders: orders-topic
      programs: programs-topic
      clients: clients-topic
    output:
      processed: processed-orders-topic

Two things to note here:

  1. We keep exactly-once semantics enabled. The DLQ writes happen outside this guarantee, but your normal processing remains transactional.
  2. Message size limits prevent oversized messages from breaking the DLQ itself.

What About Monitoring?

Exception handlers without monitoring are just silent failures with extra steps.

You need to track:

  • dlq_messages_total — counter, labeled by error type
  • error_rate_5m — sliding window to catch spikes
  • dlq_topic_lag — are errors being investigated or piling up?

Set up alerts:

  • DLQ rate > 10 messages in 5 minutes → page on-call
  • Same error repeated 50+ times → upstream data quality issue
  • Application state != RUNNING → health check failure

Investigating DLQ Messages

Build a simple consumer that reads your DLQ topic and extracts patterns:

bash
kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic error-dlq \
  --property print.headers=true \
  --from-beginning

Look for patterns:

  • Are all errors from the same source topic? Upstream issue.
  • Same exception class repeated? Schema problem.
  • Errors started at a specific timestamp? Recent deployment correlation.

Reprocessing from DLQ

Once you've fixed the root cause (updated schema, fixed upstream service, patched your code), you can replay messages from the DLQ:

  1. Consume messages from DLQ
  2. Extract original bytes from message value
  3. Extract source topic from headers
  4. Produce back to the original topic
  5. Let your Kafka Streams app reprocess them

This closes the loop — no data loss, just delayed processing.

Common Pitfalls

1. Using Spring's KafkaTemplate

Don't share your application's KafkaTemplate. Create a dedicated one in the handler's configure() method to avoid coupling and config conflicts.

2. Not Truncating Messages

Kafka's default max message size is 1MB. A 5MB JSON blob will fail to write to the DLQ. Always truncate both message payloads and stacktraces.

3. Forgetting the DLQ Topic Exists

If auto-topic-creation is disabled and your DLQ topic doesn't exist, writes fail silently. Create it upfront with appropriate retention (7-30 days).

4. Returning FAIL for All Deserialization Errors

I've seen this mistake multiple times. Returning FAIL in the deserialization handler defeats the entire purpose. You'll crash on the first bad message, just like before.

5. Not Distinguishing Recoverable vs Non-Recoverable Errors

For serialization errors, a TimeoutException is transient — retry makes sense. But a schema validation error indicates a bug in your code. Fail fast so you notice and fix it.


The Impact

After implementing these handlers in our production environment:

  • Zero crashes from malformed data in 6 months
  • Average of 2-3 DLQ messages per day (down from 10-15 crashes per week)
  • Mean time to recovery dropped from 2 hours to 0 (no recovery needed)
  • We catch upstream data issues before they spread

More importantly, we sleep better. That 3 AM page? Hasn't happened since.

Conclusion

Exception handling in Kafka Streams isn't optional. It's the difference between a fragile prototype and a production-grade system.

The pattern is straightforward:

  1. Catch exceptions at serialization boundaries
  2. Enrich with diagnostic metadata
  3. Send to DLQ
  4. Continue processing
  5. Monitor and alert
  6. Investigate and fix root causes
  7. Replay when appropriate

Your data pipeline will be more resilient, your on-call rotation will thank you, and you'll have full visibility into failures instead of wondering what data got lost during crashes.

The code is production-tested, the patterns are proven, and the benefits are immediate.

Now go fix your exception handling before your next 3 AM incident.

Ajouter un commentaire

Commentaires

Il n'y a pas encore de commentaire.