文档

大数据计算服务MaxCompute

更新时间:

本文为您介绍大数据计算服务MaxCompute连接器的语法结构、WITH参数和使用示例等。

背景信息

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute

MaxCompute连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

  • 源表

    numRecordsIn:源表当前读取到的数据总条数。

    numRecordsInPerSecond:源表当前每秒读取的数据条数。

    numBytesIn:源表当前读取到的数据总字节数(解压缩后)。

    numBytesInPerSecond:源表当前每秒读取的数据字节数(解压缩后)。

  • 结果表

    numRecordsOut:结果表当前写出的数据总条数。

    numRecordsOutPerSecond:结果表当前每秒写出的数据条数。

    numBytesOut:结果表当前写出的数据总字节数(压缩前)。

    numBytesOutPerSecond:结果表当前每秒写出的数据字节数(压缩前)。

  • 维表

    dim.odps.cacheSize:维表缓存的数据条数。

说明

指标含义详情,请参见监控指标说明

API种类

Datastream和SQL

是否支持更新或删除结果表数据

Batch Tunnel和Stream Tunnel模式仅支持插入数据,Upsert Tunnel模式支持插入、更新和删除数据。

前提条件

已创建MaxCompute表,详情请参见创建表

使用限制

  • 仅实时计算引擎VVR 2.0.0及以上版本支持MaxCompute连接器。

  • MaxCompute连接器仅支持At Least Once语义。

    说明

    At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?

  • 默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。

    如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。

    说明
    • 维表每次更新时都会检查最新分区,不受这一限制。

    • 在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。

语法结构

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为odps。

    endpoint

    MaxCompute服务地址。

    String

    请参见Endpoint

    tunnelEndpoint

    MaxCompute Tunnel服务的连接地址。

    String

    请参见Endpoint

    说明
    • VPC环境下为必填。

    • 如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。

    project

    MaxCompute项目名称。

    String

    无。

    schemaName

    MaxCompute Schema名称。

    String

    仅当MaxCompute项目开启Schema功能时,需填写该值为MaxCompute表所属Schema名,详情请参见 Schema操作

    说明

    仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

    tableName

    MaxCompute表名。

    String

    无。

    accessId

    MaxCompute AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理

    accessKey

    MaxCompute AccessKey Secret。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理

    partition

    MaxCompute分区名。

    String

    对于非分区表和增量源表无需填写。

    compressAlgorithm

    MaxCompute Tunnel使用的压缩算法。

    String

    • VVR 4.0.13及以上版本:ZLIB

    • VVR 6.0.1及以上版本:SNAPPY

    参数取值如下:

    • RAW(无压缩)

    • ZLIB

    • SNAPPY

      SNAPPY相比ZLIB能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    quotaName

    MaxCompute独享数据传输服务的quota名称。

    String

    设置该值来使用独享的MaxCompute数据传输服务。

    重要
    • 仅实时计算引擎VVR 8.0.3及以上版本支持该参数。

    • 设置该值时,必须删除tunnelEndpoint参数,否则仍将使用tunnelEndpoint中指定的数据通道。

    说明

    MaxCompute独享数据传输参见购买与使用独享数据传输服务资源组

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    maxPartitionCount

    可以读取的最大分区数量。

    Integer

    100

    如果读取的分区数量超过了该参数,则会出现报错The number of matched partitions exceeds the default limit

    重要

    由于一次性读取大量分区会给MaxCompute服务带来一定压力,同时也会让作业启动速度变慢,因此您需要确认是否需要读取这么多分区(而不是误填partition参数)。如果确实需要,需要手动调大maxPartitionCount参数。

  • 增量源表独有

    增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区year=2023,month=10字典序小于分区year=2023,month=9,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    startPartition

    增量读取的起始MaxCompute分区点位(包含)。

    String

    • 使用该参数后启用增量源表模式,将忽略partition参数。

    • 多级分区必须按分区级别从大到小声明每个分区列的值。

    说明

    startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?

    subscribeIntervalInSec

    轮询MaxCompute获取分区列表的时间间隔。

    Integer

    30

    单位为秒。

    modifiedTableOperation

    读取分区过程中遇到分区数据被修改时的处理。

    Enum (NONE, SKIP)

    NONE

    由于下载session被保存在检查点中,每次从检查点恢复时尝试从该session恢复读取进度,而该session由于分区数据被修改不可用,Flink任务会陷入不断重启。此时您可以设置该参数,参数取值如下:

    • NONE:需要您修改startPartition参数使其大于不可用分区,并从无状态启动作业。

    • SKIP:若不希望无状态启动,可将模式修改为SKIP,Flink尝试从检查点恢复session时将跳过不可用的分区。

    重要

    NONE和SKIP模式下,被修改分区中已读取的数据不会被撤回,未读取的数据将不会被读取。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    useStreamTunnel

    是否使用MaxCompute Stream Tunnel上传数据。

    Boolean

    false

    参数取值如下:

    • true:使用MaxCompute Stream Tunnel上传数据。

    • false:使用MaxCompute Batch Tunnel上传数据。

    说明

    flushIntervalMs

    MaxCompute Tunnel Writer缓冲区flush间隔。

    Long

    30000(30秒)

    MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标MaxCompute表。

    对于Stream Tunnel,flush的数据立即可见;对于Batch Tunnel,数据flush后仍需要等待checkpoint完成后才可见,建议设置该参数为0来关闭定时flush。

    单位为毫秒。

    说明

    本参数可以与batchSize一同使用,满足任一条件即会Flush数据。

    batchSize

    MaxCompute Tunnel Writer缓冲区flush的大小。

    Long

    67108864(64 MB)

    MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。

    单位为字节。

    说明
    • 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。

    • 本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。

    numFlushThreads

    MaxCompute Tunnel Writer缓冲区flush的线程数。

    Integer

    1

    每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。

    说明

    仅实时计算引擎VVR 4.0.14及以上版本支持该参数。

    dynamicPartitionLimit

    写入动态分区的最大数量。

    Integer

    100

    当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错Too many dynamic partitions

    重要

    由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。

    retryTimes

    向MaxCompute服务器请求最大重试次数。

    Integer

    3

    创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用时,会根据该配置进行重试。

    sleepMillis

    重试间隔时间。

    Integer

    1000

    单位为毫秒。

    enableUpsert

    是否使用MaxCompute Upsert Tunnel上传数据。

    Boolean

    false

    参数取值如下:

    • true:使用Upsert Tunnel,处理Flink中的INSERT、UPDATE_AFTER和DELETE数据。

    • false:根据useStreamTunnel参数使用Batch Tunnel或Stream Tunnel,处理Flink中的INSERT、UPDATE_AFTER数据。

    重要
    • 若Upsert模式下MaxCompute sink提交时出现报错、失败、耗时长等情况,建议限制sink节点的并发数在10以内。

    • 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

    upsertAsyncCommit

    Upsert模式下在提交session时是否使用异步模式。

    Boolean

    false

    参数取值如下:

    • true:使用异步模式,提交耗时更短,但提交完成时写入的数据非立即可读。

    • false:默认为同步模式,提交时将等待服务侧处理完session。

    说明

    仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

    upsertCommitTimeoutMs

    Upsert模式下提交session超时时间。

    Integer

    120000

    (120秒)

    仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

  • 维表独有

    MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    cache

    缓存策略。

    String

    目前MaxCompute维表仅支持ALL策略,必须显式声明。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    说明
    • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。

    • 如果MaxCompute维表数据量较大,可以考虑使用SHUFFLE_HASH注解将维表数据均匀分散到各个并发中。详情请参见如何使用维表SHUFFLE_HASH注解?

    • 在使用超大MaxCompute维表时,如果JVM频繁GC导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议改为支持LRU Cache策略的KV型维表,例如云数据库Hbase版维表。

    cacheSize

    最多缓存的数据条数。

    Long

    100000

    如果维表数据量超过了cacheSize,则会出现报错Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit

    重要

    由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。

    cacheTTLMs

    缓存超时时间,也就是缓存更新的间隔时间。

    Long

    Long.MAX_VALUE(相当于永不更新)

    单位为毫秒。

    cacheReloadTimeBlackList

    更新时间黑名单。在该参数规定的时间段内不会更新缓存。

    String

    用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?

    maxLoadRetries

    缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。

    Integer

    10

    无。

