MaxCompute提供了新版Flink Connector插件,支持将Flink数据写入至MaxCompute的普通表和Delta Table类型表,以提高Flink数据写入MaxCompute的便捷性。本文介绍新版Flink Connector写入MaxCompute的能力支持情况与主要操作流程。
背景信息
- 支持的写入模式: - 使用新版Flink Connector将数据写入MaxCompute时,支持通过Upsert和Insert写入方式。其中使用Upsert时支持以下两种数据写入流: - 按照Primary Key进行分组 
- 按照分区字段进行分组 - 若分区数量过多,您可以按照分区字段进行分组,但该方式可能导致数据倾斜。 
 
- Upsert模式下,通过Flink Connector进行数据写入流程和参数配置建议,详情请参见数据实时入仓实践。 
- 您可在配置Flink数据写入MaxCompute时,通过设置Flink Connector参数指定使用哪种写入方式,全量Connector参数介绍请参见下文的附录:新版Flink Connector全量参数。 
- Flink Upsert写入任务的Checkpoint间隔建议设置为3分钟以上,否则可能导致写入效率低下,并引入大量小文件。 
- MaxCompute与实时计算Flink版的字段类型对照关系如下: - Flink 数据类型 - MaxCompute 数据类型 - CHAR(p) - CHAR(p) - VARCHAR(p) - VARCHAR(p) - STRING - STRING - BOOLEAN - BOOLEAN - TINYINT - TINYINT - SMALLINT - SMALLINT - INT - INT - BIGINT - LONG - FLOAT - FLOAT - DOUBLE - DOUBLE - DECIMAL(p, s) - DECIMAL(p, s) - DATE - DATE - TIMESTAMP(9) WITHOUT TIME ZONE、TIMESTAMP_LTZ(9) - TIMESTAMP - TIMESTAMP(3) WITHOUT TIME ZONE、TIMESTAMP_LTZ(3) - DATETIME - BYTES - BINARY - ARRAY<T> - LIST<T> - MAP<K, V> - MAP<K, V> - ROW - STRUCT 说明- Flink的TIMESTAMP数据类型不含时区,MaxCompute TIMESTAMP数据类型含时区。此差异会导致8小时的时间差。其通过使用TIMESTAMP_LTZ(9)来对齐时间戳。 - -- FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT '创建时间', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Flink数据写入MaxCompute流程:自建开源Flink
- 准备工作:创建MaxCompute表。 - 您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建两张表(Delta Table非分区表和分区表)作为示例,为您演示Flink数据写入MaxCompute的主要流程,其中表属性设置请参考Delta Table表参数。 - -- 创建Delta Table非分区表 CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; --创建Delta Table分区表 CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
- 搭建开源Flink集群。当前支持1.13、1.15、1.16和1.17版本的开源Flink,您可以选择对应版本的Flink: 说明- Flink 1.17版本可复用1.16版本。 
- 本文以Flink Connector 1.13为例,将包下载至本地环境,下载完成后进行解压。 
 
- 下载Flink Connector并添加至Flink集群包中。 - 将Flink Connector Jar包下载至本地环境。 
- 将Flink Connector Jar包添加至解压后的Flink安装包的lib目录中。 - mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
 
- 启动Flink实例服务。 - cd $FLINK_HOME/bin ./start-cluster.sh
- 启动Flink客户端。 - cd $FLINK_HOME/bin ./sql-client.sh
- 创建Flink表,并配置Flink Connector参数。 - 当前支持直接使用Flink SQL创建Flink表并配置参数,也支持使用Flink的DataStream API进行相关操作。两种操作的核心示例如下。 - 使用Flink SQL- 进入Flink SQL的编辑界面,执行以下命令完成建表与参数配置。 - -- 在 Flink SQL中注册一张对应的非分区表 CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- 在 Flink SQL 中注册一张对应的分区表 CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
- 向Flink表中写入数据,并在MaxCompute表中查询,验证Flink数据写入MaxCompute的结果。 - -- 在flink Sql客户端中往非分区表里插入数据 INSERT INTO mf_flink VALUES (1,'Danny',27, false); -- 在Maxcompute中查询返回 SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- 在flink Sql客户端中往非分区表里插入数据 INSERT INTO mf_flink VALUES (1,'Danny',28, false); -- 在Maxcompute中查询返回 SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- 在flink Sql客户端中往分区表里插入数据 INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); -- 在Maxcompute中查询返回 SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- 在flink Sql客户端中分区表里插入数据 INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); -- 在Maxcompute中查询返回 SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
 - 使用DataStream API- 使用DataStream接口时,需先添加以下依赖。 - <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>说明- 使用时请将“xxx”改为相应的版本号。 
