本文为您介绍LogHub Reader支持的数据类型、字段映射和数据源等参数及配置示例。

背景信息

日志服务(Log Service)是针对实时数据的一站式服务,为您提供日志类数据采集、消费、投递及查询分析功能,全面提升海量日志的处理、分析能力。LogHub Reader是使用日志服务的Java SDK消费LogHub中的实时日志数据,并转换日志数据为数据集成传输协议传递给Writer。

实现原理

LogHub Reader通过日志服务Java SDK消费LogHub中的实时日志数据,具体使用的日志服务Java SDK版本,如下所示。
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.7</version>
</dependency>
日志库(Logstore)是日志服务中日志数据的采集、存储和查询单元,Logstore读写日志必定保存在某一个分区(Shard)上。每个日志库分为若干个分区,每个分区由MD5左闭右开区间组成,每个区间范围不会相互覆盖,并且所有的区间的范围是MD5整个取值范围,每个分区可以提供一定的服务能力:
  • 写入:5 MB/s,2000 次/s。
  • 读取:10 MB/s,100 次/s。
LogHub Reader消费Shard中的日志,具体消费过程(GetCursor、BatchGetLog相关API)如下所示:
  • 根据时间区间范围获得游标。
  • 通过游标、步长参数读取日志,同时返回下一个位置游标。
  • 不断移动游标进行日志消费。
  • 根据Shard进行任务的切分并发执行。

类型转换列表

LogHub Reader针对LogHub类型的转换列表,如下所示。
数据集成内部类型 LogHub数据类型
STRING STRING

参数说明

参数 描述 是否必选 默认值
endPoint 日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见服务入口
accessId 访问日志服务的访问密钥,用于标识用户。
accessKey 访问日志服务的访问密钥,用来验证用户的密钥。
project 目标日志服务的项目名称,是日志服务中的资源管理单元,用于隔离和控制资源。
logstore 目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。
batchSize 一次从日志服务查询的数据条数。 128
column 每条数据中的列名,此处可以配置日志服务中的元数据作为同步列。日志服务支持日志主题、采集机器唯一标识、主机名、路径和日志时间等元数据。
说明 列名区分大小写。元数据的写法请参见日志服务机器组
beginDateTime 数据消费的开始时间位点,即日志数据到达Loghub的时间。该参数为时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。
例如,您在节点编辑页面右侧的调度参数中配置beginDateTime=${yyyymmdd-1},则在日志开始时间处配置为${beginDateTime}000000,表示获取的日志开始时间为业务日期的0点0分0秒。
说明 beginDateTimeendDateTime需要互相组合配套使用。
endDateTime 数据消费的结束时间位点,为时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。
例如,您在节点编辑页面右侧的调度参数中配置endDateTime=${yyyymmdd},则在日志结束时间处配置为${endDateTime}000000,表示获取的日志结束时间为业务日期后一天的0点0分0秒。
说明 上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚于下一周期的beginDateTime。否则,可能无法拉取部分区域的数据。

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。
    Logstore 目标日志库的名称。
    日志开始时间 数据消费的开始时间位点,即日志数据到达Loghub的时间。时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。
    日志结束时间 数据消费的结束时间位点,时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。
    批量条数 一次从日志服务查询的数据条数。
  2. 字段映射,即上述参数说明中的column。 beginTimestampMillis
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除。删除
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

脚本配置样例如下所示,具体参数填写请参见参数说明。
{
 "type":"job",
 "version":"2.0",//版本号。
 "steps":[
     {
         "stepType":"loghub",//插件名。
         "parameter":{
             "datasource":"",//数据源。
             "column":[//字段。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", //日志主题。
                 "C_HostName", //主机名。
                 "C_Path", //路径。
                 "C_LogTime" //事件时间。
             ],
             "beginDateTime":"",//数据消费的开始时间位点。
             "batchSize":"",//一次从日志服务查询的数据条数。
             "endDateTime":"",//数据消费的结束时间位点。
             "fieldDelimiter":",",//列分隔符。
             "logstore":""//:目标日志库的名字。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"//错误记录数。
     },
     "speed":{
         "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}
说明 如果元数据配置JSON中有tag前缀,需要删除tag前缀。例如,__tag__:__client_ip__需要修改为__client_ip__