Timed out while committing offsets via consumer

Hi… for EOS i added these configurations in my kstream application… i am running my application on 2 kafka brokers … when both my brokers are going down the application goes down getting this exception
Timed out while committing offsets via consumer
and when i restart it, few messages are getting reprocessed… any idea how to fix this

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ADTProducerExceptionHandler.class);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE_BETA);
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
streamsConfiguration.setProperty(ProducerConfig.ACKS_CONFIG, "all");
streamsConfiguration.setProperty(ProducerConfig.RETRIES_CONFIG,Integer.MAX_VALUE+"");```

EOS requires a cluster of at least three brokers by default. For development you can change this, by adjusting broker setting transaction.state.log.replication.factor and transaction.state.log.min.isr.
See https://kafka.apache.org/documentation/#streamsconfigs_processing.guarantee

Thanks for the pointers… i have 2 broker setup for local testing only and below are values for the settings that you pointed out… but i am wondering can the kstream application not wait for some duration for kafka cluster to get back… because if that’s not the case achieving EOS would be difficult
transaction.state.log.replication.factor =1 and transaction.state.log.min.isr= 1

> when both my brokers are going down the application goes down getting this exception
> Timed out while committing offsets via consumer
This log line sounds weird. If you have EOS enabled, offsets should be committed via the producer, not the consumer… (or the log line is incorrect…) What version of Kafka Streams are you using?

Apologies… my bad i get above error when EOS configurations are not set
I am using confluent version 6.1.0 and with EOS configurations as below it’s logging warn message saying brokers are not available

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE_BETA);
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
streamsConfiguration.setProperty(ProducerConfig.ACKS_CONFIG, "all");
streamsConfiguration.setProperty(ProducerConfig.RETRIES_CONFIG,Integer.MAX_VALUE+"");```
Log messages:
`{"@version":1,"source_host":"abcd","message":"[Consumer clientId=com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-StreamThread-1-consumer, groupId=com.group.dev] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.","thread_name":"com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-StreamThread-1","@timestamp":"2021-06-08T14:31:24.725+05:30","level":"WARN","logger_name":"org.apache.kafka.clients.NetworkClient"}`
`{"@version":1,"source_host":"abcd","message":"[AdminClient clientId=com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-admin] Connection to node 2 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.","thread_name":"kafka-admin-client-thread | com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-admin","@timestamp":"2021-06-08T14:31:26.211+05:30","level":"WARN","logger_name":"org.apache.kafka.clients.NetworkClient"}`

When i start the brokers i see these log messages
{"@version":1,"source_host":"abcd","message":"[Consumer clientId=com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-StreamThread-1-consumer, groupId=com.group.dev] Error while fetching metadata with correlation id 1978 : {inbound=UNKNOWN_TOPIC_OR_PARTITION}","thread_name":"com.dev-9e2aef17-930b-452e-9056-52b9916a12a0-StreamThread-1","@timestamp":"2021-06-08T14:31:49.785+05:30","level":"WARN","logger_name":"org.apache.kafka.clients.NetworkClient"}
{"@version":1,"source_host":"abcd","message":"[Consumer clientId=com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-StreamThread-1-consumer, groupId=com.group.dev] Received unknown topic or partition error in fetch for partition inbound-0","thread_name":"com.group.dev-9e2aef17-930b-452e-9056-52b9916a12a0-StreamThread-1","@timestamp":"2021-06-08T14:31:49.844+05:30","level":"WARN","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher"}

All messages are processed only once :slightly_smiling_face:
thank you!

Maybe you want to upgrade Kafka Streams to 2.8.0 – it ships with improved timeout handling: https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams

Otherwise, you may increase the corresponding consumer timeout, to let it retry committing offsets for a longer period of time.