Learn how to use the StarRocks connector.
Background
StarRocks is a next-generation Massively Parallel Processing (MPP) data warehouse that delivers extremely fast performance across all scenarios and a unified analytics experience. StarRocks offers the following advantages:
StarRocks is compatible with the MySQL protocol, enabling you to use MySQL clients and common Business Intelligence (BI) tools to connect and analyze data.
StarRocks uses a distributed architecture:
It horizontally partitions data tables and stores them with multiple replicas.
The cluster can be scaled flexibly and can analyze up to 10 petabytes (PB) of data.
It uses an MPP framework to accelerate parallel computations.
It supports multiple replicas to provide fault tolerance.
The Flink connector caches data and uses Stream Load to write it in batches to sink tables. It reads from source tables by fetching data in batches. The following table lists the capabilities of the StarRocks connector.
Category | Description |
Supported types | source tables, dimension tables, sink tables, and data ingestion targets |
Execution mode | streaming mode and batch mode |
Data format | CSV |
Connector-specific metrics | None |
API types | DataStream, SQL, and YAML for data ingestion |
Support for updates/deletions in sink tables | Yes |
Prerequisites
You must have a StarRocks cluster deployed on EMR or a self-managed one on ECS.
Limitations
Only Ververica Runtime (VVR) 11.1 or later supports joins with dimension tables.
To avoid network access restrictions, add the following StarRocks cluster ports to a security group or firewall whitelist: 9030, 8030, 8040, 9060, 8060, 9020.
SQL
Features
StarRocks on E-MapReduce supports CREATE TABLE AS SELECT (CTAS) and CREATE DATABASE AS SELECT (CDAS) statements. CTAS synchronizes the schema and data of a single table, whereas CDAS synchronizes an entire database or multiple tables within the same database. For more information, see Use CTAS and CDAS statements in Realtime Compute for Apache Flink to synchronize data from a MySQL database to StarRocks.
Syntax
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);Parameters
Type | Parameter | Description | Type | Required | Default | Remarks |
General | connector | Specifies the connector to use. | String | Yes | — | The value must be |
jdbc-url | The Java Database Connectivity (JDBC) URL. | String | Yes | — | Specify the IP address and JDBC port of the FE in the format | |
database-name | The name of the StarRocks database. | String | Yes | — | — | |
table-name | The name of the StarRocks table. | String | Yes | — | — | |
username | The username to connect to StarRocks. | String | Yes | — | — | |
password | The password to connect to StarRocks. | String | Yes | — | — | |
starrocks.create.table.properties | Specifies properties for automatic table creation. | String | No | — | Specifies initial table properties, such as the engine and replica count. For example: 'starrocks.create.table.properties' = 'buckets 8' or 'starrocks.create.table.properties' = 'replication_num=1'. | |
Source-specific | scan-url | The data scanning URL. | String | No | — | Specifies the IP address and HTTP port of the FE. Format: Note To specify multiple IP addresses and ports, separate them with semicolons (;). |
scan.connect.timeout-ms | The timeout for The connector reports an error if a connection is not established within this timeout. | String | No | 1000 | Unit: milliseconds. | |
scan.params.keep-alive-min | The keep-alive duration for the query task. | String | No | 10 | — | |
scan.params.query-timeout-s | The timeout for a query task. If no result is returned within this period, the system stops the query task. | String | No | 600 | Unit: seconds. | |
scan.params.mem-limit-byte | The memory limit for a single query on a BE node. | String | No | 1073741824 (1 GB) | Unit: bytes. | |
scan.max-retries | The maximum number of retries for a failed query. The connector reports an error if this limit is exceeded. | String | No | 1 | — | |
Sink-specific | load-url | The data import URL. | String | Yes | — | Specify the IP addresses and HTTP ports of the FEs in the format Note To specify multiple IP addresses and ports, separate them with semicolons (;). |
sink.semantic | The delivery semantics for writes. | String | No | at-least-once | Valid values:
| |
sink.buffer-flush.max-bytes | The maximum amount of data to buffer before flushing. | String | No | 94371840 (90 MB) | Valid range: 64 MB to 10 GB. | |
sink.buffer-flush.max-rows | The maximum number of rows to buffer before flushing. | String | No | 500000 | Valid range: 64,000 to 5,000,000. | |
sink.buffer-flush.interval-ms | The buffer flush interval. | String | No | 300000 | Valid range: 1,000 ms to 3,600,000 ms. | |
sink.max-retries | The maximum number of retries for failed writes. | String | No | 3 | Valid range: 0 to 10. | |
sink.connect.timeout-ms | The timeout for connecting to StarRocks. | String | No | 1000 | Valid range: 100 to 60,000. Unit: milliseconds. | |
sink.properties.* | Additional Stream Load properties for the sink. | String | No | — | These parameters control the behavior of Stream Load. For example, | |
Dimension-specific | lookup.cache.enabled | Specifies whether to enable caching for the dimension table. | Boolean | No | true | Valid values:
Important
|
Data type mapping
StarRocks data type | Flink data type |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED Note Requires Realtime Compute for Apache Flink engine VVR 8.0.10 and later. | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(m) Note
| CHAR(n) |
VARCHAR(m) Note
| CHAR(n) |
VARCHAR | STRING |
VARBINARY Note Requires Realtime Compute for Apache Flink engine VVR 8.0.10 and later. | VARBINARY |
Code example
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;StarRocks allows a primary key column to be NULLABLE. However, Flink does not support a primary key that contains a nullable column. Flink's data consistency model requires that a primary key is unique and non-nullable. Otherwise, Flink throws the error Invalid primary key. Column 'xxx' is nullable. For more information, see "Invalid primary key. Column 'xxx' is nullable." error.
Data ingestion
Use the StarRocks Pipeline connector to write data records and schema changes from upstream data sources to an external StarRocks database. The StarRocks connector supports both the community edition and the fully managed EMR Serverless StarRocks from Alibaba Cloud.
Features
Automatic database and table creation.
If an upstream database or table does not exist in the downstream StarRocks instance, the connector creates it automatically. You can use the
table.create.properties.*parameter to configure options for automatic table creation.Schema change synchronization.
The StarRocks connector automatically applies CreateTableEvent, AddColumnEvent, and DropColumnEvent events to the downstream database.
VVR 11.1 and later supports compatible column type changes. For more information, see ALTER TABLE | StarRocks.
Usage notes
Each synchronized table must have a primary key. For tables without a primary key, you must specify one in the
transformblock to write data downstream. For example:transform: - source-table: ... primary-keys: id, ...For automatically created tables, the bucket key is the same as the primary key, and the table cannot have a partition key.
When synchronizing schema changes, new columns can only be appended to the end of existing columns. In the default Lenient schema change mode, insertions at other positions are automatically moved to the end.
If you use a StarRocks version earlier than 2.5.7, you must explicitly specify the number of buckets with the
table.create.num-bucketsparameter. StarRocks 2.5.7 and later can automatically determine an appropriate number of buckets.If you use StarRocks 3.2 or later, we recommend that you enable the
table.create.properties.fast_schema_evolutionoption to accelerate schema changes.Streaming issues may occur when you use CDC YAML for data ingestion into EMR Serverless StarRocks. You can use one of the following workarounds:
Use the Flink SQL StarRocks connector and set the
sink.version=V1parameter.Enable the FE parameter
emr_internal_redirect.Use a StarRocks Private Zone domain name instead of an SLB.
Syntax
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
sink.buffer-flush.interval-ms: 5000 # Set the data flush interval.Configuration
Parameter | Description | Type | Required | Default | Remarks |
| Specifies the sink connector type. | String | Yes | — | Set to |
| The display name of the sink. | String | No | — | — |
| The JDBC URL for the database connection. | String | Yes | — | Supports multiple addresses separated by commas ( |
| The HTTP URL of an FE node for Stream Load. | String | Yes | — | Supports multiple addresses separated by semicolons ( |
| The username for the StarRocks connection. | String | Yes | — | This user must have at least SELECT and INSERT permissions on the target table. You can grant the required permissions with the StarRocks GRANT command. |
| The password for the StarRocks connection. | String | Yes | — | — |
| The delivery semantics for data writes. | String | No | at-least-once | Valid values:
|
| The label prefix for Stream Load jobs. | String | No | — | — |
| The timeout for establishing an HTTP connection. | Integer | No | 30000 | Unit: milliseconds. The value must be between 100 and 60000. |
| The timeout for waiting for a 100 Continue response from the server. | Integer | No | 30000 | Unit: milliseconds. The value must be between 3000 and 600000. |
| The maximum size of the in-memory cache, in bytes, before a flush is triggered. | Long | No | 157286400 | Unit: bytes. The value must be between 64 MB and 10 GB. Note
|
| The maximum number of rows in the in-memory cache before a flush is triggered. | Long | No | 500000 | The value must be between 64,000 and 5,000,000. |
| The time interval between flushes for each table's buffer. | Long | No | 300000 | Unit: milliseconds. Note For jobs that synchronize small amounts of data, reduce this value to avoid long delays before data is persisted. |
| The maximum number of retries. | Long | No | 3 | The value must be between 0 and 1000. |
| The frequency at which the connector checks whether to flush the buffer. | Long | No | 50 | Unit: milliseconds. |
| The number of threads used for Stream Load. | Integer | No | 2 | — |
| Specifies whether to use the Stream Load transaction interface for data ingestion. | Boolean | No | true | This option takes effect only if the database supports it. |
| Additional properties for the sink. | String | No | — | For supported properties, see STREAM LOAD. |
| The number of buckets for automatically created tables. | Integer | No | — |
|
| Additional properties for automatic table creation. | String | No | — | For example, you can pass |
| The timeout for schema change operations. | Duration | No | 30 min | Must be an integer number of seconds. Note If a schema change operation exceeds this limit, the job fails. |
| The number of bytes to allocate for each Unicode character. | Integer | No | 3 | In CDC, the length of a VARCHAR type is measured in characters, whereas in StarRocks, the length of a VARCHAR type is measured in bytes. In most cases, a Unicode character does not exceed 3 bytes after UTF-8 encoding. However, some rare characters and emoji symbols may occupy 4 or more bytes. |
Reuse a built-in catalog
VVR 11.5 and later lets you reference a built-in StarRocks catalog created on the Data Management page directly in a Flink CDC data ingestion job. This simplifies configuration by reducing the number of properties you need to set manually.
sink:
type: starrocks
using.built-in-catalog: starrocks_catalogData ingestion jobs can automatically reuse the following StarRocks catalog options:
jdbc-url
http-url
username
password
table.num-buckets
To override these values, you can explicitly set the corresponding YAML options, which take precedence.
Type mapping
StarRocks does not support all CDC YAML types. Writing an unsupported type to the sink causes the job to fail. You can use the CAST built-in function in a transform to convert unsupported data, or use a projection statement to remove it from the result table. For more information, see Develop a Flink CDC data ingestion job.
CDC type | StarRocks type | Remarks |
TINYINT | TINYINT | — |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
DECIMAL(p, s) | DECIMAL(p, s) | Because StarRocks does not support DECIMAL for a primary key, the connector automatically converts an upstream DECIMAL primary key column to VARCHAR in the synchronized StarRocks schema. |
CHAR(n) (n <= 85) | CHAR(n × 3) | CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3 to account for multi-byte UTF-8 characters. Note The maximum length of the StarRocks CHAR type is 255. Therefore, only CDC CHAR types with a length up to 85 are mapped to the StarRocks CHAR type. Note You can set the |
CHAR(n) (n > 85) | VARCHAR(n × 3) | CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3 to account for multi-byte UTF-8 characters. Note CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3. Since the result exceeds the 255-byte limit for the StarRocks CHAR type, it is mapped to VARCHAR. Note You can set the |
VARCHAR(n) | VARCHAR(n × 3) | CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3 to account for multi-byte UTF-8 characters. |
BINARY(n) | BINARY(n+2) | Two bytes of padding are added to ensure data integrity. |
VARBINARY(n) | VARBINARY(n+1) | One byte of padding is added to ensure data integrity. |
Schema change
As a data ingestion sink, StarRocks supports the following schema change events:
CREATE TABLE EVENT
NoteIf the downstream StarRocks table already exists, the connector does not attempt to create it again. Ensure that the downstream table schema is compatible with the upstream schema.
ADD COLUMN EVENT
NoteStarRocks requires primary key columns to appear first in a table. Any new columns must be added after them.
ALTER COLUMN TYPE EVENT
NoteFor supported schema change paths, refer to the official StarRocks documentation.
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT