本文为您介绍如何使用大数据计算服务MaxCompute连接器。

背景信息

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute详情请参见什么是MaxCompute

MaxCompute连接器支持的信息如下。
类别详情
支持类型源表、维表和结果表
运行模式流模式和批模式
数据格式暂不支持
特有监控指标
  • 源表

    numRecordsIn:源表当前读取到的数据总条数。

    numRecordsInPerSecond:源表当前每秒读取的数据条数。

    numBytesIn:源表当前读取到的数据总字节数(解压缩后)。

    numBytesInPerSecond:源表当前每秒读取的数据字节数(解压缩后)。

  • 结果表

    numRecordsOut:结果表当前写出的数据总条数。

    numRecordsOutPerSecond:结果表当前每秒写出的数据条数。

    numBytesOut:结果表当前写出的数据总字节数(压缩前)。

    numBytesOutPerSecond:结果表当前每秒写出的数据字节数(压缩前)。

  • 维表

    dim.odps.cacheSize:维表缓存的数据条数。

说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类Datastream和SQL
是否支持更新或删除结果表数据不支持更新和删除结果表数据,只支持插入数据。

前提条件

已创建MaxCompute表,详情请参见创建表

使用限制

  • 仅实时计算引擎VVR 2.0.0及以上版本支持MaxCompute连接器。
  • MaxCompute连接器仅支持At Least Once语义。
    说明 At Least Once语义会保证数据不缺失,但在少部分情况下,可能会将重复数据写入MaxCompute。不同的MaxCompute Tunnel出现重复数据的情况不同,MaxCompute Tunnel详情请参见如何选择数据通道?
  • 默认情况下源表为全量模式,仅会读取partition参数中指定的分区,在读完所有数据后结束运行,状态转换为finished,不会监控是否有新分区产生。
    如果您需要持续监控新分区,请通过WITH参数中指定startPartition使用增量源表模式。
    说明
    • 维表每次更新时都会检查最新分区,不受这一限制。
    • 在源表开始运行后,向分区里添加的新数据不会被读取,请在分区数据完整的情况下运行作业。

语法结构

