INdexing issues with mysql to elasticsearch with debezium connector

Hi All, I have a mysql to elasticsearch data indexing setup with a mysql debezium connector with 4 consumer threads, a java application to transform the messages read by the debezium connector and a ES sink connector with 5 topics haveing the following configuration “tasks.max”: “4”,
flush.timeout.ms”: “200000”,
read.timeout.ms”: “20000”,
linger.ms”: “20”,
“batch.size”: “1000”

I have set the Kafka producer batch size to 100000 bytes ( Just trying out different sizes to see the difference in performance)
and linger.ms to 20

For Kafka Consumer , I have set the MAX_POLL_RECORDS_CONFIG to 100 and MAX_POLL_INTERVAL_MS_CONFIG to maxValue

After the pipeline runs for a few days the indexing speed slows down . I am not completely sure what is the reason as the logs seem fine for kafka and kafka connect. Any help/suggestions are appreciated . Thanks In advance

Are you doing updates (merges)?

Elastic cannot handle the same document within a batch, so if it gets an error from doing so, I believe (at least that was a case a year ago) it would split the 1000 into individual messages

Furthermore, there was an issue with a given release that was very non-performant when it came to duplicate documents in the same batch — what version are you using?

Thanks for the reply , I am using confluent version 7 containers, Elasticsearch 7.5.1, Debezium 1.6 Initially we do the first time indexing where all records from mysql read using the connector, processed in java application and then sent over to elasticsearch indices through the es sink connector . and the Pipeline is running always as we term it as Live pipeline which listens to mysql db changes . So after the first time indexing is done , later it picks up whatever updates are made to the db and indexes that … But we are observing sometimes the updates done in the db are not reflected in the Elasticsearch quickly… sometimes taking 5/10minutes or more

So do u mean if there is 1 record updated … then its a issue with the batch size config

Are you using kafka connect and the elasticsearch sink connector? — that is what I am asking about

Yes as I mentioned Kafka connect Version 7.0.0 and debezium MysqlConnector(1.6.0) and Elasticsearch Sink Connector11.0.0

https://github.com/confluentinc/kafka-connect-elasticsearch/pull/490

In trying to track things down from this issue (similar to the issue I reported issues against in 10.x 11 months ago) may not be in 11.0.0, I would see if 11.0.1 works better as it appears 11.0.1 would have the fix

| * | | | | | | | | | | | | | | | | | | | | | 4b38e2b - (tag: v11.0.1) [maven-release-plugin] prepare release v11.0.1 (10 months ago) <Lev Zemlyanov>
* | | | | | | | | | | | | | | | | | | | | | | abaee31 - Merge branch '11.0.x' (10 months ago) <Lev Zemlyanov>
|\| | | | | | | | | | | | | | | | | | | | | |
| * | | | | | | | | | | | | | | | | | | | | | 7a2f9ed - CCMSG-796: fix poor performance for lots of update requests (#490) (10 months ago) <Lev Zemlyanov>
* | | | | | | | | | | | | | | | | | | | | | | 9381272 - Merge branch '11.0.x' (10 months ago) <Confluent Jenkins Bot>
|\| | | | | | | | | | | | | | | | | | | | | |
| * | | | | | | | | | | | | | | | | | | | | | 730531c - CCMSG-449: fix jackson databind CVE (#492) (10 months ago) <Lev Zemlyanov>
* | | | | | | | | | | | | | | | | | | | | | | 22ae635 - Merge branch '11.0.x' (10 months ago) <Confluent Jenkins Bot>```

Git diff v11.0.0 v11.0.1 – ./src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java

-      // make sure that only unique records are being sent in a batch
-      // every request that is flushed and succeeds triggers a callback that removes it from the map
-      while (docIdToRecord.containsKey(request.id())) {
-        flush();
-        clock.sleep(TimeUnit.SECONDS.toMillis(1));
-      }
-    }
-```

That code above was fixed on 10.x at some point (as that is when I reported it 12/2020 — but I guess 11.0.x was already in development and didn’t make it into 11.0.0, but was fixed in 11.0.1 — at least by my checks this morning.

Hi, Do u mean if I use this config

  "[linger.ms](http://linger.ms)": 1000,
  "batch.size": 2000,
  "max.buffered.records": 20000,
  "max.in.flight.requests": 5,```
It should show some difference in the 11.0.1 version

Also I came across a lot of metrics which Kafka connect exposes via JMX, but I could not find any documentation on how can I use JMX with the confluent docker containers(kafka,kafka connect , zk , etc). They have given a list of Mbeans and metrics list on the documentation page of Connect but nothing else. Never worked with JMX. Can you share any resource on setting up JMX if you know any. Thanks

I last used the elastic connector about 1 year ago, and they were switching it over to use the elastic client library, I believe. so I do not know the nuances of those settings with the current version of the connector, but from my experience when I had lost of duplicate updates to the same document happening in the same batch, that fix was a lifesaver for me. So I think it is worth a try to see if 11.0.1 indeed can help.

I have done tons with JMX and monitoring, but not specifically around connector.

here is my project where I try to showcase JMX metrics with the core cluster and kafka streams

https://github.com/nbuesing/kafka-streams-dashboards

It shows how I expose those metrics with jmx prometheus exporter along with docker

Here is an excellent resource by confluent

https://github.com/confluentinc/jmx-monitoring-stacks/

Along with a great blog about it https://www.confluent.io/blog/monitor-kafka-clusters-with-prometheus-grafana-and-confluent/

Ohh So we need an additional Container like Promethues to see metric in JMX

You need some JMX client, yet. inspection tools can be used to see a value — but not really a great help to monitor

JMX Exporter, Promtheus, and Grafana really do the job well

I Actually need a way to just get the time/timestamp of the last message processed by the debezium mysql source connector and the ES sink connector