本文以LogHub数据同步至MaxCompute为例,为您介绍如何通过DataWorks数据集成同步LogHub数据至目的端数据源,如MaxCompute、OSS、OTS、RDBMS、DataHub等。

背景信息

数据集成是阿里集团对外提供的稳定高效、弹性伸缩的数据同步平台,为阿里云大数据计算引擎(包括MaxCompute、AnalyticDB)提供离线、批量的数据进出通道。

应用场景

  • 跨Region的LogHub与MaxCompute等数据源的数据同步。
  • 不同阿里云账号下的LogHub与MaxCompute等数据源间的数据同步。
  • 同一阿里云账号下的LogHub与MaxCompute等数据源间的数据同步。
  • 公共云与金融云账号下的LogHub与MaxCompute等数据源间的数据同步。

跨阿里云账号说明

以B账号进入数据集成配置同步任务,将A账号的LogHub数据同步至B账号的MaxCompute为例。

  • 用A账号的AccessKey ID和AccessKey创建LogHub数据源。

    此时B账号可以同步A账号下日志服务所有Project的数据。

  • 用A账号的子账号A1的AccessKey ID和AccessKey创建LogHub数据源。
    • A给授予A1日志服务通用权限,即AliyunLogFullAccessAliyunLogReadOnlyAccess,详情请参见访问日志服务资源
    • A给授予A1日志服务自定义权限。

      主账号A进入RAM访问控制台 > 权限管理 > 权限策略管理页面,选择新建授权策略

      相关授权操作请参见访问控制RAMRAM子用户访问

      根据下述策略进行授权后,B账号通过子账号A1只能同步日志服务project_name1以及project_name2的数据。
      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }

通过向导模式配置同步任务

  1. 新建数据源。
    具体操作步骤请参见配置LogHub数据源
  2. 新建业务流程。
    1. 以开发者身份进入DataWorks管理控制台,单击对应工作空间操作栏中的进入数据开发
    2. 数据开发页面,鼠标悬停至业务流程图标,单击业务流程
    3. 新建业务流程对话框中,输入业务流程名称描述
    4. 单击新建
  3. 新建离线同步节点。
    1. 展开业务流程,右键单击数据集成
    2. 单击新建 > 离线同步
    3. 新建节点对话框中,输入节点名称,并选择目标文件夹
    4. 单击提交
  4. 选择数据源。
    新建离线同步节点后,首先需要配置离线同步节点的读取端数据源。选择数据源
    配置 说明
    数据源 填写LogHub数据源的名称。
    Logstore 目标日志库的名称。
    日志开始时间 数据消费的开始时间位点,即日志数据到达Loghub的时间。时间范围(左闭右开)的左边界,为yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。
    日志结束时间 数据消费的结束时间位点,时间范围(左闭右开)的右边界,为yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。
    批量条数 一次读取的数据条数,默认为256。
    数据预览默认收起,您可单击进行预览。
    说明 预览数据可能会和您同步的数据不太一样,因为数据预览是选择LogHub中的几条数据展现在预览框。
  5. 选择数据去向。
    选择MaxCompute数据源及目标表ok选择数据去向
    配置 说明
    数据源 填写配置的数据源名称。
    选择需要同步的表。
    分区信息 此处需同步的表是非分区表,所以无分区信息。
    清理规则
    • 写入前清理已有数据:导数据之前,清空表或者分区的所有数据,相当于insert overwrite。
    • 写入前保留已有数据:导数据之前不清理任何数据,每次运行数据都是追加进去的,相当于insert into。
    压缩 默认选择不压缩。
    空字符串作为null 默认选择否。
  6. 配置字段的映射关系。
    选择字段的映射关系。需对字段映射关系进行配置,左侧源头表字段和右侧目标表字段为一一对应的关系。
    说明 投递日志字段时,__time__ 映射为C_LogTime,__source__ 映射为C_Source。Tag字段使用冒号后面的字段进行映射。
    字段映射
  7. 配置通道控制。
    配置作业速率上限和脏数据检查规则。通道控制
    配置 说明
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    任务资源组 任务运行的机器,如果任务数比较多,使用默认资源组出现等待资源的情况,建议购买独享数据集成资源或添加自定义资源组,详情请参见DataWorks独享资源新增任务资源
  8. 运行任务。
    您可通过以下两种方式运行任务。
    • 直接运行(一次性运行)

      单击任务上方的运行按钮,将直接在数据集成页面运行任务,运行之前需要配置自定义参数的具体数值。

      如上图所示,代表同步10:10到17:30这段时间的LogHub记录到MaxCompute。

    • 调度运行

      单击提交按钮,将同步任务提交到调度系统中。

通过脚本模式配置同步任务

成功创建离线同步节点后,单击工具栏中的转换脚本进入脚本模式。转换脚本
请根据您的实际情况进行参数配置,示例脚本如下。
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "loghub",
"parameter": {
"datasource": "loghub_lzz",//数据源名,保持跟您添加的数据源名一致。
"logstore": "logstore-ut2",//目标日志库的名字。
"beginDateTime": "${startTime}",//数据消费的开始时间位点,为时间范围(左闭右开)的左边界。
"endDateTime": "${endTime}",//数据消费的结束时间位点,为时间范围(左闭右开)的右边界。
"batchSize": 256,//一次读取的数据条数,默认为256。
"splitPk": "",
"column": [
"key1",
"key2",
"key3"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",//数据源名,保持跟您添加的数据源名一致。
"table": "ok",//目标表名。
"truncate": true,
"partition": "",//分区信息。
"column": [//目标列名。
"key1",
"key2",
"key3"
]
}
},
"setting": {
"speed": {
"mbps": 8,/作业速率上限。
"concurrent": 7//同步并发数设置。
}
}
}
}