本文档向您介绍如何通过DataWorks采集MaxCompute数据。

前提条件

背景信息

DataWorks是阿里云提供的数据中转服务,日志服务采集到的日志数据可以通过DataWorks投递至MaxCompute存储与分析。MaxCompute主要用于离线计算,如果您需要OLAP实时分析,您可以将投递到MaxCompute的日志数据及后续的计算结果通过DataWorks导出到日志服务,并对导出的数据进行实时查询与分析。

LogHub Writer通过DataWorks框架获取Reader生成的数据,然后将DataWorks支持的类型通过逐一判断并转换成String类型。当数据量达到用户指定的batchSize时,会使用日志服务Java SDK一次性推送至日志服务。 默认情况下,一次推送1024条数据。batchSize的值最大为4096。

操作步骤

  1. 登录DataWorks管理控制台,创建Loghub数据源。
    创建步骤请参见步骤1 新增数据源
  2. 创建脚本模式的同步任务。
    1. 在当前工作空间选择数据集成 > 数据同步,新建数据同步节点。
      图 1. 数据同步
      数据同步
    2. 在数据同步节点中单击转换脚本图标,进入脚本模式。
    3. 在脚本模式中单击导入模板图标,填写导入模板。
      图 2. 导入模板
      导入模板
      配置项 说明
      来源类型 您的数据源的类型,此处请选择ODPS
      数据源 您的来源数据源名称。您也可以单击新增数据源,新建一个数据源。
      目标类型 您的投递目标类型,此处请选择LogHub
      数据源 您的目标数据源名称。请选择以上步骤中创建的LogHub数据源,也可以单击新增数据源,新建一个目标数据源。
    4. 填写完毕后单击确认,开始配置同步任务。
    5. 在脚本编辑区域输入您的配置。
      示例如下:
      {
        "type": "job",
        "version": "1.0",
        "configuration": {
          "setting": {
            "errorLimit": {
              "record": "0"
            },
            "speed": {
              "mbps": "1",
              "concurrent": 1,
              "dmu": 1,
              "throttle": false
            }
          },
          "reader": {
            "plugin": "odps",
            "parameter": {
                     "accessKey":"*****",
                 "accessId":"*****",
                 "column":["*"],
                 "isCompress":"false",
                 "partition":["pt=20161226"],
                 "project":"aliyun_account",
                 "table":"ak_biz_log_detail"
            }
          },
          "writer": {
            "plugin": "loghub",
            "parameter": {
              "endpoint": "",
              "accessId": "",
              "accessKey": "",
              "project": "",
              "logstore": "",
              "batchSize": "1024",
              "topic": "",
              "time" :"time_str",
              "timeFormat":"%Y_%m_%d %H:%i:%S",
              "column": [
                "col0",
                "col1",
                "col2",
                "col3",
                "col4",
                "col5"
              ],
              "datasource": "sls"
            }
          }
        }
      }
      参数 是否必选 说明
      endpoint 必选 日志服务的Endpoint,请参见服务入口
      accessKeyId 必选 主账号或子账号的AccessKeyId。如果是子账号,则子账号必须具有当前项目的读写权限,具体请参见授权RAM用户
      accessKeySecret 必选 主账号或子账号的AccessKeySecret。
      project 必选 目标日志服务Project的名称。
      logstore 必选 目标日志服务Logstore的名称。
      topic 可选 指定MaxCompute的某个字段为日志服务中的topic字段,默认值为空字符串。
      batchSize 可选 一次推送发送的数据条数,默认为1024。
      column 必选 每条数据中column的名字。
      说明column的个数与数据源不匹配时,会认为是脏数据。
      time 可选 time字段的名称。
      说明 如果没有time字段,默认采用系统时间作为日志时间。
      timeFormat 如果有time配置,则timeFormat必选。 time字段的格式,可以配置为:
      • bigint格式,表示unix时间戳。
      • 时间戳格式,即从字符串类型中提取时间。例如%Y_%m_%d %H:%M:%S

      如果time字段是bigint类型的1529382552,那么timeFormatbigint。如果time字段是字符串类型的2018_06_19 12:30:25,那么timeFormat%Y_%m_%d %H:%M:%S

      datasource 必选 DataWorks中定义的数据类型。
  3. 保存并执行任务。
    单击保存图标。您可以直接执行同步任务,或者提交到调度系统。
    图 3. 运行同步任务
    运行同步任务
    • 直接运行。

      单击运行图标,直接开始一次数据同步任务。

    • 调度运行。
      单击提交图标,将同步任务提交到调度系统,调度系统会按照配置属性自动定时执行。
      说明 推荐根据分区(Partition)生成周期进行调度。 例如一个小时的数据生成一个分区,则推荐调度周期为一小时。

      关于调度运行的更多信息请参见配置调度

数据类型

MaxCompute数据通过DataWorks成功导入到日志服务后,所有数据类型都会被转换为String类型。如下表所示。

MaxCompute数据类型 导入LogHub后的数据类型
Long String
Double String
String String
Data String
Boolean String
Bytes String