本文为您介绍MaxCompute结果表的DDL定义、WITH参数、类型映射和代码示例。
实现原理
MaxCompute Sink可以分为以下两个阶段:
- 写入数据。调用MaxCompute SDK中的接口将数据写入缓冲区,在缓冲区大小超过64 MB或者每隔指定的时间间隔时,上传数据到MaxCompute的临时文件中。
- 提交会话。在任务进行Checkpoint时, MaxCompute Sink会调用Tunnel的Commit方法,提交会话,移动临时文件到MaxCompute表的数据目录,并修改元数据。
说明 Commit方法不能提供原子性。因此,MaxCompute Sink提供的是At least Once方式,而不是Exactly Once方式。
DDL定义
Flink支持将MaxCompute作为结果输出,示例代码如下。
create table odps_sink(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'tablename' = '<yourTableName>',
'accessid' = '<yourAccessKeyId>',
'accesskey' = '<yourAccessKeySecret>',
'partition' = 'ds=2018****'
);
说明
- WITH参数需要全部小写。
- DDL定义中的字段名称、顺序和类型必须与MaxCompute物理表保持一致,否则可能导致MaxCompute物理表中查询的数据为
/n
。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 数据结果表类型 | 是 | 固定值为odps 。
|
endPoint | MaxCompute服务地址 | 是 | 参见开通MaxCompute服务的Region和服务连接对照表。 |
tunnelEndpoint | MaxCompute Tunnel服务的连接地址 | 是 | 参见开通MaxCompute服务的Region和服务连接对照表。 |
project | MaxCompute项目名称 | 是 | 无 |
tableName | 表名 | 是 | 无 |
accessId | AccessKey ID | 是 | 无 |
accessKey | AccessKey Secret | 是 | 无 |
partition | 分区名 | 否 | 如果存在分区表,则必填partition。填写partition需要注意以下两点:
|
flushIntervalMs | Odps tunnel writer缓冲区Flush间隔。
MaxCompute Sink写入记录时,先放入数据到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs)时,再把缓冲区里的数据写到目标 MaxCompute表。 |
否 | 单位为毫秒,默认值为30000(30秒)。 |
partitionBy | 写入Sink节点前,系统会根据该值做Hash Shuffle,数据就会流向对应的Sink节点。 | 否 | 系统按照多个列进行Hash Shuffle,各个列名之间使用逗号(,)分割。默认为空。 |
dynamicPartitionLimit | 分区数目最大值 | 否 | 默认值为100,系统会把已写入的分区和TunnelBufferedWriter的映射关系维护到一个Map里,如果该Map大小超过了dynamicPartitionLimit设定值,则会出现Too many dynamic partitions: 100, which exceeds the size limit: 100 报错。
|
类型映射
MaxCompute字段类型 | Flink字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
代码示例
- 写入固定分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) with ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) with ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'tunnelEndpoint' = '<yourTunnelEndpoint>', 'project' = '<yourProjectName>', 'tablename' = '<yourTableName>', 'accessid' = '<yourAccessKeyId>', 'accesskey' = '<yourAccessKeySecret>', 'partition' = 'ds=20180905' ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
- 写入动态分区
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) with ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --动态分区列需要显式声明。 ) with ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'tunnelEndpoint' = '<yourTunnelEndpoint>', 'project' = '<yourProjectName>', 'tablename' = '<yourTableName>', 'accessid' = '<yourAccessKeyId>', 'accesskey' = '<yourAccessKeySecret>', 'partition' ='ds' --不写分区的值,表示根据ds字段的值写入不同分区。 ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
在文档使用中是否遇到以下问题
更多建议
匿名提交