StarRocks

更新时间:
复制 MD 格式

Learn how to use the StarRocks connector.

Background

StarRocks is a next-generation Massively Parallel Processing (MPP) data warehouse that delivers extremely fast performance across all scenarios and a unified analytics experience. StarRocks offers the following advantages:

  • StarRocks is compatible with the MySQL protocol, enabling you to use MySQL clients and common Business Intelligence (BI) tools to connect and analyze data.

  • StarRocks uses a distributed architecture:

    • It horizontally partitions data tables and stores them with multiple replicas.

    • The cluster can be scaled flexibly and can analyze up to 10 petabytes (PB) of data.

    • It uses an MPP framework to accelerate parallel computations.

    • It supports multiple replicas to provide fault tolerance.

The Flink connector caches data and uses Stream Load to write it in batches to sink tables. It reads from source tables by fetching data in batches. The following table lists the capabilities of the StarRocks connector.

Category

Description

Supported types

source tables, dimension tables, sink tables, and data ingestion targets

Execution mode

streaming mode and batch mode

Data format

CSV

Connector-specific metrics

None

API types

DataStream, SQL, and YAML for data ingestion

Support for updates/deletions in sink tables

Yes

Prerequisites

You must have a StarRocks cluster deployed on EMR or a self-managed one on ECS.

Limitations

  • Only Ververica Runtime (VVR) 11.1 or later supports joins with dimension tables.

  • To avoid network access restrictions, add the following StarRocks cluster ports to a security group or firewall whitelist: 9030, 8030, 8040, 9060, 8060, 9020.

SQL

Features

StarRocks on E-MapReduce supports CREATE TABLE AS SELECT (CTAS) and CREATE DATABASE AS SELECT (CDAS) statements. CTAS synchronizes the schema and data of a single table, whereas CDAS synchronizes an entire database or multiple tables within the same database. For more information, see Use CTAS and CDAS statements in Realtime Compute for Apache Flink to synchronize data from a MySQL database to StarRocks.

Syntax

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

Parameters

Type

Parameter

Description

Type

Required

Default

Remarks

General

connector

Specifies the connector to use.

String

Yes

The value must be starrocks.

jdbc-url

The Java Database Connectivity (JDBC) URL.

String

Yes

Specify the IP address and JDBC port of the FE in the format jdbc:mysql://ip:port.

database-name

The name of the StarRocks database.

String

Yes

table-name

The name of the StarRocks table.

String

Yes

username

The username to connect to StarRocks.

String

Yes

password

The password to connect to StarRocks.

String

Yes

starrocks.create.table.properties

Specifies properties for automatic table creation.

String

No

Specifies initial table properties, such as the engine and replica count. For example: 'starrocks.create.table.properties' = 'buckets 8' or 'starrocks.create.table.properties' = 'replication_num=1'.

Source-specific

scan-url

The data scanning URL.

String

No

Specifies the IP address and HTTP port of the FE. Format: fe_ip:http_port;fe_ip:http_port.

Note

To specify multiple IP addresses and ports, separate them with semicolons (;).

scan.connect.timeout-ms

The timeout for flink-connector-starrocks to connect to StarRocks.

The connector reports an error if a connection is not established within this timeout.

String

No

1000

Unit: milliseconds.

scan.params.keep-alive-min

The keep-alive duration for the query task.

String

No

10

scan.params.query-timeout-s

The timeout for a query task.

If no result is returned within this period, the system stops the query task.

String

No

600

Unit: seconds.

scan.params.mem-limit-byte

The memory limit for a single query on a BE node.

String

No

1073741824 (1 GB)

Unit: bytes.

scan.max-retries

The maximum number of retries for a failed query.

The connector reports an error if this limit is exceeded.

String

No

1

Sink-specific

load-url

The data import URL.

String

Yes

Specify the IP addresses and HTTP ports of the FEs in the format fe_ip:http_port;fe_ip:http_port.

Note

To specify multiple IP addresses and ports, separate them with semicolons (;).