CREATE TBALE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '<yourAccessKeyId>',
  'accessKey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****'
);

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector表类型。String固定值为odps。
    endpointMaxCompute服务地址。String请参见Endpoint
    tunnelEndpointMaxCompute Tunnel服务的连接地址。String请参见Endpoint
    说明
    • VPC环境下为必填。
    • 如果未填写,MaxCompute会根据内部的负载均衡服务分配Tunnel的连接。
    projectMaxCompute项目名称。String无。
    tableNameMaxCompute表名。String无。
    accessIdMaxCompute AccessKey ID。String无。
    accessKeyMaxCompute AccessKey Secret。String无。
    partitionMaxCompute分区名。String对于非分区表和增量源表无需填写。
    compressAlgorithmMaxCompute Tunnel使用的压缩算法。String
    • VVR 4.0.13及以上版本:ZLIB
    • VVR 6.0.1及以上版本:SNAPPY
    参数取值如下:
    • RAW(无压缩)
    • ZLIB
    • SNAPPY

      SNAPPY相比ZLIB能带来明显的吞吐提升。在测试场景下,吞吐提升约50%。

    说明 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
  • 源表独有
    参数说明数据类型是否必填默认值备注
    maxPartitionCount可以读取的最大分区数量。Integer100如果读取的分区数量超过了该参数,则会出现报错The number of matched partitions exceeds the default limit
    重要 由于一次性读取大量分区会给MaxCompute服务带来一定压力,同时也会让作业启动速度变慢,因此您需要确认是否需要读取这么多分区(而不是误填partition参数)。如果确实需要,需要手动调大maxPartitionCount参数。
  • 增量源表独有

    增量源表通过间歇轮询MaxCompute服务器获取所有的分区信息来发现新增的分区,读取新分区时要求分区内数据已写入完毕,详情参见增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?。通过startPartition可以指定起始点位,但注意只读取字典序大于等于起始点位的分区,例如分区year=2023,month=10字典序小于分区year=2023,month=9,对于这种类型的分区声明可以通过加0补齐的方式来保证字典序正确,例如year=2023,month=09

    参数说明数据类型是否必填默认值备注
    startPartition增量读取的起始MaxCompute分区点位(包含)。String
    • 使用该参数后启用增量源表模式,将忽略partition参数。
    • 多级分区必须按分区级别从大到小声明每个分区列的值。
    说明 startPartition参数详情,请参见如何填写增量MaxCompute的startPartition参数?
    subscribeIntervalInSec轮询MaxCompute获取分区列表的时间间隔。Integer30单位为秒。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    useStreamTunnel是否使用MaxCompute Stream Tunnel上传数据。Booleanfalse
    参数取值如下:
    • true:使用MaxCompute Stream Tunnel上传数据。
    • false:使用MaxCompute Batch Tunnel上传数据。
    说明
    flushIntervalMsMaxCompute Tunnel Writer缓冲区flush间隔。Long30000(30秒)

    MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs),再把缓冲区里的数据写到目标MaxCompute表。

    对于Stream Tunnel,flush的数据立即可见;对于Batch Tunnel,数据flush后仍需要等待checkpoint完成后才可见,建议设置该参数为0来关闭定时flush。

    单位为毫秒。

    说明 本参数可以与batchSize一同使用,满足任一条件即会Flush数据。
    batchSizeMaxCompute Tunnel Writer缓冲区flush的大小。Long67108864(64 MB)

    MaxCompute Sink写入记录时,先将数据存储到MaxCompute的缓冲区中,等缓冲区达到一定大小(batchSize),再把缓冲区里的数据写到目标MaxCompute表。

    单位为字节。

    说明
    • 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
    • 本参数可以与flushIntervalMs一同使用,满足任一条件即会Flush数据。
    numFlushThreadsMaxCompute Tunnel Writer缓冲区flush的线程数。Integer1
    每个MaxCompute Sink并发将创建numFlushThreads个线程用于flush数据。当该值大于1时,将允许不同分区的数据并发Flush,提升Flush的效率。
    说明 仅实时计算引擎VVR 4.0.14及以上版本支持该参数。
    dynamicPartitionLimit写入动态分区的最大数量。Integer100当结果表在两次Checkpoint之间写入的动态分区数量超过了dynamicPartitionLimit,则会出现报错Too many dynamic partitions
    重要 由于一次性写入大量分区会给MaxCompute服务带来一定压力,同时也会导致结果表flush和作业Checkpoint变慢。因此当报错出现时,您需要确认是否需要写入这么多分区。如果确实需要,需要手动调大dynamicPartitionLimit参数。
    retryTimes向MaxCompute服务器请求最大重试次数。Integer3创建session、提交session、flush数据时可能存在短暂的MaxCompute服务不可用时,会根据该配置进行重试。
    sleepMillis重试间隔时间。Integer1000单位为毫秒。
  • 维表独有

    MaxCompute维表在作业启动时从指定的分区拉取全量数据,partition参数支持使用max_pt()等函数。当缓存过期重新加载时会重新解析partition参数拉取最新的分区,使用max_two_pt()时维表可拉取两个分区,其他情况下只支持指定单个分区。

    参数说明数据类型是否必填默认值备注
    cache缓存策略。String目前MaxCompute维表仅支持ALL策略,必须显式声明。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    说明
    • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
    • 如果MaxCompute维表数据量较大,可以考虑使用SHUFFLE_HASH注解将维表数据均匀分散到各个并发中。详情请参见如何使用维表SHUFFLE_HASH注解?
    • 在使用超大MaxCompute维表时,如果JVM频繁GC导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议改为支持LRU Cache策略的KV型维表,例如云数据库Hbase版维表。
    cacheSize最多缓存的数据条数。Long100000如果维表数据量超过了cacheSize,则会出现报错Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit
    重要 由于维表数据量太大会占用大量JVM堆内存,同时也会让作业启动和维表更新变慢,因此您需要确认是否需要缓存这么多数据,如果确实需要,需要手动调大该参数。
    cacheTTLMs缓存超时时间,也就是缓存更新的间隔时间。LongLong.MAX_VALUE(相当于永不更新)单位为毫秒。
    cacheReloadTimeBlackList更新时间黑名单。在该参数规定的时间段内不会更新缓存。String用于防止缓存在关键时间段(例如活动流量峰值期间)更新导致作业不稳定。填写方式详情请参见如何填写CacheReloadTimeBlackList参数?
    maxLoadRetries缓存更新时(包含作业启动时初次拉取数据)最多尝试次数,超过该次数后作业运行失败。Integer10无。

类型映射

MaxCompute字段类型Flink字段类型
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
VARCHARVARCHAR
DECIMALDECIMAL
BINARYVARBINARY
STRINGVARCHAR

使用示例

SQL

  • 源表示例
    • 全量读取
      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '<yourAccessId>',
        'accessKey' = '<yourAccessPassword>',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • 增量读取
      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '<yourAccessId>',
        'accessKey' = '<yourAccessPassword>',
        'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 从20180905对应分区开始读取
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • 结果表示例
    • 写入固定分区
      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '<yourAccessKeyId>',
        'accessKey' = '<yourAccessKeySecret>',
        'partition' = 'ds=20180905' -- 写入固定分区ds=20180905。
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • 写入动态分区
      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR --需要显式声明动态分区列。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '<yourAccessKeyId>',
        'accessKey' = '<yourAccessKeySecret>',
        'partition' = 'ds' --不写分区的值,表示根据ds字段的值写入不同分区。
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • 维表示例
    • 一对一维表
      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- 一对一维表需要声明主键。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '<yourAccessKeyId>',
        'accessKey' = '<yourAccessKeySecret>',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • 一对多维表
      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- 一对多维表无需声明主键。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '<yourAccessKeyId>',
        'accessKey' = '<yourAccessKeySecret>',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要 通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法
MaxCompute连接器的Maven依赖包含了构建全量源表、增量源表、结果表和维表的所需要的类。
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${connector.version}</version>
</dependency>
在DataStream中使用MaxCompute连接器推荐使用SQL声明MaxCompute表,通过Table/DataStream相互转换来连接MaxCompute表和数据流。
  • 连接源表
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=201809*'",
        ")");
    DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
    source.print();
    env.execute("odps source"); 
  • 连接结果表
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=20180905'",
        ")");
    DataStream<Row> data = env.fromElements(
        Row.of("id0", 3.),
        Row.of("id1", 4.));
    tEnv.fromDataStream(data).insertInto("odps_sink");
    env.execute("odps sink");

常见问题