本文介绍如何在Lindorm流引擎中使用Flink SQL进行复杂事件处理(CEP)。
背景信息
机房持续温度过高报警、自动驾驶过程中突然提速或降速、股票价格的波动等场景都存在几个共同点:数据量大、属于流式数据、且其中大部分数据属于无意义的数据。如果使用关系型数据库分析这类数据,需要先将所有数据导入到数据库中,再使用统计分析工具进行分析,虽然可以得到部分有用的数据,但是无法保证实时性。
为了满足上述需求,出现了复杂事件处理(CEP)的概念,CEP认为任何事情的发生可以当做一个“事件”,例如股票价格上涨或下跌,涨跌的发生会产生一个数据,CEP认为该数据就是一个“事件”。CEP可以通过规则,从不同的事件源中获取相关的事件组合,并对发现的事件深入处理。
前提条件
- 已将客户端IP地址添加至Lindorm实例的白名单中。具体操作,请参见设置白名单。
- 已获取Lindorm流引擎的Lindorm Stream SQL地址。如何获取,请参见查看连接地址。
- 已获取Lindorm宽表引擎中HBase兼容的专有网络连接地址。如何获取,请参见查看连接地址。
- 已安装Lindorm-cli。安装方法,请参见安装Lindorm-cli。
注意事项
如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
- 所在地域相同,并建议所在可用区相同(以减少网络延时)。
- ECS实例与Lindorm实例属于同一专有网络。
连接Lindorm流引擎使用的网络类型
网络类型 | 说明 |
---|---|
专有网络(推荐) | 专有网络VPC(Virtual Private Cloud)是您自己独有的云上私有网络,不同的专有网络之间二层逻辑隔离,拥有较高的安全性和性能。Lindorm-cli部署在ECS实例上时,通过专有网络连接至Lindorm流引擎,可获得更高的安全性和更低的网络延迟。 |
公网 | 公网即互联网,当本地设备需要测试或管理Lindorm流引擎时,可在本地设备上部署Lindorm-cli,然后通过公网连接至Lindorm流引擎。 说明 通过公网连接不会产生流量费用,但存在一定的安全风险,推荐通过专有网络连接以获取更高的安全性。 |
操作步骤
本文将以监控股票价格的波动,识别股票持续下跌的时间段为例。股票的实时价格数据如下:
stock rowtime price
====== ==================== =======
'STOCK1' '01-Dec-22 10:00:00' 12
'STOCK1' '01-Dec-22 10:00:01' 17
'STOCK1' '01-Dec-22 10:00:02' 19
'STOCK1' '01-Dec-22 10:00:03' 21
'STOCK1' '01-Dec-22 10:00:04' 25
'STOCK1' '01-Dec-22 10:00:05' 18
'STOCK1' '01-Dec-22 10:00:06' 15
'STOCK1' '01-Dec-22 10:00:07' 14
'STOCK1' '01-Dec-22 10:00:08' 24
'STOCK1' '01-Dec-22 10:00:09' 25
'STOCK1' '01-Dec-22 10:00:10' 19
- 使用Lindorm-cli连接Lindorm宽表引擎。
./lindorm-cli -url jdbc:lindorm:table:url=http://ld-bp17j28j2y7pm****-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060 -username -password
- 在Lindorm宽表中创建一张结果表。
CREATE TABLE IF NOT EXISTS PRICE_DOWN ( stock_name VARCHAR, start_time TIMESTAMP, bottom_time TIMESTAMP, end_time TIMESTAMP, avg_price DOUBLE, primary key (stock_name, start_time) );
- 通过Kafka API写入原始数据(csv格式)至指定的Topic(名称为stock_price)中,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据。
#创建Topic ./kafka-topics.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic stock_price --create #写入数据 ./kafka-console-producer.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic stock_price STOCK1,2022-12-01 10:00:00,12 STOCK1,2022-12-01 10:00:01,17 STOCK1,2022-12-01 10:00:02,19 STOCK1,2022-12-01 10:00:03,21 STOCK1,2022-12-01 10:00:04,25 STOCK1,2022-12-01 10:00:05,18 STOCK1,2022-12-01 10:00:06,15 STOCK1,2022-12-01 10:00:07,14 STOCK1,2022-12-01 10:00:08,24 STOCK1,2022-12-01 10:00:09,25 STOCK1,2022-12-01 10:00:10,19
- 使用Lindorm-cli连接Lindorm流引擎。
./lindorm-sqlline -url jdbc:streamsql:url=http://ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30060
- 使用Flink SQL提交流引擎计算任务读取Kafka Topic中的数据,并将其写入Lindorm表中。
CREATE FJOB recognition_stock_price_down( CREATE TABLE stock_price( `stock_name` VARCHAR, `row_time` TIMESTAMP(3), `price` INT, WATERMARK FOR `row_time` AS `row_time` - INTERVAL '1' SECOND )WITH( 'connector'='kafka', 'topic'='stock_price', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080', 'format'='csv'); CREATE TABLE lindorm_price_down( `stock_name` VARCHAR, `start_time` TIMESTAMP(3), `bottom_time` TIMESTAMP(3), `end_time` TIMESTAMP(3), `avg_price` DOUBLE, PRIMARY KEY (`stock_name`, `start_time`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='ld-bp****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020', 'userName'='root', 'password'='root', 'tableName'='PRICE_DOWN', 'namespace'='default'); INSERT INTO lindorm_price_down SELECT * FROM stock_price MATCH_RECOGNIZE( PARTITION BY `stock_name` ORDER BY `row_time` MEASURES START_ROW.row_time AS start_time, LAST(PRICE_DOWN.row_time) AS bottom_time, LAST(PRICE_UP.row_time) AS end_time, AVG(PRICE_DOWN.price) AS avg_price ONE ROW PER MATCH AFTER MATCH SKIP TO LAST PRICE_UP PATTERN (START_ROW PRICE_DOWN+ PRICE_UP) DEFINE PRICE_DOWN AS (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1), PRICE_UP AS PRICE_UP.price > LAST(PRICE_DOWN.price, 1) ); );
- Flink SQL的语法说明,请参见Flink官方文档。
- Flink的Kafka连接器的配置信息,请参见Flink官网文档。
- 流引擎的宽表连接器的配置信息,请参见配置流引擎的宽表连接器。
- 使用Lindorm-cli连接Lindorm宽表引擎并查询结果表中的处理结果。
SELECT * FROM PRICE_DOWN;
返回结果如下:
+------------+-------------------------------+-------------------------------+-------------------------------+-----------+ | stock_name | start_time | bottom_time | end_time | avg_price | +------------+-------------------------------+-------------------------------+-------------------------------+-----------+ | STOCK1 | 2022-12-01 18:00:04 +0000 UTC | 2022-12-01 18:00:07 +0000 UTC | 2022-12-01 18:00:08 +0000 UTC | 15 | +------------+-------------------------------+-------------------------------+-------------------------------+-----------+