Using AirByte to load data (CSV, JSON, Excel, Feather, Parquet) into StarRocks

AirByte has a lot of connectors. One of the connectors you can use is the source file connector which allows you to source CSV, JSON, Excel, Feather, Parquet. Then you can use the StarRocks AirByte destination connector to load that data into StarRocks. This tutorial solves the EL part of ELT. At the end of the tutorial you should do your own T “transform” to restructure the data to what you need it to be.

Prerequisites

For this tutorial you need to:

  • Have Docker Desktop or podman container runtime installed.
  • A StarRocks or CelerData database.
  • An Airbyte instance.
  • Configure AirByte.

Have Docker Desktop or podman container runtime installed

This is out of scope for the tutorial.

A StarRocks or CelerData database cluster

This is out of scope for the tutorial. We also used #24771 to expose a local StarRocks cluster to the internet.

An Airbyte instance.

This is out of scope for the tutorial but we used the AirByte Docker Compose environment that was shown in https://docs.airbyte.com/quickstart/deploy-airbyte/.

Configure AirByte

First you need to add a new connector. Navigate to settings → destinations and click on “add a new connector”.

Container is available at starrocks/destination-starrocks:latest and you can see all versions at Docker
Screenshot 2023-07-06 at 7 43 02 PM

Next, you’ll see “StarRocks” in the list of destinations connectors. Select it.
Screenshot 2023-06-01 at 9 08 16 PM

Fill out the information for your StarRocks/CelerData instance.

Screenshot of the connection info for StarRocks allIn1 Container image with serveo.net #24771
Note: HTTP port for StarRocks allin1 container is port 8040.
Screenshot 2023-06-08 at 10 58 55 AM

Screenshot of the connection info for CelerData Cloud
Note: HTTP port for cloud.celerdata.com is port 443
Screenshot 2023-06-08 at 11 00 02 AM

For this tutorial, we’ll load a parquet file from NYC Taxi

https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet

Note: There is an upper file size limit so parquet files like yellow_tripdata are too large and will crash the airbyte worker threads. See airbytehq/airbyte#27174 and airbytehq/airbyte#27169
Screenshot 2023-06-25 at 9 53 33 PM

Now we need to setup a connection. Pick the source we created.
Screenshot 2023-06-01 at 9 21 36 PM

Then pick the destination that we created.
Screenshot 2023-06-01 at 9 21 45 PM

Do a check before finalizing the connection.
Screenshot 2023-06-01 at 9 22 00 PM

Here is the log output after a successful sync.

