基于FlinkCEP的电商营销实时预警系统

更新时间:2025-03-25 09:41:05

FlinkCEP(Complex Event Processing)用于处理复杂事件流,能够实时检测特定事件模式并触发预警。在电商营销中,FlinkCEP可用于实时监控用户行为、交易数据等,识别异常或关键事件,及时发出预警。

背景信息

随着电商行业的快速发展,用户行为数据和交易数据的规模呈指数级增长。电商平台需要实时监控用户行为、交易状态以及营销活动的效果,以便及时发现问题并采取应对措施。例如:

  • 异常行为检测:如短时间内大量加入购物车但未支付,可能是恶意刷单或系统故障。

  • 营销活动监控:如促销活动期间流量激增,可能导致系统负载过高或库存不足。

  • 用户流失预警:如用户频繁浏览商品但未下单,可能是价格或体验问题。

传统的批处理方式无法满足实时性需求,而流处理技术能够实时处理和分析数据流,帮助电商平台快速响应。

方案架构

FlinkCEP(Complex Event Processing)是Apache Flink中用于处理复杂事件模式的库。其方案架构可以概括如下:

2

  1. Event Stream 事件流是CEP处理的输入源,通常是一个连续的数据流,包含一系列按时间顺序排列的事件。每个事件可以包含多个属性,用于后续的模式匹配。

  2. Pattern and Rule Definitions

    用户定义事件模式(Pattern)和规则(Rule),这些模式描述了用户感兴趣的事件序列或组合。模式可以包括事件的顺序、时间约束、条件过滤等。例如,定义“A事件后跟随B事件,且两者时间间隔不超过10秒”的模式。

  3. CEP Engine Analysis CEP引擎接收事件流,并根据定义的模式和规则进行分析。引擎会持续监控事件流,尝试将输入事件与定义的模式进行匹配。匹配过程中,引擎会考虑事件的时间顺序、属性条件以及时间窗口等约束。

  4. CEP Matching Outputs 当事件流中的事件序列与定义的模式匹配成功时,CEP引擎会生成匹配结果(Output)。这些结果可以是匹配到的事件序列、触发规则的动作,或者其他用户定义的输出形式。匹配结果可以用于后续的处理,如告警、决策或存储。

FlinkCEP通过定义复杂事件模式,实时监控事件流,并在事件流中识别出符合模式的事件序列,最终输出匹配结果。这种机制适用于实时监控、异常检测、风控等场景。

实践场景

在电子商务领域,用户行为数据是优化运营策略、提升业务增长的重要依据。通过对用户浏览、加购、下单等行为的实时分析,平台可以识别潜在机会与风险,并采取针对性的干预措施。以下五个典型场景分别展现了如何利用数据驱动策略实现精准运营,最终提升用户转化率、客单价和留存率,为平台创造更大价值。

交叉销售与追加销售机会

用户在浏览商品时,可能会对多个相关品类表现出兴趣,例如先浏览手机后又查看耳机或充电宝。这种跨品类的浏览行为是交叉销售和追加销售的潜在机会。通过精准推荐互补商品(如手机壳、耳机)或提供组合优惠(如“手机+耳机套餐立减优惠”),平台能够提升用户购买附加商品的概率,增加客单价,同时优化用户体验,增强用户粘性,最终实现业务增长。

高价值购物车挽回

用户将高价值商品加入购物车后,可能因价格敏感、决策犹豫或其他原因未完成购买,这种购物车放弃行为会导致电商平台面临潜在的销售损失。通过实时识别此类行为并触发及时干预(如发送限时折扣、库存预警或免运费优惠),平台能够有效减少高价值商品的购物车放弃率,在提升订单转化率的同时,挽回潜在订单损失,最终实现用户价值与平台收益的双向增长。

高意向用户识别

用户在短时间内多次浏览同一商品,表明其对该商品有强烈的购买意向,例如反复查看某款鞋子或电子设备。通过识别此类行为并触发个性化营销动作(如发送专属优惠券或库存提醒),平台能够加速高意向用户的购买决策,提高转化率,同时提升用户满意度和购买体验,进一步推动销售增长。

价格敏感用户运营

部分用户对价格非常敏感,他们可能会反复浏览某商品,但仅在降价时将其加入购物车。通过分析此类行为特征,平台可以在价格变动时主动推送通知或提供定向优惠(如“您关注的商品已降价!”),从而提升价格敏感用户的购买转化率,并通过个性化定价和促销策略优化用户运营效率。

