当前MaxCompute为您提供了新版的Flink Connector插件,新版插件支持将Flink数据写入至MaxCompute的普通表和Delta Table类型表,提高了Flink数据写入MaxCompute的便捷性。本文为您介绍新版Flink Connector写入MaxCompute的能力支持情况与主要操作流程。
背景信息
支持的写入模式:
使用新版Flink Connector将数据写入MaxCompute时,支持通过Upsert和Insert写入方式。其中使用Upsert时支持以下两种数据写入流:
按照Primary Key进行分组
按照分区字段进行分组
若分区数量过多,您可以按照分区字段进行分组,但使用该流程可能导致数据倾斜。
Upsert模式下,通过Flink Connector进行数据写入流程和参数配置建议,详情请参见数据实时入仓实践。
您可在配置Flink数据写入MaxCompute时,通过设置Flink Connector参数指定使用哪种写入方式,全量Connector参数介绍请参见下文的附录:新版Flink Connector全量参数。
Flink Upsert写入任务的Checkpoint间隔建议设置3分钟以上,设置太小的话,写入效率得不到保障,并且可能引入大量小文件。
MaxCompute与实时计算Flink版的字段类型对照关系如下:
Flink 数据类型
MaxCompute 数据类型
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIMEZONE、TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIMEZONE、TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
说明Flink的TIMESTAMP数据类型不含时区,MaxCompute TIMESTAMP数据类型含时区。此差异会导致8小时的时间差。其通过使用TIMESTAMP_LTZ(9)来对齐时间戳。
--FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT '创建时间', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Flink数据写入MaxCompute流程:自建开源Flink
准备工作:创建MaxCompute表。
您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建两张表(Delta Table非分区表和分区表)作为示例,为您演示Flink数据写入MaxCompute的主要流程,其中表属性设置请参考Delta Table表参数。
--创建Delta Table非分区表 CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; --创建Delta Table分区表 CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
搭建开源Flink集群。当前支持1.13、1.15、1.16和1.17版本的开源Flink,您可以选择对应版本的Flink:
说明Flink 1.17版本可复用1.16版本。
本文以Flink Connector 1.13为例,将包下载至本地环境,下载完成后进行解压。
下载Flink Connector并添加至Flink集群包中。
将Flink Connector Jar包下载至本地环境。
将Flink Connector Jar包添加至解压后的Flink安装包的lib目录中。
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
启动Flink实例服务。
cd $FLINK_HOME/bin ./start-cluster.sh
启动Flink客户端。
cd $FLINK_HOME/bin ./sql-client.sh
创建Flink表,并配置Flink Connector参数。
当前支持直接使用Flink SQL创建Flink表并配置参数,也支持使用Flink的DataStream API进行相关操作。两种操作的核心示例如下。
使用Flink SQL
进入Flink SQL的编辑界面,执行以下命令完成建表与参数配置。
-- 在 Flink SQL中注册一张对应的非分区表 CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- 在 Flink SQL 中注册一张对应的分区表 CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
向Flink表中写入数据,并在MaxCompute表中查询,验证Flink数据写入MaxCompute的结果。
--在flink Sql客户端中往非分区表里插入数据 Insert into mf_flink values (1,'Danny',27, false); --在Maxcompute中查询返回 select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ --在flink Sql客户端中往非分区表里插入数据 Insert into mf_flink values (1,'Danny',28, false); --在Maxcompute中查询返回 select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ --在flink Sql客户端中往分区表里插入数据 Insert into mf_flink_part values (1,'Danny',27, false, '01','01'); --在Maxcompute中查询返回 select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ --在flink Sql客户端中分区表里插入数据 Insert into mf_flink_part values (1,'Danny',30, false, '01','01'); --在Maxcompute中查询返回 select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
使用DataStream API
使用DataStream接口时,需先添加以下依赖。
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>
说明使用时请将“xxx”改为相应的版本号。
建表与参数配置的示例代码如下。
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Flink数据写入MaxCompute流程:阿里云全托管Flink
准备工作:创建MaxCompute表。
您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建一张Delta Table表为例。
set odps.sql.type.system.odps2=true; drop table mf_flink_upsert; create table mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) partitioned by (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
登录实时计算控制台,查看Flink Connector信息,Flink Connector已经加载到阿里云全托管Flink VVP上。
通过Flink SQL作业创建Flink表,并构造Flink实时数据,完成作业开发后进行作业部署。
在Flink的作业开发页面,创建并编辑Flink SQL作业,以下示例为新建一张Flink数据源表、一张Flink临时结果表,并自动构建实时数据生成逻辑写入源表,通过计算逻辑将源表数据写入临时结果表。SQL作业开发详细操作请参见SQL作业开发。
--创建flink数据源表, CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt as CURRENT_TIMESTAMP ) with ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); --flink创建临时结果表 CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyL****', 'odps.access.key'='gJwKaF3hK9MDAQgb**********', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); --flink 计算逻辑 insert into test_c_d_g select c1 as c1, c2 as c2, gt as gt, date_format(gt, 'yyyyMMddHH') as ds from fake_src_table;
其中:
odps.end.point
:使用对应Region的云产品互联网络Endpoint。upsert.write.bucket.num
:与MaxCompute中创建的Delta Table表的write.bucket.num属性值保持一致。在MaxCompute中查询数据,验证Flink数据写入MaxCompute的结果。
select * from mf_flink_upsert where ds=2023061517; --返回,由于Flink中的数据为随机生成,实际MaxCompute查询结果与本示例不一定完全一致 +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
附录:新版Flink Connector全量参数
基础参数
参数
是否必填
默认值
说明
connector
是
无默认值
Connector类型,需设置为
MaxCompute
。odps.project.name
是
无默认值
MaxCompute的Project名称。
odps.access.id
是
无默认值
您的阿里云账号AccessKey ID。您可以在访问凭证页面查看对应信息。
odps.access.key
是
无默认值
您的阿里云账号AccessKey Secret。您可以在访问凭证页面查看对应信息。
odps.end.point
是
无默认值
MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint请参见Endpoint。
odps.tunnel.end.point
否
Tunnel服务的公网访问链接。如果您未配置Tunnel Endpoint,Tunnel会自动路由到MaxCompute服务所在网络对应的Tunnel Endpoint。如果您配置了Tunnel Endpoint,则以配置为准,不进行自动路由。
各地域及网络对应的Tunnel Endpoint值,请参见Endpoint。
odps.tunnel.quota.name
否
无默认值
访问MaxCompute使用的Tunnel Quota名称。
table.name
是
无默认值
MaxCompute表名称,格式为
[project.][schema.]table
。odps.namespace.schema
否
false
是否使用三层模型。关于三层模型介绍,请参见Schema操作。
sink.operation
是
insert
写入类型,取值为
insert
或upsert
说明仅MaxCompute Delta Table支持Upsert写入。
sink.parallelism
否
无默认值
写入的并行度,如果不设置,则默认使用上游数据并行度。
说明请务必确保表属性 write.bucket.num 是该配置值的整数倍,这样可以获得最佳的写入性能,并且能够最有效地节省 Sink 节点内存。
sink.meta.cache.time
否
400
元数据缓存大小。
sink.meta.cache.expire.time
否
1200
元数据缓存超时时间,单位:秒(s)。
sink.coordinator.enable
否
是
是否开启Coordinator模式。
分区参数
参数
是否必填
默认值
说明
sink.partition
否
无默认值
待写入的分区名称。
若您使用的是动态分区,则为动态分区的上级分区名称。
sink.partition.default-value
否
__DEFAULT_PARTITION__
使用动态分区时的默认分区名称。
sink.dynamic-partition.limit
否
100
动态分区写入时,单个Checkpoint可同时导入的最大分区数量。
说明建议不要大幅提升该参数值,因为当同时写入的分区数量过多时,容易导致Sink节点内存溢出(OOM),且当并发写入分区数超过阈值,写入任务会报错。
sink.group-partition.enable
否
false
动态分区写入时,是否按照分区进行分组。
sink.partition.assigner.class
否
无默认值
PartitionAssigner实现类。
FileCached模式写入参数
当动态分区数量过多时,可以使用文件缓存模式,您可以通过以下参数配置数据写入时缓存文件信息。
参数
是否必配
默认值
说明
sink.file-cached.enable
否
false
是否开启FileCached写入。取值说明:
false:不开启。
true:开启。
说明当动态分区数量过多时,可以使用文件缓存模式。
sink.file-cached.tmp.dirs
否
./local
文件缓存模式下,默认文件缓存目录。
sink.file-cached.writer.num
否
16
文件缓存模式下,单个Task上传数据的并发数。
说明建议不要大幅提升该参数值,因为当同时写入的分区数量过多时,容易导致内存溢出(OOM)。
sink.bucket.check-interval
否
60000
文件缓存模式下,检查文件大小的周期,单位:毫秒(ms)。
sink.file-cached.rolling.max-size
否
16 M
文件缓存模式下,单个缓存文件的最大值。
若文件大小超过该值,会将该文件数据上传到服务端。
sink.file-cached.memory
否
64 M
文件缓存模式下,写入文件使用的最大堆外内存大小。
sink.file-cached.memory.segment-size
否
128 KB
文件缓存模式下,写入文件的使用的buffer大小。
sink.file-cached.flush.always
否
true
文件缓存模式下,写入文件是否使用缓存。
sink.file-cached.write.max-retries
否
3
文件缓存模式下,上传数据的重试次数。
Insert
或Upsert
写入参数Upsert写入参数
参数
是否必填
默认值
说明
upsert.writer.max-retries
否
3
Upsert Writer写入Bucket失败后,重试次数。
upsert.writer.buffer-size
否
64 m
单个Upsert Writer数据在Flink中的缓存大小。
说明当所有Bucket的缓冲区大小总和达到预设阈值时,系统将自动触发刷新操作,将数据更新到服务器端。
一个upsert writer里会同时写入多个Bucket,建议提高该值,以提升写入效率。
若写入分区较多时,会存在引发内存OOM风险,可考虑降低该参数值。
upsert.writer.bucket.buffer-size
否
1 m
单个Bucket数据在Flink中的缓存大小,当Flink服务器使用内存资源紧张时,可以减小该参数值。
upsert.write.bucket.num
是
无
写入表的bucket数量,必须与写入表
write.bucket.num
值一致。upsert.write.slot-num
否
1
单个Session使用Tunnel slot数量。
upsert.commit.max-retries
否
3
Upsert Session Commit重试次数。
upsert.commit.thread-num
否
16
Upsert Session Commit的并行度。
不建议将此参数值调整得过大,因为当同时进行的提交并发数越多时,会导致资源消耗增加,可能导致性能问题或资源过度消耗。
upsert.major-compact.min-commits
否
100
发起Major Compact的最小Commit次数。
upsert.commit.timeout
否
600
Upsert Session Commit等待超时时间,单位:秒(s)。
upsert.major-compact.enable
否
false
是否开启Major Compact。
upsert.flush.concurrent
否
2
限制单个分区允许同时写入的最大Bucket数。
说明每当一个bucket的数据刷新时,将会占用一个Tunnel Slot资源。
说明Upsert写入时,参数配置建议详情,请参见Upsert写入参数配置建议。
Insert写入参数
参数
是否必配
默认值
说明
insert.commit.thread-num
否
16
Commit Session的并行度。
insert.arrow-writer.enable
否
false
是否使用Arrow格式。
insert.arrow-writer.batch-size
否
512
Arrow Batch的最大行数。
insert.arrow-writer.flush-interval
否
100000
Writer Flush间隔,单位毫秒(ms)。
insert.writer.buffer-size
否
64 M
使用Buffered Writer的缓存大小。