Hello everyone,
I have a Kafka connector for StarRocks that is consuming from topics created by a Kafka Oracle producer.
In this connector, I am consuming data from several tables (around 80). Below is the configuration.
Currently, I am consuming an average of 997,577 records per minute. In my system, I have tables with more than 90M records.
I would like to see if I can improve these times by configuring the connector or by improving the Kafka configuration.
{
"name":"starrocks-connector",
"config":{
"connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics":"server1.TABLE 1....",
"starrocks.topic2table.map":"server1.TABLE:table",
"starrocks.http.url":"starrocks.sagaciresearch.com:8030",
"starrocks.username":"XXXXXX",
"starrocks.password":"XXXXXX",
"starrocks.database.name":"XXXXX",
"sink.properties.format":"json",
"tasks.max": "5",
"key.converter":"io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url":"http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register":"true",
"key.converter.apicurio.registry.find-latest":"true",
"value.converter":"io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register":"true",
"value.converter.apicurio.registry.find-latest":"true",
"sink.properties.strip_outer_array":"true",
"connect.timeoutms":"10000",
"transforms":"addfield,unwrap,timestamp, timestamp1, timestamp2, timestamp3, timestamp4, timestamp5, timestamp6, timestamp7, timestamp8, timestamp9, timestamp10, timestamp11",
"transforms.addfield.type":"com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.field": "TIMESTAMP",
"transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp.target.type": "string",
"transforms.timestamp1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp1.field": "UPDATED",
"transforms.timestamp1.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp1.target.type": "string",
"transforms.timestamp2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp2.field": "CREATED",
"transforms.timestamp2.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp2.target.type": "string",
"transforms.timestamp3.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp3.field": "START_DATE",
"transforms.timestamp3.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp3.target.type": "string",
"transforms.timestamp4.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp4.field": "END_DATE",
"transforms.timestamp4.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp4.target.type": "string",
"transforms.timestamp5.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp5.field": "COMPLETED",
"transforms.timestamp5.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp5.target.type": "string",
"transforms.timestamp6.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp6.field": "MODIFIED",
"transforms.timestamp6.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp6.target.type": "string",
"transforms.timestamp7.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp7.field": "BIRTHDATE",
"transforms.timestamp7.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp7.target.type": "string",
"transforms.timestamp8.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp8.field": "STATUS_CHANGED ",
"transforms.timestamp8.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp8.target.type": "string",
"transforms.timestamp9.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp9.field": "STORE_RATING",
"transforms.timestamp9.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp9.target.type": "string",
"transforms.timestamp10.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp10.field": "TIMESTAMP_CHECKIN",
"transforms.timestamp10.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp10.target.type": "string",
"transforms.timestamp11.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp11.field": "TIMESTAMP_SHELF",
"transforms.timestamp11.format": "yyyy-MM-dd HH:mm:ss",
"transforms.timestamp11.target.type": "string"
}
}
Thanks in advance.
Xavi.