- 建表与参数配置的示例代码如下。 - package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
 
Flink数据写入MaxCompute流程:阿里云全托管Flink
- 准备工作:创建MaxCompute表。 - 先创建好MaxCompute表,用于后续Flink数据写入。以下以创建一张Delta Table表为例。 - SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
- 登录实时计算控制台,查看Flink Connector信息,Flink Connector已经加载到阿里云全托管Flink VVP上。 
- 通过Flink SQL作业创建Flink表,并构造Flink实时数据,完成作业开发后进行作业部署。 - 在Flink的作业开发页面,创建并编辑Flink SQL作业,以下示例为新建一张Flink数据源表、一张Flink临时结果表,并自动构建实时数据生成逻辑写入源表,通过计算逻辑将源表数据写入临时结果表。SQL作业开发详细操作请参见作业开发地图。 - --创建flink数据源表, CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); --flink创建临时结果表 CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); --flink 计算逻辑 INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;- 其中: - odps.end.point:使用对应Region的云产品互联网络Endpoint。- upsert.write.bucket.num:与MaxCompute中创建的Delta Table表的write.bucket.num属性值保持一致。
- 在MaxCompute中查询数据,验证Flink数据写入MaxCompute的结果。 - SELECT * FROM mf_flink_upsert WHERE ds=2023061517; -- 返回,由于Flink中的数据为随机生成,实际MaxCompute查询结果与本示例不一定完全一致 +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
附录:新版Flink Connector全量参数
- 基础参数 - 参数 - 是否必填 - 默认值 - 说明 - connector - 是 - 无默认值 - Connector类型,需设置为 - MaxCompute。- odps.project.name - 是 - 无默认值 - MaxCompute的Project名称。 - odps.access.id - 是 - 无默认值 - 您的阿里云账号AccessKey ID。您可以在访问凭证页面查看对应信息。 - odps.access.key - 是 - 无默认值 - 您的阿里云账号AccessKey Secret。您可以在访问凭证页面查看对应信息。 - odps.end.point - 是 - 无默认值 - MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint请参见Endpoint。 - odps.tunnel.end.point - 否 - 无默认值 - Tunnel服务的公网访问链接。如果您未配置Tunnel Endpoint,Tunnel会自动路由到MaxCompute服务所在网络对应的Tunnel Endpoint。如果您配置了Tunnel Endpoint,则以配置为准,不进行自动路由。 - 各地域及网络对应的Tunnel Endpoint值,请参见Endpoint。 - odps.tunnel.quota.name - 否 - 无默认值 - 访问MaxCompute使用的Tunnel Quota名称。 - table.name - 是 - 无默认值 - MaxCompute表名称,格式为 - [project.][schema.]table。- odps.namespace.schema - 否 - false - 是否使用三层模型。关于三层模型介绍,请参见Schema操作。 - sink.operation - 是 - insert - 写入类型,取值为 - insert或- upsert说明- 仅MaxCompute Delta Table支持Upsert写入。 - sink.parallelism - 否 - 无默认值 - 写入的并行度,如果不设置,则默认使用上游数据并行度。 说明- 请务必确保表属性 - write.bucket.num是该配置值的整数倍,这样可以获得最佳的写入性能,并且能够最有效地节省 Sink 节点内存。- sink.meta.cache.time - 否 - 400 - 元数据缓存大小。 - sink.meta.cache.expire.time - 否 - 1200 - 元数据缓存超时时间,单位:秒(s)。 - sink.coordinator.enable - 否 - 是 - 是否开启Coordinator模式。 
- 分区参数 - 参数 - 是否必填 - 默认值 - 说明 - sink.partition - 否 - 无默认值 - 待写入的分区名称。 - 若您使用的是动态分区,则为动态分区的上级分区名称。 - sink.partition.default-value - 否 - __DEFAULT_PARTITION__ - 使用动态分区时的默认分区名称。 - sink.dynamic-partition.limit - 否 - 100 - 动态分区写入时,单个Checkpoint可同时导入的最大分区数量。 说明- 建议不要大幅提升该参数值,因为当同时写入的分区数量过多时,容易导致Sink节点内存溢出(OOM),且当并发写入分区数超过阈值,写入任务会报错。 - sink.group-partition.enable - 否 - false - 动态分区写入时,是否按照分区进行分组。 - sink.partition.assigner.class - 否 - 无默认值 - PartitionAssigner实现类。 
- FileCached模式写入参数 - 当动态分区数量过多时,可以使用文件缓存模式,您可以通过以下参数配置数据写入时缓存文件信息。 - 参数 - 是否必填 - 默认值 - 说明 - sink.file-cached.enable - 否 - false - 是否开启FileCached写入。取值说明: - false:不开启。 
- true:开启。 说明- 当动态分区数量过多时,可以使用文件缓存模式。 
 - sink.file-cached.tmp.dirs - 否 - ./local - 文件缓存模式下,默认文件缓存目录。 - sink.file-cached.writer.num - 否 - 16 - 文件缓存模式下,单个Task上传数据的并发数。 说明- 建议不要大幅提升该参数值,因为当同时写入的分区数量过多时,容易导致内存溢出(OOM)。 - sink.bucket.check-interval - 否 - 60000 - 文件缓存模式下,检查文件大小的周期,单位:毫秒(ms)。 - sink.file-cached.rolling.max-size - 否 - 16 M - 文件缓存模式下,单个缓存文件的最大值。 - 若文件大小超过该值,会将该文件数据上传到服务端。 - sink.file-cached.memory - 否 - 64 M - 文件缓存模式下,写入文件使用的最大堆外内存大小。 - sink.file-cached.memory.segment-size - 否 - 128 KB - 文件缓存模式下,写入文件使用的buffer大小。 - sink.file-cached.flush.always - 否 - true - 文件缓存模式下,写入文件是否使用缓存。 - sink.file-cached.write.max-retries - 否 - 3 - 文件缓存模式下,上传数据的重试次数。 
- Insert或- Upsert写入参数- Upsert写入参数- 参数 - 是否必填 - 默认值 - 说明 - upsert.writer.max-retries - 否 - 3 - Upsert Writer写入Bucket失败后,重试次数。 - upsert.writer.buffer-size - 否 - 64 m - 单个Upsert Writer数据在Flink中的缓存大小。 说明- 当所有Bucket的缓冲区大小总和达到预设阈值时,系统将自动触发刷新操作,将数据更新到服务器端。 
- 一个Upsert Writer里会同时写入多个Bucket,建议提高该值,以提升写入效率。 
- 若写入分区较多时,会存在引发内存OOM风险,可考虑降低该参数值。 
 - upsert.writer.bucket.buffer-size - 否 - 1 m - 单个Bucket数据在Flink中的缓存大小,当Flink服务器使用内存资源紧张时,可以减小该参数值。 - upsert.write.bucket.num - 是 - 无 - 写入表的bucket数量,必须与写入表 - write.bucket.num值一致。- upsert.write.slot-num - 否 - 1 - 单个Session使用Tunnel slot数量。 - upsert.commit.max-retries - 否 - 3 - Upsert Session Commit重试次数。 - upsert.commit.thread-num - 否 - 16 - Upsert Session Commit的并行度。 - 不建议将此参数值调整得过大,因为当同时进行的提交并发数越多时,会导致资源消耗增加,可能导致性能问题或资源过度消耗。 - upsert.major-compact.min-commits - 否 - 100 - 发起Major Compact的最小Commit次数。 - upsert.commit.timeout - 否 - 600 - Upsert Session Commit等待超时时间,单位:秒(s)。 - upsert.major-compact.enable - 否 - false - 是否开启Major Compact。 - upsert.flush.concurrent - 否 - 2 - 限制单个分区允许同时写入的最大Bucket数。 说明- 每当一个Bucket的数据刷新时,将会占用一个Tunnel Slot资源。 说明- Upsert写入时,参数配置建议详情,请参见Upsert写入参数配置建议。 - Insert写入参数- 参数 - 是否必配 - 默认值 - 说明 - insert.commit.thread-num - 否 - 16 - Commit Session的并行度。 - insert.arrow-writer.enable - 否 - false - 是否使用Arrow格式。 - insert.arrow-writer.batch-size - 否 - 512 - Arrow Batch的最大行数。 - insert.arrow-writer.flush-interval - 否 - 100000 - Writer Flush间隔,单位毫秒(ms)。 - insert.writer.buffer-size - 否 - 64 M - 使用Buffered Writer的缓存大小。