全部产品
云市场
云游戏

创建同步MaxCompute

更新时间:2020-06-30 14:44:58

同步数据到MaxCompute

准备工作


1.创建MaxCompute表
DataHub支持将数据同步到MaxCompute对应的数据表中,同时支持分区表和非分区表,一般情况下推荐用户使用分区表进行数据同步以方便MaxCompute数据处理
目前DataHub支持将TUPLE和BLOB的数据同步到MaxCompute数据表中
1)针对TUPLE类型topic,MaxCompute目标表数据类型需要和DataHub数据类型相匹配,具体的数据类型映射关系如下:

MaxCompute DataHub
TINYINT TINYINT
SMALLINT SMALLINT
INT INTEGER
BIGINT BIGINT
STRING / CHAR / VARCHR STRING (若ODPS类型有长度限制则直接截断)
BOOLEAN BOOLEAN
FLOAT FLOAT
DOUBLE DOUBLE
DATETIME / TIMESTAMP TIMESTAMP (默认按微秒换算,可配置)
DECIMAL / DECIMAL2.0 DECIMAL
MAP 不支持
ARRAY 不支持


由于目前DataHub并不能完全支持MaxCompute所有的数据类型,请用户尽量根据DataHub数据类型创建MaxCompute表结构。
DataHub 中的 TINYINT , SMALLINT , INTEGER , FLOAT 类型从 java sdk 2.16.1-public 开始支持。
2)针对BLOB数据类型,需要要求MaxCompute表结构仅需要包含一列STRING类型的column即可,DataHub默认会将数据同步到该column中

DataHub MaxCompute
BLOB STRING


3)同时为了方便数据追踪和问题排查,建议用户在创建MaxCompute表结构时,增加一列__rowkey__ STRING字段,DataHub会自动将DataHub对应数据的trace信息同步到该列中,以方便后续数据排查。
2.准备同步任务账号并授权
1)新建同步MaxCompute任务时,需要用户手动填写访问MaxCompute表的账号信息,请用户确保填入有效的账号信息(一般情况下采用MaxCompute子账号即可)。
2)需要给该账号授予访问MaxCompute表的响应权限,具体权限包括CreateInstanceDescribeAlter以及Update权限。
用户可以使用DataWorks管控台进行MaxCompute对应表的权限管理,参考MaxCompute高级配置,也可以选择使用MaxCompute的命令行工具进行授权,参考MaxCompute用户及授权管理
注意事项

  1. 由于MaxCompute目前的写入标准原因,分区数越多就会导致DataHub同步数据越慢。因此,在创建MaxCompute同步任务时,请尽可能的控制分区数,尤其是USER_DEFINE同步模式。
    • 同一分区的数据越连续越好,不要频繁的分区跳变
    • 同步模式控制创建分区时,请不要创建过多的分区数

创建同步任务

  1. 依次进入项目列表/Project详情/Topic详情页面
  2. 点击右上角的 + 同步按钮进行同步任务创建
    CreateConnector.jpg
  3. 选择MaxCompute类型作业,如下图所示:
    1)TUPLE类型同步1.png
    2)BLOB类型同步
    2.png


部分配置说明:
下面罗列了部分管控台创建同步任务的配置说明,更多更灵活的操作请参考SDK使用。

  1. 导入字段
    DataHub可以根据用户设置将部分column内容同步到MaxCompute表中
  2. 分区模式
    分区模式决定了将数据写入到MaxCompute哪个分区中,目前DataHub支持以下分区方式:
分区模式 分区依据 支持Topic类型 说明
USER_DEFINE Record中的分区列(和MaxCompute的分区字段同名)的value值 TUPLE (1). DataHub schema中必须包含MaxCompute分区字段 (2). 该列值必须为非空UTF8字符串
SYSTEM_TIME Record写入DataHub的时间 TUPLE / BLOB (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息
EVENT_TIME Record中的event_time(TIMESTAMP)列的value值 TUPLE (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息
META_TIME Record的属性字段__dh_meta_time__的value值 TUPLE / BLOB (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息


其中SYSTEM_TIMEEVENT_TIMEMETA_TIME均是根据时间Timestamp和时区配置来进行MaxCompute分区的转换过程,单位均为毫秒。

  1. 分区配置
    分区配置决定了根据时间戳转换MaxCompute分区时的相关配置。目前管控台默认固定的MaxCompute分区格式,分区配置对应为
分区 时间Format 说明
ds %Y%m%d day
hh %H hour
mm %M minute
  1. 分区间隔
    分区间隔决定了根据时间戳转换MaxCompute分区时所采用的时间间隔。时间范围是15分钟 ~ 1440分钟(1天),跳变间隔15分钟
  2. 时区信息(TimeZone)
    时区信息决定了根据时间戳转换MaxCompute分区时所采用的转换时区。
  3. 分隔符
    BLOB数据同步时,可以指定16进制分隔符来决定是否对BLOB数据分割后再同步MaxCompute,比如 0A表示\n(换行符)
  4. Base64编码
    DataHub BLOB默认存储二进制数据,而MaxCompute对应的同步列为STRING类型,因此管控台创建同步任务时,默认采用base64编码后进行同步,更多定制化需求请参考SDK实现。

查看同步任务


可以点击对应connector的详情页面查看同步任务的运行状态和点位等信息, 包含同步点位、同步状态以及重启和停止等操作,如下图所示:
SinkOdpsDetail.jpg

同步示例

1. USER_DEFINE同步模式

  1. 首先建立MaxCompute分区表,分区key分别是ds、hh、mm(用户可以根据需要设置)

    1. CREATE TABLE IF NOT EXISTS datahub_dev.test_sink_odps_user_define(f1 STRING,f2 BIGINT) PARTITIONED BY (ds STRING,hh STRING,mm STRING) STORED AS ALIORC;
  2. 建立DataHub Topic
    备注: topic schema中必须需要包含MaxCompute分区字段,类型为STRING,如下图所示:
    topic_user_define.jpg

  3. 向DataHub Topic写入数据,可以使用datahub-sdk进行数据写入
    测试过程中使用SDK写入2条数据,其中[ds,hh,mm]分别为: [20200101,01,01]和[20200202,02,02],数据内容如下所示:
    topic_user_define_data.jpg
  4. 建立同步任务
    topic_user_define_connector.jpg
    这里导入字段中设置导入f1、f2字段,不同步f3字段。
  5. 确认同步数据
    可以从DataHub管控台查看对应同步任务的同步信息,当同步完成后,可以使用MaxCompute工具查询数据。
    topic_user_define_status.jpg
    查询MaxCompute数据结果,结果如下:
    topic_user_define_result.jpg
    可以看到在USER_DEFINE模式下,DataHub会根据MaxCompute分组字段所对应的value将DataHub中的数据同步到对应的分区中。

2. SYSTEM_TIME同步模式

  1. 首先建立MaxCompute分区表,这里分区key分别是ds、hh(管控台目前默认采用ds/hh/mm的固定分区字段)

    1. CREATE TABLE IF NOT EXISTS datahub_dev.test_sink_odps_user_define(f1 STRING,f2 BIGINT) PARTITIONED BY (ds STRING,hh STRING) STORED AS ALIORC;
  2. 建立DataHub Topic
    备注: 由于分区是根据写入DataHub时间来计算的,因此topic schema只需包含数据字段,不需要包含分区字段,如下图所示:
    topic_system_time.jpg

  3. 向DataHub Topic写入数据,可以使用datahub-sdk进行数据写入
    测试过程中使用SDK写入2条数据,DataHub目前对应的写入时间为2020-03-04 18:40:56,数据内容如下所示:
    topic_system_time_data.jpg
  4. 建立同步任务
    3.png
    • 这里导入字段中设置仅导入f1字段,不同步f2字段。
    • 由于MaxCompute仅有ds和hh分区,这里的分区配置也仅配置这两项
    • 分区间隔设置为15分钟
  5. 确认同步数据
    可以从DataHub管控台查看对应同步任务的同步信息,当同步完成后,可以使用MaxCompute工具查询数据。
    topic_system_time_status.jpg
    查询MaxCompute数据结果,结果如下:
    topic_system_time_result.jpg
    可以看到在SYSTEM_TIME模式下,DataHub会根据数据写入DataHub的时间将DataHub中的数据同步到对应的分区中。