Is there a way to skip messages when using pollAndProduce
of the parallel consumer?
This is what I did:
@SneakyThrows
public void pollAndProduce(
Function<ConsumerRecord<K, V>, ProducerRecord<K, V>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
pollAndProduceMany(
rec -> {
ProducerRecord<K, V> result = userFunction.apply(rec);
return result == null ? UniLists.of() : UniLists.of(result);
},
callback);
}```
Yes, just return from your function without doing anything and return an empty list if using the produce methods.
Keep an eye on this: https://github.com/confluentinc/parallel-consumer/issues/242
Let user function throw retry exception, instead of raw errors which pollute log #242
I’ll add a skip exception too:
throw new SkipRecordsException("reason")
so that it’s better in the logs
Also - i lurk mostly in the room FYI
Should be no need to extend the PC
Had you tried returning an empty list?
Empty list would work but it doesn’t conform to the function signature (expecting ProducerRecord
but returning list):
Function<ConsumerRecord<K, V>, ProducerRecord<K, V>> userFunction
That’s why I had to override the pollAndProduce
. It seems to be working fine so far! Are there any gotchas of this approach that I have to be aware of? I can definitely swtich to the new version once #242 is addressed
Use poll and produce many, and return an empty list