使用阿里云Flink(流式数据传输)

实时计算Flink版内置插件支持通过批量数据通道写入MaxCompute,受到批量数据通道并发数及存储文件数影响,内置版本插件会有性能瓶颈。MaxCompute提供了使用流式数据通道的Flink插件,支持使用Flink在高并发、高QPS场景下写入MaxCompute。

前提条件

  • 已开通实时计算Flink版的Blink服务并创建Blink项目。

    更多开通Blink及创建Blink项目的信息。

  • 已安装使用流式数据通道的Flink插件

背景信息

实时计算Flink版可以调用MaxCompute SDK中的接口将数据写入缓冲区,当缓冲区的大小超过指定的大小(默认为1 MB)或每隔指定的时间间隔时,将数据上传至MaxCompute结果表中。

说明

建议Flink同步MaxCompute并发数大于32或Flush间隔小于60秒的场景下,使用MaxCompute自定义插件。其他场景可以随意选择Flink内置插件和MaxCompute自定义插件。

MaxCompute与实时计算Flink版的字段类型对照关系如下。

MaxCompute字段类型

实时计算Flink版字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

STRING

VARCHAR

DECIMAL

DECIMAL

BINARY

VARBINARY

使用限制

该功能的使用限制如下:

  • 本插件仅支持Blink 3.2.1及以上版本。

  • MaxCompute中的聚簇表不支持作为MaxCompute结果表。

语法示例

您需要在Flink控制台新建作业,创建MaxCompute结果表。

说明

DDL语句中定义的字段需要与MaxCompute物理表中的字段名称、顺序以及类型保持一致,否则可能导致在MaxCompute物理表中查询的数据为/n

命令示例如下:

create table odps_output(
    id INT,
    user_name VARCHAR,
    content VARCHAR
) with (
    type ='custom',
    class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
    endpoint = '<YourEndPoint>',
    project = '<YourProjectName>',
    `table` = '<YourtableName>',
    access_id = '<yourAccessKeyId>',
    access_key = '<yourAccessKeySecret>',
    `partition` = 'ds=2018****'
);

WITH参数

参数

说明

是否必填

备注

type

结果表的类型。

固定值为custom

class

插件入口类。

固定值为com.alibaba.blink.customersink.MaxComputeStreamTunnelSink

endpoint

MaxCompute服务地址。

参见各地域Endpoint对照表(外网连接方式)

tunnel_endpoint

MaxCompute Tunnel服务的连接地址。

参见各地域Endpoint对照表(外网连接方式)

说明

VPC环境下必填。

project

MaxCompute项目名称。

table

MaxCompute物理表名称。

access_id

可以访问MaxCompute项目的AccessKey ID。

access_key

AccessKey ID对应的AccessKey Secret。

partition

分区表的分区名称。

如果表为分区表则必填:

  • 固定分区

    例如`partition` = 'ds=20180905'表示将数据写入分区ds= 20180905

  • 动态分区

    如果不明文显示分区的值,则会根据写入数据中的分区列具体的值,写入到不同的分区中。例如`partition`='ds'表示根据ds字段的值写入分区。

    如果要创建多级动态分区,With参数中Partition的字段顺序和结果表的DDL中的分区字段顺序,必须与物理表一致,各个分区字段之间使用英文逗号(,)分隔。

    说明
    • 动态分区列需要显式写在建表语句中。

    • 对于动态分区字段为空的情况,如果数据源中ds=nullds='',则会创建ds=NULL的分区。

enable_dynamic_partition

设置是否开启动态分区机制。

默认值为False。

dynamic_partition_limit

设置最大并发分区数。动态分区模式会为每个分区分配一个缓冲区,缓冲区大小通过flush_batch_size参数控制,所以动态分区模式最大会占用分区数量×缓冲区大小的内存。例如100个分区,每个分区1 MB,则最大占用内存为100 MB。

默认值为100。系统内存中会维护一个分区到Writer的Map,如果这个Map的大小超过了dynamicPartitionLimit的值,系统会通过LRU(Least Recently Used)的规则尝试淘汰没有数据写入的分区。如果所有分区都有数据写入,则会出现dynamic partition limit exceeded: 100报错。

flush_batch_size

数据缓冲区大小,单位字节。缓冲区数据写满后会触发Flush操作,将数据发送到MaxCompute。

默认值为1048576,即1 MB。

flush_interval_ms

缓冲区Flush间隔,单位毫秒。

MaxCompute Sink写入数据时,先将数据放到MaxCompute的缓冲区中,等缓冲区溢出或每隔一段时间(flush_interval_ms)时,再把缓冲区中的数据写到目标 MaxCompute表。

默认值为-1,即不设置主动Flush间隔。

flush_retry_count

数据Flush失败重试次数,在缓冲区Flush失败的场景下自动重试。

默认值为10,即重试10次。

flush_retry_interval_sec

Flush失败重试的时间间隔,单位秒。

默认值为1,即1秒。

flush_retry_strategy

Flush失败重试策略,多次重试的时间间隔增长策略,配合flush_retry_interval_sec使用。包含如下三种策略:

  • constant:常数时间,即每次重试间隔使用固定时间间隔。

  • linear:线性增长,即每次重试间隔时间线性增长,例如flush_retry_interval_sec设置为1,flush_retry_count设置为5,多次重试时间间隔为1、2、3、4、5秒。

  • exponential:指数增长。例如flush_retry_interval_sec设置为1,flush_retry_count设置为5,多次重试中间间隔为1、2、4、8、16秒。

默认值为constant,即常数时间间隔。

类型映射

MaxCompute字段类型

实时计算Flink版字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

STRING

VARCHAR

DECIMAL

DECIMAL

代码示例

包含MaxCompute结果表的实时计算Flink版作业代码示例如下:

  • 写入固定分区

    create table source (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type='custom',
       class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition` = 'ds=20180418'
    );
    insert into odps_sink 
    select 
       id, len, content 
    from source;
  • 写入动态分区

    create table source (
       id INT,
       len INT,
       content VARCHAR,
       c TIMESTAMP 
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR,
       ds VARCHAR                        --动态分区列需要显式写在建表语句中。
    ) with (
       type = 'odps',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition`='ds'                 --不写分区的值,表示根据ds字段的值写入不同分区。
       ,enable_dynamic_partition = 'true' --启用动态分区。
       ,dynamic_partition_limit='50' --最大并发分区数50。
       ,flush_batch_size = '524288' --缓冲区512 KB。
       ,flush_interval_ms = '60000' --Flush间隔60秒。
       ,flush_retry_count = '5' --Flush失败重试5次。
       ,flush_retry_interval_sec = '2' --失败重试间隔单位2秒。
       ,flush_retry_strategy = 'linear' --连续失败重试时间间隔线性增长。
    );
    insert into odps_sink 
    select 
       id, 
       len, 
       content,
       date_dormat(c, 'yyMMdd') as ds
    from source;