实时计算Flink版内置插件支持通过批量数据通道写入MaxCompute,受到批量数据通道并发数及存储文件数影响,内置版本插件会有性能瓶颈。MaxCompute提供了使用流式数据通道的Flink插件,支持使用Flink在高并发、高QPS场景下写入MaxCompute。
前提条件
已开通实时计算Flink版的Blink服务并创建Blink项目。
更多开通Blink及创建Blink项目的信息。
已安装使用流式数据通道的Flink插件。
背景信息
实时计算Flink版可以调用MaxCompute SDK中的接口将数据写入缓冲区,当缓冲区的大小超过指定的大小(默认为1 MB)或每隔指定的时间间隔时,将数据上传至MaxCompute结果表中。
建议Flink同步MaxCompute并发数大于32或Flush间隔小于60秒的场景下,使用MaxCompute自定义插件。其他场景可以随意选择Flink内置插件和MaxCompute自定义插件。
MaxCompute与实时计算Flink版的字段类型对照关系如下。
MaxCompute字段类型 | 实时计算Flink版字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
使用限制
该功能的使用限制如下:
本插件仅支持Blink 3.2.1及以上版本。
MaxCompute中的聚簇表不支持作为MaxCompute结果表。
语法示例
您需要在Flink控制台新建作业,创建MaxCompute结果表。
DDL语句中定义的字段需要与MaxCompute物理表中的字段名称、顺序以及类型保持一致,否则可能导致在MaxCompute物理表中查询的数据为/n
。
命令示例如下:
create table odps_output(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
type ='custom',
class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
endpoint = '<YourEndPoint>',
project = '<YourProjectName>',
`table` = '<YourtableName>',
access_id = '<yourAccessKeyId>',
access_key = '<yourAccessKeySecret>',
`partition` = 'ds=2018****'
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
type | 结果表的类型。 | 是 | 固定值为 |
class | 插件入口类。 | 是 | 固定值为 |
endpoint | MaxCompute服务地址。 | 是 | |
tunnel_endpoint | MaxCompute Tunnel服务的连接地址。 | 否 | 说明 VPC环境下必填。 |
project | MaxCompute项目名称。 | 是 | 无 |
table | MaxCompute物理表名称。 | 是 | 无 |
access_id | 可以访问MaxCompute项目的AccessKey ID。 | 是 | 无 |
access_key | AccessKey ID对应的AccessKey Secret。 | 是 | 无 |
partition | 分区表的分区名称。 | 否 | 如果表为分区表则必填:
|
enable_dynamic_partition | 设置是否开启动态分区机制。 | 否 | 默认值为False。 |
dynamic_partition_limit | 设置最大并发分区数。动态分区模式会为每个分区分配一个缓冲区,缓冲区大小通过flush_batch_size参数控制,所以动态分区模式最大会占用分区数量×缓冲区大小的内存。例如100个分区,每个分区1 MB,则最大占用内存为100 MB。 | 否 | 默认值为100。系统内存中会维护一个分区到Writer的Map,如果这个Map的大小超过了dynamicPartitionLimit的值,系统会通过LRU(Least Recently Used)的规则尝试淘汰没有数据写入的分区。如果所有分区都有数据写入,则会出现 |
flush_batch_size | 数据缓冲区大小,单位字节。缓冲区数据写满后会触发Flush操作,将数据发送到MaxCompute。 | 否 | 默认值为1048576,即1 MB。 |
flush_interval_ms | 缓冲区Flush间隔,单位毫秒。 MaxCompute Sink写入数据时,先将数据放到MaxCompute的缓冲区中,等缓冲区溢出或每隔一段时间(flush_interval_ms)时,再把缓冲区中的数据写到目标 MaxCompute表。 | 否 | 默认值为-1,即不设置主动Flush间隔。 |
flush_retry_count | 数据Flush失败重试次数,在缓冲区Flush失败的场景下自动重试。 | 否 | 默认值为10,即重试10次。 |
flush_retry_interval_sec | Flush失败重试的时间间隔,单位秒。 | 否 | 默认值为1,即1秒。 |
flush_retry_strategy | Flush失败重试策略,多次重试的时间间隔增长策略,配合flush_retry_interval_sec使用。包含如下三种策略:
| 否 | 默认值为 |
类型映射
MaxCompute字段类型 | 实时计算Flink版字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
代码示例
包含MaxCompute结果表的实时计算Flink版作业代码示例如下:
写入固定分区
create table source ( id INT, len INT, content VARCHAR ) with ( type = 'random' ); create table odps_sink ( id INT, len INT, content VARCHAR ) with ( type='custom', class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink', endpoint = '<yourEndpoint>', project = '<yourProjectName>', `table` = '<yourTableName>', accessId = '<yourAccessId>', accessKey = '<yourAccessPassword>', `partition` = 'ds=20180418' ); insert into odps_sink select id, len, content from source;
写入动态分区
create table source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) with ( type = 'random' ); create table odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --动态分区列需要显式写在建表语句中。 ) with ( type = 'odps', endpoint = '<yourEndpoint>', project = '<yourProjectName>', `table` = '<yourTableName>', accessId = '<yourAccessId>', accessKey = '<yourAccessPassword>', `partition`='ds' --不写分区的值,表示根据ds字段的值写入不同分区。 ,enable_dynamic_partition = 'true' --启用动态分区。 ,dynamic_partition_limit='50' --最大并发分区数50。 ,flush_batch_size = '524288' --缓冲区512 KB。 ,flush_interval_ms = '60000' --Flush间隔60秒。 ,flush_retry_count = '5' --Flush失败重试5次。 ,flush_retry_interval_sec = '2' --失败重试间隔单位2秒。 ,flush_retry_strategy = 'linear' --连续失败重试时间间隔线性增长。 ); insert into odps_sink select id, len, content, date_dormat(c, 'yyMMdd') as ds from source;