LogHub(SLS)数据源为您提供读取和写入LogHub(SLS)双向通道的功能,本文为您介绍DataWorks的LogHub(SLS)数据同步的能力支持情况。
使用限制
数据集成离线写LogHub(SLS)时,由于LogHub(SLS)无法实现幂等,Failover重跑任务时会引起数据重复。
支持的字段类型
数据集成支持读写的LogHub(SLS)字段类型如下。
| 字段类型 | 离线读(LogHub(SLS) Reader) | 离线写(LogHub(SLS) Writer) | 实时读 | 
| STRING | 支持 | 支持 | 支持 | 
其中:
- 离线写LogHub(SLS)时 - 会将支持同步的各类型数据均转换成STRING类型后写入LogHub(SLS)。LogHub(SLS) Writer针对LogHub(SLS)类型的转换列表,如下所示。 - 支持的数据集成内部类型 - 写入LogHub(SLS)时的数据类型 - LONG - STRING - DOUBLE - STRING - STRING - STRING - DATE - STRING - BOOLEAN - STRING - BYTES - STRING 
- 实时读LogHub(SLS)时 - 会自带以下元数据字段。 - LogHub(SLS)实时同步字段 - 数据类型 - 说明 - __time__ - STRING - SLS保留字段:__time__写入日志数据时指定的日志时间,unix时间戳,单位为秒。 - __source__ - STRING - SLS保留字段:__source__日志来源设备。 - __topic__ - STRING - SLS保留字段:__topic__topic名称。 - __tag__:__receive_time__ - STRING - 日志到达服务端的时间。开启记录外网IP功能后,服务端接收日志时为原始日志追加该字段。unix时间戳,单位为秒。 - __tag__:__client_ip__ - STRING - 日志来源设备的公网IP。开启记录外网IP功能后,服务端接收日志时为原始日志追加该字段。 - __tag__:__path__ - STRING - Logtail采集的日志文件路径,Logtail会自动为日志追加该字段。 - __tag__:__hostname__ - STRING - Logtail采集数据的来源机器主机名,Logtail会自动为日志追加该字段。 
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见数据源管理,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
LogHub数据源作为数据来源端,在进行任务配置同步时支持通过LogHub的查询语法、SPL语句(SLS Processing Language是SLS处理日志的语法)对LogHub内的数据进行过滤,具体语法说明请参见附录二:LogHub SPL语法过滤说明。
单表离线同步任务配置指导
- 说明通过向导模式配置同步任务时,参数格式需与附录一:脚本Demo与参数说明的参数配置格式保持一致。 
- 脚本模式配置的全量参数和脚本Demo请参见下文的附录一:脚本Demo与参数说明。 
单表实时同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置、数据集成侧实时同步任务配置。
整库实时等整库级别同步配置指导
操作流程请参见整库实时同步任务配置。
常见问题
更多其他数据集成常见问题请参见数据集成常见问题。
附录一:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
{
 "type":"job",
 "version":"2.0",//版本号。
 "steps":[
     {
         "stepType":"LogHub",//插件名。
         "parameter":{
             "datasource":"",//数据源。
             "column":[//字段。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", //日志主题。
                 "C_HostName", //主机名。
                 "C_Path", //路径。
                 "C_LogTime" //事件时间。
             ],
             "beginDateTime":"",//数据消费的开始时间位点。
             "batchSize":"",//一次从日志服务查询的数据条数。
             "endDateTime":"",//数据消费的结束时间位点。
             "fieldDelimiter":",",//列分隔符。
             "logstore":""//:目标日志库的名字。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"//错误记录数。
     },
     "speed":{
         "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12",//限流,此处1mbps = 1MB/s。
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}Reader脚本参数
| 参数 | 描述 | 是否必选 | 默认值 | 
| endPoint | 日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见服务入口。 | 是 | 无 | 
| accessId | 访问日志服务的访问密钥,用于标识用户。 | 是 | 无 | 
| accessKey | 访问日志服务的访问密钥,用来验证用户的密钥。 | 是 | 无 | 
| project | 目标日志服务的项目名称,是日志服务中的资源管理单元,用于隔离和控制资源。 | 是 | 无 | 
| logstore | 目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。 | 是 | 无 | 
| batchSize | 一次从日志服务查询的数据条数。 | 否 | 128 | 
| column | 每条数据中的列名,此处可以配置日志服务中的元数据作为同步列。日志服务支持日志主题、采集机器唯一标识、主机名、路径和日志时间等元数据。 说明  列名区分大小写。元数据的写法请参见日志服务机器组。 | 是 | 无 | 
| beginDateTime | 数据消费的开始时间位点,即日志数据到达LogHub(SLS)的时间。该参数为时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。 例如,您在节点编辑页面右侧的调度配置,在参数中配置 说明  
 | 是 | 无 | 
| endDateTime | 数据消费的结束时间位点,为时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。 例如,您在节点编辑页面右侧的调度配置,在参数中配置endDateTime=${yyyymmdd},则在日志结束时间处配置为${endDateTime}000000,表示获取的日志结束时间为业务日期后一天的0点0分0秒。详情请参见调度参数支持的格式。 重要  
 | 是 | 无 | 
| query | 通过LogHub的查询语法或SPL语句(SLS Processing Language是SLS处理日志的语法)对LogHub内的数据进行过滤。 | 是 | 无 | 
若读取LogHub同步缺少数据,请在LogHub控制台检查数据元数据字段receive_time是否在任务配置的时间区间内。
Writer脚本Demo
{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "LogHub",//插件名。
            "parameter": {
                "datasource": "",//数据源。
                "column": [//字段。
                    "col0",
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5"
                ],
                "topic": "",//选取topic。
                "batchSize": "1024",//一次性批量提交的记录数大小。
                "logstore": ""//目标LogService LogStore的名称。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//错误记录数。
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":3, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}Writer脚本参数
LogHub(SLS) Writer通过数据集成框架获取Reader生成的数据,然后将数据集成支持的类型通过逐一判断转换成STRING类型。当达到您指定的batchSize时,会使用LogService Java SDK一次性推送至LogHub(SLS)。
| 参数 | 描述 | 是否必选 | 默认值 | 
| endpoint | 日志服务入口endPoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见:服务入口。 | 是 | 无 | 
| accessKeyId | 访问日志服务的AccessKeyId。 | 是 | 无 | 
| accessKeySecret | 访问日志服务的AccessKeySecret。 | 是 | 无 | 
| project | 目标日志服务的项目名称。 | 是 | 无 | 
| logstore | 目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。 | 是 | 无 | 
| topic | 目标日志服务的topic名称。 | 否 | 空字符串 | 
| batchSize | LogHub(SLS)一次同步的数据条数,默认1,024条,最大值为4,096。 说明  一次性同步至LogHub(SLS)的数据大小不要超过5M,请根据您的单条数据量大小调整一次性推送的条数。 | 否 | 1,024 | 
| column | 每条数据中的column名称。 | 是 | 无 | 
附录二:LogHub SPL语法过滤说明
LogHub数据源作为数据来源端,在进行任务配置同步时支持通过LogHub的查询语法、SPL语句(SLS Processing Language是SLS处理日志的语法)对LogHub内的数据进行过滤,具体语法说明如下:
SPL的更多详细信息,请参见SPL语法。
| 场景 | SQL语句 | SPL语句 | 
| 数据过滤 |  | 
 | 
| 字段处理与筛选 | 精确选择字段,并将其重命名:  | 
 | 
| 数据规整 (调用SQL函数) | 转换数据类型、时间解析等:  | 转换数据类型、时间解析等:  | 
| 字段提取 | 正则提取: JSON提取:  | 
 |