Improve the speed of consuming records with Kafka connector

Hello everyone,

I have a Kafka connector for StarRocks that is consuming from topics created by a Kafka Oracle producer.

In this connector, I am consuming data from several tables (around 80). Below is the configuration.

Currently, I am consuming an average of 997,577 records per minute. In my system, I have tables with more than 90M records.

I would like to see if I can improve these times by configuring the connector or by improving the Kafka configuration.



{
    "name":"starrocks-connector",
    "config":{
        "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
        "topics":"server1.TABLE 1....",
        "starrocks.topic2table.map":"server1.TABLE:table",
	    "starrocks.http.url":"starrocks.sagaciresearch.com:8030",
        "starrocks.username":"XXXXXX",
        "starrocks.password":"XXXXXX",
        "starrocks.database.name":"XXXXX",
        "sink.properties.format":"json",
        "tasks.max": "5",
        "key.converter":"io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url":"http://apicurio:8080/apis/registry/v2",
        "key.converter.apicurio.registry.auto-register":"true",
        "key.converter.apicurio.registry.find-latest":"true",
        "value.converter":"io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url":"http://apicurio:8080/apis/registry/v2",
        "value.converter.apicurio.registry.auto-register":"true",
        "value.converter.apicurio.registry.find-latest":"true",
        "sink.properties.strip_outer_array":"true",
        "connect.timeoutms":"10000",
        "transforms":"addfield,unwrap,timestamp, timestamp1, timestamp2, timestamp3, timestamp4, timestamp5, timestamp6, timestamp7, timestamp8, timestamp9, timestamp10, timestamp11",
        "transforms.addfield.type":"com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"true",
        "transforms.unwrap.delete.handling.mode":"rewrite",
        "transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp.field": "TIMESTAMP",
        "transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp.target.type": "string",
        "transforms.timestamp1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp1.field": "UPDATED",
        "transforms.timestamp1.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp1.target.type": "string",
        "transforms.timestamp2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp2.field": "CREATED",
        "transforms.timestamp2.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp2.target.type": "string",
        "transforms.timestamp3.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp3.field": "START_DATE",
        "transforms.timestamp3.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp3.target.type": "string",
        "transforms.timestamp4.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp4.field": "END_DATE",
        "transforms.timestamp4.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp4.target.type": "string",
        "transforms.timestamp5.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp5.field": "COMPLETED",
        "transforms.timestamp5.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp5.target.type": "string",
        "transforms.timestamp6.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp6.field": "MODIFIED",
        "transforms.timestamp6.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp6.target.type": "string",
        "transforms.timestamp7.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp7.field": "BIRTHDATE",
        "transforms.timestamp7.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp7.target.type": "string",
        "transforms.timestamp8.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp8.field": "STATUS_CHANGED ",
        "transforms.timestamp8.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp8.target.type": "string",
        "transforms.timestamp9.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp9.field": "STORE_RATING",
        "transforms.timestamp9.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp9.target.type": "string",
        "transforms.timestamp10.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp10.field": "TIMESTAMP_CHECKIN",
        "transforms.timestamp10.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp10.target.type": "string",
        "transforms.timestamp11.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.timestamp11.field": "TIMESTAMP_SHELF",
        "transforms.timestamp11.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.timestamp11.target.type": "string"
    }
}

Thanks in advance.
Xavi.

 997,577 records per minute.

This performance works for one table, or for 80 tables.

  1. first search the keywords “Cache full” in the connect log to see whether the bottleneck is sink
  2. if not sink, get a cpu profile to see where the bottleneck is