FlinkCEP(Complex Event Processing)用于动态处理复杂事件流,能够实时检测特定事件模式并触发预警。在电商营销中,FlinkCEP可用于实时监控用户行为、交易数据等,识别异常或关键事件,及时发出预警。
背景信息
随着电商行业的快速发展,用户行为数据和交易数据的规模呈指数级增长。传统的批处理方式已难以满足对异常行为、系统风险和用户流失的及时识别与响应。相比之下,利用动态复杂事件处理(CEP)引擎对多阶段用户行为进行建模分析,能够自动识别复杂的事件模式,并在风险发生的初期触发预警,这是动态CEP在实时业务中的核心优势所在。其具备以下三大关键特点:
实时性强:实现毫秒级响应,支持“事中预警”,而非事后分析,助力快速决策。
规则灵活可配置:支持动态更新规则策略,无需重启服务即可快速适应业务变化。
复杂事件识别能力强:支持多事件序列、时间窗口、条件组合等高级逻辑匹配,精准捕捉复杂业务场景。
在电商行业,动态CEP的典型应用场景包括但不限于以下几个方面:
场景 | 说明 |
交叉销售与追加销售机会 | 用户在浏览商品时,常表现出跨品类兴趣,例如先看手机,再查看耳机或充电宝。这种行为蕴含交叉销售和追加销售机会。通过精准推荐互补商品(如手机壳、耳机)或提供组合优惠(如“手机+耳机套餐立减”),平台不仅能提升附加商品购买率、提高客单价,还能优化用户体验,增强用户粘性,从而推动业务增长。 |
高价值购物车挽回 | 用户将高价值商品加入购物车后,可能因价格敏感或决策犹豫未完成购买,造成潜在销售损失。通过实时识别购物车放弃行为并触发干预(如限时折扣、库存预警或免运费优惠),平台可有效减少高价值商品的流失,提升订单转化率,挽回潜在收益,实现用户价值与平台收益的双赢。 |
高意向用户识别 | 用户短时间内多次浏览同一商品,表明其购买意向较高。通过识别该行为并触发个性化营销(如专属优惠券或库存提醒),平台可加速用户决策,提高转化率,同时优化用户体验,推动销售增长。 |
价格敏感用户运营 | 价格敏感用户常反复浏览某商品,仅在降价时加入购物车。通过分析该行为,平台可在价格变动时推送通知或定向优惠(如“您关注的商品已降价!”),提升转化率,同时优化用户运营效率。 |
流失风险预警 | 用户频繁浏览商品却长期未下单,可能存在流失风险。通过识别此类行为并采取挽回措施(如发送专属优惠券或推荐热门商品),平台可有效降低流失率,延长用户生命周期,同时提升用户留存与平台收益。 |
方案架构
FlinkCEP是Apache Flink中用于处理复杂事件模式的库。FlinkCEP(Complex Event Processing)通过定义复杂事件模式,实时监控事件流,并在事件流中识别出符合模式的事件序列,最终输出匹配结果。其方案架构可以概括如下:
Event Stream
事件流是CEP处理的输入源,通常是一个连续的数据流,包含一系列按时间顺序排列的事件。每个事件可以包含多个属性,用于后续的模式匹配。
Pattern and Rule Definitions
用户定义事件模式(Pattern)和规则(Rule),这些模式描述了用户感兴趣的事件序列或组合。模式可以包括事件的顺序、时间约束、条件过滤等。例如,定义“A事件后跟随B事件,且两者时间间隔不超过10秒”的模式。
CEP Engine Analysis
CEP引擎接收事件流,并根据定义的模式和规则进行分析。引擎会持续监控事件流,尝试将输入事件与定义的模式进行匹配。匹配过程中,引擎会考虑事件的时间顺序、属性条件以及时间窗口等约束。
CEP Matching Outputs
当事件流中的事件序列与定义的模式匹配成功时,CEP引擎会生成匹配结果(Output)。这些结果可以是匹配到的事件序列、触发规则的动作,或者其他用户定义的输出形式。匹配结果可以用于后续的处理,如告警、决策或存储。
前提条件
已开通实时计算Flink版,详情请参见开通实时计算Flink版。
已开通云消息队列Kafka,详情请参见部署消息队列Kafka实例。
已开通RDS MySQL,详情请参见创建RDS MySQL实例。
实时计算Flink版、云数据库RDS MySQL、云消息队列Kafka需要在同一VPC下。如果不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见如何访问跨VPC的其他服务?和如何访问公网?。
通过RAM用户或RAM角色等身份访问时,需要具备操作权限。
步骤一:准备工作
创建RDS MySQL实例并准备数据源
创建RDS MySQL数据库,详情请参见创建数据库。
为目标实例创建名称为
ecommerce
的数据库。准备MySQL CDC数据源。
在目标实例详情页面,单击上方的登录数据库。
在弹出的DMS页面中,填写创建的数据库账号名和密码,然后单击登录。
登录成功后,在左侧双击
ecommerce
数据库,切换数据库。在SQL Console区域编写如下建表DDL以及插入的数据语句。
-- 创建规则表1 CREATE TABLE rds_demo1 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 创建规则表2 CREATE TABLE rds_demo2 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 创建规则表3 CREATE TABLE rds_demo3 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 创建规则表4 CREATE TABLE rds_demo4 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 创建规则表5 CREATE TABLE rds_demo5 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 创建源表 CREATE TABLE `click_stream1` ( id bigint not null primary key auto_increment, -- 自增主键 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream2` ( id bigint not null primary key auto_increment, -- 自增主键 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream3` ( id bigint not null primary key auto_increment, -- 自增主键 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream4` ( id bigint not null primary key auto_increment, -- 自增主键 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream5` ( id bigint not null primary key auto_increment, -- 自增主键 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) );
单击执行后,再单击直接执行。
创建云消息队列Kafka Topic和Group资源
参考创建资源创建以下Kafka资源:
Group:clickstream.consumer。
Topic:click_stream1、click_stream2、click_stream3、click_stream4和click_stream5。
创建Topic时,分区数建议设置为1,否则在某些场景下可能导致示例数据无法匹配到结果。
步骤二:MySQL实时同步Kafka
将用户点击流事件从MySQL同步到Kafka中,可以有效降低多个任务对MySQL数据库造成的压力。
创建MySQL Catalog,详情请参见创建MySQL Catalog。
本示例Catalog命名为
mysql-catalog
,默认数据库为ecommerce
。创建kafak Catalog,详情请参见管理Kafka JSON Catalog。
本示例Catalog命名为
kafka-catalog
。在
页面,新建SQL流作业,并将如下代码拷贝到SQL编辑器。CREATE TEMPORARY TABLE `clickstream1` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定义主键 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定义Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream1', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream2` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定义主键 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定义Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream2', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream3` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定义主键 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定义Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream3', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream4` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定义主键 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定义Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream4', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream5` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定义主键 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定义Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream5', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); BEGIN STATEMENT SET; INSERT INTO `clickstream1` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream1`; INSERT INTO `clickstream2` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream2`; INSERT INTO `clickstream3` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream3`; INSERT INTO `clickstream4` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream4`; INSERT INTO `clickstream5` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream5`; END; --写入多个Sink时,必填。
单击右上方的部署,进行作业部署。
单击左侧导航栏的
,单击目标作业操作列的启动,选择无状态启动后单击启动。
步骤三:开发、部署与启动CEP作业
本文部署了cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar作业,该作业从Kafka中消费用户点击流事件,经过处理生成预警信息打印到实时计算开发控制台。您可以根据实际业务架构调整代码,选择合适的下游连接器以适配不同的数据输出场景。更多支持的连接器详情请参见支持的连接器。
1、代码开发
本步骤仅为您展示核心代码及其功能说明。
2、部署作业
在
页面,单击 ,分别部署5个流作业。参数配置说明:
参数 | 说明 | 示例 |
部署模式 | 流处理 | 流模式 |
部署名称 | 填写对应的JAR作业名称。 |
|
引擎版本 | 当前作业使用的Flink引擎版本。 本文代码SDK使用JDK11,需要选择带有 | vvr-8.0.11-jdk11-flink-1.17 |
JAR URI | 手动单击右侧 | oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar |
Entry Point Class | 程序的入口类。 | com.alibaba.ververica.cep.demo.CepDemo |
Entry Point Main Arguments | 您可以在此处传入参数,在主方法中调用该参数。 本文需配置如下参数:
|
|
部署详情请参见部署JAR作业。
3、启动作业
在作业运维页面,单击目标作业操作列的启动,选择无状态启动后单击启动。依次启动名称为EcommerceCEPRunner1、EcommerceCEPRunner2、EcommerceCEPRunner3、EcommerceCEPRunner4和EcommerceCEPRunner5共5个场景的作业。
启动配置的具体详情,请参见作业启动。