Flink Connector内部实现是通过缓存并批量由Stream Load导入。本文为您介绍Flink Connector的使用方式及示例。
背景信息
因为Flink官方只提供了flink-connector-jdbc,不足以满足导入性能要求,所以新增了flink-connector-starrocks,其内部实现是通过缓存并批量由Stream Load导入。
使用方式
您可以下载源码进行测试:下载flink-connector-starrocks源码。
将以下内容添加pom.xml中。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for flink-1.11, flink-1.12 -->
<version>x.x.x_flink-1.11</version>
<!-- for flink-1.13 -->
<version>x.x.x_flink-1.13</version>
</dependency>
说明 您可以在版本信息页面,查看Latest Version信息,替换代码中的
x.x.x
。代码示例如下:
- 方式一
// -------- sink with raw json string stream -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "admin") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build() ) ); // -------- sink with stream transformation -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.properties.row_delimiter", "\\x02") .build(), // set the slots with streamRowData (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) );
- 方式二
// create a table with `structure` and `properties` // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port'," + "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.max-rows' = '1000000'," + "'sink.buffer-flush.max-bytes' = '300000000'," + "'sink.buffer-flush.interval-ms' = '5000'," + "'sink.properties.column_separator' = '\\x01'," + "'sink.properties.row_delimiter' = '\\x02'," + "'sink.max-retries' = '3'" + "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'` ")" );
其中,Sink选项如下表所示。
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector | 是 | 无 | String | 类型,固定为starrocks。 |
jdbc-url | 是 | 无 | String | 用于在StarRocks中执行查询操作。 |
load-url | 是 | 无 | String | 指定FE的IP和HTTP端口,格式为fe_ip:http_port;fe_ip:http_port ,多个时使用半角分号(;)分隔。 |
database-name | 是 | 无 | String | StarRocks数据库名称。 |
table-name | 是 | 无 | String | StarRocks表名称。 |
username | 是 | 无 | String | StarRocks连接用户名。 |
password | 是 | 无 | String | StarRocks连接密码。 |
sink.semantic | 否 | at-least-once | String | 取值为at-least-once或exactly-once。 |
sink.buffer-flush.max-bytes | 否 | 94371840(90M) | String | Buffer可容纳的最大数据量,取值范围为64 MB~10 GB。 |
sink.buffer-flush.max-rows | 否 | 500000 | String | Buffer可容纳的最大数据行数,取值范围为64,000~5000,000。 |
sink.buffer-flush.interval-ms | 否 | 300000 | String | Buffer刷新时间间隔,取值范围为 1000 ms~3600000 ms。 |
sink.max-retries | 否 | 1 | String | 最大重试次数,取值范围为0~10。 |
sink.connect.timeout-ms | 否 | 1000 | String | 连接到load-url的超时时间,取值范围为100~60000。 |
sink.properties.* | 否 | 无 | String | Sink属性。 |
重要
- 为了保证Sink数据的Exactly-once语义,需要外部系统的Two-phase Commit机制。由于StarRocks无此机制,所以需要依赖Flink的checkpoint-interval,在每次Checkpoint时保存批数据以及其Label,在Checkpoint完成后的第一次invoke中阻塞flush所有缓存在state中的数据,以此达到Exactly-once。但如果StarRocks终止了,会导致您的Flink Sink Stream算子长时间阻塞,并引起Flink的监控报警或强制退出。
- 默认使用CSV格式进行导入,您可以通过指定sink.properties.row_delimiter(该参数自StarRocks 1.15.0版本开始支持)为\\x02,sink.properties.column_separator为\\x01,来自定义行分隔符与列分隔符。
- 如果遇到导入停止的情况,请尝试增加Flink任务的内存。
- 如果代码运行正常且能接收到数据,但是写入不成功时,请确认当前机器是否能访问BE的http_port端口,即能ping通BE服务使用的IP地址。
例如,如果一台机器有外网和内网IP,且FE或BE的http_port均可通过外网
ip:port
访问,集群里绑定的IP为内网IP,任务里loadurl写的FE外网ip:http_port
,FE会将写入任务转发给BE内网ip:port
,此时如果Client机器ping不通BE的内网IP就会写入失败。