阿里云实时计算(Flink)通过创建日志服务源表的方式,可以直接消费日志服务的数据。本文介绍了如何为实时计算创建日志服务源表以及创建过程涉及到的属性字段提取。
背景信息
Flink消费日志支持的信息如下:
|
类别 |
详情 |
|
支持类型 |
源表和结果表。 |
|
运行模式 |
仅支持流模式。 |
|
特有监控指标 |
暂不适用。 |
|
数据格式 |
暂无。 |
|
API种类 |
SQL。 |
|
是否支持更新或删除结果表数据 |
不支持更新和删除结果表数据,只支持插入数据。 |
使用Flink消费日志的入口,请参见Flink SQL作业快速入门。
前提条件
-
如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。
-
已创建Flink工作空间,详情请参见开通实时计算Flink版。
-
已创建Project和LogStore。更多信息,请参见创建Project和LogStore。
使用限制
仅实时计算引擎VVR 11.1及以上版本支持日志服务SLS作为数据摄入YAML的同步数据源。
SLS连接器仅保证At-Least-Once语义。
强烈建议不要设置Source并发度大于Shard个数,不仅会造成资源浪费,且在8.0.5及更低版本中,如果后续Shard数目发生变化,自动Failover功能可能会失效,造成部分Shard不被消费。
创建日志服务源表和结果表
使用Flink消费日志服务数据,必须有一个完整的SQL作业。完整的SQL作业包含:源表、结果表,在经过业务逻辑处理后将源表数据插入到结果表(INSERT INTO语句)。
FlinkSQL作业开发请参见作业开发地图。
日志服务是实时数据存储,实时计算能将其作为流式数据输入。假设有如下日志内容:
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200
代码示例
Flink消费日志服务数据的SQL开发作业代码如下:
SQL中的表名、列名和保留字冲突时,需要使用反引号'`'括起来。
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值sls。
endPoint
EndPoint地址。
String
是
无
请填写SLS的私网服务地址,详情请参见服务接入点。
project
SLS项目名称。
String
是
无
无。
logStore
SLS LogStore或metricstore名称。
String
是
无
logStore和metricstore是相同的消费方式。
accessId
阿里云账号的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?。
重要为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。
accessKey
阿里云账号的AccessKey Secret。
String
是
无
源表独有
参数
说明
数据类型
是否必填
默认值
备注
enableNewSource
是否启用实现了FLIP-27接口的新数据源。
Boolean
否
false
新数据源可以自动适应Shard变化,同时尽可能保证Shard在所有的source并发上分布均匀。
重要仅实时计算引擎VVR 8.0.9及以上版本支持该参数。从实时计算引擎VVR 11.1版本开始该参数默认为true。
作业在该配置项发生变化后无法从状态恢复。可通过先设置配置项consumerGroup启动作业,将消费进度记录到SLS消费组中,再将配置项consumeFromCheckpoint设为true后无状态启动作业,从而实现从历史进度继续消费。
如果SLS中存在只读Shard,Flink的某些并发任务在完成对只读Shard的消费后会继续请求读取其他未完成的Shard。这可能导致部分并发任务被分配到多个Shard,从而造成不同并发任务之间的Shard分配不均衡。这种不均衡会影响整体的消费效率和系统性能。为缓解这一问题,您可以通过调整并发度、优化任务调度策略、合并小Shard等方法,以减少Shard数量和任务分配复杂度。
shardDiscoveryIntervalMs
动态检测shard变化时间间隔,单位为毫秒。
Long
否
60000
设置为负值时可以关闭动态检测。
说明该参数值不能少于1分钟(60000毫秒)。
仅当配置项enableNewSource为true时生效。
仅实时计算引擎VVR 8.0.9及以上版本支持该参数。
startupMode
源表启动模式。
String
否
timestamp
timestamp(默认):从指定的起始时间开始消费日志。latest:从最新位点开始消费日志。earliest:从最早位点开始消费日志。consumer_group:从消费组记录位点开始消费日志。若消费组未记录某shard消费位点,则会从最早位点开始消费日志。
重要实时计算引擎VVR 11.1以下版本,不支持取值为consumer_group,需要将
consumeFromCheckpoint设为true,此时会从指定消费组记录的位点开始消费日志,此处的启动模式将不会生效。
startTime
消费日志的开始时间。
String
否
当前时间
格式为
yyyy-MM-dd hh:mm:ss。仅当
startupMode设为timestamp时生效。说明startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。
stopTime
消费日志的结束时间。
String
否
无
格式为
yyyy-MM-dd hh:mm:ss。说明仅用于消费历史日志,应设置为过去时间点。若配置为未来时间,可能因暂无新日志写入而导致消费提前终止,表现为数据流中断且无异常提示。
如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true.
consumerGroup
消费组名称。
String
否
无
消费组用于记录消费进度。您可以自定义消费组名,无固定格式。
说明不支持通过相同的消费组进行多作业的协同消费。不同的Flink作业应该设置不同的消费组。如果不同的Flink作业使用相同的消费组,它们将会消费全部数据。这是因为在Flink消费SLS的数据时,并不会经过SLS消费组进行分区分配,因此导致各个消费者独立消费各自的消息,即使消费组是相同的。
consumeFromCheckpoint
是否从指定的消费组中保存的Checkpoint开始消费日志。
String
否
false
true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。
重要实时计算引擎VVR 11.1版本开始不再支持配置该参数。对于VVR 11.1及更高版本,需要将
startupMode配置为consumer_group。maxRetries
读取SLS失败后重试次数。
String
否
3
无。
batchGetSize
单次请求读取logGroup的个数。
String
否
100
batchGetSize设置不能超过1000,否则会报错。exitAfterFinish
在数据消费完成后,Flink程序是否退出。
String
否
false
true:数据消费完后,Flink程序退出。false(默认):数据消费完后,Flink程序不退出。
query
重要自VVR 11.3起废弃,后续版本仍兼容。
SLS消费预处理语句。
String
否
无
通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。
例如
'query' = '*| where request_method = ''GET'''表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。说明query需使用日志服务SPL语言,请参见SPL语法。
重要仅实时计算引擎VVR 8.0.1及以上版本支持该参数。
该功能会产生日志服务SLS费用,详情请参见费用说明。
processor
SLS 消费处理器,与query字段同时存在时,query生效,processor不生效。
String
否
无
通过使用processor参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。推荐使用processor参数而不是query参数。
例如
'processor' = 'test-filter-processor'表示在Flink读取SLS数据前,先经过SLS消费处理器的过滤。重要仅实时计算引擎VVR 11.3及以上版本支持该参数。
该功能会产生日志服务SLS费用,详情请参见费用说明。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
topicField
指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。
String
否
无
该参数值是表中已存在的字段之一。
timeField
指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。
String
否
当前时间
该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。
sourceField
指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。
String
否
无
该参数值是表中已存在的字段之一。
partitionField
指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。
String
否
无
如果未指定,则每条数据会随机写入当前可用的Shard中。
buckets
当指定partitionField时,根据Hash值重新分组的个数。
String
否
64
该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。
flushIntervalMs
触发数据写入的时间周期。
String
否
2000
单位为毫秒。
writeNullProperties
是否将null值作为空字符串写入SLS。
Boolean
否
true
true(默认值):将null值作为空字符串写入日志。false:计算结果为null的字段不会写入到日志中。
说明仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
属性字段提取
除日志字段外,支持提取如下四个属性字段,也支持提取其它自定义字段。
|
字段名 |
字段类型 |
字段说明 |
|
__source__ |
STRING METADATA VIRTUAL |
消息源。 |
|
__topic__ |
STRING METADATA VIRTUAL |
消息主题。 |
|
__timestamp__ |
BIGINT METADATA VIRTUAL |
日志时间。 |
|
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL |
消息TAG。 对于属性 |
属性字段的提取需要添加HEADER声明,示例如下:
create table sls_stream(
__timestamp__ bigint HEADER,
__receive_time__ bigint HEADER
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
相关文档
使用Flink DataStream API消费数据,请参见DataStream API。