Hi Everyone… I am new to the Community… I am challenged by Kafka and was looking out for somehelp. I am pretty sure many of you might have faced this issue… would be great if someone could help me out …(you can also point me to a previous post )
This is my issue :
I have created a simple setup with Kafka and Zookeeper using docker compose. The services are working fine. Then I created a python program to act as a producer and send json messages to Kafka topic.
code snippet from producer.py
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3)
if __name__ == "__main__":
i = 0
while 1 == 1:
data = get_parking_data()
print(data)
producer.send(topic='test', value=data).get()
producer.flush()
time.sleep(3)```
snippet from docker compose:
``` zookeeper:
image: confluentinc/cp-zookeeper:7.1.1
container_name: zookeeper
hostname: zookeeper
restart: always
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:7.1.1
container_name: kafka
hostname: kafka
restart: always
ports:
- "9092:9092"
- "9093:9093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: [INTERNAL://kafka:9092](INTERNAL://kafka:9092),[EXTERNAL://kafka:9093](EXTERNAL://kafka:9093)
KAFKA_ADVERTISED_LISTENERS: [INTERNAL://kafka:9092](INTERNAL://kafka:9092) ,EXTERNAL://:9093 # external clients can connect to 9093 port
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_HOSTNAME: "kafka"
KAFKA_JMX_PORT: 9999
KAFKA_JMX_OPTS: "-Djava.rmi.server.hostname=kafka
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.rmi.port=9999
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"```
This is the error what I get:
```kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time```