2023-06-26 04:51:13 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksDestination(getConsumer):65 JsonNode config: 
{
  "fe_host" : "abc.serveo.net",
  "database" : "airbyte",
  "password":"**********",
  "username" : "root",
  "http_port" : 12345,
  "query_port" : 9030
}
2023-06-26 04:51:13 �[44msource�[0m > Reading nyc (https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet)...
2023-06-26 04:51:13 �[44msource�[0m > TransportParams: None
2023-06-26 04:51:16 �[43mdestination�[0m > INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):144 class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2023-06-26 04:51:16 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onStartFunction$0):73 Preparing tmp tables in destination started for 1 streams
2023-06-26 04:51:16 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onStartFunction$0):79 Preparing tmp table in destination started for stream nyc. tmp table name: _airbyte_tmp_swp_nyc
2023-06-26 04:51:17 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onStartFunction$0):86 Preparing tmp tables in destination completed.
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 5000 (1 MB)
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 10000 (2 MB)
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 15000 (4 MB)
2023-06-26 04:51:17 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 20000 (5 MB)
2023-06-26 04:51:18 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 25000 (7 MB)
2023-06-26 04:51:18 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 30000 (8 MB)
2023-06-26 04:51:18 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 35000 (10 MB)
2023-06-26 04:51:19 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 40000 (11 MB)
2023-06-26 04:51:19 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 45000 (13 MB)
2023-06-26 04:51:19 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 50000 (14 MB)
2023-06-26 04:51:20 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 55000 (16 MB)
2023-06-26 04:51:20 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 60000 (17 MB)
2023-06-26 04:51:20 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(processMessageFromSource):87 - Records read: 65000 (19 MB)
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(lambda$readFromSrcAndWriteToDstRunnable$5):361 - Source has no more messages, closing connection.
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.ReplicationWorkerHelper(endOfSource):62 - Total records read: 68211 (20 MB)
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.i.FieldSelector(reportMetrics):122 - Schema validation was performed to a max of 10 records with errors per stream.
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):111 - thread status... heartbeat thread: false , replication thread: true
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(replicate):248 - Waiting for source and destination threads to complete.
2023-06-26 04:51:21 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(replicate):253 - One of source or destination thread complete. Waiting on the other.
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 Airbyte message consumer: succeeded.
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.d.b.BufferedStreamConsumer(close):255 executing on success close procedure.
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):85 Flushing nyc: 68211 records (81 MB)
2023-06-26 04:51:21 �[43mdestination�[0m > INFO i.a.i.d.s.DefaultStreamLoader(send):109 Stream loading, label : airbyte__airbyte_tmp_swp_nyc_100bf56c-4397-4113-8397-5c792aedcf071687755081325, database : airbyte, table : _airbyte_tmp_swp_nyc, request : PUT http://abc.serveo.net:12345/api/airbyte/_airbyte_tmp_swp_nyc/_stream_load HTTP/1.1
2023-06-26 04:58:51 �[43mdestination�[0m > INFO i.a.i.d.s.h.StreamLoadEntity(writeTo):100 Entity write end, contentLength : -1, total : 24815616
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.DefaultStreamLoader(send):129 Stream load completed, label : airbyte__airbyte_tmp_swp_nyc_100bf56c-4397-4113-8397-5c792aedcf071687755081325, database : airbyte, table : _airbyte_tmp_swp_nyc, body : {
    "TxnId": 2,
    "Label": "airbyte__airbyte_tmp_swp_nyc_100bf56c-4397-4113-8397-5c792aedcf071687755081325",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 68211,
    "NumberLoadedRows": 68211,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 24815616,
    "LoadTimeMs": 509027,
    "BeginTxnTimeMs": 143,
    "StreamLoadPlanTimeMs": 122,
    "ReadDataTimeMs": 178,
    "WriteDataTimeMs": 508687,
    "CommitAndPublishTimeMs": 73
}
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):91 Flushing completed for nyc
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):116 Finalizing stream nyc. tmp table _airbyte_tmp_swp_nyc, final table nyc
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.DefaultStreamLoader(close):88 Finished stream load, database : airbyte, tmp table : _airbyte_tmp_swp_nyc
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):133 Finalizing tables in destination completed.
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):137 Cleaning tmp tables in destination started for 1 streams
2023-06-26 04:59:51 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):140 Clean tmp table in destination started for stream nyc.tmp table name: _airbyte_tmp_swp_nyc
2023-06-26 04:59:52 �[43mdestination�[0m > INFO i.a.i.d.s.StarRocksBufferedConsumerFactory(lambda$onCloseFunction$3):145 Cleaning tmp tables in destination completed.
2023-06-26 04:59:52 �[43mdestination�[0m > INFO i.a.i.b.IntegrationRunner(runInternal):195 Completed integration: io.airbyte.integrations.destination.starrocks.StarRocksDestination
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(replicate):255 - Source and destination threads complete.
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(getReplicationOutput):449 - sync summary: {
  "status" : "completed",
  "recordsSynced" : 68211,
  "bytesSynced" : 21267210,
  "startTime" : 1687755072694,
  "endTime" : 1687755592380,
  "totalStats" : {
    "bytesCommitted" : 21267210,
    "bytesEmitted" : 21267210,
    "destinationStateMessagesEmitted" : 0,
    "destinationWriteEndTime" : 1687755592377,
    "destinationWriteStartTime" : 1687755072770,
    "meanSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBetweenStateMessageEmittedandCommitted" : 0,
    "meanSecondsBetweenStateMessageEmittedandCommitted" : 0,
    "recordsEmitted" : 68211,
    "recordsCommitted" : 68211,
    "replicationEndTime" : 1687755592378,
    "replicationStartTime" : 1687755072694,
    "sourceReadEndTime" : 1687755081138,
    "sourceReadStartTime" : 1687755072727,
    "sourceStateMessagesEmitted" : 0
  },
  "streamStats" : [ {
    "streamName" : "nyc",
    "stats" : {
      "bytesCommitted" : 21267210,
      "bytesEmitted" : 21267210,
      "recordsEmitted" : 68211,
      "recordsCommitted" : 68211
    }
  } ]
}
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.g.DefaultReplicationWorker(getReplicationOutput):450 - failures: [ ]
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.t.TemporalAttemptExecution(get):163 - Stopping cancellation check scheduling...
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.i.LineGobbler(voidCall):149 - 
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.i.LineGobbler(voidCall):149 - ----- END REPLICATION -----
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.i.LineGobbler(voidCall):149 - 
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):159 - sync summary: io.airbyte.config.StandardSyncOutput@6404f47f[standardSyncSummary=io.airbyte.config.StandardSyncSummary@18da331[status=completed,recordsSynced=68211,bytesSynced=21267210,startTime=1687755072694,endTime=1687755592380,totalStats=io.airbyte.config.SyncStats@5f8a6bf4[bytesCommitted=21267210,bytesEmitted=21267210,destinationStateMessagesEmitted=0,destinationWriteEndTime=1687755592377,destinationWriteStartTime=1687755072770,estimatedBytes=<null>,estimatedRecords=<null>,meanSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBetweenStateMessageEmittedandCommitted=0,meanSecondsBetweenStateMessageEmittedandCommitted=0,recordsEmitted=68211,recordsCommitted=68211,replicationEndTime=1687755592378,replicationStartTime=1687755072694,sourceReadEndTime=1687755081138,sourceReadStartTime=1687755072727,sourceStateMessagesEmitted=0,additionalProperties={}],streamStats=[io.airbyte.config.StreamSyncStats@14288075[streamName=nyc,streamNamespace=<null>,stats=io.airbyte.config.SyncStats@57cc3983[bytesCommitted=21267210,bytesEmitted=21267210,destinationStateMessagesEmitted=<null>,destinationWriteEndTime=<null>,destinationWriteStartTime=<null>,estimatedBytes=<null>,estimatedRecords=<null>,meanSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBetweenStateMessageEmittedandCommitted=<null>,meanSecondsBetweenStateMessageEmittedandCommitted=<null>,recordsEmitted=68211,recordsCommitted=68211,replicationEndTime=<null>,replicationStartTime=<null>,sourceReadEndTime=<null>,sourceReadStartTime=<null>,sourceStateMessagesEmitted=<null>,additionalProperties={}],additionalProperties={}]],additionalProperties={}],normalizationSummary=<null>,webhookOperationSummary=<null>,state=<null>,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@3b349c60[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@37ed9539[stream=io.airbyte.protocol.models.AirbyteStream@67168099[name=nyc,jsonSchema={"$schema":"http://json-schema.org/draft-07/schema#","type":"object","properties":{"DOLocationID":{"type":["number","null"]},"RatecodeID":{"type":["number","null"]},"fare_amount":{"type":["number","null"]},"congestion_surcharge":{"type":["number","null"]},"tpep_dropoff_datetime":{"format":"date-time","type":["string","null"]},"VendorID":{"type":["number","null"]},"passenger_count":{"type":["number","null"]},"tolls_amount":{"type":["number","null"]},"improvement_surcharge":{"type":["number","null"]},"trip_distance":{"type":["number","null"]},"payment_type":{"type":["number","null"]},"store_and_fwd_flag":{"type":["string","null"]},"total_amount":{"type":["number","null"]},"extra":{"type":["number","null"]},"tip_amount":{"type":["number","null"]},"mta_tax":{"type":["number","null"]},"airport_fee":{"type":["number","null"]},"PULocationID":{"type":["number","null"]},"tpep_pickup_datetime":{"format":"date-time","type":["string","null"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}],failures=[],commitStateAsap=true,additionalProperties={}]
2023-06-26 04:59:52 �[32mINFO�[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):164 - Sync summary length: 3277
2023-06-26 04:59:52 �[32mINFO�[m i.a.c.t.TemporalUtils(withBackgroundHeartbeat):307 - Stopping temporal heartbeating...

And can query the table after all the data has been inserted.

StarRocks > show tables;
+-------------------+
| Tables_in_airbyte |
+-------------------+
| nyc               |
+-------------------+
1 row in set (0.00 sec)
StarRocks > select count(*) from nyc;
+----------+
| count(*) |
+----------+
|    68211 |
+----------+
1 row in set (0.02 sec)
StarRocks > show create table nyc;
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| nyc   | CREATE TABLE `nyc` (
  `_airbyte_ab_id` varchar(40) NULL COMMENT "",
  `_airbyte_emitted_at` bigint(20) NULL COMMENT "",
  `_airbyte_data` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`_airbyte_ab_id`, `_airbyte_emitted_at`)
DISTRIBUTED BY HASH(`_airbyte_ab_id`) BUCKETS 16
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
); |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.02 sec)

After the data has been inserted, I would then use “transform” from the current data schema to the target schema that you want. I would use the `insert into select’ SQL statement. Another idea is to generate (SQL alter) columns to the table. See StarRocks | StarRocks for an example.

Here is some SQL to transform the JSON into a separate table.

drop table taxi_green;
create table taxi_green (
     lpep_pickup_datetime DATETIME     
  , VendorID int                          
  , lpep_dropoff_datetime DATETIME   
  , passenger_count int                   
  , trip_distance float                   
  , PULocationID string         
  , DOLocationID string              
  , RatecodeID int                        
  , store_and_fwd_flag string      
  , payment_type int                       
  , fare_amount float                      
  , extra float                           
  , mta_tax float                          
  , improvement_surcharge float                         
  , tip_amount float                      
  , tolls_amount float                   
  , total_amount float                     
  , congestion_surcharge float
  , trip_type int

)
ENGINE=OLAP
DUPLICATE KEY(`lpep_pickup_datetime`)
DISTRIBUTED BY HASH(`lpep_pickup_datetime`) BUCKETS 9;

INSERT INTO taxi_green (
	lpep_pickup_datetime
	, VendorID
	, lpep_dropoff_datetime
	, passenger_count
	, trip_distance
	, PULocationID
	, DOLocationID
	, RatecodeID
	, store_and_fwd_flag
	, payment_type
	, fare_amount
	, extra
	, mta_tax
	, improvement_surcharge
	, tip_amount
	, tolls_amount
	, total_amount
	, congestion_surcharge
	, trip_type
)
SELECT
	STR_TO_DATE(json_query(_airbyte_data, "$.lpep_pickup_datetime"), '%Y-%m-%dT%H:%i:%s')
	, json_query(_airbyte_data, "$.VendorID")
	, STR_TO_DATE(json_query(_airbyte_data, "$.lpep_dropoff_datetime"), '%Y-%m-%dT%H:%i:%s')
	, json_query(_airbyte_data, "$.passenger_count")
	, json_query(_airbyte_data, "$.trip_distance")
	, json_query(_airbyte_data, "$.PULocationID")
	, json_query(_airbyte_data, "$.DOLocationID")
	, json_query(_airbyte_data, "$.RatecodeID")
	, json_query(_airbyte_data, "$.store_and_fwd_flag")
	, json_query(_airbyte_data, "$.payment_type")
	, json_query(_airbyte_data, "$.fare_amount")
	, json_query(_airbyte_data, "$.extra")
	, json_query(_airbyte_data, "$.mta_tax")
	, json_query(_airbyte_data, "$.improvement_surcharge")
	, json_query(_airbyte_data, "$.tip_amount")
	, json_query(_airbyte_data, "$.tolls_amount")
	, json_query(_airbyte_data, "$.total_amount")
	, json_query(_airbyte_data, "$.congestion_surcharge")
	, json_query(_airbyte_data, "$.trip_type")
from nyc;
1 Like