Write data to MaxCompute with Flink

更新时间:
复制 MD 格式

You can use the MaxCompute Flink connector to write Flink data to standard and delta tables in MaxCompute, which simplifies data ingestion. This topic describes the connector's capabilities and outlines the procedures for writing data.

Background information

  • Supported write modes

    The Flink connector supports two write modes: upsert and insert. In upsert mode, you can group data streams in one of the following ways:

    • Group by primary key

    • Group by partition field

      While suitable for a large number of partitions, grouping by partition field may cause data skew.

  • For the upsert write procedure and recommended parameters, see Real-time data ingestion into a data warehouse.

  • Specify the write mode using Flink connector parameters. For a complete list of connector parameters, see Appendix: Flink connector parameters.

  • Set the checkpoint interval for Flink upsert write jobs to at least 3 minutes. Shorter intervals can reduce write efficiency and create a large number of small files.

  • The following table maps data types between Realtime Compute for Apache Flink and MaxCompute.

    Flink data type

    MaxCompute data type

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    BIGINT

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    ARRAY<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    Note

    The Flink TIMESTAMP data type does not include time zone information, whereas the MaxCompute TIMESTAMP data type does. This difference can cause an 8-hour time discrepancy. To align the timestamps, use TIMESTAMP_LTZ(9).

    -- Flink SQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'ID',
      created_time TIMESTAMP NOT NULL COMMENT 'Creation time',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Update time',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Write data from a self-managed Flink cluster

  1. Prerequisites: Create a MaxCompute table.

    You must first create a MaxCompute table for Flink to write data to. The following example demonstrates this process by creating two tables (a non-partitioned delta table and a partitioned table). For information about table property settings, see delta table parameters.

    -- Create a non-partitioned delta table.
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    --Create a partitioned delta table.
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. Set up an open source Flink cluster. The connector supports Flink 1.13, 1.15, 1.16, and 1.17. Select the Flink connector that matches your Flink version:

    Note
    • The Flink connector for Flink 1.16 is compatible with Flink 1.17.

    • This topic uses the Flink connector for Flink 1.13 as an example. Download and decompress the package.

  3. Download the Flink connector and add it to your Flink cluster package.

    1. Download the Flink connector JAR package to your local environment.

    2. Add the Flink connector JAR package to the lib directory of the decompressed Flink installation package.

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. Start the Flink instance service.

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. Start the Flink client.

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. Create a Flink table and configure the Flink connector parameters.

    You can create a Flink table and configure its parameters using either Flink SQL or the DataStream API. The following sections provide core examples for both approaches.

    Flink SQL

    1. In the Flink SQL editor, run the following commands to create a table and configure parameters.

      -- Register a non-partitioned table in Flink SQL.
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
      
      -- Register a partitioned table in Flink SQL.
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
        'odps.access.id'='LTAI****************',
        'odps.access.key'='********************',
        'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        'odps.project.name'='mf_mc_bj'
      );
    2. Write data to the Flink table and query the MaxCompute table to verify the result.

      -- Insert data into the non-partitioned table in the Flink SQL client.
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      -- Query result in MaxCompute.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the non-partitioned table in the Flink SQL client to update the record.
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      -- Query result in MaxCompute.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the partitioned table in the Flink SQL client.
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      -- Query result in MaxCompute.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      -- Insert data into the partitioned table in the Flink SQL client to update the record.
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      -- Query result in MaxCompute.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    DataStream API

    1. To use the DataStream API, add the following dependency.

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      Note

      Replace "xxx" with the actual version number.

    2. The following sample code shows how to create a table and configure parameters.

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Write data from fully managed Flink

  1. Prerequisites: Create a MaxCompute table.

    You must create a target MaxCompute table for the Flink data. The following example shows how to create a delta table.

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. The Flink connector is pre-loaded on fully managed Flink, so no manual installation is required. You can view connector details in the Realtime Compute for Apache Flink console.

  3. Create a Flink table, construct real-time Flink data by using a Flink SQL job, and then deploy the job after development.

    On the Flink job development page, create and edit a SQL job. The following example defines a source table that generates random data, a result table that connects to MaxCompute, and an INSERT statement to transfer the data. For more information about how to develop a SQL job, see Job development map.

    -- Create a Flink source table.
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    -- Create a temporary result table in Flink.
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI****************',
        		'odps.access.key'='********************',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    -- Flink computation logic.
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    Parameters:

    odps.end.point: Use the internal network endpoint of the corresponding region.

    upsert.write.bucket.num: This value must be consistent with the value of the write.bucket.num property of the delta table created in MaxCompute.

  4. Query the MaxCompute table to verify that the data was written.

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    -- Your results may differ because the source data is randomly generated.
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

