创建同步MaxCompute

本文为您介绍如何在DataHub中创建同步至MaxComputeConnector,以及在创建时的注意事项以及问题。

准备工作

准备MaxCompute

DataHub支持将数据同步到MaxCompute对应的数据表中,同时支持分区表和非分区表,推荐用户使用分区表进行数据同步,以方便MaxCompute数据处理。

数据映射关系

DataHub支持将TUPLEBLOB的数据同步到MaxCompute数据表中

TUPLE类型下,DataHubMaxCompute数据类型的映射。

MaxCompute

DataHub

BIGINT

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

TINYINT

TINIINT

SMALLINT

SMALLINT

INT

INTEGER

FLOAT

FLOAT

MAP

不支持

ARRAY

不支持

说明

由于DataHub并不能完全支持MaxCompute所有的数据类型,请用户尽量根据DataHub数据类型创建MaxCompute表结构。

BLOB类型下,DataHubMaxCompute数据类型的映射。

针对BLOB数据类型,MaxCompute表结构仅需包含一列STRING类型的column即可,DataHub默认会将数据同步到该column中。

DataHub

MaxCompute

BLOB

STRING

说明

为了方便数据追踪和问题排查,建议您在创建MaxCompute表结构时,增加一列__rowkey__ STRING字段,DataHub会自动将DataHub对应数据的trace信息同步到该列中,以方便后续数据排查。

准备同步任务账号并授权

  • 新建同步MaxCompute任务时,需手动填写访问MaxCompute表的账号信息,请用户确保填写有效的账号信息(一般情况下采用MaxCompute子账号即可)。

  • 需要给该账号授予访问MaxCompute表的相应权限,具体权限包括CreateInstanceDescribeAlter以及Update权限。

    说明

    用户可以使用DataWorks管控台进行MaxCompute对应表的权限管理,参考配置MaxCompute引擎权限,也可以选择使用MaxCompute的命令行工具进行授权,参考MaxCompute权限

确认TimestampUnit单位

ConnectorTimestampUnit的作用,就是将数据中TIMESTAMP类型的数据(如果有),以TimestampUnit为单位进行转换后写入到下游系统的日期类型(如datetime类型)。

  • 如果TIMESTAMP列写入的是以为单位的值,那新建Connector的时候TimestampUnit就选择SECOND

  • 如果TIMESTAMP列写入的是以毫秒为单位的值,那新建Connector的时候TimestampUnit就选择MILLISECOND

  • 如果TIMESTAMP列写入的是以微秒为单位的值,那新建Connector的时候TimestampUnit就选择MICROSECOND

说明

由于MaxCompute目前的写入标准原因,分区数越多就会导致DataHub同步数据越慢。因此,在创建MaxCompute同步任务时,请尽可能的控制分区数,尤其是USER_DEFINE同步模式。

  • 同一分区的数据越连续越好,不要频繁地分区跳变。

  • 同步模式控制创建分区时,请不要创建过多的分区。

确认是否需要添加服务白名单

目前MaxCompute支持配置project级别的ip白名单,如果设置ip白名单之后DataHub将无法正常同步MaxCompute。为了解决该问题,MaxCompute添加了服务白名单,可以将DataHub的服务名称添加到MaxCompute的服务白名单中,此时DataHub便可以正常将数据同步至MaxCompute。

DataHub的服务名称当前定义为DataHub,可以通过SDK或者odpscmd进行设置。

SDK设置方式参考以下示例:

public static void setServiceName() throws OdpsException {
    String endpoint = "";
    String project = "";
    String accessId = "";
    String accessKey = "";

    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(endpoint);
    odps.setDefaultProject(project);

    Map<String, String> prop = new HashMap<>();
    //将DataHub的服务名称添加到MaxCompute的服务白名单,如果添加多个需要以逗号隔开,每次设置会覆盖之前的设置
    prop.put("odps.security.ip.whitelist.services", "DataHub");
    Group group = new Groups(odps).get("groupName");
    group.updateProject(project, null, null, prop);

    //检查当前MaxCompute服务白名单是否服务预期
    prop = odps.projects().get(project).getAllProperties();
    System.out.println("current service list: " + prop.get("odps.security.ip.whitelist.services"));
}

odpscmd设置参考以下示例:

启动odpscmd之后,输入 setproject odps.security.ip.whitelist.services=DataHub;进行设置。

设置完成之后,可以通过 setproject;查看下是否设置成功。如果看到odps.security.ip.whitelist.services=DataHub则表示设置成功。

创建同步任务

  1. 单击DataHub中已创建的Topic,进入Topic详情页。

  2. 单击Topic详情页右上角的image

  3. 在新建Connector界面单击MaxCompute,配置新建Connector弹框的参数,单击创建。

    以下为新建Connector的关键参数介绍:

    参数

    参数说明

    导入字段

    DataHub可以根据用户设置将部分column内容同步到MaxCompute表中。

    分区模式

    分区模式决定了将数据写入到MaxCompute哪个分区中,分别支持以下四种参数:

    • USER_DEFINE: 用户自定义Field

    • SYSTEM_TIME: 写入DataHub时间

    • EVENT_TIME: Schemaevent_time字段,单位为微秒

    • META_TIME: Attributes中的__dh_meta_time__字段,单位毫秒

    分区配置

    分区配置可选以下三种参数:

    %Y%m%d对应ds分区,获取的是日期。

    %H对应hh分区,获取的是小时。

    %M对应分钟分区,获取的是分钟。

    Timestamp Unit

    支持配置以下参数:

    • MICROSECOND

    • MILLISCOND

    • SECOND

    说明
    • 分区间隔决定了根据时间戳转换MaxCompute分区时所采用的时间间隔。时间范围是15分钟 ~ 1440分钟(1天),跳变间隔是15分钟

    • 时区信息(TimeZone)决定了根据时间戳转换MaxCompute分区时所采用的转换时区。

    • 分隔符BLOB数据同步时,可以指定16进制分隔符来决定是否对BLOB数据分割后再同步MaxCompute,比如 0A表示\n(换行符)。

    • Base64编码DataHub BLOB默认存储二进制数据,而MaxCompute对应的同步列为STRING类型,因此管控台创建同步任务时,默认采用base64编码后进行同步,更多定制化需求请参考SDK实现。

