本文为您介绍大数据计算服务MaxCompute连接器的语法结构、WITH参数和使用示例等。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute。
MaxCompute连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | Batch Tunnel和Stream Tunnel模式仅支持插入数据,Upsert Tunnel模式支持插入、更新和删除数据。 |
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
MaxCompute连接器仅支持At Least Once语义。
说明At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?。
默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。
如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。
说明维表每次更新时都会检查最新分区,不受这一限制。
在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。
SQL
MaxCompute连接器可以在SQL作业中使用,作为源表,维表或者结果表。
语法结构
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为odps。 |
endpoint | MaxCompute服务地址。 | String | 是 | 无 | 请参见Endpoint。 |
tunnelEndpoint | MaxCompute Tunnel服务的连接地址。 | String | 否 | 无 | 请参见Endpoint。 说明 如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。 |
project | MaxCompute项目名称。 | String | 是 | 无 | 无。 |
schemaName | MaxCompute Schema名称。 | String | 否 | 无 | 仅当MaxCompute项目开启Schema功能时,需填写该值为MaxCompute表所属Schema名,详情请参见 Schema操作。 说明 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 |
tableName | MaxCompute表名。 | String | 是 | 无 | 无。 |
accessId | MaxCompute AccessKey ID。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要 为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 |
accessKey | MaxCompute AccessKey Secret。 | String | 是 | 无 | |
partition | MaxCompute分区名。 | String | 否 | 无 | 对于非分区表和增量源表无需填写。 说明 分区表详情请参见在读取或写入分区时,如何填写Partition参数?。 |
compressAlgorithm | MaxCompute Tunnel使用的压缩算法。 | String | 否 |
| 参数取值如下:
说明 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。 |
quotaName | MaxCompute独享数据传输服务的quota名称。 | String | 否 | 无 | 设置该值来使用独享的MaxCompute数据传输服务。 重要
说明 MaxCompute独享数据传输参见购买与使用独享数据传输服务资源组。 |
源表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
maxPartitionCount | 可以读取的最大分区数量。 | Integer | 否 | 100 | 如果读取的分区数量超过了该参数,则会出现报错 重要 由于一次性读取大量分区会给MaxCompute服务带来一定压力,同时也会让作业启动速度变慢,因此您需要确认是否需要读取这么多分区(而不是误填partition参数)。如果确实需要,需要手动调大maxPartitionCount参数。 |
useArrow | 是否使用Arrow格式读取数据。 | Boolean | 否 | false | 使用Arrow格式能够调用MaxCompute的Storage API,详情请参见什么是MaxCompute中用户接口与开放性一节。 重要
|
splitSize | 在使用Arrow格式读取数据时,一次拉取的数据大小。 | MemorySize | 否 | 256 MB | 仅实时计算引擎VVR 8.0.8及以上版本支持该参数。 重要 仅在批作业中生效。 |
compressCodec | 在使用Arrow格式读取数据时,采用的压缩算法。 | String | 否 | "" | 参数取值如下:
指定压缩算法相比无压缩能带来一定的吞吐提升。 重要
|
dynamicLoadBalance | 是否允许动态分配分片。 | Boolean | 否 | false | 参数取值如下:
允许动态分配分片能够发挥Flink不同节点的处理性能,减少源表整体读取时间,但也会导致不同节点读取总数据量不一致,出现数据倾斜情况。 重要
|
增量源表独有
增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区year=2023,month=10
字典序小于分区year=2023,month=9
,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09
。
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
startPartition | 增量读取的起始MaxCompute分区点位(包含)。 | String | 是 | 无 |
说明 startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?。 |
subscribeIntervalInSec | 轮询MaxCompute获取分区列表的时间间隔。 | Integer | 否 | 30 | 单位为秒。 |
modifiedTableOperation | 读取分区过程中遇到分区数据被修改时的处理。 | Enum (NONE, SKIP) | 否 | NONE | 由于下载session被保存在检查点中,每次从检查点恢复时尝试从该session恢复读取进度,而该session由于分区数据被修改不可用,Flink任务会陷入不断重启。此时您可以设置该参数,参数取值如下:
重要
|
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
useStreamTunnel | 是否使用MaxCompute Stream Tunnel上传数据。 | Boolean | 否 | false | 参数取值如下:
说明
|
flushIntervalMs | MaxCompute Tunnel Writer缓冲区flush间隔。 | Long | 否 | 30000(30秒) | MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标MaxCompute表。 对于Stream Tunnel,flush的数据立即可见;对于Batch Tunnel,数据flush后仍需要等待checkpoint完成后才可见,建议设置该参数为0来关闭定时flush。 单位为毫秒。 说明 本参数可以与batchSize一同使用,满足任一条件即会Flush数据。 |
batchSize | MaxCompute Tunnel Writer缓冲区flush的大小。 | Long | 否 | 67108864(64 MB) | MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。 单位为字节。 说明
|
numFlushThreads | MaxCompute Tunnel Writer缓冲区flush的线程数。 | Integer | 否 | 1 | 每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。 说明 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。 |
slotNum | MaxCompute Tunnel Writer使用的slot数。 | Integer | 否 | 0 | slot数的限制请参见数据传输服务概述。 |
dynamicPartitionLimit | 写入动态分区的最大数量。 | Integer | 否 | 100 | 当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错 重要 由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。 |
retryTimes | 向MaxCompute服务器请求最大重试次数。 | Integer | 否 | 3 | 创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用的情况,会根据该配置进行重试。 |
sleepMillis | 重试间隔时间。 | Integer | 否 | 1000 | 单位为毫秒。 |
enableUpsert | 是否使用MaxCompute Upsert Tunnel上传数据。 | Boolean | 否 | false | 参数取值如下:
重要
|
upsertAsyncCommit | Upsert模式下在提交session时是否使用异步模式。 | Boolean | 否 | false | 参数取值如下:
说明 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 |
upsertCommitTimeoutMs | Upsert模式下提交session超时时间。 | Integer | 否 | 120000 (120秒) | 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 |
sink.operation | 写入Delta Table时的写入模式。 | String | 否 | insert | 参数取值如下:
说明 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
sink.parallelism | 写入Delta Table时的并行度 | Integer | 否 | None |
重要 确保Delta Table表属性 write.bucket.num 是该配置值的整数倍,这样可以获得最佳的写入性能,并且能够最有效地节省 Sink 节点内存。 |
sink.file-cached.enable | 写入Delta table动态分区时,是否使用文件缓存模式。 | Boolean | 否 | false | 参数取值如下:
使用文件缓存模式能够减少写入服务端的小文件数量,但是写出数据的延迟更高。在结果表并行度较高时建议使用文件缓存模式。 说明 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
sink.file-cached.writer.num | 文件缓存模式下,单个Task上传数据的并发数。 | Integer | 否 | 16 |
|
sink.bucket.check-interval | 文件缓存模式下,检查文件大小的周期,单位:毫秒(ms)。 | Integer | 否 | 60000 |
|
sink.file-cached.rolling.max-size | 文件缓存模式下,单个缓存文件的最大值。 | MemorySize | 否 | 16 M |
|
sink.file-cached.memory | 文件缓存模式下,写入文件使用的最大堆外内存大小。 | MemorySize | 否 | 64 M |
|
sink.file-cached.memory.segment-size | 文件缓存模式下,写入文件的使用的buffer大小。 | MemorySize | 否 | 128 KB |
|
sink.file-cached.flush.always | 文件缓存模式下,写入文件是否使用缓存。 | Boolean | 否 | true |
|
sink.file-cached.write.max-retries | 文件缓存模式下,上传数据的重试次数。 | Integer | 否 | 3 |
|
upsert.writer.max-retries | Upsert Writer写入Bucket失败后的重试次数。 | Integer | 否 | 3 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
upsert.writer.buffer-size | 单个Upsert Writer数据在Flink中的缓存大小。 | MemorySize | 否 | 64 m |
说明 一个upsert writer里会同时写入多个Bucket,建议提高该值,以提升写入效率。 若写入分区较多时,会存在引发内存OOM风险,可考虑降低该参数值。
|
upsert.writer.bucket.buffer-size | 单个Bucket数据在Flink中的缓存大小。 | MemorySize | 否 | 1 m |
|
upsert.write.bucket.num | 写入表的bucket数量。 | Integer | 是 | None |
|
upsert.write.slot-num | 单个Session使用Tunnel slot数量。 | Integer | 否 | 1 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
upsert.commit.max-retries | Upsert Session Commit重试次数。 | Integer | 否 | 3 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
upsert.commit.thread-num | Upsert Session Commit的并行度。 | Integer | 否 | 16 |
|
upsert.commit.timeout | Upsert Session Commit等待超时时间,单位:秒(s)。 | Integer | 否 | 600 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
upsert.flush.concurrent | 限制单个分区允许同时写入的最大Bucket数。 | Integer | 否 | 2 |
|
insert.commit.thread-num | Commit Session的并行度。 | Integer | 否 | 16 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
insert.arrow-writer.enable | 是否使用Arrow格式。 | Boolean | 否 | false | 参数取值如下:
说明 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
insert.arrow-writer.batch-size | Arrow Batch的最大行数。 | Integer | 否 | 512 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
insert.arrow-writer.flush-interval | Writer Flush间隔,单位毫秒(ms)。 | Integer | 否 | 100000 | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
insert.writer.buffer-size | 使用Buffered Writer的缓存大小。 | MemorySize | 否 | 64 M | 仅实时计算引擎VVR 8.0.10及以上版本支持该参数。 |
upsert.partial-column.enable | 是否仅更新部分列。 | Boolean | 否 | false | 只在结果表类型为Delta Table时生效,详情请参见部分列更新。 参数取值如下:
根据结果表是否存在更新数据的主键,数据写入分以下几种情况:
说明 仅实时计算引擎VVR 8.0.11及以上版本支持该参数。 |
维表独有
MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
cache | 缓存策略。 | String | 是 | 无 | 目前MaxCompute维表仅支持 ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。 说明
|
cacheSize | 最多缓存的数据条数。 | Long | 否 | 100000 | 如果维表数据量超过了cacheSize,则会出现报错 重要 由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。 |
cacheTTLMs | 缓存超时时间,也就是缓存更新的间隔时间。 | Long | 否 | Long.MAX_VALUE(相当于永不更新) | 单位为毫秒。 |
cacheReloadTimeBlackList | 更新时间黑名单。在该参数规定的时间段内不会更新缓存。 | String | 否 | 无 | 用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?。 |
maxLoadRetries | 缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。 | Integer | 否 | 10 | 无。 |
类型映射
MaxCompute支持的类型参见2.0数据类型版本。
MaxCompute类型 | Flink类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
当MaxCompute物理表中同时存在嵌套的复合类型字段(ARRAY、MAP或STRUCT)和JSON类型字段时,需要在创建MaxCompute物理表时指定tblproperties('columnar.nested.type'='true')
,才能被Flink正确读写。
数据摄入
MaxCompute连接器可以用于数据摄入YAML作业开发,作为目标端写入。
使用限制
仅实时计算引擎VVR 11.1及以上版本支持。
语法结构
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8
配置项
配置项 | 是否必填 | 默认值 | 类型 | 描述 |
type | 是 | none | String | 指定要使用的连接器,这里需要设置成 |
name | 否 | none | String | Sink的名称。 |
access-id | 是 | none | String | 阿里云账号或RAM用户的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。 |
access-key | 是 | none | String | AccessKey ID对应的AccessKey Secret。 |
endpoint | 是 | none | String | MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见 Endpoint。 |
project | 是 | none | String | MaxCompute项目名称。您可以登录MaxCompute控制台,在工作区>项目管理页面获取MaxCompute项目名称。 |
tunnel.endpoint | 否 | none | String | MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的项目所在的地域进行自动路由。仅在使用代理等特殊网络环境下使用该配置。 |
quota.name | 否 | none | String | MaxCompute数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参见购买与使用独享数据传输服务资源组。 |
sts-token | 否 | none | String | 当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。 |
buckets-num | 否 | 16 | Integer | 自动创建MaxCompute Delta表时使用的桶数。使用方式请参见近实时数仓概述。 |
compress.algorithm | 否 | zlib | String | 写入MaxCompute时使用的数据压缩算法,当前支持 |
total.buffer-size | 否 | 64MB | String | 内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。 |
bucket.buffer-size | 否 | 4MB | String | 内存中缓冲的数据量大小,单位为桶级,仅写入Delta表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。 |
commit.thread-num | 否 | 16 | Integer | Checkpoint阶段,能够同时处理的分区(表)数量。 |
flush.concurrent-num | 否 | 4 | Integer | 写入数据到MaxCompute时,能够同时写入的桶数量。仅写入Delta表时生效。 |
表位置映射
连接器自动建表时,使用如下映射关系,将源表的位置信息映射到MaxCompute表中。
当MaxCompute项目不支持Schema模型时,以上游MySQL为例,每个同步任务仅能同步一个MySQL Database。(其他数据源同理,连接器Connector会忽略tableId.namespace信息)。
数据摄入作业中对象 | MaxCompute位置 | MySQL位置 |
配置中的Project参数 | Project | none |
TableId.namespace | Schema(仅当MaxCompute项目支持Schema模型时,如不支持,将忽略该配置) | Database |
TableId.tableName | Table | Table |
类型映射
CDC类型 | MaxCompute类型 |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
使用示例
SQL
源表示例
全量读取
默认情况下源表为全量模式,读取partition
参数中指定的分区。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
增量读取
从指定的startPartition
开始增量读取。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
结果表示例
写入固定分区
指定partition
固定分区值。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;
写入动态分区
根据表分区字段指定partition
。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR --需要显式声明动态分区列。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;
维表示例
一对一维表
一对一维表需要声明主键。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- 一对一维表需要声明主键。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一对多维表
一对多维表无需声明主键。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- 一对多维表无需声明主键。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
为了保护知识产权,从实时计算引擎VVR6.0.6版本起,此连接器在本地调试单次运行作业的时间为30分钟,30分钟后作业会报错并退出。本地运行和调试包含MaxCompute连接器的作业,请参见本地运行和调试包含连接器的作业。
暂不支持读取Delta Table,即建表时指定了
primary key
和transactional=true
的表,详情请参见基本概念。
在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流
连接源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
连接结果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
XML
MaxCompute连接器的Maven依赖包含了构建全量源表、增量源表、结果表和维表的所需要的类。Maven中央库中已经放置了MaxCompute DataStream连接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>