本文介绍Flink SQL基于SPL实现弱结构化分析的操作步骤。
背景
日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入SLS进行存储、分析。阿里云Flink是阿里云基于Apache Flink构建的大数据分析平台,在实时数据分析、风控监测等场景应用广泛。阿里云Flink原生支持阿里云日志服务SLS的Connector,用户可以在阿里云Flink平台将SLS作为源表或者结果表使用。
阿里云Flink SLS Connector对于结构化的日志非常直接,通过配置,SLS的日志字段可以与Flink SQL的Table字段列一一映射。然而仍有大量的业务日志并非完全的结构化,例如会将所有日志内容写入一个字段中,需要用正则表达式、分隔符拆分等手段才可以提取出结构化的字段,基于这个场景,本文介绍一种使用SLS SPL配置SLS Connector完成数据结构化的方案,覆盖日志清洗与格式规整场景。
弱结构化日志数据
下面是一条日志示例,日志格式较为复杂,既有JSON字符串,又有字符串与JSON混合的场景。其中:
Payload为JSON字符串,其中schedule字段的内容也是一段JSON结构。
requestURL为一段标准的URL Path路径。
error字段的前半部分包含CouldNotExecuteQuery字符串,后半部分是一段JSON结构。
__tag__:__path__包含日志文件的路径,其中service_a可能是业务名称。
caller中包含文件名与文件行数。
{
"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
"TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
"TaskType": "ALERT",
"__source__": "11.199.XXX.XXX",
"__tag__:__hostname__": "iabcde12345.cloud.abc121",
"__tag__:__path__": "/var/log/service_a.LOG",
"caller": "executor/pool.go:64",
"error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
"requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
"ts": "2024-01-29 22:57:13"
}
结构化数据处理需求
对于这样的日志提取出更有价值的信息需要进行数据清洗,首先需要提取重要的字段,然后对这些字段进行数据分析。本文关注重要字段的提取,分析仍然可以在Flink中进行。假设提取字段具体需求如下:
提取error中的httpCode、errorCode、errorMessage和requestID。
提取 __tag__:__path_中的_service_a作为serviceName。
提取caller中的pool.go作为fileName,64作为fileNo
提取Payload中的project,提取Payload下面的schedule中的type为scheduleType。
重命名 __source__ 为serviceIP。
其余字段舍弃,最终需要的字段列表如下。
解决方案
实现这样的数据清洗,有很多种方法,这里列举几种基于SLS与Flink的方案,不同方案之间没有绝对的优劣,需要根据不同的场景选择不同的方案。
数据加工方案:在SLS控制台创建目标Logstore,通过创建数据加工任务,完成对数据的清洗。
Flink方案:将error和payload指定为源表字段,通过SQL正则函数、JSON函数对字段进行解析,将解析后的字段写入临时表,然后对临时表进行分析。
SPL方案:在Flink SLS Connector中配置SPL语句,对数据进行清洗,Flink中源表字段定义为清洗后的数据结构。
从上述三种方案的原理不难看出,在需要数据清洗的场景中,在SLS Connector 中配置SPL,是一种更轻量化的方案。在日志数据弱结构化的场景中,SPL方案既避免了方案一中创建临时中间Logstore,也避免了方案二中在Flink中创建临时表,在离数据源更近的位置进行数据清洗,在计算平台关注业务逻辑,职责分离更加清晰。
在Flink中使用SPL
1. SLS准备数据
将上文日志片段通过SDK方式写入到目标Logstore,用于样例模拟数据。
在Logstore,编写SLS SPL管道式语法,预览SPL效果。
查询分析语句如下,SLS SPL管道式语法使用
|
分隔符分割不同的指令,每次输入一个指令可以即时查看结果,然后再增加管道数,渐进式、探索式获取最终结果。更多信息,请参考扫描(Scan)查询语法。* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, '$.type') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
语法解释如下:
1行:project指令:从原始结果中保留Payload、error、tag:path、caller字段,舍弃其他字段,这些字段用于后续解析。
2行:parse-json指令:将Payload字符串展开为JSON,第一层字段出现在结果中,包括lastNotified、serviceUri、jobID等。
3行:project-away指令:去除原始Payload字段。
4行:parse-regexp指令:按照error字段中的内容,解析其中的部分JSON内容,置于errorJson字段。
5行:parse-json指令:展开errorJson字段,得到httpCode、errorCode、errorMessage等字段。
6行:parse-regexp指令:通过正则表达式解析出__tag__:__path__中的文件名,并命名为serviceName。
7行:parse-regexp指令:通过正则表达式,解析出caller中的文件名与行数,并置于fileName、fileNo字段。
8行:project-rename指令:将__tag__:__hostname__字段重命名为serviceHost。
9行:extend指令:使用json_extract_scalar函数,提取schedule中的type字段,并命名为scheduleType。
10行:project指令:保留需要的字段列表,其中project字段来自于Payload。
2. 创建SQL作业
登录实时计算控制台,单击目标工作空间。
在左侧导航栏,选择
。单击新建,在新建作业草稿对话框,选择
,单击下一步。在作业草稿中输入如下创建临时表的语句。
CREATE TEMPORARY TABLE sls_input_complex ( errorCode STRING, errorMessage STRING, fileName STRING, fileNo STRING, httpCode STRING, requestID STRING, scheduleType STRING, serviceHost STRING, project STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-beijing-intranet.log.aliyuncs.com', 'accessId' = '${ak}', 'accessKey' = '${sk}', 'starttime' = '2024-02-01 10:30:00', 'project' ='${project}', 'logstore' ='${logtore}', 'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project' );
SQL中的参数说明如下,请根据实际情况替换。
参数名
参数含义
示例值
参数名
参数含义
示例值
connector
连接器。更多信息,请参见支持的连接器。
sls
endpoint
日志服务的私网域名,获取方式请参见服务接入点。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
用户身份识别ID,获取方式,请参见创建AccessKey。
LTAI5tK*******
accessKey
用于验证您拥有该AccessKey ID的密码。获取方式,请参见创建AccessKey。
3k5PJm*******
starttime
指定查询日志的起始时间点。
2025-02-19 00:00:00
project
日志服务的Project名。
test-project
logstore
日志服务的Logstore名。
clb-access-log
query
填写SLS的SPL语句,注意在阿里云Flink的SQL作业开发中,字符串需要使用英文单引号进行转义。
* | where slbid = ''slb-01''
鼠标选中SQL,鼠标右击,单击运行,连接SLS。
3. 连续查询及效果
在作业中输入如下分析语句,按照slbid进行聚合查询。
SELECT * FROM sls_input_complex;
单击右上角调试按钮,在调试弹框,单击选择调试集群下拉框中的创建新的集群,参考下图,创建新的调试集群。
在调试弹框选择创建好的调试集群,然后单击确定。
在结果区域,可以看到TABLE中每一列的值,对应SPL处理后的结果。SPL最终得到的字段列表与TABLE中字段对应。
- 本页导读 (1)
- 背景
- 弱结构化日志数据
- 结构化数据处理需求
- 解决方案
- 在Flink中使用SPL
- 1. SLS准备数据
- 2. 创建SQL作业
- 3. 连续查询及效果