Troubleshooting Kafka consumer disconnecting from group and not consuming data

Hey everyone,
I am having a problem with kafka consumer, it connects but then not sure why it leaves the group and does not consume data.

Logs available at: https://chat.opennms.com/files/up7zxuwydfddfjwka3aex7ae4w/public?h=8IPuv0sgMunP7LRayLNjQqBtalgUSoWbSRQ2xyeB8Ms|https://chat.opennms.com/files/up7zxuwydfddfjwka3aex7ae4w/public?h=8IPuv0sgMunP7LRayLNjQqBtalgUSoWbSRQ2xyeB8Ms

Any help would be really much appreciated.

Thanks

Hi - Consumer Group Details: The logs don’t show details about the consumer group like its current state or other members.

Here are some suggestions for further troubleshooting:
Check topic existence: Verify if the topic “OpenNMS.London.rpc-request” exists in your Kafka cluster.
Review consumer configuration: Double-check your consumer configuration, including groupId, topic names, and any authorization settings.
Enable additional consumer logs: Increase the consumer logging level to get more detailed information about its actions and potential errors.
Monitor consumer group: Tools like Kafka Manager can help you monitor the consumer group and its members, including their states.

Hi
First of all, thanks for you feedback.

  1. Topic exists. User has read/write access to the topic and to the London consumer group.
  2. Consumer configuration:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 1000
    auto.include.jmx.reporter = true
    auto.offset.reset = latest
    bootstrap.servers = [stream-scram.runtime.staging.xxx.net:9094]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-London-2
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = London
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 50
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = SCRAM-SHA-512
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = SASL_SSL
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer```


3. Client is already in debug mode, do not find any more logs. Only the ones from kafka cluster but they also do not how much more info.

In the meatime got some feedback from the teams which owns the kafka service:
`13:53:33.731 INFO [rpc-server-kafka-consumer-0] [Consumer clientId=consumer-London-2, groupId=London] Node 2 sent an invalid full fetch response with extraIds=(0C6gtEtFTXiBkzQVOxrwXw), response=()`

The consumer
consumer-London-2
left the group due to receiving an invalid full fetch response from Node 2. This action was likely part of its error handling and recovery strategy to ensure data consistency and reset its state. The presence of unexpected topic IDs and an empty response suggests potential issues with the broker or communication, prompting the consumer to leave the group and rejoin to establish a fresh session.
It also worth to check if consumer version is compatible with broker version. Some APIs like
Envelope
(58) and
ConsumerGroupHeartbeat
(68) are unsupported by this broker that can lead to the invalid fetch response

Does this make sense to you?

Hi - That makes sense, you are in the right direction of trouble shooting.

Do you know how exactly can I check if consumer version is compatible with broker version?

The Kafka client library you’re using might have its own compatibility rules and recommendations. Check the library’s documentation for specific guidelines.