Error recovery in Kafka

How does everyone handle unrecoverable processing errors, such as those caused by corrupt records or a bug in your consumer? My colleagues are pushing for a “dead-letter queue”, where we write “bad” records and then continue processing. This practice is very common in ephemeral messaging systems (e.g. RabbitMQ, SQS, etc.), but I feel like that might not be the best approach in Kafka, where the messages are persisted. That said, halting processing and fiddling with offsets to skip a bunch of corrupt records also doesn’t seem like a great solution.
What approach does everyone here take?

https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/

https://www.confluent.io/blog/error-handling-patterns-in-kafka/

These could help guide you

  • this is a great question and can vary based on everyone’s needs.

For example, when it comes to Avro deserialization issues I break down the exception to understand if it is an issue with deserialization or an issue with connecting to Schema Registry. In the former I log to DLQ and move in, in the latter, I do a full stop.

In addition to that, I do the following when it comes to Kafka Streams.

Anything on input should validate and do a DLQ, but if there are any errors with internal topics, I do a full stop. The reason is any failures at this point are issues with the streams application, since I should validate any incoming data as invalid upon entry and if I fail to do that upon entry then that is a coding issue with this application and needs to be fixed and redeployed. If it is bad-data coming in, that is out of my control.

I have also creatd a “threshold” error handler and if I got more than 5% (a setting) of errors within a period of time I would do a shutdown.

But again it all depends on can you handle skipping an event and/or handle reprocessing an event later.

When it comes to DLQ processing my approach is when that runs you use it to tell the source system to run again not to resume from where the error occurred — get the system that produced bad data to correct and send again.

Interesting, those are some great ideas. Our app is a Kafka Streams one, and I hadn’t given any thought to the distinction between errors on internal vs. source topics, but that makes total sense. I love the idea of the “threshold” error handler too.

If you write a custom DeserializerExceptionHandler and you are interested in doing a threshold; some pieces of advice from memory…

you should be able to get to the metric streams has for skipped records via the following

this is internal naming, so this could change on release; at the time I investigate this the tags needed to pull the metric from the context was the client-id which is the same as the thread id

tags.put("client-id", Thread.currentThread().getName());
String name = new MetricName("skipped-records-rate", "stream-metrics", "", tags);
Metric metric = context.metrics().metrics().get(name)```
then in the handle method return `DeserializationHandlerResponse.FAIL` if you are above that threshold as the handle method provides the processor context

This way you do not have to calculate the rate yourself; provided the 30second avg. from streams is ok for you (or you change the metric sampling and such — which is a lot of fun, but not really where you want to spend your time :wink:

Now if you are working with Schema Registry and wanting to track down what is a hard failure vs. a data failure, that is a whole other area of rabbit-hole to explore.

You will want to check the cause of the SerializationException to know if it a connection issue or not

RestClientException cause will be the heart of them, but not all of them. For example, what if their certificate expired or credentials changed (if you were using basic auth) you could possibility get a certificate exception as the cause.

What if DNS changes, you will get an unknown host exception

I think I covered most of them here, but I believe there are other scenarios and they are not all easy to understand/test for. So you do want to log and monitor for a new scenario that you uncover as a data issue vs a SR issue — of course with docker you can easily spin up and down SR change hostname, deploy with incorrect cert, etc to try to uncover them all

This is all really great advice and very interesting, thanks a lot.