文档

使用Flink写入数据(Transaction Table2.0)

更新时间:

当前MaxCompute为您提供了新版的Flink Connector插件,新版插件支持将Flink数据写入至MaxCompute的普通表和Transaction Table2.0类型表,提高了Flink数据写入MaxCompute的便捷性。本文为您介绍新版Flink Connector写入MaxCompute的能力支持情况与主要操作流程。

背景信息

  • 支持的写入模式:

    使用新版Flink Connector将数据写入MaxCompute时,支持通过upsert和insert写入方式。其中使用upsert时支持以下两种数据写入流程:

    • 按照Primary Key进行分组

    • 按照分区字段进行分组

      说明

      若分区数量过多,您可以按照分区字段进行分组,但使用该流程可能导致数据倾斜。

您可在配置Flink数据写入MaxCompute时,通过设置Flink Connector参数指定使用哪种写入方式,全量Connector参数介绍请参见下文的附录:新版Flink Connector全量参数

  • Flink Upsert写入任务的Checkpoint间隔建议设置3分钟以上,设置太小的话,写入效率得不到保障,并且可能引入大量小文件。

  • MaxCompute与实时计算Flink版的字段类型对照关系如下。

    Flink 数据类型

    MaxCompute 数据类型

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIMEZONE、TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIMEZONE、TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

