600s transaction timeout during stream load of Flink snapshot

I am setting up a Flink CDC Pipeline to replicate a database from MySQL into StarRocks. During the stream load of the initial snapshot, a transaction always times out 600 seconds after it first appears in the log (see BE log below). This then causes the whole Flink snapshot to restart from the beginning…

Unfortunately, I don’t fully understand what causes this. I have already set stream_load_default_timeout_second from 600s to 3600s, but the transaction still times out at 600s. Is there any other parameter I need to change? Or any hint in what direction I should look to further debug this?

(in desperation, I also tried increasing the below variables since they had default values of 600s, w/o success: streaming_load_rpc_max_alive_time_sec, check_consistency_default_timeout_second and max_create_table_timeout_second)

Flink log:

Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: target_db, table: table, label: flink-4115d407-6d62-439e-bb09-3571d0c08d8b, 
responseBody: {
    "Status": "TXN_NOT_EXISTS",
    "Message": "Transaction with label flink-4115d407-6d62-439e-bb09-3571d0c08d8b not exists"
}

StarRocks BE log (be.INFO):

I20250808 21:13:32.997993 139988574824000 transaction_mgr.cpp:190] new transaction manage request. id=2041d26b956b0005-d415cc0ae0daa9af, job_id=-1, txn_id: 11198, label=flink-4115d407-6d62-439e-bb09-3571d0c08d8b, db=target_db, tbl=table op=begin

I20250808 21:13:33.002236 139988574824000 transaction_stream_load.cpp:282] new transaction load request.id=2041d26b956b0005-d415cc0ae0daa9af, job_id=-1, txn_id: 11198, label=flink-4115d407-6d62-439e-bb09-3571d0c08d8b, db=target_db, tbl=table

...

I20250808 21:23:38.491448 139991314196032 transaction_mgr.cpp:367] Rollback transaction id=2041d26b956b0005-d415cc0ae0daa9af, job_id=-1, txn_id: 11198, label=flink-4115d407-6d62-439e-bb09-3571d0c08d8b, db=target_db

I20250808 21:23:38.493424 139991314196032 transaction_mgr.cpp:408] Abort transaction id=2041d26b956b0005-d415cc0ae0daa9af, job_id=-1, txn_id: 11198, label=flink-4115d407-6d62-439e-bb09-3571d0c08d8b, db=target_db, reason: transaction is aborted by timeout 600 seconds., abort status: OK

...

W20250808 21:23:38.498808 139999267395136 stream_load_executor.cpp:118] fragment execute failed, query_id=2041d26b956b0005-d415cc0ae0daa9af, err_msg=Cancelled because of runtime state is cancelled, id=2041d26b956b0005-d415cc0ae0daa9af, job_id=-1, txn_id: 11198, label=flink-4115d407-6d62-439e-bb09-3571d0c08d8b, db=target_db

I20250808 21:23:38.499256 139989176276544 local_tablets_channel.cpp:798] cancel LocalTabletsChannel txn_id: 11198 load_id: 2041d26b956b0005-d415cc0ae0daa9af index_id: 21571 #tablet:16 tablet_ids:21603,21601,21599,21573,21575,21577,21579,21581,21583,21585,21587,21589,21591,21593,21595,21597

For context:

I am using StarRocks version 3.5.1, Flink CDC version 3.4.0 and Flink version 1.20.1. The FE and on BE run in a Podman container each, on the same server.

The table has ~600m rows and the table schema is:

  • Int(11)
  • DateTime
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(28,6)
  • Decimal(19,10)
  • Bigint(20)
  • Int(11)

be.conf:

...

# Set default replication number to 1
default_replication_num = 1

# Compaction
compact_threads = 24
compact_thread_pool_queue_size = 500
max_cumulative_compaction_num_singleton_deltas=1000

# Avoid "too many table version" error
cumulative_compaction_num_threads_per_disk = 24
base_compaction_num_threads_per_disk = 24
update_compaction_num_threads_per_disk = 24
update_compaction_per_tablet_min_interval_seconds = 60

# Increase number of tables
tablet_max_versions = 10000

# Increase batch size
streaming_load_max_batch_size_mb = 512

# Trying to fix timeout:
streaming_load_rpc_max_alive_time_sec = 3600

fe.conf:

...

# Increase maximum number of concurrent transactions (default 1000)
max_running_txn_num_per_db = 1500

# Prevent transactions from getting aborted after 600 seconds
stream_load_default_timeout_second = 3600

# Trying to fix timeout:
check_consistency_default_timeout_second = 3600
max_create_table_timeout_second = 3600

In case anybody else runs into the same issue, I finally figured this one out. Didn’t take long at all…

Somehow, the FE parameter stream_load_default_timeout_second has no effect when using a Flink CDC pipeline - it’s either a bug or it gets overwritten somewhere. The answer is to set the stream load timeout parameter directly in the YAML pipeline file. Specifically, add this to the sink specification:

sink:
   sink.properties.timeout: 1200