使用Flink写入数据到Delta Table

当前MaxCompute为您提供了新版的Flink Connector插件,新版插件支持将Flink数据写入至MaxCompute的普通表和Delta Table类型表,提高了Flink数据写入MaxCompute的便捷性。本文为您介绍新版Flink Connector写入MaxCompute的能力支持情况与主要操作流程。

背景信息

  • 支持的写入模式:

    使用新版Flink Connector将数据写入MaxCompute时,支持通过Upsert和Insert写入方式。其中使用Upsert时支持以下两种数据写入流:

    • 按照Primary Key进行分组

    • 按照分区字段进行分组

      若分区数量过多,您可以按照分区字段进行分组,但使用该流程可能导致数据倾斜。

  • Upsert模式下,通过Flink Connector进行数据写入流程和参数配置建议,详情请参见数据实时入仓实践

  • 您可在配置Flink数据写入MaxCompute时,通过设置Flink Connector参数指定使用哪种写入方式,全量Connector参数介绍请参见下文的附录:新版Flink Connector全量参数

  • Flink Upsert写入任务的Checkpoint间隔建议设置3分钟以上,设置太小的话,写入效率得不到保障,并且可能引入大量小文件。

  • MaxCompute与实时计算Flink版的字段类型对照关系如下:

    Flink 数据类型

    MaxCompute 数据类型

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE、TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE、TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    说明

    Flink的TIMESTAMP数据类型不含时区,MaxCompute TIMESTAMP数据类型含时区。此差异会导致8小时的时间差。其通过使用TIMESTAMP_LTZ(9)来对齐时间戳。

    --FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT '创建时间',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新时间',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Flink数据写入MaxCompute流程:自建开源Flink

  1. 准备工作:创建MaxCompute表。

    您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建两张表(Delta Table非分区表和分区表)作为示例,为您演示Flink数据写入MaxCompute的主要流程,其中表属性设置请参考Delta Table表参数

    --创建Delta Table非分区表
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    --创建Delta Table分区表
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. 搭建开源Flink集群。当前支持1.13、1.15、1.16和1.17版本的开源Flink,您可以选择对应版本的Flink:

    说明
    • Flink 1.17版本可复用1.16版本。

    • 本文以Flink Connector 1.13为例,将包下载至本地环境,下载完成后进行解压。

  3. 下载Flink Connector并添加至Flink集群包中。

    1. 将Flink Connector Jar包下载至本地环境。

    2. 将Flink Connector Jar包添加至解压后的Flink安装包的lib目录中。

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. 启动Flink实例服务。

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. 启动Flink客户端。

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. 创建Flink表,并配置Flink Connector参数。

    当前支持直接使用Flink SQL创建Flink表并配置参数,也支持使用Flink的DataStream API进行相关操作。两种操作的核心示例如下。

    使用Flink SQL

    1. 进入Flink SQL的编辑界面,执行以下命令完成建表与参数配置。

      -- 在 Flink SQL中注册一张对应的非分区表
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- 在 Flink SQL 中注册一张对应的分区表
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. 向Flink表中写入数据,并在MaxCompute表中查询,验证Flink数据写入MaxCompute的结果。

      --在flink Sql客户端中往非分区表里插入数据
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      --在Maxcompute中查询返回
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客户端中往非分区表里插入数据
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      --在Maxcompute中查询返回
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客户端中往分区表里插入数据
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      --在Maxcompute中查询返回
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      --在flink Sql客户端中分区表里插入数据
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      --在Maxcompute中查询返回
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    使用DataStream API

    1. 使用DataStream接口时,需先添加以下依赖。

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      说明

      使用时请将“xxx”改为相应的版本号。

    2. 建表与参数配置的示例代码如下。

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Flink数据写入MaxCompute流程:阿里云全托管Flink

  1. 准备工作:创建MaxCompute表。

    您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建一张Delta Table表为例。

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. 登录实时计算控制台,查看Flink Connector信息,Flink Connector已经加载到阿里云全托管Flink VVP上。

  3. 通过Flink SQL作业创建Flink表,并构造Flink实时数据,完成作业开发后进行作业部署。

    在Flink的作业开发页面,创建并编辑Flink SQL作业,以下示例为新建一张Flink数据源表、一张Flink临时结果表,并自动构建实时数据生成逻辑写入源表,通过计算逻辑将源表数据写入临时结果表。SQL作业开发详细操作请参见SQL作业开发

    --创建flink数据源表,
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    --flink创建临时结果表
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    --flink 计算逻辑
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    其中:

    odps.end.point:使用对应Region的云产品互联网络Endpoint。

    upsert.write.bucket.num:与MaxCompute中创建的Delta Table表的write.bucket.num属性值保持一致。

  4. 在MaxCompute中查询数据,验证Flink数据写入MaxCompute的结果。

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    --返回,由于Flink中的数据为随机生成,实际MaxCompute查询结果与本示例不一定完全一致
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

