阿里云实时计算(Flink)通过创建日志服务源表的方式,可以直接消费日志服务的数据。本文介绍了如何为实时计算创建日志服务源表以及创建过程涉及到的属性字段提取。
背景信息
Flink消费日志支持的信息如下:
| 类别 | 详情 | 
| 支持类型 | 源表和结果表。 | 
| 运行模式 | 仅支持流模式。 | 
| 特有监控指标 | 暂不适用。 | 
| 数据格式 | 暂无。 | 
| API种类 | SQL。 | 
| 是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 | 
使用Flink消费日志的入口,请参见Flink SQL作业快速入门。
前提条件
- 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。 
- 已创建Flink工作空间,详情请参见开通实时计算Flink版。 
- 已创建Project和Logstore。更多信息,请参见创建Project和Logstore。 
使用限制
- 仅实时计算引擎VVR 2.0.0及以上版本支持日志服务SLS连接器。 
- SLS连接器仅保证At-Least-Once语义。 
- 仅实时计算引擎VVR 4.0.13及以上版本支持Shard数目变化触发自动Failover功能。 
- 强烈建议不要设置Source并发度大于Shard个数,不仅会造成资源浪费,且在8.0.5及更低版本中,如果后续Shard数目发生变化,自动Failover功能可能会失效,造成部分Shard不被消费。 
创建日志服务源表和结果表
使用Flink消费日志服务数据,必须有一个完整的SQL作业。完整的SQL作业包含:源表、结果表,在经过业务逻辑处理后将源表数据插入到结果表(INSERT INTO语句)。
FlinkSQL作业开发请参见SQL作业开发。
日志服务是实时数据存储,实时计算能将其作为流式数据输入。假设有如下日志内容:
__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200代码示例
Flink消费日志服务数据的SQL开发作业代码如下:
SQL中的表名、列名和保留字冲突时,需要使用反引号'`'括起来。
CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; WITH参数
- 通用 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - connector - 表类型。 - String - 是 - 无 - 固定值sls。 - endPoint - EndPoint地址。 - String - 是 - 无 - 请填写SLS的私网服务地址,详情请参见服务接入点。 说明- 实时计算Flink版默认不具备访问公网的能力,但阿里云提供的NAT网关可以实现VPC网络与公网网络互通,详情请参见实时计算Flink版如何访问公网?。 
- 不建议跨公网访问SLS。如确有需要,请使用HTTPS网络传输协议并且开启SLS全球加速服务,详情请参见管理传输加速。 
 - project - SLS项目名称。 - String - 是 - 无 - 无。 - logStore - SLS LogStore或metricstore名称。 - String - 是 - 无 - logStore和metricstore是相同的消费方式。 - accessId - 阿里云账号的AccessKey ID。 - String - 是 - 无 - 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要- 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。 - accessKey - 阿里云账号的AccessKey Secret。 - String - 是 - 无 - 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要- 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理。 
- 源表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - enableNewSource - 是否启用实现了FLIP-27接口的新数据源。 - Boolean - 否 - false - 新数据源可以自动适应shard变化,同时尽可能保证shard在所有的source并发上分布均匀。 说明- 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。 重要- 作业在该配置项发生变化后无法从状态恢复。 - 可通过先设置配置项consumerGroup启动作业,将消费进度记录到SLS消费组中,再将配置项consumeFromCheckpoint设为true后无状态启动作业,从而实现从历史进度继续消费。 - shardDiscoveryIntervalMs - 动态检测shard变化时间间隔,单位为毫秒。 - Long - 否 - 60000 - 设置为负值时可以关闭动态检测。 说明- 仅当配置项enableNewSource为true时生效。 
- 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。 
 - startupMode - 源表启动模式。 - String - 否 - timestamp - 参数取值如下: - timestamp(默认):从指定的起始时间开始消费日志。 
- latest:从最新位点开始消费日志。 
- earliest:从最早位点开始消费日志。 
 说明- 若将配置项consumeFromCheckpoint设为true,则会从指定的消费组中保存的Checkpoint开始消费日志,此处的启动模式将不会生效。 - startTime - 消费日志的开始时间。 - String - 否 - 当前时间 - 格式为yyyy-MM-dd hh:mm:ss。 - 仅当startupMode设为timestamp时生效。 说明- startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。 - stopTime - 消费日志的结束时间。 - String - 否 - 无 - 格式为yyyy-MM-dd hh:mm:ss。 说明- 如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true. - consumerGroup - 消费组名称。 - String - 否 - 无 - 消费组用于记录消费进度。您可以自定义消费组名,无固定格式。 说明- 不支持通过相同的消费组进行多作业的协同消费。不同的Flink作业应该设置不同的消费组。如果不同的Flink作业使用相同的消费组,它们将会消费全部数据。这是因为在Flink消费SLS的数据时,并不会经过SLS消费组进行分区分配,因此导致各个消费者独立消费各自的消息,即使消费组是相同的。 - consumeFromCheckpoint - 是否从指定的消费组中保存的Checkpoint开始消费日志。 - String - 否 - false - 参数取值如下: - true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。 
- false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。 
 说明- 仅实时计算引擎VVR 6.0.5及以上版本支持该参数。 - maxRetries - 读取SLS失败后重试次数。 - String - 否 - 3 - 无。 - batchGetSize - 单次请求读取logGroup的个数。 - String - 否 - 100 - batchGetSize设置不能超过1000,否则会报错。 - exitAfterFinish - 在数据消费完成后,Flink程序是否退出。 - String - 否 - false - 参数取值如下: - true:数据消费完后,Flink程序退出。 
- false(默认):数据消费完后,Flink程序不退出。 
 说明- 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。 - query - SLS消费预处理语句。 - String - 否 - 无 - 通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。 - 例如 - 'query' = '*| where request_method = ''GET'''表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。说明- query需使用日志服务SPL语言,请参见SPL语法。 
- 结果表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - topicField - 指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。 - String - 否 - 无 - 该参数值是表中已存在的字段之一。 - timeField - 指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。 - String - 否 - 当前时间 - 该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。 - sourceField - 指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。 - String - 否 - 无 - 该参数值是表中已存在的字段之一。 - partitionField - 指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。 - String - 否 - 无 - 如果未指定,则每条数据会随机写入当前可用的Shard中。 - buckets - 当指定partitionField时,根据Hash值重新分组的个数。 - String - 否 - 64 - 该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。 说明- 仅实时计算引擎VVR 6.0.5及以上版本支持该参数。 - flushIntervalMs - 触发数据写入的时间周期。 - String - 否 - 2000 - 单位为毫秒。 - writeNullProperties - 是否将null值作为空字符串写入SLS。 - Boolean - 否 - true - 参数取值如下: - true(默认值):将null值作为空字符串写入日志。 
- false:计算结果为null的字段不会写入到日志中。 
 说明- 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。 
属性字段提取
除日志字段外,支持提取如下四个属性字段,也支持提取其它自定义字段。
| 字段名 | 字段类型 | 字段说明 | 
| __source__ | STRING METADATA VIRTUAL | 消息源。 | 
| __topic__ | STRING METADATA VIRTUAL | 消息主题。 | 
| __timestamp__ | BIGINT METADATA VIRTUAL | 日志时间。 | 
| __tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 消息TAG。 对于属性 | 
属性字段的提取需要添加HEADER声明,示例如下:
create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);相关文档
使用Flink DataStream API消费数据,请参见DataStream API。