流失风险预警

用户在平台上频繁浏览商品但长期未购买,可能存在流失风险,例如一周内多次查看不同商品却始终未下单。通过识别此类行为并触发挽回策略(如发送专属优惠券或推荐热门商品),平台能够降低用户流失率,延长用户生命周期,同时通过主动干预提升用户留存和平台收益。

FlinkCEP代码开发

说明

交叉销售与追加销售机会

模式定义

检测用户在同一会话中5分钟内浏览不同类别商品的行为(如 first_view -> second_view)。

Pattern<ClickEvent, ?> crossSellPattern = Pattern.<ClickEvent>begin("first_view")
         // 模式开始于一个事件类型为"view"的ClickEvent
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // 接下来是一个事件类型同样为"view"的ClickEvent
        .next("second_view")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // 整个模式必须在5分钟内完成
        .within(Time.minutes(5));

匹配结果处理

实时匹配成功后,发送交叉销售与追加销售机会预警。系统可根据预警消息自动推荐互补商品(如手机壳、耳机或充电宝),或提供组合优惠(如“手机+耳机套餐立减优惠”)。

SingleOutputStreamOperator<Alert> crossSellAlerts = crossSellPatternStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent firstView = pattern.get("first_view").get(0);
        ClickEvent secondView = pattern.get("second_view").get(0);

        // 检查类别是否不同
        if (!firstView.getCategoryCode().equals(secondView.getCategoryCode())) {
            var message = "Cross-sell opportunity detected for user " + firstView.getUserId() +
                    ": viewed products in categories " + firstView.getCategoryCode() + " and " + secondView.getCategoryCode();
            // 返回交叉销售与追加销售机会预警
            return new Alert(secondView.getUserSession(), secondView.getUserId(), AlertType.CROSS_UPSELL, message);
        }
        return null; // 如果类别相同则返回null,有效地将其过滤掉
    }
}).filter(Objects::nonNull).name("CrossSellAlertsPattern").uid("CrossSellAlertsPattern");

高价值购物车挽回

模式定义

检测用户将高价值商品加入购物车(cart_add)后,10分钟内未发生购买事件(purchase)。

Pattern<ClickEvent, ?> abandonmentPattern = Pattern.<ClickEvent>begin("cart_add")
        // 模式开始于一个事件类型为"cart"且价格大于200的ClickEvent
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart") && event.getPrice() > 200;
            }
        })
        // 接下来是一个事件类型为"purchase"的ClickEvent
        .followedBy("purchase")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        })
        // 整个模式必须在10分钟内完成
        .within(Time.minutes(10));

匹配结果处理

若超时未购买,发送高价值购物车挽回预警。系统可触发挽回策略,如发送提醒邮件、短信或推送通知,提供限时折扣、免运费或库存预警(如“库存紧张,赶快下单!”)。

SingleOutputStreamOperator<Alert> abandonedmentAlert = abandonmentPatternStream
        .select(new PatternTimeoutFunction<ClickEvent, Alert>() {
            @Override
            // 处理超时事件
            public Alert timeout(Map<String, List<ClickEvent>> pattern, long timeoutTimestamp) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);

                var message = "High-value cart abandonment detected for user '" + cartAdd.getUserId() + "' priced at " + cartAdd.getPrice() +
                        ". No purchase within 10 minutes.";
                // 返回高价值购物车挽回预警
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.CART_ABANDONMENT, message);
            }
        }, new PatternSelectFunction<ClickEvent, Alert>() {
            @Override
             // 处理匹配的模式
            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);
                ClickEvent purchase = pattern.get("purchase").get(0);

                var message = "Purchase completed for user " + purchase.getUserId() + " on product " + purchase.getProductId() +
                        " priced at " + purchase.getPrice();
                // 返回购买完成预警
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.PURCHASE_COMPLETION, message);
            }
        }).map(new MapFunction<Either<Alert, Alert>, Alert>() {
            @Override
            public Alert map(Either<Alert, Alert> alert) throws Exception {
                if (alert.isLeft()) {
                    // 如果是超时预警,返回左值
                    return alert.left();
                } else {
                    // 如果是匹配预警,返回右值
                    return alert.right();
                }
            }
        }).name("AbandonmentAlertsPattern").uid("AbandonmentAlertsPattern");

高意向用户识别

模式定义

检测用户15分钟内多次浏览同一商品的行为(如 initial_view -> repeat_view)。

