Confluent+实时计算Flink版最佳实践
本文提供使用流数据服务Confluent和实时计算Flink版搭建实时计算平台的最佳实践。
业务背景
在阿里云上开展实际业务往往需要进行实时数据分析,实现流式数据实时计算,Confluent+VVP是一个常用的处理方案。流数据服务Confluent是阿里云基于Apache Kafka提供的企业级全托管流数据服务,而实时计算Flink版是阿里云基于Apache Flink构建的企业级实时大数据计算商业产品,两者的相互结合可以帮助流式数据处理过程变得更加简洁高效。
准备工作
创建Confluent集群和实时计算Flink版集群
登录Confluent管理控制台,创建流数据服务Confluent集群,参见开通文档。
登录实时计算Flink版管理控制台,创建实时计算Flink版集群,参见开通文档。
Flink集群和Confluent集群在创建的时候建议选择同一VPC,这样可以在VVP内部通过Confluent的内部域名访问Confluent集群。
案例1:实时统计玩家充值金额
案例1 主要使用Confluent+VVP+Hologres架构来实时统计玩家在游戏APP中的充值金额,其中玩家充值记录被封装成消息,通过Kafka Produce客户端发送至Confluent对应Topic中,然后Flink实时计算任务通过Kafka Connector把Topic消息读取出来进行ETL处理或者进行汇总,最终输出到Hologres实时数仓中进行展示。
一、新建Confluent消息队列
在Confluent集群列表页,登录Control Center控制台。

在左侧选中Topics,点击Add a topic按钮,创建一个名为game_consume_log的topic,partition设置为3。


二、配置 Hologres 结果表
登录Hologres控制台,点击Hologres实例,在DB管理中创建数据库mydb


点击进入SQL编辑器,新建SQL查询,创建Hologres数据库

Hologres中创建结果表 SQL语句
--用户累计消费结果表
CREATE TABLE consume (
appkey VARCHAR,
serverid VARCHAR,
servertime VARCHAR,
roleid VARCHAR,
amount FLOAT,
dt VARCHAR,
primary key(appkey,dt)
);
三、创建实时计算VVP作业
登录Flink控制台,选择集群所在region,点击控制台,进入开发界面。

点击作业开发,点击新建文件,文件名称:confluent-vvp-hologres,文件类型选择:流作业/SQL

在输入框写入以下代码,创建相应数据表
-- 创建读取kafka消息source表
create TEMPORARY table kafka_game_consume_source(
appkey STRING,
servertime STRING,
consumenum DOUBLE,
roleid STRING,
serverid STRING
) with (
'connector' = 'kafka',
'topic' = 'game_consume_log',
'properties.bootstrap.servers' = 'rb-{{集群clusterId}}-internal.csp.aliyuncs.com:9095',
'properties.group.id' = 'gamegroup',
'format' = 'json',
'properties.security.protocol'='SASL_SSL',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用户]" password="xxx[相应的密码]";'
);
-- 创建累计消费hologres sink表
CREATE TEMPORARY TABLE consume(
appkey STRING,
serverid STRING,
servertime STRING,
roleid STRING,
amount DOUBLE,
dt STRING,
PRIMARY KEY (appkey,dt) NOT ENFORCED
)WITH (
'connector' = 'hologres',
'dbname' = 'mydb',
'endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80',
'password' = '[your appkey secret]',
'tablename' = 'consume',
'username' = '[your app key]',
'mutateType' = 'insertorreplace'
);
-- 计算每个用户累积消费金额
insert into consume
SELECT
appkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid,
sum(consumenum) as amount,
substring(servertime,1,10) as dt
FROM kafka_game_consume_source
GROUP BY appkey,substring(servertime,1,10)
having sum(consumenum) > 0;
点击上线按钮,完成上线

在运维作用列表里找到刚上线的作业,点击启动按钮,等待状态更新为Running,运行成功。

在control center的【Topics->Messages】页面,逐条发送测试消息,格式为:
{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}
四、查看用户充值金额实时统计效果
进入HoloWeb控制台,可以通过SQL查询看到用户充值金额实时统计信息

可以切换至柱状图,或者其它图形展示形式。

案例2:电商实时pv和uv统计
案例2 主要使用Confluent+VVP+RDS架构来统计电商平台中 pv(page views)和 uv(unique visitors),其中用户每一次访问记录都被封装成消息,通过Kafka Produce客户端发送至Confluent对应Topic中,然后Flink实时计算任务通过Kafka Connector把Topic消息读取出来按照分钟级别分组并汇总至Flink视图中,最终通过RDS Connector输出到RDS中进行展示。
一、新建Confluent消息队列
参照案例1,创建一个名为pv_uv的topic,将partition设置为3
二、创建云数据库RDS结果表
登录RDS 管理控制台页面,购买按量计费RDS实例,网络类型选择专有网络,同时选择与Flink全托管集群在相同地域(Region)和相同VPC内。

