LogHub(SLS)数据源

LogHub(SLS)数据源为您提供读取和写入LogHub(SLS)双向通道的功能,本文为您介绍DataWorksLogHub(SLS)数据同步的能力支持情况。

使用限制

数据集成离线写LogHub(SLS)时,由于LogHub(SLS)无法实现幂等,FailOver重跑任务时会引起数据重复。

支持的字段类型

数据集成支持读写的LogHub(SLS)字段类型如下。

字段类型

离线读(LogHub(SLS) Reader)

离线写(LogHub(SLS) Writer)

实时读

STRING

支持

支持

支持

其中:

  • 离线写LogHub(SLS)时

    会将支持同步的各类型数据均转换成STRING类型后写入LogHub(SLS)。LogHub(SLS) Writer针对LogHub(SLS)类型的转换列表,如下所示。

    支持的数据集成内部类型

    写入LogHub(SLS)时的数据类型

    LONG

    STRING

    DOUBLE

    STRING

    STRING

    STRING

    DATE

    STRING

    BOOLEAN

    STRING

    BYTES

    STRING

  • 实时读LogHub(SLS)时

    会自带以下元数据字段。

    LogHub(SLS)实时同步字段

    数据类型

    说明

    __time__

    STRING

    SLS保留字段:__time__写入日志数据时指定的日志时间,unix时间戳,单位为秒。

    __source__

    STRING

    SLS保留字段:__source__日志来源设备。

    __topic__

    STRING

    SLS保留字段:__topic__topic名称。

    __tag__:__receive_time__

    STRING

    日志到达服务端的时间。开启记录外网IP功能后,服务端接收日志时为原始日志追加该字段。unix时间戳,单位为秒。

    __tag__:__client_ip__

    STRING

    日志来源设备的公网IP。开启记录外网IP功能后,服务端接收日志时为原始日志追加该字段。

    __tag__:__path__

    STRING

    Logtail采集的日志文件路径,Logtail会自动为日志追加该字段。

    __tag__:__hostname__

    STRING

    Logtail采集数据的来源机器主机名,Logtail会自动为日志追加该字段。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

说明

LogHub数据源作为数据来源端,在进行任务配置同步时支持通过LogHub的查询语法、SPL语句(SLS Processing LanguageSLS处理日志的语法)对LogHub内的数据进行过滤,具体语法说明请参见附录二:LogHub SPL语法过滤说明

单表离线同步任务配置指导

单表实时同步任务配置指导

操作流程请参见配置单表增量数据实时同步DataStudio侧实时同步任务配置

整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导

操作流程请参见数据集成侧同步任务配置

常见问题

更多其他数据集成常见问题请参见数据集成常见问题

附录一:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
 "type":"job",
 "version":"2.0",//版本号。
 "steps":[
     {
         "stepType":"LogHub",//插件名。
         "parameter":{
             "datasource":"",//数据源。
             "column":[//字段。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", //日志主题。
                 "C_HostName", //主机名。
                 "C_Path", //路径。
                 "C_LogTime" //事件时间。
             ],
             "beginDateTime":"",//数据消费的开始时间位点。
             "batchSize":"",//一次从日志服务查询的数据条数。
             "endDateTime":"",//数据消费的结束时间位点。
             "fieldDelimiter":",",//列分隔符。
             "logstore":""//:目标日志库的名字。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"//错误记录数。
     },
     "speed":{
         "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流,此处1mbps = 1MB/s。
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}

Reader脚本参数

参数

描述

是否必选

默认值

endPoint

日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见服务入口

accessId

访问日志服务的访问密钥,用于标识用户。

accessKey

访问日志服务的访问密钥,用来验证用户的密钥。

project

目标日志服务的项目名称,是日志服务中的资源管理单元,用于隔离和控制资源。

logstore

目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。

batchSize

一次从日志服务查询的数据条数。

128

column

每条数据中的列名,此处可以配置日志服务中的元数据作为同步列。日志服务支持日志主题、采集机器唯一标识、主机名、路径和日志时间等元数据。

说明

列名区分大小写。元数据的写法请参见日志服务机器组

beginDateTime

数据消费的开始时间位点,即日志数据到达LogHub(SLS)的时间。该参数为时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。

例如,您在节点编辑页面右侧的调度配置,在参数中配置beginDateTime=${yyyymmdd-1},则在日志开始时间处配置为${beginDateTime}000000,表示获取的日志开始时间为业务日期的000秒。详情请参见调度参数支持的格式

说明

beginDateTimeendDateTime需要互相组合配套使用。

endDateTime

数据消费的结束时间位点,为时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。

例如,您在节点编辑页面右侧的调度配置,在参数中配置endDateTime=${yyyymmdd},则在日志结束时间处配置为${endDateTime}000000,表示获取的日志结束时间为业务日期后一天的000秒。详情请参见调度参数支持的格式

说明

上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚于下一周期的beginDateTime。否则,可能无法拉取部分区域的数据。

Writer脚本Demo

{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "LogHub",//插件名。
            "parameter": {
                "datasource": "",//数据源。
                "column": [//字段。
                    "col0",
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5"
                ],
                "topic": "",//选取topic。
                "batchSize": "1024",//一次性批量提交的记录数大小。
                "logstore": ""//目标LogService LogStore的名称。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//错误记录数。
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":3, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer脚本参数

说明

LogHub(SLS) Writer通过数据集成框架获取Reader生成的数据,然后将数据集成支持的类型通过逐一判断转换成STRING类型。当达到您指定的batchSize时,会使用LogService Java SDK一次性推送至LogHub(SLS)。

参数

描述

是否必选

默认值

endpoint

日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见:服务入口

accessKeyId

访问日志服务的AccessKeyId

accessKeySecret

访问日志服务的AccessKeySecret

project

目标日志服务的项目名称。

logstore

目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。

topic

目标日志服务的topic名称。

空字符串

batchSize

LogHub(SLS)一次同步的数据条数,默认1,024条,最大值为4,096

说明

一次性同步至LogHub(SLS)的数据大小不要超过5M,请根据您的单条数据量大小调整一次性推送的条数。

1,024

column

每条数据中的column名称。

附录二:LogHub SPL语法过滤说明

LogHub数据源作为数据来源端,在进行任务配置同步时支持通过LogHub的查询语法、SPL语句(SLS Processing LanguageSLS处理日志的语法)对LogHub内的数据进行过滤,具体语法说明如下:

说明

SPL的更多详细信息,请参见SPL概述

场景

SQL语句

SPL语句

数据过滤

SELECT * WHERE Type='write'
| where Type='write'

字段处理与筛选

精确选择字段,并将其重命名:

SELECT "__tag__:node" AS node, path
  • 精确选择字段,并重命名。

    | project node="__tag__:node", path
  • 按模式选择字段。

    | project -wildcard "__tag__:*"
  • 重命名部分字段,不影响其他字段。

    | project-rename node="__tag__:node"
  • 按模式排除字段。

    | project-away -wildcard "__tag__:*"

数据规整

(调用SQL函数)

转换数据类型、时间解析等:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

转换数据类型、时间解析等:

| extend Status=cast(Status as BIGINT), extend Time=date_parse(Time, '%Y-%m-%d %H:%i')

字段提取

正则提取:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

JSON提取:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time
  • 正则提取:一次性匹配。

    | parse-regexp protocol, '(\w+)/(\d+)' as scheme, version
  • JSON提取:全部展开。

    | parse-json -path='$.0' content
  • CSV提取。

    | parse-csv -delim='^_^' content as ip, time, host