sink.semantic

The delivery semantics for writes.

String

No

at-least-once

Valid values:

  • at-least-once (default): Guarantees that data is delivered at least once.

  • exactly-once: Guarantees that data is delivered exactly once.

sink.buffer-flush.max-bytes

The maximum amount of data to buffer before flushing.

String

No

94371840 (90 MB)

Valid range: 64 MB to 10 GB.

sink.buffer-flush.max-rows

The maximum number of rows to buffer before flushing.

String

No

500000

Valid range: 64,000 to 5,000,000.

sink.buffer-flush.interval-ms

The buffer flush interval.

String

No

300000

Valid range: 1,000 ms to 3,600,000 ms.

sink.max-retries

The maximum number of retries for failed writes.

String

No

3

Valid range: 0 to 10.

sink.connect.timeout-ms

The timeout for connecting to StarRocks.

String

No

1000

Valid range: 100 to 60,000. Unit: milliseconds.

sink.properties.*

Additional Stream Load properties for the sink.

String

No

These parameters control the behavior of Stream Load. For example, sink.properties.format specifies the format of the imported data, such as CSV. For more parameters, see Stream Load.

Dimension-specific

lookup.cache.enabled

Specifies whether to enable caching for the dimension table.

Boolean

No

true

Valid values:

  • true: Enables caching. After the table data is read for the first time, the system caches it in memory. Subsequent requests use the cached data within its validity period to reduce I/O overhead.

  • false: Disables caching. Each query directly accesses the data source.

Important
  • This feature requires Realtime Compute for Apache Flink engine VVR 11.1 and later.

  • We recommend disabling this feature in the following scenarios:

    • The dimension table data is frequently updated, and real-time data is required.

    • The table contains a large amount of data, posing a risk of memory overflow.

Data type mapping

StarRocks data type

Flink data type

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

BIGINT UNSIGNED

Note

Requires Realtime Compute for Apache Flink engine VVR 8.0.10 and later.

DECIMAL(20,0)

LARGEINT

DECIMAL(20,0)

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR(m)

Note
  • VVR 8.0.10 automatically extends the CHAR length threefold (m=n*3, where n<=85) to accommodate encoding differences between MySQL and StarRocks.

  • VVR 8.0.11 and later automatically extend the CHAR length fourfold (m=n*4, where n<=63) to accommodate encoding differences between MySQL and StarRocks.

  • The maximum length for the StarRocks CHAR type is 255. Therefore, Flink maps a CHAR type to a StarRocks CHAR type only if its automatically extended length does not exceed 255.

CHAR(n)

VARCHAR(m)

Note
  • VVR 8.0.10 automatically extends the VARCHAR length threefold (m=n*3, where n>85) to accommodate encoding differences between MySQL and StarRocks.

  • VVR 8.0.11 and later automatically extend the VARCHAR length fourfold (m=n*4, where n>63) to accommodate encoding differences between MySQL and StarRocks.

  • The maximum length for the StarRocks CHAR type is 255. Therefore, if the automatically extended length of a Flink CHAR type exceeds 255, Flink maps the type to the StarRocks VARCHAR type.

CHAR(n)

VARCHAR

STRING

VARBINARY

Note

Requires Realtime Compute for Apache Flink engine VVR 8.0.10 and later.

VARBINARY

Code example

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
Note

StarRocks allows a primary key column to be NULLABLE. However, Flink does not support a primary key that contains a nullable column. Flink's data consistency model requires that a primary key is unique and non-nullable. Otherwise, Flink throws the error Invalid primary key. Column 'xxx' is nullable. For more information, see "Invalid primary key. Column 'xxx' is nullable." error.

Data ingestion

Use the StarRocks Pipeline connector to write data records and schema changes from upstream data sources to an external StarRocks database. The StarRocks connector supports both the community edition and the fully managed EMR Serverless StarRocks from Alibaba Cloud.

