Flink SQL基于SPL实现行过滤与列裁剪

更新时间:2025-02-24 02:21:59

本文介绍Flink SQLSPL完成行过滤与列裁剪的操作步骤。

背景

在阿里云Flink配置SLS作为源表时,默认会消费SLSLogstore数据进行动态表的构建,在消费的过程中,可以指定起始时间点,其消费的数据是指定时间点以后的全量数据。这样做有两个问题:

  1. Connector 从源头拉取了过多不必要的数据行或者数据列,造成了网络的开销。

  2. 这些不必要的数据需要在Flink中进行过滤投影计算,这些清洗工作并不是数据分析的关注重点,造成了计算浪费。

为此,SLS SPLFlink SLS Connector提供了过滤下推和投影下推的能力。通过配置SLS Connectorquery语句或参数,可以实现过滤条件和投影字段的下推,避免全量数据传输和计算,提升效率。

方案原理

  • 未配置SPL语句时:Flink会拉取SLS的全量日志数据(包含所有列、所有行)进行计算,如图所示。

    image
  • 配置SPL语句时:当SPL语句包含行过滤或列裁剪操作时,Flink拉取的数据是经过这些操作处理后的部分数据,用于后续计算,如图所示。

    image

准备工作

  • 开通日志服务,已创建ProjectLogstore

  • 本文Logstore数据使用SLSSLB七层日志模拟接入方式产生模拟数据,其中包含10多个字段。模拟接入会持续产生随机的日志数据,日志内容示例如下:

    {
      "__source__": "127.0.0.1",
      "__tag__:__receive_time__": "1706531737",
      "__time__": "1706531727",
      "__topic__": "slb_layer7",
      "body_bytes_sent": "3577",
      "client_ip": "114.137.XXX.XXX",
      "host": "www.pi.mock.com",
      "http_host": "www.cwj.mock.com",
      "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
      "request_length": "1662",
      "request_method": "GET",
      "request_time": "31",
      "request_uri": "/request/path-0/file-3",
      "scheme": "https",
      "slbid": "slb-02",
      "status": "200",
      "upstream_addr": "42.63.XXX.XXX",
      "upstream_response_time": "32",
      "upstream_status": "200",
      "vip_addr": "223.18.XX.XXX"
    }
  • Logstoreslbid字段有三种值,对15分钟的日志数据进行slbid统计,可以发现slb-01slb-02数量相当。

    image

操作步骤

行过滤:SLS SPLFlink SLS Connector提供了一种支持过滤下推的能力,通过配置SLS Connectorquery语句中的过滤条件,即可实现过滤条件下推。避免全量数据传输和全量数据过滤计算。

列过滤:SLS SPLFlink SLS Connector提供了一种支持投影下推的能力,通过配置SLS Connectorquery参数,即可实现投影字段下推。避免全量数据传输和全量数据过滤计算。

行过滤场景
列裁剪场景

步骤一:创建SQL作业

  1. 登录实时计算控制台,单击目标工作空间。

  2. 在左侧导航栏,选择数据开发 > ETL

  3. 单击新建,在新建作业草稿对话框,选择SQL基础模板 > 空白的流作业草稿,单击下一步

    image

  4. 拷贝如下创建临时表的SQLSQL编辑区域。

    CREATE TEMPORARY TABLE sls_input(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'LTAI5tK*******',
      'accessKey' = '3k5PJm*******',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'''
    );

    SQL中的参数说明如下:

    参数名

    参数含义

    示例值

    connector

    连接器。更多信息,请参见支持的连接器

    sls

    endpoint

    日志服务的私网域名,获取方式请参见服务接入点

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    用户身份识别ID,获取方式,请参见创建AccessKey

    LTAI5tK*******

    accessKey

    用于验证您拥有该AccessKey ID的密码。获取方式,请参见创建AccessKey

    3k5PJm*******

    starttime

    指定查询日志的起始时间点。

    2025-02-19 00:00:00

    project

    日志服务的Project名。

    test-project

    logstore

    日志服务的Logstore名。

    clb-access-log

    query

    填写SLSSPL语句,注意在阿里云FlinkSQL作业开发中,字符串需要使用英文单引号进行转义。

    * | where slbid = ''slb-01''

  5. 鼠标选中SQL,鼠标右击,单击运行,连接SLS。

    image

步骤二:连续查询及效果

  1. 在作业中输入如下分析语句,按照slbid进行聚合查询。

    SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;
  2. 单击右上角调试按钮,在调试弹框,单击选择调试集群下拉框中的创建新的集群,参考下图,创建新的调试集群。

    image

  3. 在调试弹框选择创建好的调试集群,然后单击确定

    image

  4. 在结果区域,可以看到结果中slbid的字段值,始终是slb-01。可以看出设置了SPL语句后,sls_input仅包含slbid='slb-01'的数据,其他不符合条件的数据被过滤掉了。

    image

步骤一:创建SQL作业

  1. 登录实时计算控制台,单击目标工作空间。

  2. 在左侧导航栏,选择数据开发 > ETL

  3. 单击新建,在新建作业草稿对话框,选择SQL基础模板 > 空白的流作业草稿,单击下一步

    image

  4. 拷贝如下创建临时表的SQLSQL编辑区域。与行过滤场景不同的是,这里query参数配置进行了修改,在过滤的基础上增加了投影语句,使用|符号(类似Unix管道)将不同指令进行分割,上一条指令的输出作为下一条指令的输入,最后指令的输出表示整个管道的输出。实现从SLS服务端仅拉取特定字段的内容。

    CREATE TEMPORARY TABLE sls_input_project(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'LTAI5tK*******',
      'accessKey' = '3k5PJm*******',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
    );

    SQL中的参数说明如下:

    参数名

    参数含义

    示例值

    connector

    连接器。更多信息,请参见支持的连接器

    sls

    endpoint

    日志服务的私网域名,获取方式请参见服务接入点

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    用户身份识别ID,获取方式,请参见创建AccessKey

    LTAI5tK*******

    accessKey

    用于验证您拥有该AccessKey ID的密码。获取方式,请参见创建AccessKey

    3k5PJm*******

    starttime

    指定查询日志的起始时间点。

    2025-02-19 00:00:00

    project

    日志服务的Project名。

    test-project

    logstore

    日志服务的Logstore名。

    clb-access-log

    query

    填写SLSSPL语句,注意在阿里云FlinkSQL作业开发中,字符串需要使用英文单引号进行转义。

    * | where slbid = ''slb-01''

  5. 鼠标选中SQL,鼠标右击,单击运行,连接SLS。

    image

步骤二:连续查询及效果

  1. 在作业中输入如下分析语句,按照slbid进行聚合查询。

    SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;
  2. 单击右上角调试按钮,在调试弹框,单击选择调试集群下拉框中的创建新的集群,参考下图,创建新的调试集群。

    image

  3. 在调试弹框选择创建好的调试集群,然后单击确定

    image

  4. 在结果区域,可以看到结果与行过滤场景结果类似。

    说明

    注意:这里与行过滤不同的是,行过滤场景会返回全量的字段,而当前的语句令SLS Connector只返回特定的字段,再次减少了数据的网络传输。

    image

  • 本页导读 (1)
  • 背景​
  • 方案原理​
  • 准备工作
  • 操作步骤