Pattern<ClickEvent, ?> purchaseIntentPattern = Pattern.<ClickEvent>begin("initial_view")
        // 模式开始于一个事件类型为"view"的ClickEvent
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // 接下来是一个事件类型同样为"view"的ClickEvent,并且这个事件重复3次
        .followedBy("repeat_view")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).times(3)
        // 整个模式必须在15分钟内完成
        .within(Time.minutes(15));

匹配结果处理

匹配成功后,发送高意向用户识别预警。系统可标记为高意向用户,并触发个性化营销动作,如发送专属优惠券、限时折扣或库存提醒(如“仅剩3件,赶快下单!”)。

SingleOutputStreamOperator<Alert> purchaseIntentAlerts = purchaseIntentStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent initialView = pattern.get("initial_view").get(0);
        var message = "High purchase intent detected for user " + initialView.getUserId() +
                " on product " + initialView.getProductId();
        // 返回高意向用户识别预警
        return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
    }
}).name("PurchaseIntentAlertsPattern").uid("PurchaseIntentAlertsPattern");

价格敏感用户运营

模式定义

检测用户浏览商品后,仅在价格下降时加入购物车的行为(如 initial_view -> view_price_drop -> cart_after_price_drop)。

Pattern<ClickEvent, ?> priceSensitivityPattern = Pattern.<ClickEvent>begin("initial_view")
        // 模式开始于一个事件类型为"view"的ClickEvent
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // 接下来是一个事件类型同样为"view"的ClickEvent,并且该产品的价格低于初始产品的价格
        .next("view_price_drop")
        .where(new IterativeCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
                ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
                return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
            }
        })
        // 接下来是一个事件类型为"cart"的ClickEvent
        .next("cart_after_price_drop")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart");
            }
        })
        // 整个模式必须在10分钟内完成
        .within(Time.minutes(10));

匹配结果处理

匹配成功后,发送价格敏感用户运营预警。系统可记录用户的价格敏感特征,并在未来价格变动时主动推送通知或提供定向优惠(如“您关注的商品已降价!”)。

SingleOutputStreamOperator<Alert> priceSensitivityAlerts = priceSensitivityStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent initialView = pattern.get("initial_view").get(0);
        var message = "Price-sensitive customer detected for user " + initialView.getUserId() +
                " on product " + initialView.getProductId() + " after a price drop.";
        // 返回价格敏感用户运营预警
        return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
    }
}).name("PriceSensitivityAlertsPattern").uid("PriceSensitivityAlertsPattern");

流失风险预警

模式定义

检测用户一周内多次浏览商品但未下单的行为(如 first_view -> ... -> no_purchase)。

Pattern<ClickEvent, ?> churnPredictionPattern = Pattern.<ClickEvent>begin("first_view")
        // 模式开始于一个事件类型为"view"的ClickEvent
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
         // 用户必须至少查看10次
        .timesOrMore(10)  
        // 在接下来的时间窗口内不能有"purchase"事件
        .notNext("purchase")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        })
        // 整个模式必须在7天内完成
        .within(Time.days(7));

匹配结果处理

匹配成功后,发送流失风险预警。系统可标记为潜在流失用户,并触发挽回策略,如发送专属优惠券、推荐热门商品或提供免费试用。

SingleOutputStreamOperator<Alert> churnPredictionAlerts = churnPredictionStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent firstView = pattern.get("first_view").get(0);
        var message = "Churn risk detected for user " + firstView.getUserId() +
                ": viewed multiple products over the week without making a purchase.";
        // 返回流失风险预警
        return new Alert(firstView.getUserSession(), firstView.getUserId(), AlertType.CHURN_RISK, message);
    }
}).name("ChurnPredictionAlertsPattern").uid("ChurnPredictionAlertsPattern");

前提条件

步骤一:准备工作

创建RDS MySQL实例并准备数据源

  1. 创建RDS MySQL数据库,详情请参见创建数据库

    为目标实例创建名称为ecommerce的数据库。

  2. 准备MySQL CDC数据源。

    1. 在目标实例详情页面,单击上方的登录数据库

    2. 在弹出的DMS页面中,填写创建的数据库账号名和密码,然后单击登录

    3. 登录成功后,在左侧双击ecommerce数据库,切换数据库。

    4. SQL Console区域编写如下建表DDL以及插入的数据语句。

      CREATE TABLE `click_stream` (
        id bigint not null primary key auto_increment,  -- 自增主键
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );

      插入表数据

      -- 交叉销售与追加销售机会场景测试数据
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
      ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
      ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
      ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
      
      -- 高价值购物车挽回场景测试数据
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
      ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
      ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
      
      -- 高意向用户识别场景测试数据
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
      ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
      ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
      ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
      
      -- 价格敏感用户运营场景测试数据
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
      ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
      ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
      
      -- 流失风险场景测试数据 
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. 单击执行后,再单击直接执行

创建云消息队列Kafka TopicGroup资源

创建以下Kafka资源:

  • Topic:clickstreamalerts。

  • Group:clickstream.consumer。

具体操作步骤请参考创建资源

步骤二:MySQL实时同步Kafka

将用户点击流事件从MySQL同步到Kafka中,可以有效降低多个任务对MySQL数据库造成的压力。

  1. 创建MySQL Catalog,详情请参见创建MySQL Catalog

    本示例Catalog命名为mysql-catalog,默认数据库为ecommerce

  2. 数据开发 > ETL页面,新建SQL流作业,并将如下代码拷贝到SQL编辑器。

    CREATE TEMPORARY TABLE `clickstream` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定义主键
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定义Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'clickstream',
      'properties.bootstrap.servers' = '<您的Kafka集群接入点地址>',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    INSERT INTO `clickstream`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,    -- 转成毫秒单位
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream`;
  3. 单击右上方的部署,进行作业部署。

  4. 单击左侧导航栏的运维中心 > 作业运维,单击目标作业操作列的启动,选择无状态启动后单击启动

步骤三:部署作业并启动

本文部署ecommerce-cep-demos-0.1.0.jar作业,从Kafka中消费用户点击流事件,产生预警再写回Kafka Topic中。您可以根据实际业务架构调整代码,选择合适的下游连接器,详情请参考支持的连接器

  1. 部署电商营销预警作业。

    运维中心 > 作业运维页面,单击部署作业 > JAR作业

    image

    参数配置说明:

    参数

    说明

    示例

    参数

    说明

    示例

    部署模式

    流处理

    流模式

    部署名称

    -

    EcommerceCEPRunner

    引擎版本

    本文代码SDK使用JDK11,需要选择带有jdk11的版本,VVR推荐使用最新版本。

    vvr-8.0.11-jdk11-flink-1.17

    JAR URI

    手动单击右侧上传图标上传,选择ecommerce-cep-demos-0.1.0.jar。

    oss://xxx/artifacts/namespaces/xxx/ecommerce-cep-demos-0.1.0.jar

    Entry Point Class

    程序的入口类。

    com.ververica.cep.EcommerceCEPRunner

    Entry Point Main Arguments

    您可以在此处传入参数,在主方法中调用该参数。

    本文需配置如下参数:

    • bootstrap.servers:Kafka集群地址。

    • clickstream_topic:消费的点击流Kafka Topic。

    • group:消费者组ID。

    • alerts_topic:生产的预警信息Kafka Topic。

    --bootstrap.servers <您的Kafka集群接入点地址>

    --clickstream_topic clickstream

    --group clickstream.consumer

    --alerts_topic alerts

    部署详情请参见部署JAR作业

  2. 启动作业。

    单击目标作业操作列的启动,选择无状态启动后单击启动

    启动配置详情请参见作业启动

步骤四:预警查询

预警消息已成功发送至云消息队列Kafka版,通过分析alerts Topic中的数据,您可以获取用户行为信息。此外,您还可以通过实时计算分析处理预警信息,以实现全链路的实时自动化。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击消息查询

  5. 消息查询页面的查询方式列表中,选择按位点查询

  6. Topic列表中,选择消息所属的alerts Topic名称。在分区列表中,选择消息所属的分区,并在起始位点文本框中输入该分区的位点,然后单击查询

    image

相关文档

  • 本页导读 (1)
  • 背景信息
  • 方案架构
  • 实践场景
  • 交叉销售与追加销售机会
  • 高价值购物车挽回
  • 高意向用户识别
  • 价格敏感用户运营
  • 流失风险预警
  • FlinkCEP代码开发
  • 交叉销售与追加销售机会
  • 高价值购物车挽回
  • 高意向用户识别
  • 价格敏感用户运营
  • 流失风险预警
  • 前提条件
  • 步骤一:准备工作
  • 创建RDS MySQL实例并准备数据源
  • 创建云消息队列Kafka Topic和Group资源
  • 步骤二:MySQL实时同步Kafka
  • 步骤三:部署作业并启动
  • 步骤四:预警查询
  • 相关文档