附录:新版Flink Connector全量参数

  • 基础参数

    参数

    是否必填

    默认值

    说明

    connector

    无默认值

    Connector类型,需设置为MaxCompute

    odps.project.name

    无默认值

    MaxCompute的Project名称。

    odps.access.id

    无默认值

    您的阿里云账号AccessKey ID。您可以在访问凭证页面查看对应信息。

    odps.access.key

    无默认值

    您的阿里云账号AccessKey Secret。您可以在访问凭证页面查看对应信息。

    odps.end.point

    无默认值

    MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint请参见Endpoint

    odps.tunnel.end.point

    Tunnel服务的公网访问链接。如果您未配置Tunnel Endpoint,Tunnel会自动路由到MaxCompute服务所在网络对应的Tunnel Endpoint。如果您配置了Tunnel Endpoint,则以配置为准,不进行自动路由。

    各地域及网络对应的Tunnel Endpoint值,请参见Endpoint

    odps.tunnel.quota.name

    无默认值

    访问MaxCompute使用的Tunnel Quota名称。

    table.name

    无默认值

    MaxCompute表名称,格式为[project.][schema.]table

    odps.namespace.schema

    false

    是否使用三层模型。关于三层模型介绍,请参见Schema操作

    sink.operation

    insert

    写入类型,取值为insertupsert

    说明

    仅MaxCompute Delta Table支持Upsert写入。

    sink.parallelism

    无默认值

    写入的并行度,如果不设置,则默认使用上游数据并行度。

    说明

    请务必确保表属性 write.bucket.num 是该配置值的整数倍,这样可以获得最佳的写入性能,并且能够最有效地节省 Sink 节点内存。

    sink.meta.cache.time

    400

    元数据缓存大小。

    sink.meta.cache.expire.time

    1200

    元数据缓存超时时间,单位:秒(s)。

    sink.coordinator.enable

    是否开启Coordinator模式。

  • 分区参数

    参数

    是否必填

    默认值

    说明

    sink.partition

    无默认值

    待写入的分区名称。

    若您使用的是动态分区,则为动态分区的上级分区名称。

    sink.partition.default-value

    __DEFAULT_PARTITION__

    使用动态分区时的默认分区名称。

    sink.dynamic-partition.limit

    100

    动态分区写入时,单个Checkpoint可同时导入的最大分区数量。

    说明

    建议不要大幅提升该参数值,因为当同时写入的分区数量过多时,容易导致Sink节点内存溢出(OOM),且当并发写入分区数超过阈值,写入任务会报错。

    sink.group-partition.enable

    false

    动态分区写入时,是否按照分区进行分组。

    sink.partition.assigner.class

    无默认值

    PartitionAssigner实现类。

  • FileCached模式写入参数

    当动态分区数量过多时,可以使用文件缓存模式,您可以通过以下参数配置数据写入时缓存文件信息。

    参数

    是否必配

    默认值

    说明

    sink.file-cached.enable

    false

    是否开启FileCached写入。取值说明:

    • false:不开启。

    • true:开启。

      说明

      当动态分区数量过多时,可以使用文件缓存模式。

    sink.file-cached.tmp.dirs

    ./local

    文件缓存模式下,默认文件缓存目录。

    sink.file-cached.writer.num

    16

    文件缓存模式下,单个Task上传数据的并发数。

    说明

    建议不要大幅提升该参数值,因为当同时写入的分区数量过多时,容易导致内存溢出(OOM)。

    sink.bucket.check-interval

    60000

    文件缓存模式下,检查文件大小的周期,单位:毫秒(ms)。

    sink.file-cached.rolling.max-size

    16 M

    文件缓存模式下,单个缓存文件的最大值。

    若文件大小超过该值,会将该文件数据上传到服务端。

    sink.file-cached.memory

    64 M

    文件缓存模式下,写入文件使用的最大堆外内存大小。

    sink.file-cached.memory.segment-size

    128 KB

    文件缓存模式下,写入文件的使用的buffer大小。

    sink.file-cached.flush.always

    true

    文件缓存模式下,写入文件是否使用缓存。

    sink.file-cached.write.max-retries

    3

    文件缓存模式下,上传数据的重试次数。

  • InsertUpsert写入参数

    Upsert写入参数

    参数

    是否必填

    默认值

    说明

    upsert.writer.max-retries

    3

    Upsert Writer写入Bucket失败后,重试次数。

    upsert.writer.buffer-size

    64 m

    单个Upsert Writer数据在Flink中的缓存大小。

    说明
    • 当所有Bucket的缓冲区大小总和达到预设阈值时,系统将自动触发刷新操作,将数据更新到服务器端。

    • 一个upsert writer里会同时写入多个Bucket,建议提高该值,以提升写入效率。

    • 若写入分区较多时,会存在引发内存OOM风险,可考虑降低该参数值。

    upsert.writer.bucket.buffer-size

    1 m

    单个Bucket数据在Flink中的缓存大小,当Flink服务器使用内存资源紧张时,可以减小该参数值。

    upsert.write.bucket.num

    写入表的bucket数量,必须与写入表write.bucket.num值一致。

    upsert.write.slot-num

    1

    单个Session使用Tunnel slot数量。

    upsert.commit.max-retries

    3

    Upsert Session Commit重试次数。

    upsert.commit.thread-num

    16

    Upsert Session Commit的并行度。

    不建议将此参数值调整得过大,因为当同时进行的提交并发数越多时,会导致资源消耗增加,可能导致性能问题或资源过度消耗。

    upsert.major-compact.min-commits

    100

    发起Major Compact的最小Commit次数。

    upsert.commit.timeout

    600

    Upsert Session Commit等待超时时间,单位:秒(s)。

    upsert.major-compact.enable

    false

    是否开启Major Compact。

    upsert.flush.concurrent

    2

    限制单个分区允许同时写入的最大Bucket数。

    说明

    每当一个bucket的数据刷新时,将会占用一个Tunnel Slot资源。

    说明

    Upsert写入时,参数配置建议详情,请参见Upsert写入参数配置建议

    Insert写入参数

    参数

    是否必配

    默认值

    说明

    insert.commit.thread-num

    16

    Commit Session的并行度。

    insert.arrow-writer.enable

    false

    是否使用Arrow格式。

    insert.arrow-writer.batch-size

    512

    Arrow Batch的最大行数。

    insert.arrow-writer.flush-interval

    100000

    Writer Flush间隔,单位毫秒(ms)。

    insert.writer.buffer-size

    64 M

    使用Buffered Writer的缓存大小。