Hey everyone,
I am having a problem with kafka consumer, it connects but then not sure why it leaves the group and does not consume data.
Any help would be really much appreciated.
Thanks
Hey everyone,
I am having a problem with kafka consumer, it connects but then not sure why it leaves the group and does not consume data.
Any help would be really much appreciated.
Thanks
Hi - Consumer Group Details: The logs don’t show details about the consumer group like its current state or other members.
Here are some suggestions for further troubleshooting:
• Check topic existence: Verify if the topic “OpenNMS.London.rpc-request” exists in your Kafka cluster.
• Review consumer configuration: Double-check your consumer configuration, including groupId, topic names, and any authorization settings.
• Enable additional consumer logs: Increase the consumer logging level to get more detailed information about its actions and potential errors.
• Monitor consumer group: Tools like Kafka Manager can help you monitor the consumer group and its members, including their states.
Hi
First of all, thanks for you feedback.
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [stream-scram.runtime.staging.xxx.net:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-London-2
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = London
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = SCRAM-SHA-512
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 30000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer```
3. Client is already in debug mode, do not find any more logs. Only the ones from kafka cluster but they also do not how much more info.
In the meatime got some feedback from the teams which owns the kafka service:
`13:53:33.731 INFO [rpc-server-kafka-consumer-0] [Consumer clientId=consumer-London-2, groupId=London] Node 2 sent an invalid full fetch response with extraIds=(0C6gtEtFTXiBkzQVOxrwXw), response=()`
The consumer
consumer-London-2
left the group due to receiving an invalid full fetch response from Node 2. This action was likely part of its error handling and recovery strategy to ensure data consistency and reset its state. The presence of unexpected topic IDs and an empty response suggests potential issues with the broker or communication, prompting the consumer to leave the group and rejoin to establish a fresh session.
It also worth to check if consumer version is compatible with broker version. Some APIs like
Envelope
(58) and
ConsumerGroupHeartbeat
(68) are unsupported by this broker that can lead to the invalid fetch response
Does this make sense to you?
Hi - That makes sense, you are in the right direction of trouble shooting.
Do you know how exactly can I check if consumer version is compatible with broker version?
The Kafka client library you’re using might have its own compatibility rules and recommendations. Check the library’s documentation for specific guidelines.