Real time change data capture (CDC) using Apache Kafka and Aiven's JDBC Sink Connector for Apache Kafka® to insert data into StarRocks

One of the most common use cases for our users is the ability to perform real-time CDC from a source database into StarRocks. One of the ways is to use Apache Kafka. Once the data in into a Kafka topic, you can use a Kafka Connector can sink it into your target database. For this tutorial we will use the Aiven’s open source JDBC Sink Connector for Apache Kafka (GitHub - Aiven-Open/jdbc-connector-for-apache-kafka: Aiven's JDBC Sink and Source Connectors for Apache Kafka®) to sink the data into StarRocks.

Note

Sept 2023 update: StarRocks released a StarRocks Kafka Connector. See StarRocks | StarRocks for details.

Follow the steps at the Apache Kafka quickstart to setup the environment at Apache Kafka. Start zookeeper and the kafka server. Stop when you get to the Kafka Connect step and then follow the below steps.

Create file config/connect-jdbc-starrocks-sink.properties

name=example-jdbc-sink

# These are defaults, but they're here for clarity:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

connector.class=io.aiven.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:mysql://6dihxzedg.cloud-app.celerdata.com:9030/demo?user=adminuser&password=adminuser

topics=message
table.name.format=visits

# This is default, but it's here for clarity:
insert.mode=insert

Edit the file config/connect-standalone.properties

[root@ip-172-31-29-226 kafka_2.13-3.4.0]# cat config/connect-standalone.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

Compile or download the JDBC Connector jar (Release v6.8.0 · Aiven-Open/jdbc-connector-for-apache-kafka · GitHub) and the mysql driver jar (https://downloads.mysql.com/archives/c-j/). Here is the directory listing of plugins

[root@ip-172-31-29-226 kafka_2.13-3.4.0]# ls /opt/connectors/
jdbc-connector-for-apache-kafka-6.9.0-SNAPSHOT.jar  mysql-connector-java-5.1.49.jar
mysql-connector-java-5.1.49-bin.jar

Running Kafka Connect

CLASSPATH=/opt/connectors/mysql-connector-java-5.1.49.jar bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc-starrocks-sink.properties

This is the JSON we will be using. It’s the same as in the JSON example in the “Loading JSON and AVRO data from Confluent Cloud Kafka into StarRocks” tutorial at #22791.

{
  "ip": "122.152.45.245",
  "userid": 9,
  "remote_user": "-",
  "time": "5631",
  "_time": 5631,
  "request": "GET /site/user_status.html HTTP/1.1",
  "status": "407",
  "bytes": "278",
  "referrer": "-",
  "agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
}

Database and table create command for StarRocks

create database demo;
use demo;
DROP TABLE `visits`;
CREATE TABLE `visits` (
`userid` int NULL COMMENT "",
`time` varchar(20) NULL COMMENT "",
`_time` DATETIME REPLACE NULL COMMENT "", 
`remote_user` varchar(50) REPLACE NULL COMMENT "",
`ip` varchar(50) REPLACE NULL COMMENT "", 
`request` varchar(50) REPLACE NULL COMMENT "", 
`status` varchar(50) REPLACE NULL COMMENT "", 
`bytes` int SUM NULL COMMENT "", 
`referrer` varchar(50) REPLACE NULL COMMENT "", 
`agent` varchar(200) REPLACE NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`userid`, `time` )
DISTRIBUTED BY HASH(`userid`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

Here’s how the JSON looks expanded. When creating the schema, cross check the object types at Schema.Type (kafka 0.10.0.1 API). There is also good documentation at jdbc-connector-for-apache-kafka/docs/sink-connector.md at master · Aiven-Open/jdbc-connector-for-apache-kafka · GitHub

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": true,
                "field": "ip"
            },
            {
                "type": "int16",
                "optional": false,
                "field": "userid"
            },
            {
                "type": "string",
                "optional": false,
                "field": "time"
            },
            {
                "type": "int64",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Timestamp",
                "version": 1,
                "field": "_time"
            },
            {
                "type": "string",
                "optional": true,
                "field": "request"
            },
            {
                "type": "string",
                "optional": true,
                "field": "status"
            },
            {
                "type": "int16",
                "optional": true,
                "field": "bytes"
            },
            {
                "type": "string",
                "optional": true,
                "field": "referrer"
            },
            {
                "type": "string",
                "optional": true,
                "field": "agent"
            }
        ],
        "optional": false,
        "name": "somerecord"
    },
    "payload": {
        "ip": "122.152.45.245",
        "userid": 9,
        "time": "5631",
        "_time": 5631,
        "request": "GET /site/user_status.html HTTP/1.1",
        "status": "407",
        "bytes": "278",
        "referrer": "-",
        "agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
    }
}

