本文为您介绍如何在DataHub中创建同步至MaxCompute的Connector,以及在创建时的注意事项以及问题。
准备工作
准备MaxCompute表
DataHub支持将数据同步到MaxCompute对应的数据表中,同时支持分区表和非分区表,推荐用户使用分区表进行数据同步,以方便MaxCompute数据处理。
数据映射关系
DataHub支持将TUPLE和BLOB的数据同步到MaxCompute数据表中
为了方便数据追踪和问题排查,建议您在创建MaxCompute表结构时,增加一列__rowkey__ STRING
字段,DataHub会自动将DataHub对应数据的trace信息同步到该列中,以方便后续数据排查。
准备同步任务账号并授权
新建同步MaxCompute任务时,需手动填写访问MaxCompute表的账号信息,请用户确保填写有效的账号信息(一般情况下采用MaxCompute子账号即可)。
需要给该账号授予访问MaxCompute表的相应权限,具体权限包括
CreateInstance
、Describe
、Alter
以及Update
权限。说明用户可以使用DataWorks管控台进行MaxCompute对应表的权限管理,参考配置MaxCompute引擎权限,也可以选择使用MaxCompute的命令行工具进行授权,参考MaxCompute权限。
确认TimestampUnit单位
Connector中TimestampUnit
的作用,就是将数据中TIMESTAMP
类型的数据(如果有),以TimestampUnit
为单位进行转换后写入到下游系统的日期类型(如datetime
类型)。
如果
TIMESTAMP
列写入的是以秒为单位的值,那新建Connector的时候TimestampUnit就选择SECOND
。如果
TIMESTAMP
列写入的是以毫秒为单位的值,那新建Connector的时候TimestampUnit就选择MILLISECOND
。如果
TIMESTAMP
列写入的是以微秒为单位的值,那新建Connector的时候TimestampUnit就选择MICROSECOND
。
由于MaxCompute目前的写入标准原因,分区数越多就会导致DataHub同步数据越慢。因此,在创建MaxCompute同步任务时,请尽可能的控制分区数,尤其是USER_DEFINE
同步模式。
同一分区的数据越连续越好,不要频繁地分区跳变。
同步模式控制创建分区时,请不要创建过多的分区。
确认是否需要添加服务白名单
目前MaxCompute支持配置project级别的ip白名单,如果设置ip白名单之后DataHub将无法正常同步MaxCompute。为了解决该问题,MaxCompute添加了服务白名单,可以将DataHub的服务名称添加到MaxCompute的服务白名单中,此时DataHub便可以正常将数据同步至MaxCompute。
DataHub的服务名称当前定义为DataHub,可以通过SDK或者odpscmd进行设置。
SDK设置方式参考以下示例:
public static void setServiceName() throws OdpsException {
String endpoint = "";
String project = "";
String accessId = "";
String accessKey = "";
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(endpoint);
odps.setDefaultProject(project);
Map<String, String> prop = new HashMap<>();
//将DataHub的服务名称添加到MaxCompute的服务白名单,如果添加多个需要以逗号隔开,每次设置会覆盖之前的设置
prop.put("odps.security.ip.whitelist.services", "DataHub");
Group group = new Groups(odps).get("groupName");
group.updateProject(project, null, null, prop);
//检查当前MaxCompute服务白名单是否服务预期
prop = odps.projects().get(project).getAllProperties();
System.out.println("current service list: " + prop.get("odps.security.ip.whitelist.services"));
}
odpscmd设置参考以下示例:
启动odpscmd之后,输入 setproject odps.security.ip.whitelist.services=DataHub;
进行设置。
设置完成之后,可以通过 setproject;
查看下是否设置成功。如果看到odps.security.ip.whitelist.services=DataHub
则表示设置成功。
创建同步任务
单击DataHub中已创建的Topic,进入Topic详情页。
单击Topic详情页右上角的
。
在新建Connector界面单击MaxCompute,配置新建Connector弹框的参数,单击创建。
以下为新建Connector的关键参数介绍:
参数
参数说明
导入字段
DataHub可以根据用户设置将部分column内容同步到MaxCompute表中。
分区模式
分区模式决定了将数据写入到MaxCompute哪个分区中,分别支持以下四种参数:
USER_DEFINE: 用户自定义Field
SYSTEM_TIME: 写入DataHub时间
EVENT_TIME: Schema中event_time字段,单位为微秒
META_TIME: Attributes中的__dh_meta_time__字段,单位毫秒
分区配置
分区配置可选以下三种参数:
%Y%m%d
:对应ds分区,获取的是日期。%H
:对应hh分区,获取的是小时。%M
:对应分钟分区,获取的是分钟。Timestamp Unit
支持配置以下参数:
MICROSECOND
MILLISCOND
SECOND
说明分区间隔决定了根据时间戳转换MaxCompute分区时所采用的时间间隔。时间范围是
15分钟 ~ 1440分钟(1天)
,跳变间隔是15分钟
。时区信息(TimeZone)决定了根据时间戳转换MaxCompute分区时所采用的转换时区。
分隔符BLOB数据同步时,可以指定16进制分隔符来决定是否对BLOB数据分割后再同步MaxCompute,比如
0A
表示\n
(换行符)。Base64编码DataHub BLOB默认存储二进制数据,而MaxCompute对应的同步列为STRING类型,因此管控台创建同步任务时,默认采用base64编码后进行同步,更多定制化需求请参考SDK实现。
创建同步任务
依次进入
项目列表/Project详情/Topic详情
页面。点击右上角的
+ 同步
按钮进行同步任务创建。
选择MaxCompute类型作业,如下图所示:
1)TUPLE类型同步
2)BLOB类型同步
部分配置说明:
下面罗列了部分管控台创建同步任务的配置说明,更多更灵活的操作请参考SDK使用。
导入字段
DataHub可以根据用户设置将部分column内容同步到MaxCompute表中。
分区模式
分区模式决定了将数据写入到MaxCompute哪个分区中,目前DataHub支持以下分区方式:
分区模式 | 分区依据 | 支持Topic类型 | 说明 |
USER_DEFINE | Record中的分区列(和MaxCompute的分区字段同名)的value值 | TUPLE | (1). DataHub schema中必须包含MaxCompute分区字段 (2). 该列值必须为 |
SYSTEM_TIME | Record写入DataHub的时间 | TUPLE / BLOB | (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息 |
EVENT_TIME | Record中的 | TUPLE | (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息 |
META_TIME | Record的属性字段 | TUPLE / BLOB | (1). 分区配置中设置MaxCompute分区的时间转换Format格式 (2). 设置时区信息 |
其中SYSTEM_TIME
、EVENT_TIME
和META_TIME
均是根据时间Timestamp和时区配置来进行MaxCompute分区的转换过程,单位默认为微秒。
分区配置决定了根据时间戳转换MaxCompute分区时的相关配置。目前管控台默认固定的MaxCompute分区格式,分区配置对应为
分区 | 时间Format | 说明 |
ds | %Y%m%d | day |
hh | %H | hour |
mm | %M | minute |
分区间隔决定了根据时间戳转换MaxCompute分区时所采用的时间间隔。时间范围是
15分钟 ~ 1440分钟(1天)
,跳变间隔15分钟
。时区信息(TimeZone)决定了根据时间戳转换MaxCompute分区时所采用的转换时区。
分隔符BLOB数据同步时,可以指定16进制分隔符来决定是否对BLOB数据分割后再同步MaxCompute,比如
0A
表示\n
(换行符)。Base64编码DataHub BLOB默认存储二进制数据,而MaxCompute对应的同步列为STRING类型,因此管控台创建同步任务时,默认采用base64编码后进行同步,更多定制化需求请参考SDK实现。
查看同步任务
可以点击对应connector的详情页面查看同步任务的运行状态和点位等信息, 包含同步点位、同步状态以及重启和停止等操作,如下图所示:
同步示例
1. USER_DEFINE同步模式
建立DataHub Topic
备注: topic schema中必须需要包含MaxCompute分区字段,类型为STRING,如下图所示:
向DataHub Topic写入数据,可以使用datahub-sdk进行数据写入
测试过程中使用SDK写入几条数据,其中[ds,hh,mm]分别为:[20210304,01,15]和[20210304,02,15],数据内容如下所示:
3 建立同步任务
USER_DEFINE分区模式可以通过在同步中设置分区配置字段,如果MaxCompute没有对应的表,可自动创建。 这里导入字段中设置导入f1、f2字段,不同步f3字段。
4 确认同步数据
可以从DataHub管控台查看对应同步任务的同步信息。 查询MaxCompute数据结果,结果如下:
可以看到在USER_DEFINE模式下,DataHub会根据
MaxCompute分组字段所对应的value
将DataHub中的数据同步到对应的分区中。
2. SYSTEM_TIME同步模式
建立DataHub Topic
备注:由于分区是根据写入DataHub时间来计算的,因此topic schema只需包含数据字段,不需要包含分区字段,如下图所示:
向DataHub Topic写入数据,可以使用datahub-sdk进行数据写入。
测试过程中使用SDK写入几条数据,DataHub目前对应的写入时间为
2021-03-04 14:02:45
,数据内容如下所示:建立同步任务
请注意分区配置需要和MaxCompute表分区一致。
4 确认同步数据
可以从DataHub管控台查看对应同步任务的同步信息,如DoneTime。 查询MaxCompute数据结果,结果如下:
可以看到在SYSTEM_TIME模式下,DataHub会根据
数据写入DataHub的时间
将DataHub中的数据同步到对应的分区中。
常见问题
同步到MaxCompute timestamp字段时间变为1970-01-19
原因:DataHub同步MaxCompute默认时间戳单位为微秒,用户写入时间戳为毫秒解决方案:写入DataHub时间戳以微秒方式写入。