本文通过示例为您介绍如何基于StarRocks构建数仓场景-增量数据实时统计。
前提条件
已创建DataFlow或自定义集群,且集群中已包含Flink、Kafka服务,具体操作请参见创建集群。
已创建StarRocks集群,具体操作请参见创建StarRocks集群。
已创建RDS MySQL,具体操作请参见快速创建RDS MySQL实例。
说明本文示例中DataFlow集群为EMR-3.40.0版本、StarRocks集群为EMR-5.6.0版本,MySQL为5.7版本。
使用限制
DataFlow集群、StarRocks集群和RDS MySQL实例需要在同一个VPC下,并且在同一个可用区下。
DataFlow集群和StarRocks集群均须开启公网访问。
RDS MySQL为5.7及以上版本。
场景介绍
因为部分场景对数据延迟非常敏感,数据产生的时候必须完成加工,所以此时您可以通过增量数据实时统计的方式,提前使用Flink将明细层、汇总层等层数据进行汇聚,汇聚之后把结果集存储下来再对外提供服务。
方案架构
增量数据实时统计的基本架构如下图所示。
整体数据流如下:
直接使用Flink构建实时数仓,由Flink进行清洗加工转换和聚合汇总,将各层结果集写入Kafka中。
StarRocks从Kafka分别订阅各层数据,将各层数据持久化到StarRocks中,用于之后的查询分析。
方案特点
该方案主要特点如下:
增量计算的数据由Flink进行清洗加工转换和聚合汇总,各层应用数据通过Kafka分别持久化到StarRocks中。
Flink加工的结果集可以采取双写的方式,一方面继续投递给下一层消息流Topic,一方面Sink到同层的StarRocks中;也可以采用单写Kafka再通过StarRocks实时消费Kafka对应Topic上的数据,方便后续历史数据的状态检查与刷新。
StarRocks通过表的形式直接对接上层应用,实现应用实时查询。
方案优势
实时性强,能满足业务对实时性敏感的场景。
指标修正简单,与传统增量计算方式不一样的是,该方案将中间的状态也持久存储在StarRocks中,提升了后续分析的灵活性,当中间数据质量有问题时,直接对表修正,重刷数据即可。
方案缺点
大部分实时增量计算依赖于Flink,需要使用者有一定的Flink技能。
不适合数据频繁更新,无法进行累加计算的场景。
不适合多流Join等计算复杂资源开销大的场景。
适用场景
实时需求简单,数据量不大,以埋点数据统计为主的数据,实时性最强。
操作流程
示例操作如下:
步骤一:创建MySQL源数据表
创建测试的数据库和账号,具体操作请参见创建数据库和账号。
创建完数据库和账号后,需要授权测试账号的读写权限。
说明本文示例中创建的数据库名称为flink_cdc,账号为emr_test。
使用创建的测试账号连接MySQL实例,具体操作请参见通过DMS登录RDS MySQL。
执行以下命令,创建数据表orders和customers。
创建orders表。
CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY ( order_id ) );
创建customers表。
CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY ( customer_id ) );
步骤二:创建Kafka的Topic
使用SSH方式登录DataFlow集群,具体操作请参见登录集群。
执行以下命令, 进入Kafka的bin目录。
cd /opt/apps/FLINK/flink-current/bin
执行以下命令,创建对应的Topic。
kafka-topics.sh --create --topic ods_order --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic ods_customers --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic dwd_order_customer_valid --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic dws_agg_by_region --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092
步骤三:创建StarRocks表和导入任务
使用SSH方式登录StarRocks集群,具体操作请参见登录集群。
执行以下命令,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
执行以下命令,创建数据库。
CREATE DATABASE IF NOT EXISTS `flink_cdc`;
执行以下命令,创建数据表customers和orders。
创建customers表
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
创建orders表
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
执行以下命令,创建DWD表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
执行以下命令,创建DWS表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` ( `order_region` STRING NOT NULL COMMENT "", `order_cnt` INT NOT NULL COMMENT "", `order_total_revenue` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_region`) COMMENT "" DISTRIBUTED BY HASH(`order_region`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
执行以下命令,创建Routine Load导入任务,订阅Kafka数据源的数据。
CREATE ROUTINE LOAD flink_cdc.routine_load_orders ON orders COLUMNS (order_id, order_revenue, order_region, customer_id) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_order" ); CREATE ROUTINE LOAD flink_cdc.routine_load_customers ON customers COLUMNS (customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_customers" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dwd_order_customer_valid ON dwd_order_customer_valid COLUMNS (order_id, order_revenue, order_region, customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dwd_order_customer_valid" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dws_agg_by_region ON dws_agg_by_region COLUMNS (order_region, order_cnt, order_total_revenue) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_region\",\"$.order_cnt\",\"$.order_total_revenue\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dws_agg_by_region" );
步骤四:执行Flink任务,启动数据流
下载Flink CDC connector和Flink StarRocks Connector,并上传至DataFlow集群的/opt/apps/FLINK/flink-current/lib目录下。
拷贝DataFlow集群的/opt/apps/FLINK/flink-current/opt/connectors/kafka目录下的JAR包至/opt/apps/FLINK/flink-current/lib目录下。
使用SSH方式登录DataFlow集群,具体操作请参见登录集群。
执行以下命令,启动集群。
重要本文示例仅供测试,如果是生产级别的Flink作业请使用YARN或Kubernetes方式提交,详情请参见Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
编写Flink SQL作业,并保存为demo.sql。
执行以下命令,编辑demo.sql文件。
vim demo.sql
文件内容如下所示。
CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; --数据的订单源表。 CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'orders' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); -- create ods dwd and dws tables CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_order_table` ( `order_id` INT, `order_revenue` FLOAT, `order_region` VARCHAR(40), `customer_id` INT, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_order', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_customers_table` ( `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (customer_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_customers', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dwd_order_customer_valid` ( `order_id` INT, `order_revenue` FLOAT, `order_region` STRING, `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd_order_customer_valid', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dws_agg_by_region` ( `order_region` VARCHAR(40), `order_cnt` BIGINT, `order_total_revenue` FLOAT, PRIMARY KEY (order_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dws_agg_by_region', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); USE flink_cdc; BEGIN STATEMENT SET; INSERT INTO ods_order_table SELECT * FROM orders_src; INSERT INTO ods_customers_table SELECT * FROM customers_src; INSERT INTO dwd_order_customer_valid SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM customers_src c JOIN orders_src o ON c.customer_id=o.customer_id WHERE c.customer_id <> -1; INSERT INTO dws_agg_by_region SELECT order_region, count(*) as order_cnt, sum(order_revenue) as order_total_revenue FROM dwd_order_customer_valid GROUP BY order_region; END;
涉及参数如下所示:
创建数据表orders_src和customers_src。
参数
描述
connector
固定值为mysql-cdc。
hostname
RDS的内网地址。
您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。
port
固定值为3306。
username
步骤一:创建MySQL源数据表中创建的账号名。本示例为emr_test。
password
步骤一:创建MySQL源数据表中创建的账号的密码。本示例为Yz12****。
database-name
步骤一:创建MySQL源数据表中创建的数据库名。本示例为flink_cdc。
table-name
步骤一:创建MySQL源数据表中创建的数据表。
orders_src:本示例为orders。
customers_src:本示例为customers。
创建数据表ods_order_table、ods_customers_table、dwd_order_customer_valid和dws_agg_by_region。
参数
描述
connector
固定值为upsert-kafka。
topic
步骤二:创建Kafka的Topic中创建的Topic名称。
ods_order_table:本示例为ods_order。
ods_customers_table:本示例为ods_customers。
dwd_order_customer_valid:本示例为dwd_order_customer_valid。
dws_agg_by_region:本示例为dws_agg_by_region。
properties.bootstrap.servers
固定格式为
192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092
。
执行以下命令,启动Flink任务。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
步骤五:查看数据库和表信息
使用SSH方式登录StarRocks集群,具体操作请参见登录集群。
执行以下,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
执行以下命令,查询数据库信息。
执行以下命令,使用数据库。
use flink_cdc;
执行以下命令,查看表信息。
show tables;
返回信息如下所示。
+--------------------------+ | Tables_in_flink_cdc | +--------------------------+ | customers | | dwd_order_customer_valid | | dws_agg_by_region | | orders | +--------------------------+ 4 rows in set (0.01 sec)
步骤六:场景演示,查询插入后的数据
使用步骤一:创建MySQL源数据表中创建的测试账号连接MySQL实例,具体操作请参见通过DMS登录RDS MySQL。
在RDS数据库窗口执行以下命令,向表orders和customers中插入数据。
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
使用SSH方式登录StarRocks集群,具体操作请参见登录集群。
执行以下命令,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
执行以下命令,查询ODS层数据。
执行以下命令,使用数据库。
use flink_cdc;
执行以下命令,查看orders表信息。
select * from orders;
返回信息如下所示。
+----------+---------------+--------------+-------------+ | order_id | order_revenue | order_region | customer_id | +----------+---------------+--------------+-------------+ | 1 | 10 | beijing | 1 | | 2 | 10 | beijing | 1 | +----------+---------------+--------------+-------------+
执行以下命令,查看customers表信息。
select * from customers;
返回信息如下所示。
+-------------+--------------+---------------+ | customer_id | customer_age | customer_name | +-------------+--------------+---------------+ | 1 | 22 | emr_test | +-------------+--------------+---------------+
执行以下命令,查询DWD层数据。
执行以下命令,使用数据库。
use flink_cdc;
执行以下命令,查看orders表信息。
select * from dwd_order_customer_valid;
返回信息如下所示。
+----------+---------------+--------------+-------------+--------------+---------------+ | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +----------+---------------+--------------+-------------+--------------+---------------+ | 1 | 10 | beijing | 1 | 22 | emr_test | | 2 | 10 | beijing | 1 | 22 | emr_test | +----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.00 sec)
执行以下命令,查询DWS层数据。
执行以下命令,使用数据库。
use flink_cdc;
执行以下命令,查看orders表信息。
select * from dws_agg_by_region;
返回信息如下所示。
+--------------+-----------+---------------------+ | order_region | order_cnt | order_total_revenue | +--------------+-----------+---------------------+ | beijing | 2 | 20 | +--------------+-----------+---------------------+ 1 row in set (0.01 sec)