Appendix: Flink connector parameters

  • Basic parameters

    Parameter

    Required

    Default value

    Description

    connector

    Yes

    Set the connector type to MaxCompute.

    odps.project.name

    Yes

    The MaxCompute project name.

    odps.access.id

    Yes

    Your account's AccessKey ID. See the AccessKey Pair page.

    odps.access.key

    Yes

    Your account's AccessKey secret. See the AccessKey Pair page.

    odps.end.point

    Yes

    The MaxCompute endpoint. For a list of regional endpoints, see Endpoints.

    odps.tunnel.end.point

    No

    The public endpoint of the Tunnel service. By default, requests are automatically routed to the appropriate Tunnel endpoint. Set this parameter to use a specific endpoint and disable automatic routing.

    For more information about the Tunnel endpoints in different regions and networks, see Endpoints.

    odps.tunnel.quota.name

    No

    The name of the Tunnel quota that is used to access MaxCompute.

    table.name

    Yes

    The name of the MaxCompute table in the format [project.][schema.]table.

    odps.namespace.schema

    No

    false

    Specifies whether to use the three-layer model. For more information about the three-layer model, see Schema operations.

    sink.operation

    Yes

    insert

    The write type. Valid values are insert or upsert.

    Note

    The upsert mode is supported only for MaxCompute delta tables.

    sink.parallelism

    No

    The write parallelism. If not set, this defaults to the upstream source's parallelism.

    Note

    Ensure that the table property write.bucket.num is an integer multiple of the configuration value for optimal write performance and maximum memory savings on the Sink node.

    sink.meta.cache.time

    No

    400

    The metadata cache size.

    sink.meta.cache.expire.time

    No

    1200

    The cache expiration time for metadata, in seconds.

    sink.coordinator.enable

    No

    true

    Specifies whether to enable coordinator mode.

  • Partition parameters

    Parameter

    Required

    Default value

    Description

    sink.partition

    No

    The name of the partition to which data is written.

    If you use dynamic partitioning, this parameter specifies the name of the parent partition of the dynamic partitions.

    sink.partition.default-value

    No

    __DEFAULT_PARTITION__

    The default partition name when dynamic partitioning is used.

    sink.dynamic-partition.limit

    No

    100

    The maximum number of partitions that can be concurrently written to in a single checkpoint during dynamic partitioning.

    Note

    Increasing this value significantly can cause out-of-memory (OOM) errors on the sink node. If the number of concurrent partitions exceeds this limit, the job will fail.

    sink.group-partition.enable

    No

    false

    Specifies whether to group by partition when using dynamic partitioning.

    sink.partition.assigner.class

    No

    The PartitionAssigner implementation class.

  • FileCached mode write parameters

    Use file cache mode for jobs with a large number of dynamic partitions. The following parameters configure this mode.

    Parameter

    Required

    Default value

    Description

    sink.file-cached.enable

    No

    false

    Enables FileCached mode. Recommended for jobs with a large number of dynamic partitions.

    • false: FileCached mode is disabled.

    • true: FileCached mode is enabled.

      Note

      When the number of dynamic partitions is large, you can use the file cache mode.

    sink.file-cached.tmp.dirs

    No

    ./local

    The default file cache directory in FileCached mode.

    sink.file-cached.writer.num

    No

    16

    The number of concurrent data upload threads for a single task in FileCached mode.

    Note

    Do not significantly increase the value of this parameter. If an excessive number of partitions are concurrently written, OOM errors are likely to occur.

    sink.bucket.check-interval

    No

    60000

    The interval at which to check the file size in FileCached mode. Unit: milliseconds (ms).

    sink.file-cached.rolling.max-size

    No

    16 M

    The maximum size for a single cache file.

    When a file exceeds this size, it is uploaded.

    sink.file-cached.memory

    No

    64 M

    The maximum size of off-heap memory that is used for file writes in FileCached mode.

    sink.file-cached.memory.segment-size

    No

    128 KB

    The buffer size that is used for file writes in FileCached mode.

    sink.file-cached.flush.always

    No

    true

    Specifies whether to use cache for file writes in FileCached mode.

    sink.file-cached.write.max-retries

    No

    3

    The number of retries for uploading data in FileCached mode.

  • Insert or Upsert write parameters

    Upsert write parameters

    Parameter

    Required

    Default value

    Description

    upsert.writer.max-retries

    No

    3

    The number of retries after an upsert writer fails to write data to a bucket.

    upsert.writer.buffer-size

    No

    64 MB

    The cache size for a single upsert writer in Flink.

    Note
    • When the sum of the buffer sizes of all buckets reaches the preset threshold, the system automatically triggers a flush operation to update the data to the server.

    • An upsert writer writes data to multiple buckets simultaneously. We recommend that you increase the value of this parameter to improve write efficiency.

    • If data is written to a large number of partitions, OOM errors may occur. In this case, you can reduce the value of this parameter.

    upsert.writer.bucket.buffer-size

    No

    1 MB

    The cache size for a single bucket in Flink. If memory resources on the Flink server are insufficient, you can reduce the value of this parameter.

    upsert.write.bucket.num

    Yes

    The number of buckets for the destination table must be the same as the value of write.bucket.num.

    upsert.write.slot-num

    No

    1

    The number of Tunnel slots used by a single session.

    upsert.commit.max-retries

    No

    3

    The number of retries for an upsert session commit.

    upsert.commit.thread-num

    No

    16

    The parallelism of an upsert session commit.

    Do not set this value too high. A high number of concurrent commits leads to increased resource consumption, which may cause performance issues or excessive resource consumption.

    upsert.major-compact.min-commits

    No

    100

    The minimum number of commits required to trigger a major compaction.

    upsert.commit.timeout

    No

    600

    The timeout period for an upsert session commit. Unit: seconds (s).

    upsert.major-compact.enable

    No

    false

    Specifies whether to enable major compaction.

    upsert.flush.concurrent

    No

    2

    The maximum number of buckets to which data can be concurrently written in a single partition.

    Note

    When data in a bucket is flushed, a Tunnel slot is occupied.

    Note

    For more information about recommended parameter configurations for upsert writes, see Recommended parameter configurations for upsert writes.

    Insert write parameters

    Parameter

    Required

    Default value

    Description

    insert.commit.thread-num

    No

    16

    The parallelism of a commit session.

    insert.arrow-writer.enable

    No

    false

    Specifies whether to use the Arrow format.

    insert.arrow-writer.batch-size

    No

    512

    The maximum number of rows in an Arrow batch.

    insert.arrow-writer.flush-interval

    No

    100000

    The writer flush interval. Unit: milliseconds (ms).

    insert.writer.buffer-size

    No

    64 MB

    The cache size of the buffered writer.