本文为您介绍如何使用StarRocks连接器。
背景信息
StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。StarRocks具有以下优势:
- StarRocks兼容MySQL协议,可以使用MySQL客户端和常用BI工具对接StarRocks来分析数据。 
- StarRocks采用分布式架构: - 对数据表进行水平划分并以多副本存储。 
- 集群规模可以灵活伸缩,支持10 PB级别的数据分析。 
- 支持MPP框架,并行加速计算。 
- 支持多副本,具有弹性容错能力。 
 
Flink连接器内部的结果表是通过缓存并批量由Stream Load导入实现,源表是通过批量读取数据实现。StarRocks连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 源表、维表和结果表、数据摄入目标端 | 
| 运行模式 | 流模式和批模式 | 
| 数据格式 | CSV | 
| 特有监控指标 | 暂无 | 
| API种类 | Datastream、SQL和数据摄入YAML | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
已创建StarRocks集群,包括EMR的StarRocks或基于ECS的云上自建StarRocks。
使用限制
- StarRocks连接器仅支持at-least-once和exactly-once语义。 
- 仅实时计算引擎VVR 11.1及以上版本支持维表JOIN。 
SQL
特色功能
EMR的StarRocks支持CTAS&CDAS功能,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步,详情请参见基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks。
语法结构
CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );WITH参数
| 类型 | 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| 通用 | connector | 表类型。 | String | 是 | 无 | 固定值为starrocks。 | 
| jdbc-url | JDBC连接的URL。 | String | 是 | 无 | 指定FE(Front End)的IP和JDBC端口,格式为 | |
| database-name | StarRocks数据库名称。 | String | 是 | 无 | 无。 | |
| table-name | StarRocks表名称。 | String | 是 | 无 | 无。 | |
| username | StarRocks连接用户名。 | String | 是 | 无 | 无。 | |
| password | StarRocks连接密码。 | String | 是 | 无 | 无。 | |
| starrocks.create.table.properties | StarRocks表属性。 | String | 否 | 无 | 设置数据表初始属性,如引擎、副本数等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。 | |
| 源表独有 | scan-url | 数据扫描的url。 | String | 否 | 无 | 指定FE(Front End)的IP和HTTP端口,格式为 说明  填写多个IP和端口号时,请使用半角分号(;)进行分隔。 | 
| scan.connect.timeout-ms | flink-connector-starrocks连接StarRocks的时间上限。 超过该时间上限,将报错。 | String | 否 | 1000 | 单位为毫秒。 | |
| scan.params.keep-alive-min | 查询任务的保活时间。 | String | 否 | 10 | 无。 | |
| scan.params.query-timeout-s | 查询任务的超时时间。 如果超过该时间,仍未返回查询结果,则停止查询任务。 | String | 否 | 600 | 单位为秒。 | |
| scan.params.mem-limit-byte | BE节点中单个查询的内存上限。 | String | 否 | 1073741824(1 GB) | 单位为字节。 | |
| scan.max-retries | 查询失败时的最大重试次数。 超过该数量上限,则将报错。 | String | 否 | 1 | 无。 | |
| 结果表独有 | load-url | 数据导入的URL。 | String | 是 | 无 | 指定FE(Front End)的IP和HTTP端口,格式为 说明  填写多个IP和端口号时,请使用半角分号(;)进行分隔。 | 
| sink.semantic | 数据写入语义。 | String | 否 | at-least-once | 取值如下: 
 | |
| sink.buffer-flush.max-bytes | Buffer可容纳的最大数据量。 | String | 否 | 94371840(90 MB) | 取值范围为64 MB~10 GB。 | |
| sink.buffer-flush.max-rows | Buffer可容纳的最大数据行数。 | String | 否 | 500000 | 取值范围为64,000~5000,000。 | |
| sink.buffer-flush.interval-ms | Buffer刷新时间间隔。 | String | 否 | 300000 | 取值范围为1000毫秒~3600000毫秒。 | |
| sink.max-retries | 最大重试次数。 | String | 否 | 3 | 取值范围为0~10。 | |
| sink.connect.timeout-ms | 连接到starrocks的超时时间。 | String | 否 | 1000 | 取值范围为100~60000。单位为毫秒。 | |
| sink.properties.* | 结果表属性。 | String | 否 | 无 | Stream Load的参数控制Stream Load导入行为。例如,参数 sink.properties.format表示Stream Load所导入的数据格式,如CSV。更多参数和解释,请参见Stream Load。 | |
| 维表独有 | lookup.cache.enabled | 是否启用维表缓存机制。 | Boolean | 否 | true | 取值如下: 
 重要  
 | 