添加虚拟交换机IP段至RDS白名单,详情参考设置白名单文档


虚拟交换机IP段可在Flink控制台的工作空间详情中查询得到

在RDS实例管理页面创建数据库账号和库表结构
账号管理页面创建账号

数据库实例下数据库管理新建数据库confluent_vvp

使用系统自带的DMS服务登录RDS,登录名和密码输入上面创建的账户


双击confluent_vvp数据库,打开SQLConsole,将以下建表语句复制粘贴到 SQLConsole中,创建结果表
CREATE TABLE result_cps_total_summary_pvuv_min(
summary_date date NOT NULL COMMENT '统计日期',
summary_min varchar(255) COMMENT '统计分钟',
pv bigint COMMENT 'pv',
uv bigint COMMENT 'uv',
currenttime timestamp COMMENT '当前时间',
primary key(summary_date,summary_min)
)

三、创建实时计算VVP作业
在VVP控制台新建文件

在SQL区域输入以下代码,创建相应数据表
--创建kafka消息数据的source表
CREATE TABLE source_ods_fact_log_track_action (
account_id VARCHAR,
--用户ID
client_ip VARCHAR,
--客户端IP
client_info VARCHAR,
--设备机型信息
platform VARCHAR,
--系统版本信息
imei VARCHAR,
--设备标识
`version` VARCHAR,
--版本号
`action` VARCHAR,
--页面跳转描述
gpm VARCHAR,
--埋点链路
c_time VARCHAR,
--请求时间
target_type VARCHAR,
--目标类型
target_id VARCHAR,
--目标ID
udata VARCHAR,
--扩展信息,JSON格式
session_id VARCHAR,
--会话ID
product_id_chain VARCHAR,
--商品ID串
cart_product_id_chain VARCHAR,
--加购商品ID
tag VARCHAR,
--特殊标记
`position` VARCHAR,
--位置信息
network VARCHAR,
--网络使用情况
p_dt VARCHAR,
--时间分区天
p_platform VARCHAR
--系统版本信息
) WITH (
'connector' = 'kafka',
'topic' = 'pv_uv',
'properties.bootstrap.servers' = 'rb-{{集群clusterId}}-internal.csp.aliyuncs.com:9095',
'properties.group.id' = 'pv_uv_group',
'format' = 'json',
'properties.security.protocol'='SASL_SSL',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】" password="【your password】";'
);
-- 创建分钟统计pvuv的mysql sink表
CREATE TABLE result_cps_total_summary_pvuv_min (
summary_date date,
--统计日期
summary_min varchar,
--统计分钟
pv bigint,
--点击量
uv bigint,
--一天内同个访客多次访问仅计算一个UV
currenttime timestamp,
--当前时间
primary key (summary_date, summary_min) NOT ENFORCED
) WITH (
'connector' = 'rds',
'url' = 'jdbc:mysql://rm-{{rds实例id}}.mysql.rds.aliyuncs.com:3306/confluent_vvp',
'tableName' = 'result_cps_total_summary_pvuv_min',
'userName' = 'pv_uv_test',
'password' = '【your rds password】'
);
-- 创建按分钟分组pvuv的视图
CREATE TEMPORARY VIEW IF NOT EXISTS result_cps_total_summary_pvuv_min_01 AS
select
cast (p_dt as date) as summary_date --时间分区
, count (client_ip) as pv --客户端的IP
, count (distinct client_ip) as uv --客户端去重
, cast (max (c_time) as TIMESTAMP) as c_time --请求的时间
from
source_ods_fact_log_track_action
group
by p_dt;
-- 输出结果至mysql sink表
INSERT
into result_cps_total_summary_pvuv_min
select
a.summary_date,
--时间分区
cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min,
--取出小时分钟级别的时间
a.pv,
a.uv,
CURRENT_TIMESTAMP as currenttime --当前时间
from
result_cps_total_summary_pvuv_min_01 AS a;
点击上线之后,在作业运维页面点击启动按钮,直到状态更新为RUNNING状态。


在control center的【Topics->Messages】页面,逐条发送测试消息,格式为:
{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"}
{"account_id":"id2","client_ip":"2.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"}
{"account_id":"id3","client_ip":"2.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}

查看pv和uv效果
可以看出rds数据表的pv和uv会随着发送的消息数据,动态的变化,同时还可以通过数据可视化来查看相应的图表信息。

pv和uv图表展示:
