本文为您介绍如何使用StarRocks连接器。
背景信息
StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。StarRocks具有以下优势:
StarRocks兼容MySQL协议,可以使用MySQL客户端和常用BI工具对接StarRocks来分析数据。
StarRocks采用分布式架构:
对数据表进行水平划分并以多副本存储。
集群规模可以灵活伸缩,支持10 PB级别的数据分析。
支持MPP框架,并行加速计算。
支持多副本,具有弹性容错能力。
Flink连接器内部的结果表是通过缓存并批量由Stream Load导入实现,源表是通过批量读取数据实现。StarRocks连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表和结果表、数据摄入目标端 |
运行模式 | 流模式和批模式 |
数据格式 | CSV |
特有监控指标 | 暂无 |
API种类 | Datastream、SQL和数据摄入YAML |
是否支持更新或删除结果表数据 | 是 |
前提条件
已创建StarRocks集群,包括EMR的StarRocks或基于ECS的云上自建StarRocks。
使用限制
仅实时计算引擎VVR 6.0.5及以上版本支持StarRocks连接器。
StarRocks连接器仅支持at-least-once和exactly-once语义。
SQL
特色功能
EMR的StarRocks支持CTAS&CDAS功能,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步,详情请参见基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks。
语法结构
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
WITH参数
类型 | 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
通用 | connector | 表类型。 | String | 是 | 无 | 固定值为starrocks。 |
jdbc-url | JDBC连接的URL。 | String | 是 | 无 | 指定FE(Front End)的IP和JDBC端口,格式为 | |
database-name | StarRocks数据库名称。 | String | 是 | 无 | 无。 | |
table-name | StarRocks表名称。 | String | 是 | 无 | 无。 | |
username | StarRocks连接用户名。 | String | 是 | 无 | 无。 | |
password | StarRocks连接密码。 | String | 是 | 无 | 无。 | |
starrocks.create.table.properties | StarRocks表属性。 | String | 否 | 无 | 设置数据表初始属性,如引擎、副本数等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。 | |
源表独有 | scan-url | 数据扫描的url。 | String | 否 | 无 | 指定FE(Front End)的IP和HTTP端口,格式为 说明 填写多个IP和端口号时,请使用半角分号(;)进行分隔。 |
scan.connect.timeout-ms | flink-connector-starrocks连接StarRocks的时间上限。 超过该时间上限,将报错。 | String | 否 | 1000 | 单位为毫秒。 | |
scan.params.keep-alive-min | 查询任务的保活时间。 | String | 否 | 10 | 无。 | |
scan.params.query-timeout-s | 查询任务的超时时间。 如果超过该时间,仍未返回查询结果,则停止查询任务。 | String | 否 | 600 | 单位为秒。 | |
scan.params.mem-limit-byte | BE节点中单个查询的内存上限。 | String | 否 | 1073741824(1 GB) | 单位为字节。 | |
scan.max-retries | 查询失败时的最大重试次数。 超过该数量上限,则将报错。 | String | 否 | 1 | 无。 | |
结果表独有 | load-url | 数据导入的URL。 | String | 是 | 无 | 指定FE(Front End)的IP和HTTP端口,格式为 说明 填写多个IP和端口号时,请使用半角分号(;)进行分隔。 |
sink.semantic | 数据写入语义。 | String | 否 | at-least-once | 取值如下:
| |
sink.buffer-flush.max-bytes | Buffer可容纳的最大数据量。 | String | 否 | 94371840(90 MB) | 取值范围为64 MB~10 GB。 | |
sink.buffer-flush.max-rows | Buffer可容纳的最大数据行数。 | String | 否 | 500000 | 取值范围为64,000~5000,000。 | |
sink.buffer-flush.interval-ms | Buffer刷新时间间隔。 | String | 否 | 300000 | 取值范围为1000毫秒~3600000毫秒。 | |
sink.max-retries | 最大重试次数。 | String | 否 | 3 | 取值范围为0~10。 | |
sink.connect.timeout-ms | 连接到starrocks的超时时间。 | String | 否 | 1000 | 取值范围为100~60000。单位为毫秒。 | |
sink.properties.* | 结果表属性。 | String | 否 | 无 | Stream Load的参数控制Stream Load导入行为。例如,参数 sink.properties.format表示Stream Load所导入的数据格式,如CSV。更多参数和解释,请参见Stream Load。 |
类型映射
StarRocks字段类型 | Flink字段类型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED 说明 仅实时计算引擎VVR 8.0.10及以上版本支持。 | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(n × 3) 说明
| CHAR(n) (n <= 85 时) |
VARCHAR(n × 3) 说明
| CHAR(n) (n > 85 时) |
VARCHAR | STRING |
VARBINARY 说明 仅实时计算引擎VVR 8.0.10及以上版本支持。 | VARBINARY |
代码示例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
数据摄入
使用StarRocks Pipeline连接器,您可以轻松地将来自上游数据源的数据记录和表结构变更写入外部StarRocks数据库。StarRocks连接器同时支持社区版与阿里云E-MapReduce Serverless StarRocks全托管版本。
特色功能
自动建库建表
如果来自上游的数据库及数据表不存在于下游StarRocks实例中,则对应的数据库及数据表会被自动创建。您可以通过
table.create.properties.*
参数设定自动创建表时的选项。表结构变更同步
目前,StarRocks连接器支持自动将建表事件(CreateTableEvent)、增加列事件(AddColumnEvent)和删除列(DropColumnEvent)事件自动应用到下游数据库中。
注意事项
目前StarRocks连接器只支持At-least Once语义,并通过主键表来保证幂等写入。
目前,同步的表必须包含主键。不含主键的表必须通过
transform
语句块指定主键方可正常写入下游。例如:transform: - source-table: ... primary-keys: id, ...
自动创建的表分桶键与主键相同,且不可有分区键。
进行表结构变更同步时,新增列只能追加到已有列的尾部。在默认的表结构演化模式Lenient下,会自动将其他位置的插入转换到尾部。
如果您使用的StarRocks版本低于2.5.7,则必须显式地通过
table.create.num-buckets
参数指定分桶数量。更高版本的StarRocks可以自动设定合适的分桶数。如果您使用的是StarRocks 3.2或更高版本,建议开启
table.create.properties.fast_schema_evolution
选项来加快表结构变更的速度。
语法结构
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
配置项
参数名称 | 描述 | 类型 | 是否必填 | 默认值 | 备注 |
| 连接器的名称。 | String | 是 | 无 | 固定值为 |
| Sink的显示名称。 | String | 否 | 无 | 无。 |
| JDBC连接的URL。 | String | 是 | 无 | 支持传入多个地址,使用英文逗号 ( |
| 连接到FE节点的HTTP服务URL。 | String | 是 | 无 | 支持传入多个地址,使用英文分号 ( |
| 连接到 StarRocks时使用的用户名。 | String | 是 | 无 | 该用户至少需要具备对目标表的SELECT和INSERT权限。您可以使用StarRocks的GRANT命令赋予相应的权限。 |
| 连接到 StarRocks时使用的密码。 | String | 是 | 无 | 无。 |
| 在进行Stream Load导入时使用的标签前缀。 | String | 否 | 无 | 无。 |
| 建立HTTP连接时的超时时间。 | Integer | 否 | 30000 | 单位为毫秒,取值需要介于100 ~ 60000。 |
| 从服务器得到100 Continue请求前的超时时间)。 | Integer | 否 | 30000 | 单位为毫秒。取值需要介于3000 ~ 600000。 |
| 在将数据写入StarRocks前,最多可以在内存中缓存多大量的数据。 | Long | 否 | 157286400 | 单位为字节,取值需要介于64 MB ~ 10 GB。 说明
|
| 每张表连续两次Flush之间的间隔时间。 | Long | 否 | 300000 | 单位为毫秒。 |
| 连续两次检查是否应该进行Flush之间的间隔时间。 | Long | 否 | 50 | 单位为毫秒。 |
| 在进行 Stream Load导入时的线程数量。 | Integer | 否 | 2 | 无。 |
| 是否使用Stream Load事务接口进行导入。 | Boolean | 否 | true | 仅在数据库支持的情况下生效。 |
| 提供给Sink的额外参数。 | String | 否 | 无 | 可以在STREAM LOAD查看支持的参数。 |
| 自动建表时的Bucket数量。 | Integer | 否 | 无 |
|
| 在自动建表时需要传递的额外参数。 | String | 否 | 无 | 例如,可以传递 |
| 执行表结构变更的超时时间。 | Duration | 否 | 30 min | 必须设定为整数秒。 说明 如果某个表结构变更操作耗时超过此限制,作业将运行失败。 |
类型映射
StarRocks并不支持所有的CDC YAML类型,尝试将不支持的类型写入下游会导致作业失败。您可以使用Transform CAST内置函数对不支持的数据进行转换,或是使用Projection语句将其从结果表中移除。详情请参考数据摄入开发参考。
CDC类型 | StarRocks类型 | 附注 |
TINYINT | TINYINT | 无 |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
CHAR(n) (n <= 85 时) | CHAR(n × 3) | CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的StarRocks CHAR类型长度为原来的3倍。 说明 StarRocks的CHAR类型长度最长不可超过255,因此只有长度不超过85的CDC CHAR类型才会被映射到StarRocks CHAR类型。 |
CHAR(n) (n > 85 时) | VARCHAR(n × 3) | CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的 StarRocks VARCHAR类型长度为原来的3倍。 说明 StarRocks的CHAR类型长度最长不可超过255,因此长度大于85的CDC CHAR类型会被映射到StarRocks VARCHAR类型。 |
VARCHAR(n) | VARCHAR(n × 3) | CDC中的VARCHAR类型长度表示字符数,而StarRocks中的VARCHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的StarRocks VARCHAR类型长度为原来的3倍。 |