KafkaTimeoutError - Batch for TopicPartition expired

Hi Everyone… I am new to the Community… I am challenged by Kafka and was looking out for somehelp. I am pretty sure many of you might have faced this issue… would be great if someone could help me out …(you can also point me to a previous post :slightly_smiling_face: )
This is my issue :
I have created a simple setup with Kafka and Zookeeper using docker compose. The services are working fine. Then I created a python program to act as a producer and send json messages to Kafka topic.
code snippet from producer.py

                         value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                         acks='all',
                         retries=3)


if __name__ == "__main__":
    i = 0
    while 1 == 1:
        data = get_parking_data()
        print(data)
        producer.send(topic='test', value=data).get()
        producer.flush()       
        time.sleep(3)```
snippet from docker compose:
```  zookeeper:
    image: confluentinc/cp-zookeeper:7.1.1
    container_name: zookeeper
    hostname: zookeeper
    restart: always
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:7.1.1
    container_name: kafka
    hostname: kafka
    restart: always
    ports:
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: [INTERNAL://kafka:9092](INTERNAL://kafka:9092),[EXTERNAL://kafka:9093](EXTERNAL://kafka:9093)
      KAFKA_ADVERTISED_LISTENERS: [INTERNAL://kafka:9092](INTERNAL://kafka:9092) ,EXTERNAL://:9093    # external clients can connect to 9093 port
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_HOSTNAME: "kafka"
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_OPTS: "-Djava.rmi.server.hostname=kafka
            -Dcom.sun.management.jmxremote.local.only=false
            -Dcom.sun.management.jmxremote.rmi.port=9999
            -Dcom.sun.management.jmxremote.port=9999
            -Dcom.sun.management.jmxremote.authenticate=false
            -Dcom.sun.management.jmxremote.ssl=false"```
This is the error what I get:
```kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time```
      KAFKA_ADVERTISED_LISTENERS: [INTERNAL://kafka:9092](INTERNAL://kafka:9092) ,EXTERNAL://:9093    # external clients can connect to 9093 port```
this will not work to allow for a program running on your host machine to communicate with a broker running within docker

One of your advertiser listeners needs to be “localhost” as it needs to tell the client how to get back to that specific broker, your host machine cannot communicate with the broker with the broker’s hostname, here is my example

https://github.com/nbuesing/kafka-local/blob/main/kafka/docker-compose.yml#L135

This is a great article from Robin Moffat https://www.confluent.io/blog/kafka-listeners-explained/

that explains it and explains it rather well.

What I do is I set up external for broker-1 on 19092, on broker-2 29092 etc and then use those from my mac laptop to talk to the cluster

localhost:19092 will make it to broker-1, etc.

since clients have to talk directly to each broker kafka tells the client how to “get back to me” you have to break down the 4th wall (not sure I can find the right metaphore) and let kafka broker know how outside clients will be talking to it. You cannot do port mapping, load balancing, etc of ports w/out the broker ultimately knowing how to tell the client how to get back to itself.

Thanks for some direction… I am going to try the steps and will get back to you… again appreciate your time to reply to my query.

Thanks a ton… its working! I am able to see my messages in the my topic. I made changes to my docker compose file looking at the one you shared. My Docker compose looks like this now( may be it will help someone in future )

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: [PLAINTEXT://kafka:9092](PLAINTEXT://kafka:9092),PLAINTEXT_[HOST://localhost:29092](HOST://localhost:29092)```