本文为您介绍大数据计算服务MaxCompute连接器的语法结构、WITH参数和使用示例等。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute。
MaxCompute连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | Batch Tunnel和Stream Tunnel模式仅支持插入数据,Upsert Tunnel模式支持插入、更新和删除数据。 |
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
仅实时计算引擎VVR 2.0.0及以上版本支持MaxCompute连接器。
MaxCompute连接器仅支持At Least Once语义。
说明At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?。
默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。
如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。
说明维表每次更新时都会检查最新分区,不受这一限制。
在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。
语法结构
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为odps。
endpoint
MaxCompute服务地址。
String
是
无
请参见Endpoint。
tunnelEndpoint
MaxCompute Tunnel服务的连接地址。
String
否
无
请参见Endpoint。
说明VPC环境下为必填。
如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。
project
MaxCompute项目名称。
String
是
无
无。
schemaName
MaxCompute Schema名称。
String
否
无
仅当MaxCompute项目开启Schema功能时,需填写该值为MaxCompute表所属Schema名,详情请参见 Schema操作。
说明仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
tableName
MaxCompute表名。
String
是
无
无。
accessId
MaxCompute AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理。
accessKey
MaxCompute AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理。
partition
MaxCompute分区名。
String
否
无
对于非分区表和增量源表无需填写。
说明分区表详情请参见在读取或写入分区时,如何填写Partition参数?。
compressAlgorithm
MaxCompute Tunnel使用的压缩算法。
String
否
VVR 4.0.13及以上版本:ZLIB
VVR 6.0.1及以上版本:SNAPPY
参数取值如下:
RAW(无压缩)
ZLIB
SNAPPY
SNAPPY相比ZLIB能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
quotaName
MaxCompute独享数据传输服务的quota名称。
String
否
无
设置该值来使用独享的MaxCompute数据传输服务。
重要仅实时计算引擎VVR 8.0.3及以上版本支持该参数。
设置该值时,必须删除tunnelEndpoint参数,否则仍将使用tunnelEndpoint中指定的数据通道。
说明MaxCompute独享数据传输参见购买与使用独享数据传输服务资源组。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
maxPartitionCount
可以读取的最大分区数量。
Integer
否
100
如果读取的分区数量超过了该参数,则会出现报错
The number of matched partitions exceeds the default limit
。重要由于一次性读取大量分区会给MaxCompute服务带来一定压力,同时也会让作业启动速度变慢,因此您需要确认是否需要读取这么多分区(而不是误填partition参数)。如果确实需要,需要手动调大maxPartitionCount参数。
useArrow
是否使用Arrow格式读取数据。
Boolean
否
false
使用Arrow格式能够调用MaxCompute的Storage API,详情请参见什么是MaxCompute中用户接口与开放性一节。
重要仅在批作业中生效。
仅实时计算引擎VVR 8.0.8及以上版本支持该参数。
splitSize
在使用Arrow格式读取数据时,一次拉取的数据大小。
MemorySize
否
256 MB
仅实时计算引擎VVR 8.0.8及以上版本支持该参数。
重要仅在批作业中生效。
compressCodec
在使用Arrow格式读取数据时,采用的压缩算法。
String
否
""
参数取值如下:
"" (无压缩)
ZSTD
LZ4_FRAME
指定压缩算法相比无压缩能带来一定的吞吐提升。
重要仅在批作业中生效。
仅实时计算引擎VVR 8.0.8及以上版本支持该参数。
dynamicLoadBalance
是否允许动态分配分片。
Boolean
否
false
参数取值如下:
true:允许
false:不允许
允许动态分配分片能够发挥Flink不同节点的处理性能,减少源表整体读取时间,但也会导致不同节点读取总数据量不一致,出现数据倾斜情况。
重要仅在批作业中生效。
仅实时计算引擎VVR 8.0.8及以上版本支持该参数。
增量源表独有
增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区
year=2023,month=10
字典序小于分区year=2023,month=9
,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09
。参数
说明
数据类型
是否必填
默认值
备注
startPartition
增量读取的起始MaxCompute分区点位(包含)。
String
是
无
使用该参数后启用增量源表模式,将忽略partition参数。
多级分区必须按分区级别从大到小声明每个分区列的值。
说明startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?。
subscribeIntervalInSec
轮询MaxCompute获取分区列表的时间间隔。
Integer
否
30
单位为秒。
modifiedTableOperation
读取分区过程中遇到分区数据被修改时的处理。
Enum (NONE, SKIP)
否
NONE
由于下载session被保存在检查点中,每次从检查点恢复时尝试从该session恢复读取进度,而该session由于分区数据被修改不可用,Flink任务会陷入不断重启。此时您可以设置该参数,参数取值如下:
NONE:需要您修改startPartition参数使其大于不可用分区,并从无状态启动作业。
SKIP:若不希望无状态启动,可将模式修改为SKIP,Flink尝试从检查点恢复session时将跳过不可用的分区。
重要NONE和SKIP模式下,被修改分区中已读取的数据不会被撤回,未读取的数据将不会被读取。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
useStreamTunnel
是否使用MaxCompute Stream Tunnel上传数据。
Boolean
否
false
参数取值如下:
true:使用MaxCompute Stream Tunnel上传数据。
false:使用MaxCompute Batch Tunnel上传数据。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
数据通道选择详情请参见如何选择数据通道?。
flushIntervalMs
MaxCompute Tunnel Writer缓冲区flush间隔。
Long
否
30000(30秒)
MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标MaxCompute表。
对于Stream Tunnel,flush的数据立即可见;对于Batch Tunnel,数据flush后仍需要等待checkpoint完成后才可见,建议设置该参数为0来关闭定时flush。
单位为毫秒。
说明本参数可以与batchSize一同使用,满足任一条件即会Flush数据。
batchSize
MaxCompute Tunnel Writer缓冲区flush的大小。
Long
否
67108864(64 MB)
MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。
单位为字节。
说明仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。
numFlushThreads
MaxCompute Tunnel Writer缓冲区flush的线程数。
Integer
否
1
每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。
说明仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
dynamicPartitionLimit
写入动态分区的最大数量。
Integer
否
100
当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错
Too many dynamic partitions
。重要由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。
retryTimes
向MaxCompute服务器请求最大重试次数。
Integer
否
3
创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用的情况,会根据该配置进行重试。
sleepMillis
重试间隔时间。
Integer
否
1000
单位为毫秒。
enableUpsert
是否使用MaxCompute Upsert Tunnel上传数据。
Boolean
否
false
参数取值如下:
true:使用Upsert Tunnel,处理Flink中的INSERT、UPDATE_AFTER和DELETE数据。
false:根据useStreamTunnel参数使用Batch Tunnel或Stream Tunnel,处理Flink中的INSERT、UPDATE_AFTER数据。
重要若Upsert模式下MaxCompute sink提交时出现报错、失败、耗时长等情况,建议限制sink节点的并发数在10以内。
仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
upsertAsyncCommit
Upsert模式下在提交session时是否使用异步模式。
Boolean
否
false
参数取值如下:
true:使用异步模式,提交耗时更短,但提交完成时写入的数据非立即可读。
false:默认为同步模式,提交时将等待服务侧处理完session。
说明仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
upsertCommitTimeoutMs
Upsert模式下提交session超时时间。
Integer
否
120000
(120秒)
仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
维表独有
MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。
参数
说明
数据类型
是否必填
默认值
备注
cache
缓存策略。
String
是
无
目前MaxCompute维表仅支持
ALL
策略,必须显式声明。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
说明因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
如果MaxCompute维表数据量较大,可以考虑使用SHUFFLE_HASH注解将维表数据均匀分散到各个并发中。详情请参见如何使用维表SHUFFLE_HASH注解?。
在使用超大MaxCompute维表时,如果JVM频繁GC导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议改为支持LRU Cache策略的KV型维表,例如云数据库Hbase版维表。
cacheSize
最多缓存的数据条数。
Long
否
100000
如果维表数据量超过了cacheSize,则会出现报错
Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit
。重要由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。
cacheTTLMs
缓存超时时间,也就是缓存更新的间隔时间。
Long
否
Long.MAX_VALUE(相当于永不更新)
单位为毫秒。
cacheReloadTimeBlackList
更新时间黑名单。在该参数规定的时间段内不会更新缓存。
String
否
无
用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?。
maxLoadRetries
缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。
Integer
否
10
无。
类型映射
MaxCompute支持的类型参见2.0数据类型版本。
MaxCompute类型 | Flink类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
当MaxCompute物理表中同时存在嵌套的复合类型字段(ARRAY、MAP或STRUCT)和JSON类型字段时,需要在创建MaxCompute物理表时指定tblproperties('columnar.nested.type'='true')
,才能被Flink正确读写。
使用示例
SQL
源表示例
全量读取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
增量读取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取 ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
结果表示例
写入固定分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。 ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
写入动态分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --需要显式声明动态分区列。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。 ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
维表示例
一对一维表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- 一对一维表需要声明主键。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一对多维表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- 一对多维表无需声明主键。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了MaxCompute DataStream连接器。
为了保护知识产权,从实时计算引擎VVR6.0.6版本起,此连接器在本地调试单次运行作业的时间为30分钟,30分钟后作业会报错并退出。本地运行和调试包含MaxCompute连接器的作业参见本地运行和调试包含连接器的作业。
若您在Flink开发控制台提交作业后,出现本地运行和调试包含连接器的作业中类似的MaxCompute相关类ClassNotFound问题,请下载Maven中央库中对应版本中后缀为uber.jar的文件,添加为作业的附加依赖。以1.15-vvr-6.0.6版本为例,需下载的文件为该仓库目录下的verveica-connector-odps-1.15-vvr-6.0.6-uber.jar。
MaxCompute连接器的Maven依赖包含了构建全量源表、增量源表、结果表和维表的所需要的类。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${connector.version}</version>
</dependency>
在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流。
连接源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql(String.join( "\n", "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (", " cid VARCHAR,", " rt DOUBLE", ") WITH (", " 'connector' = 'odps',", " 'endpoint' = '<yourEndpointName>',", " 'project' = '<yourProjectName>',", " 'tableName' = '<yourTableName>',", " 'accessId' = '<yourAccessId>',", " 'accessKey' = '<yourAccessPassword>',", " 'partition' = 'ds=201809*'", ")"); DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source")); source.print(); env.execute("odps source");
连接结果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql(String.join( "\n", "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (", " cid VARCHAR,", " rt DOUBLE", ") WITH (", " 'connector' = 'odps',", " 'endpoint' = '<yourEndpointName>',", " 'project' = '<yourProjectName>',", " 'tableName' = '<yourTableName>',", " 'accessId' = '<yourAccessId>',", " 'accessKey' = '<yourAccessPassword>',", " 'partition' = 'ds=20180905'", ")"); DataStream<Row> data = env.fromElements( Row.of("id0", 3.), Row.of("id1", 4.)); tEnv.fromDataStream(data).insertInto("odps_sink").execute();