Features

  • Automatic database and table creation.

    If an upstream database or table does not exist in the downstream StarRocks instance, the connector creates it automatically. You can use the table.create.properties.* parameter to configure options for automatic table creation.

  • Schema change synchronization.

    The StarRocks connector automatically applies CreateTableEvent, AddColumnEvent, and DropColumnEvent events to the downstream database.

  • VVR 11.1 and later supports compatible column type changes. For more information, see ALTER TABLE | StarRocks.

Usage notes

  • Each synchronized table must have a primary key. For tables without a primary key, you must specify one in the transform block to write data downstream. For example:

    transform:
      - source-table: ...
        primary-keys: id, ...
  • For automatically created tables, the bucket key is the same as the primary key, and the table cannot have a partition key.

  • When synchronizing schema changes, new columns can only be appended to the end of existing columns. In the default Lenient schema change mode, insertions at other positions are automatically moved to the end.

  • If you use a StarRocks version earlier than 2.5.7, you must explicitly specify the number of buckets with the table.create.num-buckets parameter. StarRocks 2.5.7 and later can automatically determine an appropriate number of buckets.

  • If you use StarRocks 3.2 or later, we recommend that you enable the table.create.properties.fast_schema_evolution option to accelerate schema changes.

  • Streaming issues may occur when you use CDC YAML for data ingestion into EMR Serverless StarRocks. You can use one of the following workarounds:

    • Use the Flink SQL StarRocks connector and set the sink.version=V1 parameter.

    • Enable the FE parameter emr_internal_redirect.

    • Use a StarRocks Private Zone domain name instead of an SLB.

Syntax

source:
  ...

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass
  sink.buffer-flush.interval-ms: 5000   # Set the data flush interval.

Configuration

Parameter

Description

Type

Required

Default

Remarks

type

Specifies the sink connector type.

String

Yes

Set to starrocks.

name

The display name of the sink.

String

No

jdbc-url

The JDBC URL for the database connection.

String

Yes

Supports multiple addresses separated by commas (,). Example: jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3.

load-url

The HTTP URL of an FE node for Stream Load.

String

Yes

Supports multiple addresses separated by semicolons (;). Example: fe_host1:fe_http_port1;fe_host2:fe_http_port2.

username

The username for the StarRocks connection.

String

Yes

This user must have at least SELECT and INSERT permissions on the target table. You can grant the required permissions with the StarRocks GRANT command.

password

The password for the StarRocks connection.

String

Yes

sink.semantic

The delivery semantics for data writes.

String

No

at-least-once

Valid values:

  • at-least-once (default): Guarantees that data is delivered at least once.

  • exactly-once: Guarantees that data is delivered exactly once.

sink.label-prefix

The label prefix for Stream Load jobs.

String

No

sink.connect.timeout-ms

The timeout for establishing an HTTP connection.

Integer

No

30000

Unit: milliseconds. The value must be between 100 and 60000.

sink.wait-for-continue.timeout-ms

The timeout for waiting for a 100 Continue response from the server.

Integer

No

30000

Unit: milliseconds. The value must be between 3000 and 600000.

sink.buffer-flush.max-bytes

The maximum size of the in-memory cache, in bytes, before a flush is triggered.

Long

No

157286400

Unit: bytes. The value must be between 64 MB and 10 GB.

Note
  • This cache size is shared by all tables. When the buffer is full, the connector selects several tables to flush.

  • Setting a larger value can improve throughput but may increase ingestion latency.

sink.buffer-flush.max-rows

The maximum number of rows in the in-memory cache before a flush is triggered.

Long

No

500000

The value must be between 64,000 and 5,000,000.

sink.buffer-flush.interval-ms

The time interval between flushes for each table's buffer.

Long

No

300000

Unit: milliseconds.

Note

For jobs that synchronize small amounts of data, reduce this value to avoid long delays before data is persisted.

sink.max-retries

The maximum number of retries.

Long

No

3

The value must be between 0 and 1000.

sink.scan-frequency.ms

