文档

实时计算(Flink)消费

更新时间:

阿里云实时计算(Flink)通过创建日志服务源表的方式,可以直接消费日志服务的数据。本文介绍了如何为实时计算创建日志服务源表以及创建过程涉及到的属性字段提取。

背景信息

Flink消费日志支持的信息如下:

类别

详情

支持类型

源表和结果表。

运行模式

仅支持流模式。

特有监控指标

暂不适用。

数据格式

暂无。

API种类

SQL。

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

使用Flink消费日志的入口,请参见Flink SQL作业快速入门

前提条件

使用限制

  • 仅实时计算引擎VVR 2.0.0及以上版本支持日志服务SLS连接器。

  • SLS连接器仅保证At-Least-Once语义。

  • 仅实时计算引擎VVR 4.0.13及以上版本支持Shard数目变化触发自动Failover功能。

  • 强烈建议不要设置Source并发度大于Shard个数,不仅会造成资源浪费,且在8.0.5及更低版本中,如果后续Shard数目发生变化,自动Failover功能可能会失效,造成部分Shard不被消费。

创建日志服务源表和结果表

重要

使用Flink消费日志服务数据,必须有一个完整的SQL作业。完整的SQL作业包含:源表、结果表,在经过业务逻辑处理后将源表数据插入到结果表(INSERT INTO语句)。

FlinkSQL作业开发请参见SQL作业开发

日志服务是实时数据存储,实时计算能将其作为流式数据输入。假设有如下日志内容:

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

代码示例

Flink消费日志服务数据的SQL开发作业代码如下:

重要

SQL中的表名、列名和保留字冲突时,需要使用反引号'`'括起来。

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    请填写SLS的私网服务地址,详情请参见私网服务入口

    说明

    实时计算Flink版默认不具备访问公网的能力,但阿里云提供的NAT网关可以实现VPC网络与公网网络互通,详情请参见实时计算Flink版如何访问公网?

    project

    SLS项目名称。

    String

    无。

    logStore

    SLS LogStore或metricstore名称。

    String

    logStore和metricstore是相同的消费方式。

    accessId

    阿里云账号的AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理

    accessKey

    阿里云账号的AccessKey Secret。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    startupMode

    源表启动模式。

    String

    timestamp

    参数取值如下:

    • timestamp(默认):从指定的起始时间开始消费日志。

    • latest:从最新位点开始消费日志。

    • earliest:从最早位点开始消费日志。

    说明

    若将配置项consumeFromCheckpoint设为true,则会从指定的消费组中保存的Checkpoint开始消费日志,此处的启动模式将不会生效。

    startTime

    消费日志的开始时间。

    String

    当前时间

    格式为yyyy-MM-dd hh:mm:ss。

    仅当startupMode设为timestamp时生效。

    说明

    startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。

    stopTime

    消费日志的结束时间。

    String

    格式为yyyy-MM-dd hh:mm:ss。

    说明

    如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true.

    consumerGroup

    消费组名称。

    String

    消费组用于记录消费进度。您可以自定义消费组名,无固定格式。

    说明

    不支持通过相同的消费组进行多作业的协同消费。不同的Flink作业应该设置不同的消费组。如果不同的Flink作业使用相同的消费组,它们将会消费全部数据。这是因为在Flink消费SLS的数据时,并不会经过SLS消费组进行分区分配,因此导致各个消费者独立消费各自的消息,即使消费组是相同的。

    consumeFromCheckpoint

    是否从指定的消费组中保存的Checkpoint开始消费日志。

    String

    false

    参数取值如下:

    • true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。

    • false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。

    说明

    仅实时计算引擎VVR 6.0.5及以上版本支持该参数。

    directMode

    是否使用SLS的直连模式。

    String

    false

    参数取值如下:

    • true:使用该功能。

    • false(默认):禁用该功能。

    maxRetries

    读取SLS失败后重试次数。

    String

    3

    无。

    batchGetSize

    单次请求读取logGroup的个数。

    String

    100

    batchGetSize设置不能超过1000,否则会报错。

    exitAfterFinish

    在数据消费完成后,Flink程序是否退出。

    String

    false

    参数取值如下:

    • true:数据消费完后,Flink程序退出。

    • false(默认):数据消费完后,Flink程序不退出。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    query

    SLS消费预处理语句。

    String

    通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。

    说明

    query需使用日志服务SPL语言,请参见SPL概述

    重要
    • 仅实时计算引擎VVR 8.0.1及以上版本支持该参数。

    • 日志服务SLS支持该功能的地域请参见基于规则消费日志

    • 公测阶段免费,后续可能会在产生日志服务SLS费用,详情请参见费用说明

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    topicField

    指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。

    String

    该参数值是表中已存在的字段之一。

    timeField

    指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。

    String

    当前时间

    该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。

    sourceField

    指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。

    String

    该参数值是表中已存在的字段之一。

    partitionField

    指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。

    String

    如果未指定,则每条数据会随机写入当前可用的Shard中。

    buckets

    当指定partitionField时,根据Hash值重新分组的个数。

    String

    64

    该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。

    说明

    仅实时计算引擎VVR 6.0.5及以上版本支持该参数。

    flushIntervalMs

    触发数据写入的时间周期。

    String

    2000

    单位为毫秒。

    writeNullProperties

    是否将null值作为空字符串写入SLS。

    Boolean

    true

    参数取值如下:

    • true(默认值):将null值作为空字符串写入日志。

    • false:计算结果为null的字段不会写入到日志中。

    说明

    仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

属性字段提取

除日志字段外,支持提取如下三个属性字段,也支持提取其它自定义字段。

字段名

字段类型

字段说明

__source__

STRING METADATA VIRTUAL

消息源。

__topic__

STRING METADATA VIRTUAL

消息主题。

__timestamp__

BIGINT METADATA VIRTUAL

日志时间。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

消息TAG。

对于属性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'会被作为KV对,记录在Map中,在SQL中通过__tag__['__receive_time__']的方式访问。

属性字段的提取需要添加HEADER声明,示例如下:

create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

相关文档

使用Flink DataStream API消费数据,请参见DataStream API