AirByte has a lot of connectors. One of the connectors you can use is the source file connector which allows you to source CSV, JSON, Excel, Feather, Parquet. Then you can use the StarRocks AirByte destination connector to load that data into StarRocks. This tutorial solves the EL part of ELT. At the end of the tutorial you should do your own T “transform” to restructure the data to what you need it to be.
Prerequisites
For this tutorial you need to:
- Have Docker Desktop or podman container runtime installed.
- A StarRocks or CelerData database.
- An Airbyte instance.
- Configure AirByte.
Have Docker Desktop or podman container runtime installed
This is out of scope for the tutorial.
A StarRocks or CelerData database cluster
This is out of scope for the tutorial. We also used #24771 to expose a local StarRocks cluster to the internet.
An Airbyte instance.
This is out of scope for the tutorial but we used the AirByte Docker Compose environment that was shown in https://docs.airbyte.com/quickstart/deploy-airbyte/.
Configure AirByte
First you need to add a new connector. Navigate to settings → destinations and click on “add a new connector”.
Container is available at starrocks/destination-starrocks:latest
and you can see all versions at Docker
Next, you’ll see “StarRocks” in the list of destinations connectors. Select it.
Fill out the information for your StarRocks/CelerData instance.
Screenshot of the connection info for StarRocks allIn1 Container image with serveo.net #24771
Note: HTTP port for StarRocks allin1 container is port 8040.
Screenshot of the connection info for CelerData Cloud
Note: HTTP port for cloud.celerdata.com is port 443
For this tutorial, we’ll load a parquet file from NYC Taxi
https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet
Note: There is an upper file size limit so parquet files like yellow_tripdata are too large and will crash the airbyte worker threads. See airbytehq/airbyte#27174 and airbytehq/airbyte#27169
Now we need to setup a connection. Pick the source we created.
Then pick the destination that we created.
Do a check before finalizing the connection.
Here is the log output after a successful sync.
2023-06-26 04:51:13 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksDestination(getConsumer):65 JsonNode config:
{
"fe_host" : "abc.serveo.net",
"database" : "airbyte",
"password":"**********",
"username" : "root",
"http_port" : 12345,
"query_port" : 9030
}
2023-06-26 04:51:13 �[44msource�[0m > Reading nyc (https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet)...
2023-06-26 04:51:13 �[44msource�[0m > TransportParams: None
2023-06-26 04:51:16 �[43mdestination�[0m > INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):144 class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2023-06-26 04:51:16 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onStartFunction$0):73 Preparing tmp tables in destination started for 1 streams
2023-06-26 04:51:16 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onStartFunction$0):79 Preparing tmp table in destination started for stream nyc. tmp table name: _airbyte_tmp_swp_nyc
2023-06-26 04:51:17 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onStartFunction$0):86 Preparing tmp tables in destination completed.
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 5000 (1 MB)
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 10000 (2 MB)
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 15000 (4 MB)
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 20000 (5 MB)
2023-06-26 04:51:18 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 25000 (7 MB)
2023-06-26 04:51:18 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 30000 (8 MB)
2023-06-26 04:51:18 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 35000 (10 MB)
2023-06-26 04:51:19 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 40000 (11 MB)
2023-06-26 04:51:19 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 45000 (13 MB)
2023-06-26 04:51:19 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 50000 (14 MB)
2023-06-26 04:51:20 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 55000 (16 MB)
2023-06-26 04:51:20 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 60000 (17 MB)
2023-06-26 04:51:20 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 65000 (19 MB)
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(lambda$readFromSrcAndWriteToDstRunnable$5):361 - Source has no more messages, closing connection.
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(endOfSource):62 - Total records read: 68211 (20 MB)
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.i.FieldSelector(reportMetrics):122 - Schema validation was performed to a max of 10 records with errors per stream.
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):111 - thread status... heartbeat thread: false , replication thread: true
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(replicate):248 - Waiting for source and destination threads to complete.
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(replicate):253 - One of source or destination thread complete. Waiting on the other.
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 Airbyte message consumer: succeeded.
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.d.b.BufferedStreamConsumer(close):255 executing on success close procedure.
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):85 Flushing nyc: 68211 records (81 MB)
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.d.s.DefaultStreamLoader(send):109 Stream loading, label : airbyte__airbyte_tmp_swp_nyc_100bf56c-4397-4113-8397-5c792aedcf071687755081325, database : airbyte, table : _airbyte_tmp_swp_nyc, request : PUT http://abc.serveo.net:12345/api/airbyte/_airbyte_tmp_swp_nyc/_stream_load HTTP/1.1
2023-06-26 04:58:51 �[43mdestination�[0m > INFO i.a.i.d.s.h.StreamLoadEntity(writeTo):100 Entity write end, contentLength : -1, total : 24815616
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.DefaultStreamLoader(send):129 Stream load completed, label : airbyte__airbyte_tmp_swp_nyc_100bf56c-4397-4113-8397-5c792aedcf071687755081325, database : airbyte, table : _airbyte_tmp_swp_nyc, body : {
"TxnId": 2,
"Label": "airbyte__airbyte_tmp_swp_nyc_100bf56c-4397-4113-8397-5c792aedcf071687755081325",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 68211,
"NumberLoadedRows": 68211,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 24815616,
"LoadTimeMs": 509027,
"BeginTxnTimeMs": 143,
"StreamLoadPlanTimeMs": 122,
"ReadDataTimeMs": 178,
"WriteDataTimeMs": 508687,
"CommitAndPublishTimeMs": 73
}
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):91 Flushing completed for nyc
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):116 Finalizing stream nyc. tmp table _airbyte_tmp_swp_nyc, final table nyc
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.DefaultStreamLoader(close):88 Finished stream load, database : airbyte, tmp table : _airbyte_tmp_swp_nyc
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):133 Finalizing tables in destination completed.
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):137 Cleaning tmp tables in destination started for 1 streams
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):140 Clean tmp table in destination started for stream nyc.tmp table name: _airbyte_tmp_swp_nyc
2023-06-26 04:59:52 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):145 Cleaning tmp tables in destination completed.
2023-06-26 04:59:52 �[43mdestination�[0m > INFO i.a.i.b.IntegrationRunner(runInternal):195 Completed integration: io.airbyte.integrations.destination.starrocks.StarRocksDestination
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(replicate):255 - Source and destination threads complete.
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(getReplicationOutput):449 - sync summary: {
"status" : "completed",
"recordsSynced" : 68211,
"bytesSynced" : 21267210,
"startTime" : 1687755072694,
"endTime" : 1687755592380,
"totalStats" : {
"bytesCommitted" : 21267210,
"bytesEmitted" : 21267210,
"destinationStateMessagesEmitted" : 0,
"destinationWriteEndTime" : 1687755592377,
"destinationWriteStartTime" : 1687755072770,
"meanSecondsBeforeSourceStateMessageEmitted" : 0,
"maxSecondsBeforeSourceStateMessageEmitted" : 0,
"maxSecondsBetweenStateMessageEmittedandCommitted" : 0,
"meanSecondsBetweenStateMessageEmittedandCommitted" : 0,
"recordsEmitted" : 68211,
"recordsCommitted" : 68211,
"replicationEndTime" : 1687755592378,
"replicationStartTime" : 1687755072694,
"sourceReadEndTime" : 1687755081138,
"sourceReadStartTime" : 1687755072727,
"sourceStateMessagesEmitted" : 0
},
"streamStats" : [ {
"streamName" : "nyc",
"stats" : {
"bytesCommitted" : 21267210,
"bytesEmitted" : 21267210,
"recordsEmitted" : 68211,
"recordsCommitted" : 68211
}
} ]
}
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(getReplicationOutput):450 - failures: [ ]
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.t.TemporalAttemptExecution(get):163 - Stopping cancellation check scheduling...
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.i.LineGobbler(voidCall):149 -
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.i.LineGobbler(voidCall):149 - ----- END REPLICATION -----
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.i.LineGobbler(voidCall):149 -
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):159 - sync summary: io.airbyte.config.StandardSyncOutput@6404f47f[standardSyncSummary=io.airbyte.config.StandardSyncSummary@18da331[status=completed,recordsSynced=68211,bytesSynced=21267210,startTime=1687755072694,endTime=1687755592380,totalStats=io.airbyte.config.SyncStats@5f8a6bf4[bytesCommitted=21267210,bytesEmitted=21267210,destinationStateMessagesEmitted=0,destinationWriteEndTime=1687755592377,destinationWriteStartTime=1687755072770,estimatedBytes=<null>,estimatedRecords=<null>,meanSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBetweenStateMessageEmittedandCommitted=0,meanSecondsBetweenStateMessageEmittedandCommitted=0,recordsEmitted=68211,recordsCommitted=68211,replicationEndTime=1687755592378,replicationStartTime=1687755072694,sourceReadEndTime=1687755081138,sourceReadStartTime=1687755072727,sourceStateMessagesEmitted=0,additionalProperties={}],streamStats=[io.airbyte.config.StreamSyncStats@14288075[streamName=nyc,streamNamespace=<null>,stats=io.airbyte.config.SyncStats@57cc3983[bytesCommitted=21267210,bytesEmitted=21267210,destinationStateMessagesEmitted=<null>,destinationWriteEndTime=<null>,destinationWriteStartTime=<null>,estimatedBytes=<null>,estimatedRecords=<null>,meanSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBetweenStateMessageEmittedandCommitted=<null>,meanSecondsBetweenStateMessageEmittedandCommitted=<null>,recordsEmitted=68211,recordsCommitted=68211,replicationEndTime=<null>,replicationStartTime=<null>,sourceReadEndTime=<null>,sourceReadStartTime=<null>,sourceStateMessagesEmitted=<null>,additionalProperties={}],additionalProperties={}]],additionalProperties={}],normalizationSummary=<null>,webhookOperationSummary=<null>,state=<null>,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@3b349c60[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@37ed9539[stream=io.airbyte.protocol.models.AirbyteStream@67168099[name=nyc,jsonSchema={"$schema":"http://json-schema.org/draft-07/schema#","type":"object","properties":{"DOLocationID":{"type":["number","null"]},"RatecodeID":{"type":["number","null"]},"fare_amount":{"type":["number","null"]},"congestion_surcharge":{"type":["number","null"]},"tpep_dropoff_datetime":{"format":"date-time","type":["string","null"]},"VendorID":{"type":["number","null"]},"passenger_count":{"type":["number","null"]},"tolls_amount":{"type":["number","null"]},"improvement_surcharge":{"type":["number","null"]},"trip_distance":{"type":["number","null"]},"payment_type":{"type":["number","null"]},"store_and_fwd_flag":{"type":["string","null"]},"total_amount":{"type":["number","null"]},"extra":{"type":["number","null"]},"tip_amount":{"type":["number","null"]},"mta_tax":{"type":["number","null"]},"airport_fee":{"type":["number","null"]},"PULocationID":{"type":["number","null"]},"tpep_pickup_datetime":{"format":"date-time","type":["string","null"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}],failures=[],commitStateAsap=true,additionalProperties={}]
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):164 - Sync summary length: 3277
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.t.TemporalUtils(withBackgroundHeartbeat):307 - Stopping temporal heartbeating...
And can query the table after all the data has been inserted.
StarRocks > show tables;
+-------------------+
| Tables_in_airbyte |
+-------------------+
| nyc |
+-------------------+
1 row in set (0.00 sec)
StarRocks > select count(*) from nyc;
+----------+
| count(*) |
+----------+
| 68211 |
+----------+
1 row in set (0.02 sec)
StarRocks > show create table nyc;
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| nyc | CREATE TABLE `nyc` (
`_airbyte_ab_id` varchar(40) NULL COMMENT "",
`_airbyte_emitted_at` bigint(20) NULL COMMENT "",
`_airbyte_data` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`_airbyte_ab_id`, `_airbyte_emitted_at`)
DISTRIBUTED BY HASH(`_airbyte_ab_id`) BUCKETS 16
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
); |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.02 sec)
After the data has been inserted, I would then use “transform” from the current data schema to the target schema that you want. I would use the `insert into select’ SQL statement. Another idea is to generate (SQL alter) columns to the table. See StarRocks | StarRocks for an example.