本文为您介绍如何使用大数据计算服务MaxCompute连接器。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute。
类别 | 详情 |
---|---|
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监控指标。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 |
前提条件
已创建MaxCompute表,详情请参见创建表。
使用限制
- 仅实时计算引擎VVR 2.0.0及以上版本支持MaxCompute连接器。
- MaxCompute连接器仅支持At Least Once语义。说明 At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?。
- 默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。说明
- 维表每次更新时都会检查最新分区,不受这一限制。
- 在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。
语法结构
CREATE TBALE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '<yourAccessKeyId>',
'accessKey' = '<yourAccessKeySecret>',
'partition' = 'ds=2018****'
);
WITH参数
- 通用
参数 说明 数据类型 是否必填 默认值 备注 connector 表类型。 String 是 无 固定值为odps。 endpoint MaxCompute服务地址。 String 是 无 请参见Endpoint。 tunnelEndpoint MaxCompute Tunnel服务的连接地址。 String 否 无 请参见Endpoint。 说明- VPC环境下为必填。
- 如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。
project MaxCompute项目名称。 String 是 无 无。 tableName MaxCompute表名。 String 是 无 无。 accessId MaxCompute AccessKey ID。 String 是 无 无。 accessKey MaxCompute AccessKey Secret。 String 是 无 无。 partition MaxCompute分区名。 String 否 无 对于非分区表和增量源表无需填写。 说明 分区表详情请参见在读取或写入分区时,如何填写Partition参数?。compressAlgorithm MaxCompute Tunnel使用的压缩算法。 String 否 - VVR 4.0.13及以上版本:ZLIB
- VVR 6.0.1及以上版本:SNAPPY
参数取值如下: - RAW(无压缩)
- ZLIB
- SNAPPY
SNAPPY相比ZLIB能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。
说明 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。 - 源表独有
参数 说明 数据类型 是否必填 默认值 备注 maxPartitionCount 可以读取的最大分区数量。 Integer 否 100 如果读取的分区数量超过了该参数,则会出现报错 The number of matched partitions exceeds the default limit
。重要 由于一次性读取大量分区会给MaxCompute服务带来一定压力,同时也会让作业启动速度变慢,因此您需要确认是否需要读取这么多分区(而不是误填partition参数)。如果确实需要,需要手动调大maxPartitionCount参数。 - 增量源表独有
增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区
year=2023,month=10
字典序小于分区year=2023,month=9
,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09
。参数 说明 数据类型 是否必填 默认值 备注 startPartition 增量读取的起始MaxCompute分区点位(包含)。 String 是 无 - 使用该参数后启用增量源表模式,将忽略partition参数。
- 多级分区必须按分区级别从大到小声明每个分区列的值。
说明 startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?。subscribeIntervalInSec 轮询MaxCompute获取分区列表的时间间隔。 Integer 否 30 单位为秒。 - 结果表独有
参数 说明 数据类型 是否必填 默认值 备注 useStreamTunnel 是否使用MaxCompute Stream Tunnel上传数据。 Boolean 否 false 参数取值如下:- true:使用MaxCompute Stream Tunnel上传数据。
- false:使用MaxCompute Batch Tunnel上传数据。
说明- 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
- 数据通道选择详情请参见如何选择数据通道?。
flushIntervalMs MaxCompute Tunnel Writer缓冲区flush间隔。 Long 否 30000(30秒) MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标MaxCompute表。
对于Stream Tunnel,flush的数据立即可见;对于Batch Tunnel,数据flush后仍需要等待checkpoint完成后才可见,建议设置该参数为0来关闭定时flush。
单位为毫秒。
说明 本参数可以与batchSize一同使用,满足任一条件即会Flush数据。batchSize MaxCompute Tunnel Writer缓冲区flush的大小。 Long 否 67108864(64 MB) MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。
单位为字节。
说明- 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
- 本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。
numFlushThreads MaxCompute Tunnel Writer缓冲区flush的线程数。 Integer 否 1 每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。说明 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。dynamicPartitionLimit 写入动态分区的最大数量。 Integer 否 100 当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错 Too many dynamic partitions
。重要 由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。retryTimes 向MaxCompute服务器请求最大重试次数。 Integer 否 3 创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用时,会根据该配置进行重试。 sleepMillis 重试间隔时间。 Integer 否 1000 单位为毫秒。 - 维表独有
MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。
参数 说明 数据类型 是否必填 默认值 备注 cache 缓存策略。 String 是 无 目前MaxCompute维表仅支持 ALL
策略,必须显式声明。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
说明- 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
- 如果MaxCompute维表数据量较大,可以考虑使用SHUFFLE_HASH注解将维表数据均匀分散到各个并发中。详情请参见如何使用维表SHUFFLE_HASH注解?。
- 在使用超大MaxCompute维表时,如果JVM频繁GC导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议改为支持LRU Cache策略的KV型维表,例如云数据库Hbase版维表。
cacheSize 最多缓存的数据条数。 Long 否 100000 如果维表数据量超过了cacheSize,则会出现报错 Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit
。重要 由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。cacheTTLMs 缓存超时时间,也就是缓存更新的间隔时间。 Long 否 Long.MAX_VALUE(相当于永不更新) 单位为毫秒。 cacheReloadTimeBlackList 更新时间黑名单。在该参数规定的时间段内不会更新缓存。 String 否 无 用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?。 maxLoadRetries 缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。 Integer 否 10 无。
类型映射
MaxCompute字段类型 | Flink字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
使用示例
SQL
- 源表示例
- 全量读取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessPassword>', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
- 增量读取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessPassword>', 'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取 ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
- 全量读取
- 结果表示例
- 写入固定分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。 ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
- 写入动态分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --需要显式声明动态分区列。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。 ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
- 写入固定分区
- 维表示例
- 一对一维表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- 一对一维表需要声明主键。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
- 一对多维表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- 一对多维表无需声明主键。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '<yourAccessKeyId>', 'accessKey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
- 一对一维表
DataStream
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${connector.version}</version>
</dependency>
在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流。- 连接源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql(String.join( "\n", "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (", " cid VARCHAR,", " rt DOUBLE", ") WITH (", " 'connector' = 'odps',", " 'endpoint' = '<yourEndpointName>',", " 'project' = '<yourProjectName>',", " 'accessId' = '<yourAccessId>',", " 'accessKey' = '<yourAccessPassword>',", " 'partition' = 'ds=201809*'", ")"); DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source")); source.print(); env.execute("odps source");
- 连接结果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql(String.join( "\n", "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (", " cid VARCHAR,", " rt DOUBLE", ") WITH (", " 'connector' = 'odps',", " 'endpoint' = '<yourEndpointName>',", " 'project' = '<yourProjectName>',", " 'accessId' = '<yourAccessId>',", " 'accessKey' = '<yourAccessPassword>',", " 'partition' = 'ds=20180905'", ")"); DataStream<Row> data = env.fromElements( Row.of("id0", 3.), Row.of("id1", 4.)); tEnv.fromDataStream(data).insertInto("odps_sink"); env.execute("odps sink");
常见问题
- endPoint和tunnelEndpoint是指什么?如果配置错误会产生什么结果?
- 启动作业时出现Akka超时报错,但是全量MaxCompute源表和增量MaxCompute获取Metadata速率正常,应该如何处理?
- 全量MaxCompute和增量MaxCompute是如何读取MaxCompute数据的?
- 引用MaxCompute作为数据源,在作业启动后,向已有的分区或者表里追加数据,这些新数据是否能被全量MaxCompute或增量MaxCompute源表读取?
- 全量MaxCompute和增量MaxCompute源表作业是否支持暂停作业后修改并发数,再恢复作业?
- 作业启动位点设置了
2019-10-11 00:00:00
, 为什么启动位点前的分区也会被全量MaxCompute源表读取? - 增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?
- MaxCompute连接器运行报错:ErrorMessage=Authorization Failed [4019], You have NO privilege
- 如何填写增量MaxCompute的startPartition参数?
- 为什么带有增量MaxCompute源表的作业启动后,迟迟不开始读取数据?
- 在读取或写入分区时,如何填写Partition参数?
- 为什么含有MaxCompute源表的作业一直在启动中,或作业启动成功后过了很久才产生数据?
- 如何选择数据通道?
- 含有MaxCompute结果表的作业运行过程中报错Invalid partition spec
- 含有MaxCompute结果表的作业运行过程中报错No more available blockId
- max_pt()和max_pt_with_done()的区别是什么?
- 如何使用维表SHUFFLE_HASH注解?
- 如何填写CacheReloadTimeBlackList参数?