类型映射
| StarRocks字段类型 | Flink字段类型 | 
| NULL | NULL | 
| BOOLEAN | BOOLEAN | 
| TINYINT | TINYINT | 
| SMALLINT | SMALLINT | 
| INT | INT | 
| BIGINT | BIGINT | 
| BIGINT UNSIGNED 说明  仅实时计算引擎VVR 8.0.10及以上版本支持。 | DECIMAL(20,0) | 
| LARGEINT | DECIMAL(20,0) | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DATE | DATE | 
| DATETIME | TIMESTAMP | 
| DECIMAL | DECIMAL | 
| DECIMALV2 | DECIMAL | 
| DECIMAL32 | DECIMAL | 
| DECIMAL64 | DECIMAL | 
| DECIMAL128 | DECIMAL | 
| CHAR(m) 说明  
 | CHAR(n) | 
| VARCHAR(m) 说明  
 | CHAR(n) | 
| VARCHAR | STRING | 
| VARBINARY 说明  仅实时计算引擎VVR 8.0.10及以上版本支持。 | VARBINARY | 
代码示例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;数据摄入
使用StarRocks Pipeline连接器,您可以轻松地将来自上游数据源的数据记录和表结构变更写入外部StarRocks数据库。StarRocks连接器同时支持社区版与阿里云E-MapReduce Serverless StarRocks全托管版本。
特色功能
- 自动建库建表。 - 如果来自上游的数据库及数据表不存在于下游StarRocks实例中,则对应的数据库及数据表会被自动创建。您可以通过 - table.create.properties.*参数设定自动创建表时的选项。
- 表结构变更同步。 - 目前,StarRocks连接器支持自动将建表事件(CreateTableEvent)、增加列事件(AddColumnEvent)和删除列(DropColumnEvent)事件自动应用到下游数据库中。 
- 实时计算引擎VVR 11.1及以上版本支持兼容的列类型变更,详情请参见ALTER TABLE | StarRocks。 
注意事项
- 目前StarRocks连接器只支持At-least Once语义,并通过主键表来保证幂等写入。 
- 目前,同步的表必须包含主键。不含主键的表必须通过 - transform语句块指定主键方可正常写入下游。例如:- transform: - source-table: ... primary-keys: id, ...
- 自动创建的表分桶键与主键相同,且不可有分区键。 
- 进行表结构变更同步时,新增列只能追加到已有列的尾部。在默认的表结构演化模式Lenient下,会自动将其他位置的插入转换到尾部。 
- 如果您使用的StarRocks版本低于2.5.7,则必须显式地通过 - table.create.num-buckets参数指定分桶数量。更高版本的StarRocks可以自动设定合适的分桶数。
- 如果您使用的是StarRocks 3.2或更高版本,建议开启 - table.create.properties.fast_schema_evolution选项来加快表结构变更的速度。
- 使用CDC YAML数据摄入写入 EMR Serverless StarRocks 时可能出现串流问题,您可以采用以下选项之一来规避: - 使用 Flink SQL StarRocks 连接器,并使用 - sink.version=V1参数;
- 开启 FE emr_internal_redirect 参数; 
- 使用 StarRocks Private Zone 域名而不是 SLB。 
 
语法结构
source:
  ...
sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass配置项
| 参数名称 | 描述 | 类型 | 是否必填 | 默认值 | 备注 | 
| 
 | 连接器的名称。 | String | 是 | 无 | 固定值为 | 
| 
 | Sink的显示名称。 | String | 否 | 无 | 无。 | 
