全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
流计算

创建日志服务源表

更新时间:2018-04-26 16:09:34

什么是日志服务

日志服务(Log Service,简称LOG/原SLS)是针对实时数据一站式服务,在阿里集团经历大量大数据场景锤炼而成。无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,帮助提升运维、运营效率,建立DT时代海量日志处理能力.

日志服务本身是流数据存储,流计算能将其作为流式数据输入。对于日志服务而言,每条数据类似一个JSON格式,举例如下:

  1. {
  2. "a": 1000,
  3. "b": 1234,
  4. "c": "li"
  5. }

那么对于流计算而言,我们需要定义的DDL如下(sls即Log Service):

  1. create table sls_stream(
  2. a int,
  3. b int,
  4. c varchar
  5. ) with (
  6. type ='sls',
  7. endPoint ='http://cXXXXXXXXyuncs.com',
  8. accessId ='0iXXXXXXXAs',
  9. accessKey ='yF60EXXXXXXXPJ2zhCfHU',
  10. startTime = '2017-07-05 00:00:00',
  11. project ='ali-XXXXX-streamtest',
  12. logStore ='strXXXtest',
  13. consumerGroup ='consXXXXroupTest1'
  14. );

属性字段

目前默认支持三个属性字段的获取,也支持其他自定义写入的字段:

字段名 注释说明
__source__ 消息源
__topic__ 消息主题
__timestamp__ 日志时间

例子

通过 HEADER 关键字获取属性字段的例子:

测试数据

  1. __topic__: ens_altar_flow
  2. result: {"MsgID":"ems0a","Version":"0.0.1"}

测试案例

  1. CREATE TABLE sls_log (
  2. __topic__ varchar HEADER,
  3. result varchar
  4. )
  5. WITH
  6. (
  7. type ='sls'
  8. );
  9. CREATE TABLE sls_out (
  10. name varchar,
  11. MsgID varchar,
  12. Version varchar
  13. )
  14. WITH
  15. (
  16. type ='RDS'
  17. );
  18. INSERT INTO sls_out
  19. SELECT
  20. __topic__,
  21. JSON_VALUE(result,'$.MsgID'),
  22. JSON_VALUE(result,'$.Version')
  23. FROM
  24. sls_log

测试结果

name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
ens_altar_flow ems0a 0.0.1

WITH参数

参数 注释说明 备注
endPoint 消费端点信息 日志服务的ENDPOINT地址
accessId sls读取的accessKey
accessKey sls读取的密钥
project 读取的sls项目
logStore project下的具体的logStore
consumerGroup 消费组名 用户可以自定义消费组名(没有固定的格式)
startTime 消费日志开始的时间点
heartBeatIntervalMills 可选,消费客户端心跳间隔时间 默认为10s
maxRetryTimes 读取最大尝试次数 可选,默认为5
batchGetSize 单次读取logGroup条数 可选,默认为10
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION,PAD,“SKIP” 字段数目不符合时跳过 ,“EXCEPTION”:字段数目不符合时抛出异常,“PAD”:按顺序填充,不存在的置为null
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

注意:

  • SLS暂不支持MAP类型的数据。
  • 字段顺序支持无序(建议字段顺序和表中定义一致)。
  • 输入数据源为Json形式时,注意定义分隔符,并且需要采用内置函数分析JSON_VALUE,否则就会解析失败.

报错如下:

  1. 2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
  • batchGetSize设置不能超过1000,否则会报错
  • batchGetSize指明的是logGroup获取的数量,如果单条logItem的大小和 batchGetSize都很大,很有可能会导致频繁的GC,这种情况下该参数应注意调小

类型映射

日志服务和流计算字段类型对应关系,强烈建议用户使用该对应关系进行DDL声明:

日志服务字段类型 流计算字段类型
STRING VARCHAR
本文导读目录