Flink数据写入MaxCompute流程:自建开源Flink

  1. 准备工作:创建MaxCompute表。

    您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建两张表(Transaction Table2.0非分区表和分区表)作为示例,为您演示Flink数据写入MaxCompute的主要流程,其中表属性设置请参考Transaction Table2.0表参数

    --创建Transaction Table2.0非分区表
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    --创建Transaction Table2.0分区表
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. 搭建开源Flink集群。当前支持1.13、1.15、1.16和1.17版本的开源Flink,您可以选择对应版本的Flink:

    说明
    • Flink 1.17版本可复用1.16版本。

    • 本文以Flink Connector 1.13为例,将包下载至本地环境,下载完成后进行解压。

  3. 下载Flink Connector并添加至Flink集群包中。

    1. 将Flink Connector Jar包下载至本地环境。

    2. 将Flink Connector Jar包添加至解压后的Flink安装包的lib目录中。

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. 启动Flink实例服务。

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. 启动Flink客户端。

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. 创建Flink表,并配置Flink Connector参数。

    当前支持直接使用Flink SQL创建Flink表并配置参数,也支持使用Flink的DataStream API进行相关操作。两种操作的核心示例如下。

    使用Flink SQL

    1. 进入Flink SQL的编辑界面,执行以下命令完成建表与参数配置。

      -- 在 Flink SQL中注册一张对应的非分区表
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- 在 Flink SQL 中注册一张对应的分区表
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. 向Flink表中写入数据,并在MaxCompute表中查询,验证Flink数据写入MaxCompute的结果。

      --在flink Sql客户端中往非分区表里插入数据
      Insert into mf_flink values (1,'Danny',27, false);
      --在Maxcompute中查询返回
      select * from mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客户端中往非分区表里插入数据
      Insert into mf_flink values (1,'Danny',28, false);
      --在Maxcompute中查询返回
      select * from mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客户端中往分区表里插入数据
      Insert into mf_flink_part values (1,'Danny',27, false, '01','01');
      --在Maxcompute中查询返回
      select * from mf_flink_tt_part where dd=01 and hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      --在flink Sql客户端中分区表里插入数据
      Insert into mf_flink_part values (1,'Danny',30, false, '01','01');
      --在Maxcompute中查询返回
      select * from mf_flink_tt_part where dd=01 and hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    使用DataStream API

    1. 使用DataStream接口时,需先添加以下依赖。

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      说明

      使用时请将“xxx”改为相应的版本号。

    2. 建表与参数配置的示例代码如下。

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Flink数据写入MaxCompute流程:阿里云全托管Flink

  1. 准备工作:创建MaxCompute表。

    您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建一张Transaction Table2.0表为例。

    set odps.sql.type.system.odps2=true;
    drop table mf_flink_upsert;
    create table mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      partitioned by (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. 登录实时计算控制台,添加Flink Connector,需要上传开源版Flink Connector。其中关键参数如下,详细操作流程请参见管理自定义连接器

    开源版Flink Connector下载链接如下:

    'connector' = 'maxcompute',
  3. 通过Flink SQL作业创建Flink表,并构造Flink实时数据,完成作业开发后进行作业部署。

    在Flink的作业开发页面,创建并编辑Flink SQL作业,以下示例为新建一张Flink数据源表、一张Flink临时结果表,并自动构建实时数据生成逻辑写入源表,通过计算逻辑将源表数据写入临时结果表。SQL作业开发详细操作请参见SQL作业开发

    --创建flink数据源表,
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt as CURRENT_TIMESTAMP
    ) with (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    --flink创建临时结果表
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    --flink 计算逻辑
    insert into test_c_d_g
    select  c1 as c1,
            c2 as c2,
            gt as gt,
            date_format(gt, 'yyyyMMddHH') as ds
    from    fake_src_table;

    其中:

    odps.end.point:使用对应Region的经典网络Endpoint。

    upsert.write.bucket.num:与MaxCompute中创建的Transaction Table2.0表的write.bucket.num属性值保持一致。

  4. 在MaxCompute中查询数据,验证Flink数据写入MaxCompute的结果。

    select * from mf_flink_upsert where ds=2023061517;
    
    --返回,由于Flink中的数据为随机生成,实际MaxCompute查询结果与本示例不一定完全一致
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

附录:新版Flink Connector全量参数

  • 基础参数

    参数

    是否必填

    默认值

    说明

    connector

    无默认值

    connector类型,需设置为MaxCompute

    odps.project.name

    无默认值

    MaxCompute的Project名称。

    odps.access.id

    无默认值

    您的阿里云账号AccessKey ID。您可以在访问凭证页面查看对应信息。

    odps.access.key

    无默认值

    您的阿里云账号AccessKey Secret。您可以在访问凭证页面查看对应信息。

    odps.end.point

    无默认值

    MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint请参见Endpoint

    odps.tunnel.end.point

    Tunnel服务的外网访问链接。如果您未配置Tunnel Endpoint,Tunnel会自动路由到MaxCompute服务所在网络对应的Tunnel Endpoint。如果您配置了Tunnel Endpoint,则以配置为准,不进行自动路由。

    各地域及网络对应的Tunnel Endpoint值,请参见Endpoint

    odps.tunnel.quota.name

    无默认值

    访问MaxCompute使用的Tunnel Quota名称。

    table.name

    无默认值

    MaxCompute表名称,格式为[project.][schema.]table

    odps.namespace.schema

    false

    是否使用三层模型。关于三层模型介绍,请参见Schema操作

    sink.operation

    insert

    写入类型,取值为insertupsert

    说明

    只有Transaction Table2.0支持upsert写入。

    sink.parallelism

    无默认值

    写入的并行度,如果不设置,则默认使用上游数据并行度。

    sink.meta.cache.time

    400

    元数据缓存大小。

    sink.meta.cache.expire.time

    1200

    元数据缓存超时时间,单位:秒(s)。

    sink.coordinator.enable

    是否开启Coordinator模式。

  • 分区参数

    参数

    是否必填

    默认值

    说明

    sink.partition

    无默认值

    分区名称,若您使用的是动态分区,则为动态分区的上级分区名称。

    sink.partition.default-value

    __DEFAULT_PARTITION__

    使用动态分区时的默认分区名称。

    sink.dynamic-partition.limit

    100

    动态分区写入时,单个Checkpoint可同时导入的最大分区数量。

    sink.group-partition.enable

    false

    动态分区写入时,是否按照分区进行分组。

    sink.partition.assigner.class

    无默认值

    PartitionAssigner实现类。

  • FileCached模式写入参数

    当动态分区数量过多时,可以使用文件缓存模式,您可以通过以下参数配置数据写入时缓存文件信息。

    参数

    是否必配

    默认值

    说明

    sink.file-cached.enable

    false

    是否开启FileCached写入。

    sink.file-cached.tmp.dirs

    ./local

    文件缓存模式下,默认文件缓存目录。

    sink.file-cached.writer.num

    16

    文件缓存模式下,并行上传数据的线程数。

    sink.bucket.check-interval

    60000

    文件缓存模式下,检查文件大小的默认间隔,单位:毫秒(ms)。

    sink.file-cached.rolling.max-size

    16 M

    文件缓存模式下,单个缓存文件的最大值,若文件大小超过该值,会将该文件数据上传到服务端。

    sink.file-cached.memory

    64 M

    文件缓存模式下,写入文件使用的最大堆外内存大小。

    sink.file-cached.memory.segment-size

    128 KB

    文件缓存模式下,写入文件的使用的buffer大小。

    sink.file-cached.flush.always

    true

    文件缓存模式下,写入文件是否使用缓存。

    sink.file-cached.write.max-retries

    3

    文件缓存模式下,上传数据的重试次数。

  • insertupsert写入参数

    upsert写入参数

    参数

    是否必填

    默认值

    说明

    upsert.writer.max-retries

    3

    upsert写入时,writer flush重试次数。

    upsert.writer.buffer-size

    64m

    upsert写入时,单个writer的buffer size。

    upsert.writer.bucket.buffer-size

    1m

    upsert写入时,单个bucket的buffer size。

    upsert.write.bucket.num

    写入表的bucket数量,必须与写入表write.bucket.num值一致。

    upsert.write.slot-num

    1

    upsert写入时,单个session使用Tunnel slot数量。

    upsert.commit.max-retries

    3

    upsert写入时,commit session重试次数。

    upsert.commit.thread-num

    16

    upsert写入时,commit session的并行度。

    upsert.major-compact.min-commits

    100

    upsert写入时,发起major compact的最小commit次数。

    upsert.commit.timeout

    600

    upsert写入时,commit session等待超时时间,单位:秒(s)。

    upsert.major-compact.enable

    false

    upsert写入时,是否开启major compact。

    upsert.flush.concurrent

    2

    upsert写入时,每次Flush能够同时写入的Bucket数量。

    insert写入参数

    参数

    是否必配

    默认值

    说明

    insert.commit.thread-num

    16

    insert写入时,commit session的并行度。

    insert.arrow-writer.enable

    false

    insert写入时,是否使用arrow格式。

    insert.arrow-writer.batch-size

    512

    insert写入时,arrow batch最大行数。

    insert.arrow-writer.flush-interval

    100000

    insert写入时,writer flush间隔,单位毫秒(ms)。

    insert.writer.buffer-size

    64 M

    insert写入时,使用buffered writer的缓存大小。

  • 本页导读 (1)
文档反馈