Submitting the JSON into the Kafka producer.
Note: You have to submit the JSON as a single line or else you’ll get a parse error from Kafka (see Aiven-Open/jdbc-connector-for-apache-kafka#240).

[root@ip-172-31-29-226 kafka_2.13-3.4.0]# bin/kafka-console-producer.sh --topic message --bootstrap-server localhost:9092
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "ip" }, { "type": "int16", "optional": false, "field": "userid" }, { "type": "string", "optional": false, "field": "time" }, { "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "_time" }, { "type": "string", "optional": true, "field": "request" }, { "type": "string", "optional": true, "field": "status" }, { "type": "int16", "optional": true, "field": "bytes" }, { "type": "string", "optional": true, "field": "referrer" }, { "type": "string", "optional": true, "field": "agent" } ], "optional": false, "name": "somerecord" }, "payload": { "ip": "122.152.45.245", "userid": 9, "time": "5631", "_time": 5631, "request": "GET /site/user_status.html HTTP/1.1", "status": "407", "bytes": "278", "referrer": "-", "agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" } }

The kafka console consumer should show this.

[root@ip-172-31-29-226 kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic m2 --from-beginning
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "ip" }, { "type": "int16", "optional": false, "field": "userid" }, { "type": "string", "optional": false, "field": "time" }, { "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "_time" }, { "type": "string", "optional": true, "field": "request" }, { "type": "string", "optional": true, "field": "status" }, { "type": "int16", "optional": true, "field": "bytes" }, { "type": "string", "optional": true, "field": "referrer" }, { "type": "string", "optional": true, "field": "agent" } ], "optional": false, "name": "somerecord" }, "payload": { "ip": "122.152.45.245", "userid": 9, "time": "5631", "_time": 5631, "request": "GET /site/user_status.html HTTP/1.1", "status": "407", "bytes": "278", "referrer": "-", "agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" } }

You should see this in the Kafka Connect console output.

[2023-06-15 04:38:07,747] INFO [example-jdbc-sink|task-0] Attempting to open connection #1 to MySql (io.aiven.connect.jdbc.util.CachedConnectionProvider:90)
[2023-06-15 04:38:07,998] INFO [example-jdbc-sink|task-0] JdbcDbWriter Connected (io.aiven.connect.jdbc.sink.JdbcDbWriter:60)
[2023-06-15 04:38:08,030] INFO [example-jdbc-sink|task-0] Checking MySql dialect for existence of table "visits" (io.aiven.connect.jdbc.dialect.MySqlDatabaseDialect:504)
[2023-06-15 04:38:08,033] INFO [example-jdbc-sink|task-0] Using MySql dialect table "visits" present (io.aiven.connect.jdbc.dialect.MySqlDatabaseDialect:512)
[2023-06-15 04:38:08,059] INFO [example-jdbc-sink|task-0] Setting metadata for table "visits" to Table{name='"visits"', columns=[Column{'status', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'time', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'ip', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'userid', isPrimaryKey=false, allowsNull=true, sqlType=INT}, Column{'remote_user', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'bytes', isPrimaryKey=false, allowsNull=true, sqlType=INT}, Column{'request', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'agent', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'_time', isPrimaryKey=false, allowsNull=true, sqlType=DATETIME}, Column{'referrer', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}]} (io.aiven.connect.jdbc.util.TableDefinitions:66)
[2023-06-15 04:38:08,061] INFO [example-jdbc-sink|task-0] Closing BufferedRecords with preparedStatement: null (io.aiven.connect.jdbc.sink.BufferedRecords:235)
[2023-06-15 04:38:08,233] INFO [example-jdbc-sink|task-0] Closing BufferedRecords with preparedStatement: com.mysql.jdbc.JDBC42PreparedStatement@3c86e: INSERT INTO `visits`(`ip`,`userid`,`time`,`_time`,`request`,`status`,`bytes`,`referrer`,`agent`) VALUES('122.152.45.245',9,'5631','1970-01-01 00:00:05','GET /site/user_status.html HTTP/1.1','407',0,'-','Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)') (io.aiven.connect.jdbc.sink.BufferedRecords:235)

Now check if the data has been inserted.

StarRocks > use demo;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
StarRocks > select * from visits;
+--------+------+---------------------+-------------+----------------+-------------------------------------+--------+-------+----------+--------------------------------------------------------------------------+
| userid | time | _time               | remote_user | ip             | request                             | status | bytes | referrer | agent                                                                    |
+--------+------+---------------------+-------------+----------------+-------------------------------------+--------+-------+----------+--------------------------------------------------------------------------+
|      9 | 5631 | 1970-01-01 00:00:05 | NULL        | 122.152.45.245 | GET /site/user_status.html HTTP/1.1 | 407    |     0 | -        | Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html) |
+--------+------+---------------------+-------------+----------------+-------------------------------------+--------+-------+----------+--------------------------------------------------------------------------+
1 row in set (0.06 sec)

StarRocks now has their own kafka connector that uses stream load under the covers for better performance. StarRocks | StarRocks

1 Like