Kafka - re-consuming or reprocessing failed messages

And if a message in the middle of the batch fails (your message 2), messages 3 and 4 will not have been processed, and will be in the next poll (with auto-commit)

But my business logic in my go code, I don’t use Kafka streams.


import (
   "context"
   "fmt"
   "log"
   "strings"

   kafka "[github.com/segmentio/kafka-go](http://github.com/segmentio/kafka-go)"
)

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
   brokers := strings.Split(kafkaURL, ",")
   return kafka.NewReader(kafka.ReaderConfig{
      Brokers:  brokers,
      GroupID:  groupID,
      Topic:    topic,
      MinBytes: 10e3, // 10KB
      MaxBytes: 10e6, // 10MB
   })
}

func main() {
   // get kafka reader using environment variables.
   kafkaURL := "localhost:9093"
   topic := "url"
   groupID := "group-id"

   reader := getKafkaReader(kafkaURL, topic, groupID)

   defer reader.Close()

   fmt.Println("start consuming ... !!")
   for {
      m, err := reader.ReadMessage(context.Background())
      if err != nil {
         log.Fatalln(err)
      }
      fmt.Printf("message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
      reader.CommitMessages(context.Background(), m)
   }
}```

All happens in this code, I decide it here to commit or not

Is it because you are committing the messages yourself, when this is taken care of for you?:

ReadMessage automatically commits offsets when using consumer groups.
kafka-go also supports explicit commits. Instead of calling ReadMessage, call FetchMessage followed by CommitMessages.
ref: https://github.com/segmentio/kafka-go#consumer-groups

Is there any better way to handle this kind of situation with Kafka? as far as I understand offsets are being used to keep track in case of any error for Kafka not my business logic.

Maybe dead letter queue?

What would you suggest?

If it is a transient error that you can recover by re-consuming off the topic, then that is ok. if you dont want to re-consume it, then dead letter the message, which will still mean it is marked as consumed and therefore not re-consumed

> as far as I understand offsets are being used to keep track in case of any error for Kafka not my business logic.
writing to the consumer_offsets topic is done to track which messages have been consumed by which consumer groups

Consumer cannot consume messages concurrently from the same topic-partition. So here are the scenarios:

  1. If your messages that have offset 40 and 50 are on the same partition. Message that has offset 50 won’t be consumed by your consumer until you commit offset 40. This means the consumer will continue to consume message that has offset 40 until you commit the offset.
  2. If your messages that have offset 40 and 50 are on different partitions then the message with offset 50 will be consumed and processed by by another consumer on the same consume-group that is assigned to the topic-partition.
    So technically, you can just use manual-commit and decide to commit the offset in your logic. This solution is simple but not optimal because as you see above once there is a message that is failed to process, your consumer wont consume next “good” messages on the same topic-partition.
    Alternative is to separated failed-process messages to another topic to reprocess them. You would see this support in other Messaging system like MSMQ, RabbitMQ.

This is quite confusing, since the offset is specific for a partition. So given enough data, all partitions will have an offset 40 and 50.

You could consume messages concurrently from the same topic-partition, as long as they are not in the same consumer group. For example by assigning partitions. Every consumer with the assigned partition does get all the messages. It’s not possible to split those.

Only if you use a consumer group, and start consuming from a partition, the stored offset comes into play. You could read thousands of messages without commiting any offset if you want to just fine. But that does mean on rebalance some other consumer does also get those thousands of messages again.

you are right about messages consumed by multiple consumers from different consumer groups.
My reply was in assumption all consumers are in the same consumer group as It doesn’t seem right to process an order several times by different consumers from different consumer group.

Consuming a message not necessarily means processing the order. You need some bookkeeping, but it could make things faster. Assuming to many things can lead to errors.

You don’t need a consumer group to read messages.

Guys I think you misunderstood me. I have a user app. I send mails to new users. auth service produces a new message for every new user and send it to mail topic in kafka. then notification service consume messages from mail topic and then send mail to users. but when I cant send mail due to instant error in gmail, I do want this message to be redelivered by kafka. in nats straming server, when you dont acknowledge the message, nat streaming redeliver the message from topic. how can I achive that sth like this in kafka?

You likely use Kafka with Spring boot right? So you can use Spring Retry. This works for all-or-nothing kinds of errors. Which seems like your use case.

I am using with golang. no framework only client.

Then you need something similar. Just keep retrying with backoff. As long as you don’t poll again, or however it’s working with golang client, the new offset is not committed. So this would effectively get you at least once. If you need to be close to exactly once, you can limit the messages in one poll.

Another solution would be to use Kafka to put them all in a database, and use a seperate process to try to mail + delete them from the database.