阿里云首页 DataHub

创建同步MaxCompute

同步数据到MaxCompute

准备工作

1.创建MaxCompute表

DataHub支持将数据同步到MaxCompute对应的数据表中,同时支持分区表和非分区表,一般情况下推荐用户使用分区表进行数据同步以方便MaxCompute数据处理

目前DataHub支持将TUPLE和BLOB的数据同步到MaxCompute数据表中

1)针对TUPLE类型topic,MaxCompute目标表数据类型需要和DataHub数据类型相匹配,具体的数据类型映射关系如下:

MaxCompute DataHub
BIGINT BIGINT
STRING STRING
BOOLEAN BOOLEAN
DOUBLE DOUBLE
DATETIME TIMESTAMP
DECIMAL DECIMAL
TINYINT TINIINT
SMALLINT SMALLINT
INT INTEGER
FLOAT FLOAT
MAP 不支持
ARRAY 不支持

由于目前DataHub并不能完全支持MaxCompute所有的数据类型,请用户尽量根据DataHub数据类型创建MaxCompute表结构。

2)针对BLOB数据类型,需要要求MaxCompute表结构仅需要包含一列STRING类型的column即可,DataHub默认会将数据同步到该column中

DataHub MaxCompute
BLOB STRING

3)同时为了方便数据追踪和问题排查,建议用户在创建MaxCompute表结构时,增加一列__rowkey__ STRING字段,DataHub会自动将DataHub对应数据的trace信息同步到该列中,以方便后续数据排查。

2.准备同步任务账号并授权

1)新建同步MaxCompute任务时,需要用户手动填写访问MaxCompute表的账号信息,请用户确保填入有效的账号信息(一般情况下采用MaxCompute子账号即可)。

2)需要给该账号授予访问MaxCompute表的响应权限,具体权限包括CreateInstanceDescribeAlter以及Update权限。

用户可以使用DataWorks管控台进行MaxCompute对应表的权限管理,参考MaxCompute高级配置,也可以选择使用MaxCompute的命令行工具进行授权,参考MaxCompute用户及授权管理

3.确认TimestampUnit单位

1) Connector中TimestampUnit的作用,就是将数据中TIMESTAMP类型的数据(如果有),以TimestampUnit为单位进行转换后写入到下游系统的日期类型(如datatime类型)

2)如果TIMESTAMP列写入的是以秒为单位的值,那新建Connector的时候TimestampUnit就选择“SECOND”;如果写入的是以毫秒为单位的值,那就选择“MILLISECOND”;如果写入的是以微妙为单位的值,那就选择“MICROSECOND”

注意事项

  1. 由于MaxCompute目前的写入标准原因,分区数越多就会导致DataHub同步数据越慢。因此,在创建MaxCompute同步任务时,请尽可能的控制分区数,尤其是USER_DEFINE同步模式。
    • 同一分区的数据越连续越好,不要频繁的分区跳变
    • 同步模式控制创建分区时,请不要创建过多的分区数

创建同步任务

  1. 依次进入项目列表/Project详情/Topic详情页面
  2. 点击右上角的 + 同步按钮进行同步任务创建5-1
  3. 选择MaxCompute类型作业,如下图所示:

    1)TUPLE类型同步5-22)BLOB类型同步

5-3部分配置说明:

下面罗列了部分管控台创建同步任务的配置说明,更多更灵活的操作请参考SDK使用。

  1. 导入字段

    DataHub可以根据用户设置将部分column内容同步到MaxCompute表中

  2. 分区模式

    分区模式决定了将数据写入到MaxCompute哪个分区中,目前DataHub支持以下分区方式:

分区模式 分区依据 支持Topic类型 说明
USER_DEFINE Record中的分区列(和MaxCompute的分区字段同名)的value值 TUPLE (1). DataHub schema中必须包含MaxCompute分区字段 (2). 该列值必须为UTF8字符串 字段值可以为空,表示不分区
SYSTEM_TIME Record写入DataHub的时间 TUPLE / BLOB (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息
EVENT_TIME Record中的event_time(TIMESTAMP)列的value值 TUPLE (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息
META_TIME Record的属性字段__dh_meta_time__的value值 TUPLE / BLOB (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息 (3).单位是ms,不存在以systemtime填充

其中SYSTEM_TIMEEVENT_TIMEMETA_TIME均是根据时间Timestamp和时区配置来进行MaxCompute分区的转换过程,单位默认为微秒。

  1. 分区配置分区配置决定了根据时间戳转换MaxCompute分区时的相关配置。目前管控台默认固定的MaxCompute分区格式,分区配置对应为
分区 时间Format 说明
ds %Y%m%d day
hh %H hour
mm %M minute
  1. 分区间隔分区间隔决定了根据时间戳转换MaxCompute分区时所采用的时间间隔。时间范围是15分钟 ~ 1440分钟(1天),跳变间隔15分钟
  2. 时区信息(TimeZone)时区信息决定了根据时间戳转换MaxCompute分区时所采用的转换时区。
  3. 分隔符BLOB数据同步时,可以指定16进制分隔符来决定是否对BLOB数据分割后再同步MaxCompute,比如 0A表示\n(换行符)
  4. Base64编码DataHub BLOB默认存储二进制数据,而MaxCompute对应的同步列为STRING类型,因此管控台创建同步任务时,默认采用base64编码后进行同步,更多定制化需求请参考SDK实现。

查看同步任务

可以点击对应connector的详情页面查看同步任务的运行状态和点位等信息, 包含同步点位、同步状态以及重启和停止等操作,如下图所示:5-4

同步示例

1. USER_DEFINE同步模式

  1. 建立DataHub Topic

    备注: topic schema中必须需要包含MaxCompute分区字段,类型为STRING,如下图所示:5-5

  2. 向DataHub Topic写入数据,可以使用datahub-sdk进行数据写入

    测试过程中使用SDK写入几条条数据,其中[ds,hh,mm]分别为: [20210304,01,15]和[20210304,02,15],数据内容如下所示:

5-6

3 建立同步任务

USER_DEFINE分区模式可以通过在同步中设置分区配置字段,如果MaxCompute没有对应的表,可自动创建。3 这里导入字段中设置导入f1、f2字段,不同步f3字段。

4 确认同步数据

可以从DataHub管控台查看对应同步任务的同步信息5-8 查询MaxCompute数据结果,结果如下:5-9 可以看到在USER_DEFINE模式下,DataHub会根据MaxCompute分组字段所对应的value将DataHub中的数据同步到对应的分区中。

2. SYSTEM_TIME同步模式

  1. 建立DataHub Topic

    备注: 由于分区是根据写入DataHub时间来计算的,因此topic schema只需包含数据字段,不需要包含分区字段,如下图所示:5-10

  2. 向DataHub Topic写入数据,可以使用datahub-sdk进行数据写入

    测试过程中使用SDK写入几条数据,DataHub目前对应的写入时间为2021-03-04 14:02:45,数据内容如下所示:5-11

  3. 建立同步任务5-12
    • 请注意分区配置需要和MaxCompute表分区一致

4 确认同步数据

可以从DataHub管控台查看对应同步任务的同步信息,如DoneTime。 5-13 查询MaxCompute数据结果,结果如下: 5-14 可以看到在SYSTEM_TIME模式下,DataHub会根据数据写入DataHub的时间将DataHub中的数据同步到对应的分区中。

常见问题

  • 同步到Maxcompute timestamp字段时间变为1970-01-19

    原因:DataHub同步Maxcompute默认时间戳单位为微妙,用户写入时间戳为毫秒解决方案: 写入DataHub时间戳以微妙方式写入

首页 DataHub 用户指南 数据同步 创建同步MaxCompute