Flink SQL基于SPL实现弱结构化分析

更新时间:2025-02-25 09:12:11

本文介绍Flink SQL基于SPL实现弱结构化分析的操作步骤。

背景

日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入SLS进行存储、分析。阿里云Flink是阿里云基于Apache Flink构建的大数据分析平台,在实时数据分析、风控监测等场景应用广泛。阿里云Flink原生支持阿里云日志服务SLSConnector,用户可以在阿里云Flink平台将SLS作为源表或者结果表使用。

阿里云Flink SLS Connector对于结构化的日志非常直接,通过配置,SLS的日志字段可以与Flink SQLTable字段列一一映射。然而仍有大量的业务日志并非完全的结构化,例如会将所有日志内容写入一个字段中,需要用正则表达式、分隔符拆分等手段才可以提取出结构化的字段,基于这个场景,本文介绍一种使用SLS SPL配置SLS Connector完成数据结构化的方案,覆盖日志清洗与格式规整场景。

弱结构化日志数据

下面是一条日志示例,日志格式较为复杂,既有JSON字符串,又有字符串与JSON混合的场景。其中:

  • PayloadJSON字符串,其中schedule字段的内容也是一段JSON结构。

  • requestURL为一段标准的URL Path路径。

  • error字段的前半部分包含CouldNotExecuteQuery字符串,后半部分是一段JSON结构。

  • __tag__:__path__包含日志文件的路径,其中service_a可能是业务名称。

  • caller中包含文件名与文件行数。