类型映射

MaxCompute支持的类型参见2.0数据类型版本

MaxCompute类型

Flink类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

当MaxCompute物理表中同时存在嵌套的复合类型字段(ARRAY、MAP或STRUCT)和JSON类型字段时,需要在创建MaxCompute物理表时指定tblproperties('columnar.nested.type'='true'),才能被Flink正确读写。

使用示例

SQL

  • 源表示例

    • 全量读取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • 增量读取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • 结果表示例

    • 写入固定分区

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • 写入动态分区

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR --需要显式声明动态分区列。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • 维表示例

    • 一对一维表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- 一对一维表需要声明主键。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • 一对多维表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- 一对多维表无需声明主键。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • 通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了MaxCompute DataStream连接器

  • 为了保护知识产权,从实时计算引擎VVR6.0.6版本起,此连接器在本地调试单次运行作业的时间为30分钟,30分钟后作业会报错并退出。本地运行和调试包含MaxCompute连接器的作业参见本地运行和调试包含连接器的作业

  • 若您在Flink开发控制台提交作业后,出现本地运行和调试包含连接器的作业中类似的MaxCompute相关类ClassNotFound问题,请下载Maven中央库中对应版本中后缀为uber.jar的文件,添加为作业的附加依赖。以1.15-vvr-6.0.6版本为例,需下载的文件为该仓库目录下的verveica-connector-odps-1.15-vvr-6.0.6-uber.jar。

MaxCompute连接器的Maven依赖包含了构建全量源表、增量源表、结果表和维表的所需要的类。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${connector.version}</version>
</dependency>

在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流。

  • 连接源表

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'tableName' = '<yourTableName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=201809*'",
        ")");
    DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
    source.print();
    env.execute("odps source"); 
  • 连接结果表

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'tableName' = '<yourTableName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=20180905'",
        ")");
    DataStream<Row> data = env.fromElements(
        Row.of("id0", 3.),
        Row.of("id1", 4.));
    tEnv.fromDataStream(data).insertInto("odps_sink").execute();

常见问题