本文为您介绍大数据计算服务MaxCompute连接器的语法结构、WITH参数和使用示例等。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。
MaxCompute连接器支持的信息如下。
|
类别 |
详情 |
|
支持类型 |
源表、维表、结果表,数据摄入目标端 |
|
运行模式 |
流模式和批模式 |
|
数据格式 |
暂不支持 |
|
特有监控指标 |
|
|
API种类 |
Datastream、SQL,数据摄入YAML作业 |
|
是否支持更新或删除结果表数据 |
Batch Tunnel和Stream Tunnel模式仅支持插入数据,Upsert Tunnel模式支持插入、更新和删除数据。 |
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
-
MaxCompute连接器仅支持At Least Once语义。
说明At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?。
-
默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。
如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。
说明-
维表每次更新时都会检查最新分区,不受这一限制。
-
在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。
-
SQL
MaxCompute连接器可以在SQL作业中使用,作为源表,维表或者结果表。
语法结构
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
WITH参数
通用
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
connector |
表类型。 |
String |
是 |
无 |
固定值为odps。 |
|
endpoint |
MaxCompute服务地址。 |
String |
是 |
无 |
请参见Endpoint。 |
|
tunnelEndpoint |
MaxCompute Tunnel服务的连接地址。 |
String |
否 |
无 |
请参见Endpoint。 说明
如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。 |
|
project |
MaxCompute项目名称。 |
String |
是 |
无 |
无。 |
|
schemaName |
MaxCompute Schema名称。 |
String |
否 |
无 |
仅当MaxCompute项目开启Schema功能时,需填写该值为MaxCompute表所属Schema名,详情请参见 Schema操作。 说明
仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 |
|
tableName |
MaxCompute表名。 |
String |
是 |
无 |
无。 |
|
accessId |
MaxCompute AccessKey ID。 |
String |
是 |
无 |
详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要
为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 |
|
accessKey |
MaxCompute AccessKey Secret。 |
String |
是 |
无 |
|
|
partition |
MaxCompute分区名。 |
String |
否 |
无 |
对于非分区表和增量源表无需填写。 说明
分区表详情请参见在读取或写入分区时,如何填写Partition参数?。 |
|
compressAlgorithm |
MaxCompute Tunnel使用的压缩算法。 |
String |
否 |
SNAPPY |
参数取值如下:
|
|
quotaName |
MaxCompute独享数据传输服务的quota名称。 |
String |
否 |
无 |
设置该值来使用独享的MaxCompute数据传输服务。 重要
说明
MaxCompute独享数据传输参见购买与使用独享数据传输服务资源组。 |
源表独有
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
maxPartitionCount |
可以读取的最大分区数量。 |
Integer |
否 |
100 |
如果读取的分区数量超过了该参数,则会出现报错 重要
一次性读取过多分区会增加 MaxCompute 负载并拖慢作业启动,建议确认partition参数是否误配。如确需读取大量分区,请手动调大maxPartitionCount |
|
useArrow |
是否使用Arrow格式读取数据。 |
Boolean |
否 |
false |
使用Arrow格式能够调用MaxCompute的Storage API。 重要
|
|
splitSize |
在使用Arrow格式读取数据时,一次拉取的数据大小。 |
MemorySize |
否 |
256 MB |
仅实时计算引擎VVR 8.0.8及以上版本支持该参数。 重要
仅在批作业中生效。 |
|
compressCodec |
在使用Arrow格式读取数据时,采用的压缩算法。 |
String |
否 |
"" |
参数取值如下:
指定压缩算法相比无压缩能带来一定的吞吐提升。 重要
|
|
dynamicLoadBalance |
是否允许动态分配分片。 |
Boolean |
否 |
false |
参数取值如下:
允许动态分配分片能够发挥Flink不同节点的处理性能,减少源表整体读取时间,但也会导致不同节点读取总数据量不一致,出现数据倾斜情况。 重要
|
增量源表独有
增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区year=2023,month=10字典序小于分区year=2023,month=9,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09。
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
startPartition |
增量读取的起始MaxCompute分区点位(包含)。 |
String |
是 |
无 |
说明
startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?。 |
|
subscribeIntervalInSec |
轮询MaxCompute获取分区列表的时间间隔。 |
Integer |
否 |
30 |
单位为秒。 |
|
modifiedTableOperation |
读取分区过程中遇到分区数据被修改时的处理。 |
Enum (NONE, SKIP) |
否 |
NONE |
由于下载session被保存在检查点中,每次从检查点恢复时尝试从该session恢复读取进度,而该session由于分区数据被修改不可用,Flink任务会陷入不断重启。此时您可以设置该参数,参数取值如下:
重要
|
结果表独有
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
useStreamTunnel |
是否使用MaxCompute Stream Tunnel上传数据。 |
Boolean |
否 |
false |
参数取值如下:
说明
数据通道选择详情请参见如何选择数据通道?。 |
|
flushIntervalMs |
MaxCompute Tunnel Writer缓冲区flush间隔。 |
Long |
否 |
30000(30秒) |
先将数据写入缓冲区,待缓冲区满或达到 flushIntervalMs 间隔后,批量写入目标表。
单位为毫秒。 说明
本参数可以与batchSize一同使用,满足任一条件即会Flush数据。 |
|
batchSize |
MaxCompute Tunnel Writer缓冲区flush的大小。 |
Long |
否 |
67108864(64 MB) |
单位为字节。 写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。 说明
本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。 |
|
numFlushThreads |
MaxCompute Tunnel Writer缓冲区flush的线程数。 |
Integer |
否 |
1 |
每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。 |
|
slotNum |
MaxCompute Tunnel Writer使用的slot数。 |
Integer |
否 |
0 |
slot数的限制请参见数据传输服务概述。 |
|
dynamicPartitionLimit |
写入动态分区的最大数量。 |
Integer |
否 |
100 |
当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错 重要
由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。 |
|
retryTimes |
向MaxCompute服务器请求最大重试次数。 |
Integer |
否 |
3 |
创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用的情况,会根据该配置进行重试。 |
|
sleepMillis |
重试间隔时间。 |
Integer |
否 |
1000 |
单位为毫秒。 |
|
enableUpsert |
是否使用MaxCompute Upsert Tunnel上传数据。 |
Boolean |
否 |
false |
参数取值如下:
重要
|
|
upsertAsyncCommit |
Upsert模式下在提交session时是否使用异步模式。 |
Boolean |
否 |
false |
参数取值如下:
说明
仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 |
|
upsertCommitTimeoutMs |
Upsert模式下提交session超时时间。 |
Integer |
否 |
120000 (120秒) |
单位毫秒。 说明
仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 |
|
sink.operation |
写入Delta Table时的写入模式。 |
String |
否 |
insert |
参数取值如下:
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.parallelism |
写入Delta Table时的并行度 |
Integer |
否 |
None |
重要
确保Delta Table表属性 write.bucket.num 是该配置值的整数倍,这样可以获得最佳的写入性能,并且能够最有效地节省 Sink 节点内存。 |
|
sink.file-cached.enable |
写入Delta table动态分区时,是否使用文件缓存模式。 |
Boolean |
否 |
false |
参数取值如下:
使用文件缓存模式能够减少写入服务端的小文件数量,但是写出数据的延迟更高。在结果表并行度较高时建议使用文件缓存模式。 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.file-cached.writer.num |
文件缓存模式下,单个Task上传数据的并发数。 |
Integer |
否 |
16 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.bucket.check-interval |
文件缓存模式下,检查文件大小的周期,单位:毫秒(ms)。 |
Integer |
否 |
60000 |
仅在设置了 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.file-cached.rolling.max-size |
文件缓存模式下,单个缓存文件的最大值。 |
MemorySize |
否 |
16 M |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.file-cached.memory |
文件缓存模式下,写入文件使用的最大堆外内存大小。 |
MemorySize |
否 |
64 M |
仅在设置了 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.file-cached.memory.segment-size |
文件缓存模式下,写入文件的使用的buffer大小。 |
MemorySize |
否 |
128 KB |
仅在设置了 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.file-cached.flush.always |
文件缓存模式下,写入文件是否使用缓存。 |
Boolean |
否 |
true |
仅在设置了 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
sink.file-cached.write.max-retries |
文件缓存模式下,上传数据的重试次数。 |
Integer |
否 |
3 |
仅在设置了 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.writer.max-retries |
Upsert Writer写入Bucket失败后的重试次数。 |
Integer |
否 |
3 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.writer.buffer-size |
单个Upsert Writer数据在Flink中的缓存大小。 |
MemorySize |
否 |
64 m |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.writer.bucket.buffer-size |
单个Bucket数据在Flink中的缓存大小。 |
MemorySize |
否 |
1 m |
当集群内存资源紧张时,可以减小该参数值。 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.write.bucket.num |
写入表的bucket数量。 |
Integer |
是 |
None |
必须与写入Delta Table表的 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.write.slot-num |
单个Session使用Tunnel slot数量。 |
Integer |
否 |
1 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.commit.max-retries |
Upsert Session Commit重试次数。 |
Integer |
否 |
3 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.commit.thread-num |
Upsert Session Commit的并行度。 |
Integer |
否 |
16 |
不建议将此参数值调整得过大,因为当同时进行的提交并发数越多时,会导致资源消耗增加,可能导致性能问题或资源过度消耗。 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.commit.timeout |
Upsert Session Commit等待超时时间,单位:秒(s)。 |
Integer |
否 |
600 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.flush.concurrent |
限制单个分区允许同时写入的最大Bucket数。 |
Integer |
否 |
2 |
每当一个bucket的数据刷新时,将会占用一个Tunnel Slot资源。 说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
insert.commit.thread-num |
Commit Session的并行度。 |
Integer |
否 |
16 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
insert.arrow-writer.enable |
是否使用Arrow格式。 |
Boolean |
否 |
false |
参数取值如下:
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
insert.arrow-writer.batch-size |
Arrow Batch的最大行数。 |
Integer |
否 |
512 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
insert.arrow-writer.flush-interval |
Writer Flush间隔,单位毫秒(ms)。 |
Integer |
否 |
100000 |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
insert.writer.buffer-size |
使用Buffered Writer的缓存大小。 |
MemorySize |
否 |
64 M |
说明
仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
|
upsert.partial-column.enable |
是否仅更新部分列。 |
Boolean |
否 |
false |
只在结果表类型为Delta Table时生效,详情请参见部分列更新。 参数取值如下:
根据结果表是否存在更新数据的主键,数据写入分以下几种情况:
说明
仅实时计算引擎VVR 8.0.11及以上版本支持该参数。 |
维表独有
MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
cache |
缓存策略。 |
String |
是 |
无 |
目前MaxCompute维表仅支持 ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。 说明
|
|
cacheSize |
最多缓存的数据条数。 |
Long |
否 |
100000 |
如果维表数据量超过了cacheSize,则会出现报错 重要
由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。 |
|
cacheTTLMs |
缓存超时时间,也就是缓存更新的间隔时间。 |
Long |
否 |
Long.MAX_VALUE(相当于永不更新) |
单位为毫秒。 |
|
cacheReloadTimeBlackList |
更新时间黑名单。在该参数规定的时间段内不会更新缓存。 |
String |
否 |
无 |
用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?。 |
|
maxLoadRetries |
缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。 |
Integer |
否 |
10 |
无。 |
类型映射
MaxCompute支持的类型参见2.0数据类型版本。
|
MaxCompute类型 |
Flink类型 |
|
BOOLEAN |
BOOLEAN |
|
TINYINT |
TINYINT |
|
SMALLINT |
SMALLINT |
|
INT |
INTEGER |
|
BIGINT |
BIGINT |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
DECIMAL(precision, scale) |
DECIMAL(precision, scale) |
|
CHAR(n) |
CHAR(n) |
|
VARCHAR(n) |
VARCHAR(n) |
|
STRING |
STRING |
|
BINARY |
BYTES |
|
DATE |
DATE |
|
DATETIME |
TIMESTAMP(3) |
|
TIMESTAMP |
TIMESTAMP(9) |
|
TIMESTAMP_NTZ |
TIMESTAMP(9) |
|
ARRAY |
ARRAY |
|
MAP |
MAP |
|
STRUCT |
ROW |
|
JSON |
STRING |
当MaxCompute物理表中同时存在嵌套的复合类型字段(ARRAY、MAP或STRUCT)和JSON类型字段时,需要在创建MaxCompute物理表时指定tblproperties('columnar.nested.type'='true'),才能被Flink正确读写。
数据摄入(公测中)
MaxCompute连接器可以用于数据摄入YAML作业开发,作为目标端写入。
使用限制
仅实时计算引擎VVR 11.1及以上版本支持。
语法结构
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8
配置项
|
配置项 |
是否必填 |
默认值 |
类型 |
描述 |
|
type |
是 |
无 |
String |
指定要使用的连接器,这里需要设置成 |
|
name |
否 |
无 |
String |
Sink的名称。 |
|
access-id |
是 |
无 |
String |
阿里云账号或RAM用户的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。 |
|
access-key |
是 |
无 |
String |
AccessKey ID对应的AccessKey Secret。 |
|
endpoint |
是 |
无 |
String |
MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见 Endpoint。 |
|
project |
是 |
无 |
String |
MaxCompute项目名称。您可以登录MaxCompute控制台,在工作区>项目管理页面获取MaxCompute项目名称。 |
|
tunnel.endpoint |
否 |
无 |
String |
MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的项目所在的地域进行自动路由。仅在使用代理等特殊网络环境下使用该配置。 |
|
quota.name |
否 |
无 |
String |
MaxCompute数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参见购买与使用独享数据传输服务资源组。 |
|
sts-token |
否 |
无 |
String |
当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。 |
|
buckets-num |
否 |
16 |
Integer |
自动创建MaxCompute Delta表时使用的桶数。使用方式请参见近实时数仓概述。 |
|
compress.algorithm |
否 |
zlib |
String |
写入MaxCompute时使用的数据压缩算法,当前支持 |
|
total.buffer-size |
否 |
64MB |
String |
内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。 |
|
bucket.buffer-size |
否 |
4MB |
String |
内存中缓冲的数据量大小,单位为桶级,仅写入Delta表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。 |
|
commit.thread-num |
否 |
16 |
Integer |
Checkpoint阶段,能够同时处理的分区(表)数量。 |
|
flush.concurrent-num |
否 |
4 |
Integer |
写入数据到MaxCompute时,能够同时写入的桶数量。仅写入Delta表时生效。 |
表位置映射
连接器自动建表时,使用如下映射关系,将源表的位置信息映射到MaxCompute表中。
当MaxCompute项目不支持Schema模型时,以上游MySQL为例,每个同步任务仅能同步一个MySQL Database。(其他数据源同理,连接器Connector会忽略tableId.namespace信息)。
|
数据摄入作业中对象 |
MaxCompute位置 |
MySQL位置 |
|
配置中的Project参数 |
Project |
none |
|
TableId.namespace |
Schema(仅当MaxCompute项目支持Schema模型时,如不支持,将忽略该配置) |
Database |
|
TableId.tableName |
Table |
Table |
类型映射
|
CDC类型 |
MaxCompute类型 |
|
CHAR |
STRING |
|
VARCHAR |
STRING |
|
BOOLEAN |
BOOLEAN |
|
BINARY/VARBINARY |
BINARY |
|
DECIMAL |
DECIMAL |
|
TINYINT |
TINYINT |
|
SMALLINT |
SMALLINT |
|
INTEGER |
INTEGER |
|
BIGINT |
BIGINT |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
TIME_WITHOUT_TIME_ZONE |
STRING |
|
DATE |
DATE |
|
TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_NTZ |
|
TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision>3) |
TIMESTAMP |
|
TIMESTAMP_WITH_LOCAL_TIME_ZONE(Precision<=3) |
DATETIME |
|
TIMESTAMP_WITH_TIME_ZONE(Precision>3) |
TIMESTAMP |
|
TIMESTAMP_WITH_TIME_ZONE(Precision<=3) |
DATETIME |
|
ARRAY |
ARRAY |
|
MAP |
MAP |
|
ROW |
STRUCT |
使用示例
SQL
源表示例
全量读取
默认情况下源表为全量模式,读取partition参数中指定的分区。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
增量读取
从指定的startPartition开始增量读取。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
结果表示例
写入固定分区
指定partition固定分区值。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;
写入动态分区
根据表分区字段指定partition。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR --需要显式声明动态分区列。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;
维表示例
一对一维表
一对一维表需要声明主键。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- 一对一维表需要声明主键。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一对多维表
一对多维表无需声明主键。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- 一对多维表无需声明主键。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream
-
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
-
为了保护知识产权,从实时计算引擎VVR6.0.6版本起,此连接器在本地调试单次运行作业的时间为30分钟,30分钟后作业会报错并退出。本地运行和调试包含MaxCompute连接器的作业,请参见本地运行和调试包含连接器的作业。
-
暂不支持读取Delta Table,即建表时指定了
primary key和transactional=true的表,详情请参见基本概念。
在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流
连接源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
连接结果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
XML
MaxCompute连接器的Maven依赖包含了构建全量源表、增量源表、结果表和维表的所需要的类。Maven中央库中已经放置了MaxCompute DataStream连接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>