| 
 | JDBC连接的URL。 | String | 是 | 无 | 支持传入多个地址,使用英文逗号 ( | 
| 
 | 连接到FE节点的HTTP服务URL。 | String | 是 | 无 | 支持传入多个地址,使用英文分号 ( | 
| 
 | 连接到 StarRocks时使用的用户名。 | String | 是 | 无 | 该用户至少需要具备对目标表的SELECT和INSERT权限。您可以使用StarRocks的GRANT命令赋予相应的权限。 | 
| 
 | 连接到 StarRocks时使用的密码。 | String | 是 | 无 | 无。 | 
| 
 | 数据写入语义。 | String | 否 | at-least-once | 取值如下: 
 | 
| 
 | 在进行Stream Load导入时使用的标签前缀。 | String | 否 | 无 | 无。 | 
| 
 | 建立HTTP连接时的超时时间。 | Integer | 否 | 30000 | 单位为毫秒,取值需要介于100 ~ 60000。 | 
| 
 | 从服务器得到100 Continue请求前的超时时间)。 | Integer | 否 | 30000 | 单位为毫秒。取值需要介于3000 ~ 600000。 | 
| 
 | 在将数据写入StarRocks前,最多可以在内存中缓存多少字节的数据。 | Long | 否 | 157286400 | 单位为字节,取值需要介于64 MB ~ 10 GB。 说明  
 | 
| 
 | 在将数据写入StarRocks前,最多可以在内存中缓存多少行数据。 | Long | 否 | 500000 | 取值范围需要介于 64,000 和 5,000,000 之间。 | 
| 
 | 每张表连续两次Flush之间的间隔时间。 | Long | 否 | 300000 | 单位为毫秒。 | 
| 
 | 最大重试次数。 | Long | 否 | 3 | 取值范围需要介于 0 和 1000 之间。 | 
| 
 | 连续两次检查是否应该进行Flush之间的间隔时间。 | Long | 否 | 50 | 单位为毫秒。 | 
| 
 | 在进行 Stream Load导入时的线程数量。 | Integer | 否 | 2 | 无。 | 
| 
 | 是否使用Stream Load事务接口进行导入。 | Boolean | 否 | true | 仅在数据库支持的情况下生效。 | 
| 
 | 提供给Sink的额外参数。 | String | 否 | 无 | 可以在STREAM LOAD查看支持的参数。 | 
| 
 | 自动建表时的Bucket数量。 | Integer | 否 | 无 | 
 | 
| 
 | 在自动建表时需要传递的额外参数。 | String | 否 | 无 | 例如,可以传递 | 
| 
 | 执行表结构变更的超时时间。 | Duration | 否 | 30 min | 必须设定为整数秒。 说明  如果某个表结构变更操作耗时超过此限制,作业将运行失败。 | 
类型映射
StarRocks并不支持所有的CDC YAML类型,尝试将不支持的类型写入下游会导致作业失败。您可以使用Transform CAST内置函数对不支持的数据进行转换,或是使用Projection语句将其从结果表中移除。详情请参考数据摄入开发参考。
| CDC类型 | StarRocks类型 | 附注 | 
| TINYINT | TINYINT | 无 | 
| SMALLINT | SMALLINT | |
| INT | INT | |
| BIGINT | BIGINT | |
| FLOAT | FLOAT | |
| DOUBLE | DOUBLE | |
| BOOLEAN | BOOLEAN | |
| DATE | DATE | |
| TIMESTAMP | DATETIME | |
| TIMESTAMP_LTZ | DATETIME | |
| DECIMAL(p, s) | DECIMAL(p, s) | StarRocks不支持DECIMAL作为主键。因此当上游数据表的字段类型为DECIMAL且该字段作为主键时,同步至StarRocks的表结构会自动将主键字段类型从DECIMAL变更为VARCHAR。 | 
| CHAR(n) (n <= 85 时) | CHAR(n × 3) | CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的StarRocks CHAR类型长度为原来的3倍。 说明  StarRocks的CHAR类型长度最长不可超过255,因此只有长度不超过85的CDC CHAR类型才会被映射到StarRocks CHAR类型。 | 
| CHAR(n) (n > 85 时) | VARCHAR(n × 3) | CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的 StarRocks VARCHAR类型长度为原来的3倍。 说明  StarRocks的CHAR类型长度最长不可超过255,因此长度大于85的CDC CHAR类型会被映射到StarRocks VARCHAR类型。 | 
| VARCHAR(n) | VARCHAR(n × 3) | CDC中的VARCHAR类型长度表示字符数,而StarRocks中的VARCHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的StarRocks VARCHAR类型长度为原来的3倍。 | 
| BINARY(n) | BINARY(n+2) | 增加长度为2的padding,防止数据问题。 | 
| VARBINARY(n) | VARBINARY(n+1) | 增加长度为1的padding,防止数据问题。 |