{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.XXX.XXX",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

结构化数据处理需求

对于这样的日志提取出更有价值的信息需要进行数据清洗,首先需要提取重要的字段,然后对这些字段进行数据分析。本文关注重要字段的提取,分析仍然可以在Flink中进行。假设提取字段具体需求如下:

  • 提取error中的httpCodeerrorCodeerrorMessagerequestID

  • 提取 __tag__:__path_中的_service_a作为serviceName

  • 提取caller中的pool.go作为fileName,64作为fileNo

  • 提取Payload中的project,提取Payload下面的schedule中的typescheduleType

  • 重命名 __source__ serviceIP

其余字段舍弃,最终需要的字段列表如下。

image

解决方案

实现这样的数据清洗,有很多种方法,这里列举几种基于SLSFlink的方案,不同方案之间没有绝对的优劣,需要根据不同的场景选择不同的方案。

  • 数据加工方案:在SLS控制台创建目标Logstore,通过创建数据加工任务,完成对数据的清洗。

  • Flink方案:将errorpayload指定为源表字段,通过SQL正则函数、JSON函数对字段进行解析,将解析后的字段写入临时表,然后对临时表进行分析。

  • SPL方案:在Flink SLS Connector中配置SPL语句,对数据进行清洗,Flink中源表字段定义为清洗后的数据结构。

从上述三种方案的原理不难看出,在需要数据清洗的场景中,在SLS Connector 中配置SPL,是一种更轻量化的方案。在日志数据弱结构化的场景中,SPL方案既避免了方案一中创建临时中间Logstore,也避免了方案二中在Flink中创建临时表,在离数据源更近的位置进行数据清洗,在计算平台关注业务逻辑,职责分离更加清晰。

Flink中使用SPL

1. SLS准备数据

  1. 开通日志服务,已创建ProjectLogstore

  2. 将上文日志片段通过SDK方式写入到目标Logstore,用于样例模拟数据。

    image

  3. Logstore,编写SLS SPL管道式语法,预览SPL效果。

    image

    查询分析语句如下,SLS SPL管道式语法使用|分隔符分割不同的指令,每次输入一个指令可以即时查看结果,然后再增加管道数,渐进式、探索式获取最终结果。更多信息,请参考扫描(Scan)查询语法

    * | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
     | parse-json Payload 
     | project-away Payload 
     | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
     | parse-json errorJson 
     | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
     | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
     | project-rename serviceHost="__tag__:__hostname__" 
     | extend scheduleType = json_extract_scalar(schedule, '$.type') 
     | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project

    语法解释如下:

    • 1行:project指令:从原始结果中保留Payload、error、tag:path、caller字段,舍弃其他字段,这些字段用于后续解析。

    • 2行:parse-json指令:将Payload字符串展开为JSON,第一层字段出现在结果中,包括lastNotified、serviceUri、jobID等。

    • 3行:project-away指令:去除原始Payload字段。

    • 4行:parse-regexp指令:按照error字段中的内容,解析其中的部分JSON内容,置于errorJson字段。

    • 5行:parse-json指令:展开errorJson字段,得到httpCode、errorCode、errorMessage等字段。

    • 6行:parse-regexp指令:通过正则表达式解析出__tag__:__path__中的文件名,并命名为serviceName。

    • 7行:parse-regexp指令:通过正则表达式,解析出caller中的文件名与行数,并置于fileName、fileNo字段。

    • 8行:project-rename指令:将__tag__:__hostname__字段重命名为serviceHost。

    • 9行:extend指令:使用json_extract_scalar函数,提取schedule中的type字段,并命名为scheduleType。

    • 10行:project指令:保留需要的字段列表,其中project字段来自于Payload。

2. 创建SQL作业

  1. 登录实时计算控制台,单击目标工作空间。

  2. 在左侧导航栏,选择数据开发 > ETL

  3. 单击新建,在新建作业草稿对话框,选择SQL基础模板 > 空白的流作业草稿,单击下一步

    image

  4. 在作业草稿中输入如下创建临时表的语句。

    CREATE TEMPORARY TABLE sls_input_complex (
      errorCode STRING,
      errorMessage STRING,
      fileName STRING,
      fileNo STRING,
      httpCode STRING,
      requestID STRING,
      scheduleType STRING,
      serviceHost STRING,
      project STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
      'accessId' = '${ak}',
      'accessKey' = '${sk}',
      'starttime' = '2024-02-01 10:30:00',
      'project' ='${project}',
      'logstore' ='${logtore}',
      'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
      );

    SQL中的参数说明如下,请根据实际情况替换。

    参数名

    参数含义

    示例值

    参数名

    参数含义

    示例值

    connector

    连接器。更多信息,请参见支持的连接器

    sls

    endpoint

    日志服务的私网域名,获取方式请参见服务接入点

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    用户身份识别ID,获取方式,请参见创建AccessKey

    LTAI5tK*******

    accessKey

    用于验证您拥有该AccessKey ID的密码。获取方式,请参见创建AccessKey

    3k5PJm*******

    starttime

    指定查询日志的起始时间点。

    2025-02-19 00:00:00

    project

    日志服务的Project名。

    test-project

    logstore

    日志服务的Logstore名。

    clb-access-log

    query

    填写SLSSPL语句,注意在阿里云FlinkSQL作业开发中,字符串需要使用英文单引号进行转义。

    * | where slbid = ''slb-01''

  5. 鼠标选中SQL,鼠标右击,单击运行,连接SLS。

    image

3. 连续查询及效果

  1. 在作业中输入如下分析语句,按照slbid进行聚合查询。

    SELECT * FROM sls_input_complex;
  2. 单击右上角调试按钮,在调试弹框,单击选择调试集群下拉框中的创建新的集群,参考下图,创建新的调试集群。

    image

  3. 在调试弹框选择创建好的调试集群,然后单击确定

    image

  4. 在结果区域,可以看到TABLE中每一列的值,对应SPL处理后的结果。SPL最终得到的字段列表与TABLE中字段对应。

    image

  • 本页导读 (1)
  • 背景​
  • 弱结构化日志数据
  • 结构化数据处理需求​
  • 解决方案​
  • 在Flink中使用SPL
  • 1. SLS准备数据​
  • 2. 创建SQL作业​
  • 3. 连续查询及效果​