创建同步任务

  1. 依次进入项目列表/Project详情/Topic详情页面。

  2. 点击右上角的 + 同步按钮进行同步任务创建。

c

  1. 选择MaxCompute类型作业,如下图所示:

    1)TUPLE类型同步5-22)BLOB类型同步

5-3部分配置说明

下面罗列了部分管控台创建同步任务的配置说明,更多更灵活的操作请参考SDK使用。

  1. 导入字段

    DataHub可以根据用户设置将部分column内容同步到MaxCompute表中。

  2. 分区模式

    分区模式决定了将数据写入到MaxCompute哪个分区中,目前DataHub支持以下分区方式:

分区模式

分区依据

支持Topic类型

说明

USER_DEFINE

Record中的分区列(和MaxCompute的分区字段同名)的value

TUPLE

(1). DataHub schema中必须包含MaxCompute分区字段 (2). 该列值必须为UTF8字符串 字段值可以为空,表示不分区

SYSTEM_TIME

Record写入DataHub的时间

TUPLE / BLOB

(1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息

EVENT_TIME

Record中的event_time(TIMESTAMP)列的value

TUPLE

(1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息

META_TIME

Record的属性字段__dh_meta_time__value

TUPLE / BLOB

(1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息

其中SYSTEM_TIMEEVENT_TIMEMETA_TIME均是根据时间Timestamp和时区配置来进行MaxCompute分区的转换过程,单位默认为微秒。

  1. 分区配置决定了根据时间戳转换MaxCompute分区时的相关配置。目前管控台默认固定的MaxCompute分区格式,分区配置对应为

分区

时间Format

说明

ds

%Y%m%d

day

hh

%H

hour

mm

%M

minute

  1. 分区间隔决定了根据时间戳转换MaxCompute分区时所采用的时间间隔。时间范围是15分钟 ~ 1440分钟(1天),跳变间隔15分钟

  2. 时区信息(TimeZone)决定了根据时间戳转换MaxCompute分区时所采用的转换时区。

  3. 分隔符BLOB数据同步时,可以指定16进制分隔符来决定是否对BLOB数据分割后再同步MaxCompute,比如 0A表示\n(换行符)。

  4. Base64编码DataHub BLOB默认存储二进制数据,而MaxCompute对应的同步列为STRING类型,因此管控台创建同步任务时,默认采用base64编码后进行同步,更多定制化需求请参考SDK实现。

查看同步任务

可以点击对应connector的详情页面查看同步任务的运行状态和点位等信息, 包含同步点位、同步状态以及重启和停止等操作,如下图所示:5-4

同步示例

1. USER_DEFINE同步模式

  1. 建立DataHub Topic

    备注: topic schema中必须需要包含MaxCompute分区字段,类型为STRING,如下图所示:5-5

  2. DataHub Topic写入数据,可以使用datahub-sdk进行数据写入

    测试过程中使用SDK写入几条数据,其中[ds,hh,mm]分别为:[20210304,01,15]和[20210304,02,15],数据内容如下所示:

5-6

3 建立同步任务

USER_DEFINE分区模式可以通过在同步中设置分区配置字段,如果MaxCompute没有对应的表,可自动创建。5-7 这里导入字段中设置导入f1、f2字段,不同步f3字段。

4 确认同步数据

可以从DataHub管控台查看对应同步任务的同步信息。5-8 查询MaxCompute数据结果,结果如下:5-9 可以看到在USER_DEFINE模式下,DataHub会根据MaxCompute分组字段所对应的valueDataHub中的数据同步到对应的分区中。

2. SYSTEM_TIME同步模式

  1. 建立DataHub Topic

    备注:由于分区是根据写入DataHub时间来计算的,因此topic schema只需包含数据字段,不需要包含分区字段,如下图所示:

a

  1. DataHub Topic写入数据,可以使用datahub-sdk进行数据写入。

    测试过程中使用SDK写入几条数据,DataHub目前对应的写入时间为2021-03-04 14:02:45,数据内容如下所示:5-11

  2. 建立同步任务

    5-12

    • 请注意分区配置需要和MaxCompute表分区一致。

4 确认同步数据

可以从DataHub管控台查看对应同步任务的同步信息,如DoneTime。 5-13 查询MaxCompute数据结果,结果如下: 5-14 可以看到在SYSTEM_TIME模式下,DataHub会根据数据写入DataHub的时间DataHub中的数据同步到对应的分区中。

常见问题

  • 同步到MaxCompute timestamp字段时间变为1970-01-19

    原因:DataHub同步MaxCompute默认时间戳单位为微秒,用户写入时间戳为毫秒解决方案:写入DataHub时间戳以微秒方式写入。