本文为您介绍LogHub(SLS) Reader支持的数据类型、字段映射和数据源等参数及配置示例。

背景信息

日志服务(Log Service)是针对实时数据的一站式服务,为您提供日志类数据采集、消费、投递及查询分析功能,全面提升海量日志的处理、分析能力。LogHub(SLS) Reader是使用日志服务的Java SDK消费LogHub(SLS)中的实时日志数据,并转换日志数据为数据集成传输协议传递给Writer。

实现原理

LogHub(SLS) Reader通过日志服务Java SDK消费LogHub(SLS)中的实时日志数据,具体使用的日志服务Java SDK版本,如下所示。
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.7</version>
</dependency>
日志库(Logstore)是日志服务中日志数据的采集、存储和查询单元,Logstore读写日志必定保存在某一个分区(Shard)上。每个日志库分为若干个分区,每个分区由MD5左闭右开区间组成,每个区间范围不会相互覆盖,并且所有的区间的范围是MD5整个取值范围,每个分区可以提供一定的服务能力:
  • 写入:5 MB/s,2000 次/s。
  • 读取:10 MB/s,100 次/s。
LogHub(SLS) Reader消费Shard中的日志,具体消费过程(GetCursor、BatchGetLog相关API)如下所示:
  • 根据时间区间范围获得游标。
  • 通过游标、步长参数读取日志,同时返回下一个位置游标。
  • 不断移动游标进行日志消费。
  • 根据Shard进行任务的切分并发执行。

类型转换列表

LogHub(SLS) Reader针对LogHub(SLS)类型的转换列表,如下所示。
数据集成内部类型LogHub(SLS)数据类型
STRINGSTRING

参数说明

参数描述是否必选默认值
endPoint日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见服务入口
accessId访问日志服务的访问密钥,用于标识用户。
accessKey访问日志服务的访问密钥,用来验证用户的密钥。
project目标日志服务的项目名称,是日志服务中的资源管理单元,用于隔离和控制资源。
logstore目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。
batchSize一次从日志服务查询的数据条数。128
column每条数据中的列名,此处可以配置日志服务中的元数据作为同步列。日志服务支持日志主题、采集机器唯一标识、主机名、路径和日志时间等元数据。
说明 列名区分大小写。元数据的写法请参见日志服务机器组
beginDateTime数据消费的开始时间位点,即日志数据到达LogHub(SLS)的时间。该参数为时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。
例如,您在节点编辑页面右侧的调度配置,在参数中配置beginDateTime=${yyyymmdd-1},则在日志开始时间处配置为${beginDateTime}000000,表示获取的日志开始时间为业务日期的0点0分0秒。详情请参见调度参数支持的格式
说明 beginDateTimeendDateTime需要互相组合配套使用。
endDateTime数据消费的结束时间位点,为时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。
例如,您在节点编辑页面右侧的调度配置,在参数中配置endDateTime=${yyyymmdd},则在日志结束时间处配置为${endDateTime}000000,表示获取的日志结束时间为业务日期后一天的0点0分0秒。详情请参见调度参数支持的格式
说明 上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚于下一周期的beginDateTime。否则,可能无法拉取部分区域的数据。

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数描述
    数据源即上述参数说明中的datasource,通常输入您配置的数据源名称。
    Logstore目标日志库的名称。
    日志开始时间数据消费的开始时间位点,即日志数据到达LogHub(SLS)的时间。时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度参数配合使用实现每次同步增量数据。详情请参见:调度参数支持的格式
    日志结束时间数据消费的结束时间位点,时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度参数配合使用。
    批量条数一次从日志服务查询的数据条数。

    数据集成离线同步任务中,可以使用调度参数来指定同步源表的数据范围,当同步任务运行时,任务中配置的参数都会被替换为调度参数表达式所表达的实际值,然后再执行数据同步。Loghub增量同步配置示例请参见同步增量数据,离线同步增量同步实现方式请参见数据集成使用调度参数的相关说明

  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除。删除
    参数描述
    同名映射单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射单击取消映射,可以取消建立的映射关系。
    自动排版可以根据相应的规律自动排版。
    手动编辑源表字段请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
  3. 通道控制。通道配置
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务

脚本配置样例如下所示,具体参数填写请参见参数说明。
{
 "type":"job",
 "version":"2.0",//版本号。
 "steps":[
     {
         "stepType":"LogHub",//插件名。
         "parameter":{
             "datasource":"",//数据源。
             "column":[//字段。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", //日志主题。
                 "C_HostName", //主机名。
                 "C_Path", //路径。
                 "C_LogTime" //事件时间。
             ],
             "beginDateTime":"",//数据消费的开始时间位点。
             "batchSize":"",//一次从日志服务查询的数据条数。
             "endDateTime":"",//数据消费的结束时间位点。
             "fieldDelimiter":",",//列分隔符。
             "logstore":""//:目标日志库的名字。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"//错误记录数。
     },
     "speed":{
         "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}
说明 如果元数据配置JSON中有tag前缀,需要删除tag前缀。例如,__tag__:__client_ip__需要修改为__client_ip__

常见问题