实时计算Flink版支持通过DataStream作业的方式运行支持规则动态更新的Flink CEP作业。本文结合实时营销中的反作弊场景,为您介绍如何基于Flink全托管快速构建一个动态加载最新规则来处理上游Kafka数据的Flink CEP作业。
背景信息
在电商平台投放广告时,广告主通常有预算限制。例如对于按点击次数计算费用的广告而言,如果有黑灰产构造虚假流量,攻击广告主,则会很快消耗掉正常广告主的预算,使得广告内容被提前下架。在这种情况下,广告主的利益受到了损害,容易导致后续的投诉与纠纷。
为了应对上述作弊场景,我们需要快速辨识出恶意流量,采取针对性措施(例如限制恶意用户、向广告主发送告警等)来保护用户权益。同时考虑到可能有意外因素(例如达人推荐、热点事件引流)导致流量骤变,我们也需要动态调整用于识别恶意流量的规则,避免损害正常用户的利益。

实际演示中,我们会先启动Flink CEP作业,然后插入规则1:连续3条action为0的事件发生后,下一条事件的action仍非1,其业务含义为连续3次访问该产品后最后没有购买。在匹配到相应事件并进行处理后,我们会动态更新规则1内容为连续5条action为0或2的事件发生后,下一条事件的action仍非1,来应对流量整体增加的场景,同时插入一条规则2,它将和规则1的初始规则一样,用于辅助展示多规则支持等功能。当然,您也可以添加一个全新规则。
前提条件
- 已准备阿里云账号及账户余额。
- 账号注册操作步骤,请参见账号注册。
- 阿里云账户余额不少于100.00元人民币或等值的代金券或优惠券。
- 已创建Flink全托管工作空间并完成角色授权,详情请参见开通流程和阿里云账号角色授权。
- 上下游存储
- 已创建RDS MySQL实例,详情请参见创建RDS MySQL实例。
- 已创建消息队列Kafka实例,详情请参见概述。
- 仅实时计算引擎VVR 6.0.2及以上版本支持动态CEP功能。
操作流程
步骤一:准备测试数据
准备上游Kafka Topic
- 登录消息队列Kafka版控制台。
- 创建一个名称为demo_topic的Topic,存放模拟的用户行为日志。操作详情请参见步骤一:创建Topic。
准备RDS数据库
在DMS数据管理控制台上,准备RDS MySQL的测试数据。
- 使用高权限账号登录RDS MySQL。详情请参见通过DMS登录RDS MySQL。
- 创建rds_demo规则表,用来记录Flink CEP作业中需要应用的规则。在已登录的SQLConsole窗口,输入如下命令后,单击执行。
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) );
每行代表一条规则,包含id、version等用于区分不同规则与每个规则不同版本的字段、描述CEP API中的模式对象的pattern字段,以及描述如何处理匹配模式的事件序列的function字段。
步骤二:配置IP白名单
为了让Flink能访问RDS MySQL实例,您需要将Flink全托管工作空间的网段添加到在RDS MySQL的白名单中。
- 获取Flink全托管工作空间的VPC网段。
- 在RDS MySQL的IP白名单中,添加Flink全托管网段信息。操作步骤详情请参见设置IP白名单。
步骤三:开发并启动Flink CEP作业
- 配置Maven项目中的pom.xml文件所使用的仓库。pom.xml文件的配置详情,请参见Kafka DataStream Connector。
- 在作业的Maven POM文件中添加flink-cep作为项目依赖。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.15-vvr-6.0.2-api</version> <scope>provided</scope> </dependency>
- 开发作业代码。
- 完成作业代码开发,进行打包。为了让您可以快速测试使用,实时计算Flink版提供了打包好的JAR包。
- 创建JAR作业,将作业JAR包提交至集群运行。详情请参见部署JAR作业。其中需要配置的参数填写说明如下表所示。说明 由于目前我们上游的Kafka Source暂无数据,并且数据库中的规则表为空。因此作业运行起来之后,暂时会没有输出。
配置项 说明 JAR URL 上传打包好的JAR包,或者直接上传我们提供的测试JAR包。 Entry Point Class 填写为 com.alibaba.ververica.cep.demo.CepDemo
Entry Point Main Arguments 如果您是自己开发的作业,已经配置了相关上下游存储的信息,则此处可以不填写。但是,如果您是使用的我们提供的测试JAR包,则需要配置该参数。代码信息如下。 --kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000
其中涉及的参数及含义如下:- kafkaBrokers:Kafka Broker地址。
- inputTopic:Kafka Topic名称。
- inputTopicGroup:Kafka消费组。
- jdbcUrl:数据库JDBC连接地址。说明 本示例所使用的JDBC URL中对应的账号和密码需要为普通账号和密码,且密码里仅支持英文字母和数字。在实际场景中,您可根据您的需求在作业中使用不同的鉴权方式。
- tableName:目标表名称。
- jdbcIntervalMs:轮询数据库的时间间隔。
说明- 需要将以上参数的取值修改为您实际业务上下游存储的信息。
- 参数信息长度不要大于1024,且不建议用来传复杂参数,复杂参数指包括了换行、空格或者其他特殊字符的参数(仅支持英文字母和数字)。如果您需要传入复杂参数,请使用附加依赖文件来传输。
- 在页面右侧高级配置页签的引擎版本中,添选择引擎版本为vvr-6.0.2-flink-1.15。
- 在页面右侧高级配置页签的更多Flink配置中,添加如下代码。
kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-first
- 在作业运维页面,启动Flink CEP作业。详情请参见作业启动。
步骤四:插入规则
启动Flink CEP作业,然后插入规则1:连续3条action为0的事件发生后,下一条事件的action仍非1,其业务含义为连续3次访问该产品后最后没有购买。
- 使用普通账号登录RDS MySQL。详情请参见通过DMS登录RDS MySQL。
- 插入动态更新规则。将JSON字符串与id、version、function类名等拼接后插入到RDS中。
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;
为了方便您使用并提高数据库中的Pattern字段的可读性,实时计算Flink版定义了一套JSON格式的规则描述,详情请参见动态CEP中规则的JSON格式定义。上述SQL语句中的pattern字段的值就是按照JSON格式的规则,给出的序列化后的pattern字符串。它的物理意义是去匹配这样的模式:连续3条action为0的事件发生后,下一条事件的action仍非1。说明 在下文的EndCondition对应的代码中,定义了action仍非1。- 对应的CEP API描述如下。
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());
- 对应的JSON字符串如下。
{ "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": null, "nodes": [ { "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": { "className": "com.alibaba.ververica.cep.demo.condition.EndCondition", "type": "CLASS" }, "type": "ATOMIC" }, { "name": "start", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "LOOPING" ], "times": { "from": 3, "to": 3, "windowTime": null }, "untilCondition": null }, "condition": { "expression": "action == 0", "type": "AVIATOR" }, "type": "ATOMIC" } ], "edges": [ { "source": "start", "target": "end", "type": "SKIP_TILL_NEXT" } ], "window": null, "afterMatchStrategy": { "type": "SKIP_PAST_LAST_EVENT", "patternName": null }, "type": "COMPOSITE", "version": 1 }
- 对应的CEP API描述如下。
- 通过Kafka Client向demo_topic中发送消息。在本Demo中,您也可以使用消息队列Kafka提供的快速体验消息收发页面发送测试消息。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
demo_topic字段说明如下表所示。字段 说明 id 用户ID。 username 用户名。 action 用户动作,取值如下: - 0代表浏览操作。
- 1代表购买动作。
- 2代表分享操作。
product_id 商品ID。 event_time 该行为发生的事件时间。 - 查看JobManager日志中打印的最新规则和TaskManager日志中打印的匹配。
- 在JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新规则。
- 在TaskManager中以.out结尾的日志文件中,通过A match for Pattern of (id, version): (1, 1)关键词搜索,查看日志中打印的匹配。
- 在JobManager日志中,通过JDBCPeriodicPatternProcessorDiscoverer关键词搜索,查看最新规则。
步骤五:更新匹配规则,并查看更新的规则是否生效
在匹配到相应事件并进行处理后,动态更新规则1内容为连续5条action为0或为2的事件发生后,下一条事件的action仍非1,来应对流量整体增加的场景,同时插入一条规则2,它将和规则1的初始规则一样,用于辅助展示多规则支持等功能。
- 使用在RDS控制台上,更新匹配规则。
- 在Kafka控制台上发送8条简单的消息,来触发匹配。8条简单的消息示例如下。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000
- 在TaskManager中以.out结尾的日志文件中,查看匹配结果。
- 如果要搜规则1在更新为版本2之后的匹配,可以通过A match for Pattern of (id, version): (1, 2)关键词,查匹配结果。
- 如果要搜规则2在版本为1的匹配,可以通过A match for Pattern of (id, version): (2, 1)关键词,查匹配结果。
如上图中蓝框内结果所示,Flink CEP作业按照id为1,version为2的规则匹配到1次5个action为0或2的事件+1个action非1的1个事件的事件序列后输出结果,代表动态修改的规则成功生效;而对于id为2,version为1的规则,如上图中橙色框内结果所示,Flink CEP作业匹配到2次3个action为0的事件+1个action非1的1个事件的事件序列后输出结果,代表动态新增的规则也在作业中被采用。
- 如果要搜规则1在更新为版本2之后的匹配,可以通过A match for Pattern of (id, version): (1, 2)关键词,查匹配结果。