How to consume messages from Kafka topic in the specified time range.
# Create a KafkaConsumer instance
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
group_id='my-group',
enable_auto_commit=True,
auto_offset_reset='earliest',
value_deserializer=lambda x: x.decode('utf-8'))
# Subscribe to the desired topic
consumer.subscribe(['my-topic'])
# Find the starting offset for the desired time range
start_timestamp = 1500000000000
end_timestamp = 1500000001000
# Find the nearest offsets before and after the start and end timestamps
start_offsets = consumer.offsets_for_times({tp: start_timestamp})
end_offsets = consumer.offsets_for_times({tp: end_timestamp})
# Find the starting and ending offsets for the time range
for tp, offset in start_offsets.items():
if offset is not None:
start_offset = offset
else:
start_offset = 0
for tp, offset in end_offsets.items():
if offset is not None:
end_offset = offset
else:
end_offset = 0
# Seek to the starting offset for the time range
consumer.seek(tp, start_offset)
# Consume messages until we reach the end of the time range
while True:
# Retrieve the next message
message = consumer.poll(timeout_ms=1000, max_records=1)
# If we've reached the end of the time range, break out of the loop
if message.offset >= end_offset:
break
# Process the message as desired
print(message.value)
# Close the consumer
consumer.close()```
Use the KafkaConsumer
’s seek()
method: This method allows you to specify a particular offset in the topic’s partition to begin consuming messages from. You can use this method to specify the starting offset of the time range you want to consume messages from. You can then use the poll()
method to retrieve messages until you reach the end of the desired time range.