The frequency at which the connector checks whether to flush the buffer.

Long

No

50

Unit: milliseconds.

sink.io.thread-count

The number of threads used for Stream Load.

Integer

No

2

sink.at-least-once.use-transaction-stream-load

Specifies whether to use the Stream Load transaction interface for data ingestion.

Boolean

No

true

This option takes effect only if the database supports it.

sink.properties.*

Additional properties for the sink.

String

No

For supported properties, see STREAM LOAD.

table.create.num-buckets

The number of buckets for automatically created tables.

Integer

No

table.create.properties.*

Additional properties for automatic table creation.

String

No

For example, you can pass 'table.create.properties.fast_schema_evolution' = 'true' to enable fast schema change. For details, see the StarRocks documentation.

table.schema-change.timeout

The timeout for schema change operations.

Duration

No

30 min

Must be an integer number of seconds.

Note

If a schema change operation exceeds this limit, the job fails.

unicode-char.max-bytes

The number of bytes to allocate for each Unicode character.

Integer

No

3

In CDC, the length of a VARCHAR type is measured in characters, whereas in StarRocks, the length of a VARCHAR type is measured in bytes.

In most cases, a Unicode character does not exceed 3 bytes after UTF-8 encoding. However, some rare characters and emoji symbols may occupy 4 or more bytes.

Reuse a built-in catalog

VVR 11.5 and later lets you reference a built-in StarRocks catalog created on the Data Management page directly in a Flink CDC data ingestion job. This simplifies configuration by reducing the number of properties you need to set manually.

sink:
  type: starrocks
  using.built-in-catalog: starrocks_catalog

Data ingestion jobs can automatically reuse the following StarRocks catalog options:

  • jdbc-url

  • http-url

  • username

  • password

  • table.num-buckets

To override these values, you can explicitly set the corresponding YAML options, which take precedence.

Type mapping

Note

StarRocks does not support all CDC YAML types. Writing an unsupported type to the sink causes the job to fail. You can use the CAST built-in function in a transform to convert unsupported data, or use a projection statement to remove it from the result table. For more information, see Develop a Flink CDC data ingestion job.

CDC type

StarRocks type

Remarks

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP

DATETIME

TIMESTAMP_LTZ

DATETIME

DECIMAL(p, s)

DECIMAL(p, s)

Because StarRocks does not support DECIMAL for a primary key, the connector automatically converts an upstream DECIMAL primary key column to VARCHAR in the synchronized StarRocks schema.

CHAR(n)

(n <= 85)

CHAR(n × 3)

CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3 to account for multi-byte UTF-8 characters.

Note

The maximum length of the StarRocks CHAR type is 255. Therefore, only CDC CHAR types with a length up to 85 are mapped to the StarRocks CHAR type.

Note

You can set the unicode-char.max-bytes parameter to allocate more bytes for each Unicode character.

CHAR(n)

(n > 85)

VARCHAR(n × 3)

CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3 to account for multi-byte UTF-8 characters.

Note

CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3. Since the result exceeds the 255-byte limit for the StarRocks CHAR type, it is mapped to VARCHAR.

Note

You can set the unicode-char.max-bytes parameter to allocate more bytes for each Unicode character.

VARCHAR(n)

VARCHAR(n × 3)

CDC measures length in characters, while StarRocks uses bytes. The connector multiplies the length by 3 to account for multi-byte UTF-8 characters.

BINARY(n)

BINARY(n+2)

Two bytes of padding are added to ensure data integrity.

VARBINARY(n)

VARBINARY(n+1)

One byte of padding is added to ensure data integrity.

Schema change

As a data ingestion sink, StarRocks supports the following schema change events:

  • CREATE TABLE EVENT

    Note

    If the downstream StarRocks table already exists, the connector does not attempt to create it again. Ensure that the downstream table schema is compatible with the upstream schema.

  • ADD COLUMN EVENT

    Note

    StarRocks requires primary key columns to appear first in a table. Any new columns must be added after them.

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT