本文介绍如何在Lindorm流引擎中使用Flink SQL进行复杂事件处理(CEP)。

背景信息

机房持续温度过高报警、自动驾驶过程中突然提速或降速、股票价格的波动等场景都存在几个共同点:数据量大、属于流式数据、且其中大部分数据属于无意义的数据。如果使用关系型数据库分析这类数据,需要先将所有数据导入到数据库中,再使用统计分析工具进行分析,虽然可以得到部分有用的数据,但是无法保证实时性。

为了满足上述需求,出现了复杂事件处理(CEP)的概念,CEP认为任何事情的发生可以当做一个“事件”,例如股票价格上涨或下跌,涨跌的发生会产生一个数据,CEP认为该数据就是一个“事件”。CEP可以通过规则,从不同的事件源中获取相关的事件组合,并对发现的事件深入处理。

前提条件

  • 已将客户端IP地址添加至Lindorm实例的白名单中。具体操作,请参见设置白名单
  • 已获取Lindorm流引擎的Lindorm Stream SQL地址。如何获取,请参见查看连接地址
  • 已获取Lindorm宽表引擎中HBase兼容的专有网络连接地址。如何获取,请参见查看连接地址
  • 已安装Lindorm-cli。安装方法,请参见安装Lindorm-cli

注意事项

如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。
  • ECS实例与Lindorm实例属于同一专有网络。

连接Lindorm流引擎使用的网络类型

网络类型说明
专有网络(推荐)专有网络VPC(Virtual Private Cloud)是您自己独有的云上私有网络,不同的专有网络之间二层逻辑隔离,拥有较高的安全性和性能。Lindorm-cli部署在ECS实例上时,通过专有网络连接至Lindorm流引擎,可获得更高的安全性和更低的网络延迟。
公网公网即互联网,当本地设备需要测试或管理Lindorm流引擎时,可在本地设备上部署Lindorm-cli,然后通过公网连接至Lindorm流引擎。
说明 通过公网连接不会产生流量费用,但存在一定的安全风险,推荐通过专有网络连接以获取更高的安全性。

操作步骤

本文将以监控股票价格的波动,识别股票持续下跌的时间段为例。股票的实时价格数据如下:

stock         rowtime         price
======  ====================  =======
'STOCK1'  '01-Dec-22 10:00:00'   12
'STOCK1'  '01-Dec-22 10:00:01'   17
'STOCK1'  '01-Dec-22 10:00:02'   19
'STOCK1'  '01-Dec-22 10:00:03'   21
'STOCK1'  '01-Dec-22 10:00:04'   25
'STOCK1'  '01-Dec-22 10:00:05'   18
'STOCK1'  '01-Dec-22 10:00:06'   15
'STOCK1'  '01-Dec-22 10:00:07'   14
'STOCK1'  '01-Dec-22 10:00:08'   24
'STOCK1'  '01-Dec-22 10:00:09'   25
'STOCK1'  '01-Dec-22 10:00:10'   19
  1. 使用Lindorm-cli连接Lindorm宽表引擎。
    ./lindorm-cli -url jdbc:lindorm:table:url=http://ld-bp17j28j2y7pm****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060 -username -password
  2. 在Lindorm宽表中创建一张结果表。
    CREATE TABLE IF NOT EXISTS PRICE_DOWN (
      stock_name VARCHAR,
      start_time TIMESTAMP,
      bottom_time TIMESTAMP,
      end_time TIMESTAMP,
      avg_price DOUBLE,
    primary key (stock_name, start_time) );
  3. 通过Kafka API写入原始数据(csv格式)至指定的Topic(名称为stock_price)中,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据
    #创建Topic
    ./kafka-topics.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic stock_price --create
    
    #写入数据
    ./kafka-console-producer.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic stock_price
    STOCK1,2022-12-01 10:00:00,12
    STOCK1,2022-12-01 10:00:01,17
    STOCK1,2022-12-01 10:00:02,19
    STOCK1,2022-12-01 10:00:03,21
    STOCK1,2022-12-01 10:00:04,25
    STOCK1,2022-12-01 10:00:05,18
    STOCK1,2022-12-01 10:00:06,15
    STOCK1,2022-12-01 10:00:07,14
    STOCK1,2022-12-01 10:00:08,24
    STOCK1,2022-12-01 10:00:09,25
    STOCK1,2022-12-01 10:00:10,19
  4. 使用Lindorm-cli连接Lindorm流引擎。
    ./lindorm-sqlline -url jdbc:streamsql:url=http://ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30060
  5. 使用Flink SQL提交流引擎计算任务读取Kafka Topic中的数据,并将其写入Lindorm表中。
    CREATE FJOB recognition_stock_price_down(
        CREATE TABLE stock_price(
            `stock_name` VARCHAR,
            `row_time` TIMESTAMP(3),
            `price` INT,
            WATERMARK FOR `row_time` AS `row_time` - INTERVAL '1' SECOND
        )WITH(
            'connector'='kafka',
            'topic'='stock_price',
            'scan.startup.mode'='earliest-offset',
            'properties.bootstrap.servers'='ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080',
            'format'='csv');
        CREATE TABLE lindorm_price_down(
            `stock_name` VARCHAR,
            `start_time` TIMESTAMP(3),
            `bottom_time` TIMESTAMP(3),
            `end_time` TIMESTAMP(3),
            `avg_price` DOUBLE,
            PRIMARY KEY (`stock_name`, `start_time`) NOT ENFORCED
        ) WITH (
            'connector'='lindorm',
            'seedServer'='ld-bp****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020',
            'userName'='root',
            'password'='root',
            'tableName'='PRICE_DOWN',
            'namespace'='default');
        INSERT INTO lindorm_price_down
        SELECT *
        FROM stock_price
            MATCH_RECOGNIZE(
                PARTITION BY `stock_name`
                ORDER BY `row_time`
                MEASURES
                    START_ROW.row_time AS start_time,
                    LAST(PRICE_DOWN.row_time) AS bottom_time,
                    LAST(PRICE_UP.row_time) AS end_time,
                    AVG(PRICE_DOWN.price) AS avg_price
                    ONE ROW PER MATCH
                    AFTER MATCH SKIP TO LAST PRICE_UP
                    PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
                    DEFINE
                        PRICE_DOWN AS
                        (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                        PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
                        PRICE_UP AS
                        PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
            );
    );
  6. 使用Lindorm-cli连接Lindorm宽表引擎并查询结果表中的处理结果。
    SELECT * FROM PRICE_DOWN;

    返回结果如下:

    +------------+-------------------------------+-------------------------------+-------------------------------+-----------+
    | stock_name |          start_time           |          bottom_time          |           end_time            | avg_price |
    +------------+-------------------------------+-------------------------------+-------------------------------+-----------+
    | STOCK1     | 2022-12-01 18:00:04 +0000 UTC | 2022-12-01 18:00:07 +0000 UTC | 2022-12-01 18:00:08 +0000 UTC | 15        |
    +------------+-------------------------------+-------------------------------+-------------------------------+-----------+