基于动态FlinkCEP的电商营销实时预警系统

FlinkCEP(Complex Event Processing)用于动态处理复杂事件流,能够实时检测特定事件模式并触发预警。在电商营销中,FlinkCEP可用于实时监控用户行为、交易数据等,识别异常或关键事件,及时发出预警。

背景信息

随着电商行业的快速发展,用户行为数据和交易数据的规模呈指数级增长。传统的批处理方式已难以满足对异常行为、系统风险和用户流失的及时识别与响应。相比之下,利用动态复杂事件处理(CEP)引擎对多阶段用户行为进行建模分析,能够自动识别复杂的事件模式,并在风险发生的初期触发预警,这是动态CEP在实时业务中的核心优势所在。其具备以下三大关键特点:

  • 实时性强:实现毫秒级响应,支持“事中预警”,而非事后分析,助力快速决策。

  • 规则灵活可配置:支持动态更新规则策略,无需重启服务即可快速适应业务变化。

  • 复杂事件识别能力强:支持多事件序列、时间窗口、条件组合等高级逻辑匹配,精准捕捉复杂业务场景。

在电商行业,动态CEP的典型应用场景包括但不限于以下几个方面:

场景

说明

交叉销售与追加销售机会

用户在浏览商品时,常表现出跨品类兴趣,例如先看手机,再查看耳机或充电宝。这种行为蕴含交叉销售和追加销售机会。通过精准推荐互补商品(如手机壳、耳机)或提供组合优惠(如“手机+耳机套餐立减”),平台不仅能提升附加商品购买率、提高客单价,还能优化用户体验,增强用户粘性,从而推动业务增长。

高价值购物车挽回

用户将高价值商品加入购物车后,可能因价格敏感或决策犹豫未完成购买,造成潜在销售损失。通过实时识别购物车放弃行为并触发干预(如限时折扣、库存预警或免运费优惠),平台可有效减少高价值商品的流失,提升订单转化率,挽回潜在收益,实现用户价值与平台收益的双赢。

高意向用户识别

用户短时间内多次浏览同一商品,表明其购买意向较高。通过识别该行为并触发个性化营销(如专属优惠券或库存提醒),平台可加速用户决策,提高转化率,同时优化用户体验,推动销售增长。

价格敏感用户运营

价格敏感用户常反复浏览某商品,仅在降价时加入购物车。通过分析该行为,平台可在价格变动时推送通知或定向优惠(如“您关注的商品已降价!”),提升转化率,同时优化用户运营效率。

流失风险预警

用户频繁浏览商品却长期未下单,可能存在流失风险。通过识别此类行为并采取挽回措施(如发送专属优惠券或推荐热门商品),平台可有效降低流失率,延长用户生命周期,同时提升用户留存与平台收益。

方案架构

FlinkCEPApache Flink中用于处理复杂事件模式的库。FlinkCEP(Complex Event Processing)通过定义复杂事件模式,实时监控事件流,并在事件流中识别出符合模式的事件序列,最终输出匹配结果。其方案架构可以概括如下:

2

  1. Event Stream

    事件流是CEP处理的输入源,通常是一个连续的数据流,包含一系列按时间顺序排列的事件。每个事件可以包含多个属性,用于后续的模式匹配。

  2. Pattern and Rule Definitions

    用户定义事件模式(Pattern)和规则(Rule),这些模式描述了用户感兴趣的事件序列或组合。模式可以包括事件的顺序、时间约束、条件过滤等。例如,定义“A事件后跟随B事件,且两者时间间隔不超过10秒”的模式。

  3. CEP Engine Analysis

    CEP引擎接收事件流,并根据定义的模式和规则进行分析。引擎会持续监控事件流,尝试将输入事件与定义的模式进行匹配。匹配过程中,引擎会考虑事件的时间顺序、属性条件以及时间窗口等约束。

  4. CEP Matching Outputs

    当事件流中的事件序列与定义的模式匹配成功时,CEP引擎会生成匹配结果(Output)。这些结果可以是匹配到的事件序列、触发规则的动作,或者其他用户定义的输出形式。匹配结果可以用于后续的处理,如告警、决策或存储。

前提条件

步骤一:准备工作

创建RDS MySQL实例并准备数据源

  1. 创建RDS MySQL数据库,详情请参见创建数据库

    为目标实例创建名称为ecommerce的数据库。

  2. 准备MySQL CDC数据源。

    1. 在目标实例详情页面,单击上方的登录数据库

    2. 在弹出的DMS页面中,填写创建的数据库账号名和密码,然后单击登录

    3. 登录成功后,在左侧双击ecommerce数据库,切换数据库。

    4. SQL Console区域编写如下建表DDL以及插入的数据语句。

      -- 创建规则表1
      CREATE TABLE rds_demo1 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- 创建规则表2
      CREATE TABLE rds_demo2 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- 创建规则表3
      CREATE TABLE rds_demo3 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- 创建规则表4
      CREATE TABLE rds_demo4 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- 创建规则表5
      CREATE TABLE rds_demo5 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- 创建源表
      CREATE TABLE `click_stream1` (
        id bigint not null primary key auto_increment,  -- 自增主键
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream2` (
        id bigint not null primary key auto_increment,  -- 自增主键
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream3` (
        id bigint not null primary key auto_increment,  -- 自增主键
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream4` (
        id bigint not null primary key auto_increment,  -- 自增主键
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      
      CREATE TABLE `click_stream5` (
        id bigint not null primary key auto_increment,  -- 自增主键
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
    5. 单击执行后,再单击直接执行

创建云消息队列Kafka TopicGroup资源

参考创建资源创建以下Kafka资源:

  • Group:clickstream.consumer。

  • Topic:click_stream1、click_stream2、click_stream3、click_stream4click_stream5。

    创建Topic时,分区数建议设置为1,否则在某些场景下可能导致示例数据无法匹配到结果。

    image

步骤二:MySQL实时同步Kafka

将用户点击流事件从MySQL同步到Kafka中,可以有效降低多个任务对MySQL数据库造成的压力。

  1. 创建MySQL Catalog,详情请参见创建MySQL Catalog

    本示例Catalog命名为mysql-catalog,默认数据库为ecommerce

  2. 创建kafak Catalog,详情请参见管理Kafka JSON Catalog

    本示例Catalog命名为kafka-catalog

  3. 数据开发 > ETL页面,新建SQL流作业,并将如下代码拷贝到SQL编辑器。

    CREATE TEMPORARY TABLE `clickstream1` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定义主键
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定义Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream1',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream2` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定义主键
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定义Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream2',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream3` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定义主键
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定义Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream3',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream4` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定义主键
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定义Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream4',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream5` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定义主键
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定义Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream5',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    BEGIN STATEMENT SET; 
    INSERT INTO `clickstream1`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream1`;
    
    
    INSERT INTO `clickstream2`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream2`;
    
    
    INSERT INTO `clickstream3`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream3`;
    
    
    INSERT INTO `clickstream4`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream4`;
    
    
    INSERT INTO `clickstream5`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream5`;
    END;      --写入多个Sink时,必填。
  4. 单击右上方的部署,进行作业部署。

  5. 单击左侧导航栏的运维中心 > 作业运维,单击目标作业操作列的启动,选择无状态启动后单击启动

步骤三:开发、部署与启动CEP作业

本文部署了cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar作业,该作业从Kafka中消费用户点击流事件,经过处理生成预警信息打印到实时计算开发控制台。您可以根据实际业务架构调整代码,选择合适的下游连接器以适配不同的数据输出场景。更多支持的连接器详情请参见支持的连接器

1、代码开发

本步骤仅为您展示核心代码及其功能说明。

主类

public class CepDemo {

    public static void checkArg(String argName, MultipleParameterTool params) {
        if (!params.has(argName)) {
            throw new IllegalArgumentException(argName + " must be set!");
        }
    }

  //解析规则表
    private static Match_results parseOutput(String output) {
        String rule = "\\(id, version\\): \\((\\d+), (\\d+)\\).*?Event\\((\\d+), (\\w+), (\\d+), (\\d+)";
        Pattern pattern = Pattern.compile(rule);
        Matcher matcher = pattern.matcher(output);
        if (matcher.find()) {
            return new Match_results(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2)), Integer.parseInt(matcher.group(3)), matcher.group(4), Integer.parseInt(matcher.group(6)));
        }
        return null;
    }

    public static void main(String[] args) throws Exception {
        // Process args
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        //kafka的Brokers链接
        checkArg(KAFKA_BROKERS_ARG, params);
        //输入topic
        checkArg(INPUT_TOPIC_ARG, params);
        //group
        checkArg(INPUT_TOPIC_GROUP_ARG, params);

        //mysql的jdbcurl
        checkArg(JDBC_URL_ARG, params);
        //mysql的表名
        checkArg(TABLE_NAME_ARG, params);
        //轮询数据库的时间间隔
        checkArg(JDBC_INTERVAL_MILLIS_ARG, params);
        //是否使用事件时间处理(true/false)
        checkArg(USING_EVENT_TIME, params);

        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Build Kafka source with new Source API based on FLIP-27   Kafka Source设置
        KafkaSource<Event> kafkaSource =
                KafkaSource.<Event>builder()
                        .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                        .setTopics(params.get(INPUT_TOPIC_ARG))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                        .setDeserializer(new EventDeSerializationSchema())
                        .build();



        // DataStream Source   Watermark策略
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");


        //根据UserId对流进行分组,以便后续的模式匹配
        KeyedStream<Event, String> keyedStream =
                source.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                ).keyBy(new KeySelector<Event, String>() {
                    @Override
                    public String getKey(Event value) throws Exception {
                        return value.getUserId(); // 只用 UserId 作为 key
                    }
                });

        // Dynamic CEP patterns   动态CEP模式匹配,使用动态加载的模式处理器工厂类 JDBCPeriodicPatternProcessorDiscovererFactory 来获取模式,并执行模式匹配(读取mysql数据库中的规则表)
        SingleOutputStreamOperator<String> output =
                CEP.dynamicPatterns(
                        keyedStream,//源数据流
                        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                params.get(JDBC_URL_ARG),
                                JDBC_DRIVE,
                                params.get(TABLE_NAME_ARG),
                                null,
                                Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                        Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                        TypeInformation.of(new TypeHint<String>() {})
                );

        //输出到客户端
        output.print();
  
        env.execute("CEP Demo");
    }
}

场景一:检测用户在同一会话中5分钟内浏览不同类别商品的行为

数据顺序:模式开始于一个事件类型为view的事件,接下来是一个事件类型同样为view的事件。

public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

场景二:检测用户将高价值商品加入购物车后,10分钟内未发生购买事件

数据顺序:模式开始于一个事件类型为cart(加入购物车)且价格大于200的事件,接下来是一个事件类型为purchase(购买事件)。

public class CartAddCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart") && event.getPrice() > 200;
    }
}
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

场景三:检测用户15分钟内多次浏览同一商品的行为

数据顺序:模式开始于一个事件类型为view的事件,接下来是一个事件类型同样为view的事件,并且这个事件重复3次。

-- 条件类同测试1
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

场景四:检测用户浏览商品后,仅在价格下降时加入购物车的行为

数据顺序:模式开始于一个事件类型为view的事件,接下来是一个事件类型同样为viewEvent,并且该产品的价格低于初始产品的价格,最后是一个事件类型为cart的事件。

-- 条件类同测试1
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
public class InitialCondition extends IterativeCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
        ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
        return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
    }

}
public class CartCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart");
    }
}

场景五:检测用户一周内多次浏览商品但未下单的行为

数据顺序:模式开始于一个事件类型为view的事件,在接下来的时间窗口内不能有purchase事件。

-- 条件类同测试1
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
-- 条件类同测试2
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

2、部署作业

运维中心 > 作业运维页面,单击部署作业 > JAR作业,分别部署5个流作业。

image

参数配置说明:

参数

说明

示例

部署模式

流处理

流模式

部署名称

填写对应的JAR作业名称。

  • 场景一作业名称:EcommerceCEPRunner1

  • 场景二作业名称:EcommerceCEPRunner2

  • 场景三作业名称:EcommerceCEPRunner3

  • 场景四作业名称:EcommerceCEPRunner4

  • 场景五作业名称:EcommerceCEPRunner5

引擎版本

当前作业使用的Flink引擎版本。

本文代码SDK使用JDK11,需要选择带有jdk11的版本,VVR推荐使用最新版本。

vvr-8.0.11-jdk11-flink-1.17

JAR URI

手动单击右侧上传图标上传,选择cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

Entry Point Class

程序的入口类。

com.alibaba.ververica.cep.demo.CepDemo

Entry Point Main Arguments

您可以在此处传入参数,在主方法中调用该参数。

本文需配置如下参数:

  • bootstrap.servers:Kafka集群地址。

  • clickstream_topic:消费的点击流Kafka Topic。

  • group:消费者组ID。

  • jdbcUrl:MySQL服务地址。

  • database:数据库名称。

  • user:用户名。

  • password:用户密码。

  • tableName :MySQL中规则表名称。

  • 场景一配置信息:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream1 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo1 --jdbcIntervalMs 3000 --usingEventTime false

  • 场景二配置信息:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream2 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo2 --jdbcIntervalMs 3000 --usingEventTime false

  • 场景三配置信息:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream3 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo3 --jdbcIntervalMs 3000 --usingEventTime false

  • 场景四配置信息:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream4 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo4 --jdbcIntervalMs 3000 --usingEventTime false

  • 场景五配置信息:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream5 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo5 --jdbcIntervalMs 3000 --usingEventTime false

部署详情请参见部署JAR作业

3、启动作业

作业运维页面,单击目标作业操作列的启动,选择无状态启动后单击启动。依次启动名称为EcommerceCEPRunner1、EcommerceCEPRunner2、EcommerceCEPRunner3、EcommerceCEPRunner4EcommerceCEPRunner55个场景的作业。

启动配置的具体详情,请参见作业启动

步骤四:预警查询

场景一:检测用户在同一会话中5分钟内浏览不同类别商品的行为

  1. MySQL中插入规则数据。

    -- 交叉销售与追加销售机会场景测试数据
    INSERT INTO rds_demo1 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"second_view","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":5}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. MySQL中插入用户行为测试数据。

    -- 交叉销售与追加销售机会场景测试数据
    INSERT INTO `click_stream1` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
  3. 在实时计算开发控制台日志中查看数据结果。

    • JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新的规则。

      image

    • TaskManager中查看Stdout日志文件,通过A match for Pattern of (id, version): (1, 1)关键词搜索,查看日志中打印的匹配。

      image

场景二:检测用户将高价值商品加入购物车后,10分钟内未发生购买事件

  1. MySQL中插入规则数据。

    -- 高价值购物车挽回场景测试数据
    INSERT INTO rds_demo2 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"cart_add","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartAddCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"cart_add","target":"purchase","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":30}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. MySQL中插入用户行为测试数据。

    -- 高价值购物车挽回场景测试数据
    INSERT INTO `click_stream2` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
    ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
    ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
  3. 在实时计算开发控制台日志中查看数据结果。

    • JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新的规则。

      image

    • TaskManager中查看Stdout日志文件,通过A match for Pattern of (id, version): (1, 1)关键词搜索,查看日志中打印的匹配。

      image

场景三:检测用户15分钟内多次浏览同一商品的行为

  1. MySQL中插入规则数据。

    -- 高意向用户识别场景测试数据
    INSERT INTO rds_demo3 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":null,"nodes":[{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"initial_view","target":"repeat_view","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":15}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. MySQL中插入用户行为测试数据。

    -- 高意向用户识别场景测试数据
    INSERT INTO `click_stream3` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
  3. 在实时计算开发控制台日志中查看数据结果。

    • JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新的规则。

      image

    • TaskManager中查看Stdout日志文件,通过A match for Pattern of (id, version): (1, 1)关键词搜索,查看日志中打印的匹配。

      image

场景四:检测用户浏览商品后,仅在价格下降时加入购物车的行为

  1. MySQL中插入规则数据。

    -- 价格敏感用户运营场景测试数据
    INSERT INTO rds_demo4 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"view_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.InitialCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"view_price_drop","target":"cart_after_price_drop","type":"STRICT"},{"source":"initial_view","target":"view_price_drop","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":10}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. MySQL中插入用户行为测试数据。

    -- 价格敏感用户运营场景测试数据
    INSERT INTO `click_stream4` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
  3. 在实时计算开发控制台日志中查看数据结果。

    • JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新的规则。

      image

    • TaskManager中查看Stdout日志文件,通过A match for Pattern of (id, version): (1, 1)关键词搜索,查看日志中打印的匹配。

      image

场景五:检测用户一周内多次浏览商品但未下单的行为

  1. MySQL中插入规则数据。

    -- 流失风险场景测试数据 
    INSERT INTO rds_demo5 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":10,"to":10,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"purchase","type":"NOT_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"DAYS","size":7}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. MySQL中插入用户行为测试数据。

    -- 流失风险场景测试数据 
    INSERT INTO `click_stream5` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. 在实时计算开发控制台日志中查看数据结果。

    • JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新的规则。

      image

    • TaskManager中查看Stdout日志文件,通过A match for Pattern of (id, version): (1, 1)关键词搜索,查看日志中打印的匹配。

      image

相关文档