本文通过示例为您介绍如何基于EMR Serverless StarRocks的视图能力构建数仓场景-即席查询解决方案。

前提条件

  • 已创建DataFlow或自定义集群,具体操作请参见创建集群
  • 已创建EMR Serverless StarRocks实例,具体操作请参见创建实例
  • 已创建RDS MySQL,具体操作请参见创建RDS MySQL实例
    说明 本文示例中DataFlow集群为EMR-3.42.0版本、MySQL为5.7版本。

使用限制

  • DataFlow集群、EMR Serverless StarRocks实例和RDS MySQL实例需要在同一个VPC下,并且在同一个可用区下。
  • DataFlow集群和EMR Serverless StarRocks实例均须开启公网访问。
  • RDS MySQL为5.7及以上版本。

注意事项

本文档仅供测试使用,生产级别的Flink作业请使用阿里云实时计算Flink版产品进行配置,或者使用YARN或者Kubernetes提交作业。

详情请参见Apache Hadoop YARNNative Kubernetes

场景介绍

随着向量化、CBO(Cost Based Optimizer,基于代价的优化器)、单机多核调度等技术的应用,StarRocks的计算能力逐步提升。很多时候您在使用StarRocks进行数仓分层建模时,大部分将数据建模到DWD层(基础整合层)或DWS层(维度宽度)。在实际业务中,运用StarRocks的计算能力,可以直接查询DWD或DWS层数据,还可以灵活地交互式即席查询。

方案架构

使用StarRocks实现数仓场景即席查询的基本架构如下图所示。架构图
整体数据流如下:
  1. Flink清洗导入Kafka的日志或者通过Flink-CDC-StarRocks工具读取MySQL Binlog导入StarRocks。根据需要选用明细、聚合、更新或主键各种模型,只物理落地ODS层(格式整理层)。
  2. 向上采用StarRocks View视图能力,利用StarRocks向量化极速查询和CBO优化器满足多表关联、嵌套子查询等复杂SQL,查询时现场计算指标结果,保证指标上卷和下钻高度同源一致。

方案特点

该方案主要特点是,计算逻辑在StarRocks侧(现场查询),适用于业务库高频数据更新的场景,实体数据只在ODS或DWD层存储。

  • 方案优势
    • 灵活性强,可随时根据业务逻辑调整View。
    • 指标修正简单,上层都是View逻辑封装,只需要更新底表数据。
  • 方案缺点

    当View的逻辑较为复杂,数据量较多时,查询性能较低。

  • 适用场景
    • 数据来源于数据库和埋点系统,适合对QPS要求不高,对灵活性要求比较高,且计算资源较为充足的场景。
    • 实时要求非常高,要求写入即可查,更新即反馈。适合有即席查询需求,且资源较为充足,查询复杂度较低的场景。

操作流程

示例操作如下:

  1. 步骤一:创建MySQL源数据表
  2. 步骤二:创建StarRocks表
  3. 步骤三:执行Flink任务,启动数据流
  4. 步骤四:验证数据

步骤一:创建MySQL源数据表

  1. 创建测试的数据库和账号,具体操作请参见创建数据库和账号
    创建完数据库和账号后,需要授权测试账号的读写权限。
    说明 本文示例中创建的数据库名称为flink_cdc,账号为emr_test。
  2. 使用创建的测试账号连接MySQL实例,具体操作请参见通过DMS登录RDS MySQL
  3. 执行以下命令,创建数据库。
    CREATE DATABASE IF NOT EXISTS flink_cdc;
  4. 执行以下命令,创建数据表orders。
    create table flink_cdc.orders (
       order_id INT NOT NULL AUTO_INCREMENT,
       order_revenue FLOAT NOT NULL,
       order_region VARCHAR(40) NOT NULL,
       customer_id INT NOT NULL,
       PRIMARY KEY ( order_id )
    );
  5. 执行以下命令,创建数据表customers。
    create table flink_cdc.customers (
       customer_id INT NOT NULL,
       customer_age INT NOT NULL,
       customer_name VARCHAR(40) NOT NULL,
       PRIMARY KEY ( customer_id )
    );

步骤二:创建StarRocks表

  1. 连接EMR Serverless StarRocks实例,详情请参见连接StarRocks实例(客户端方式)
  2. 执行以下命令,创建数据库。
    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  3. 执行以下命令,创建数据表customers。
    CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
      `customer_id` INT NOT NULL  COMMENT "",
      `customer_age` FLOAT NOT NULL  COMMENT "",
      `customer_name` STRING NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`customer_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  4. 执行以下命令,创建数据表orders。
    CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
      `order_id` INT NOT NULL  COMMENT "",
      `order_revenue` FLOAT NOT NULL  COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `customer_id` INT NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  5. 执行以下命令,基于ODS表创建DWD视图。
    CREATE VIEW flink_cdc.dwd_order_customer_valid (
      order_id,
      order_revenue,
      order_region,
      customer_id,
      customer_age,
      customer_name
    )
    AS
    SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name
    FROM flink_cdc.customers c JOIN flink_cdc.orders o
    ON c.customer_id=o.customer_id
    WHERE c.customer_id != -1;
  6. 执行以下命令,基于DWD表创建DWS视图。
    CREATE VIEW flink_cdc.dws_agg_by_region (
      order_region,
      order_cnt,
      order_total_revenue)
    AS
    SELECT order_region, count(order_region), sum(order_revenue)
    FROM flink_cdc.dwd_order_customer_valid
    GROUP BY order_region;

步骤三:执行Flink任务,启动数据流

  1. 下载Flink CDC connectorFlink StarRocks Connector,并上传到DataFlow集群的/opt/apps/FLINK/flink-current/lib目录下。
  2. 使用SSH方式登录DataFlow集群,详情请参见登录集群
  3. 执行以下命令,启动集群。
    重要 本文示例仅供测试,如果是生产级别的Flink作业请使用YARN或Kubernetes方式提交,详情请参见Apache Hadoop YARNNative Kubernetes
    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  4. 添加端口配置。
    1. 执行以下命令,编辑文件flink-conf.yaml
      vim /etc/taihao-apps/flink-conf/flink-conf.yaml
    2. 添加以下内容至文件最后一行。
      rest.port: 8083
  5. 编写Flink SQL作业,并保存为demo.sql
    执行以下命令,编辑demo.sql文件。
    vim demo.sql

    文件内容如下所示。

    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'load-url' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'database-name' = 'flink_cdc',
      'jdbc-url' = 'jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'admin',
      'table-name' = 'customers',
      'sink.properties.strip_outer_array' = 'true',
      'password' = '1qaz!QAZ',
      'sink.max-retries' = '10',
      'connector' = 'starrocks'
    );
    INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`customers_src`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` (
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'database-name' = 'flink_cdc',
      'table-name' = 'orders',
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` (
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'sink.properties.strip_outer_array' = 'true',
      'password' = '1qaz!QAZ',
      'sink.max-retries' = '10',
      'connector' = 'starrocks',
      'table-name' = 'orders',
      'jdbc-url' = 'jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'admin',
      'load-url' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'database-name' = 'flink_cdc'
    );
    
    INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`orders_src`;
    涉及参数如下所示:
    • 创建数据表customers_src。
      参数描述
      connector固定值为mysql-cdc。
      hostnameRDS的内网地址。

      您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-2ze8398257383****.mysql.rds.aliyuncs.com。

      port固定值为3306。
      username步骤一:创建MySQL源数据表中创建的账号名。本示例为emr_test。
      password步骤一:创建MySQL源数据表中创建的账号的密码。本示例为Yz12****。
      database-name步骤一:创建MySQL源数据表中创建的数据库名。本示例为flink_cdc。
      table-name步骤一:创建MySQL源数据表中创建的数据表。本示例为customers。
    • 创建数据表customers_sink和orders_sink。
      参数描述
      load-url指定FE节点的内网地址和HTTP端口,格式为EMR Serverless StarRocks实例FE节点的内网地址:8030。例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。
      说明 关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情
      database-name步骤一:创建MySQL源数据表中创建的数据库名。本示例为flink_cdc。
      jdbc-url用于在StarRocks中执行查询操作。
      例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com为EMR Serverless StarRocks实例FE节点的内网地址。
      说明 关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情
      usernameStarRocks连接用户名。固定为admin。
      table-name本示例固定值为customers。
      connector固定值为starrocks。
  6. 执行以下命令,启动Flink任务。
     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

步骤四:验证数据

  1. 使用步骤一:创建MySQL源数据表中创建的测试账号连接MySQL实例,具体操作请参见通过DMS登录RDS MySQL
  2. 在RDS数据库窗口执行以下命令,向表orders和customers中插入数据。
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. 连接EMR Serverless StarRocks实例,详情请参见连接StarRocks实例(客户端方式)
  4. 执行以下命令,查询ODS层数据。
    1. 执行以下命令,使用数据库。
      use flink_cdc;
    2. 执行以下命令,查看orders表信息。
      select * from orders;
      返回信息如下所示。
      +----------+---------------+--------------+-------------+
      | order_id | order_revenue | order_region | customer_id |
      +----------+---------------+--------------+-------------+
      |        1 |            10 | beijing      |           1 |
      |        2 |            10 | beijing      |           1 |
      +----------+---------------+--------------+-------------+
    3. 执行以下命令,查看customers表信息。
      select * from customers;
      返回信息如下所示。
      +-------------+--------------+---------------+
      | customer_id | customer_age | customer_name |
      +-------------+--------------+---------------+
      |           1 |           22 | emr_test      |
      +-------------+--------------+---------------+
  5. 执行以下命令,查询DWD层数据。
    1. 执行以下命令,使用数据库。
      use flink_cdc;
    2. 执行以下命令,查看orders表信息。
      select * from dwd_order_customer_valid;
      返回信息如下所示。
      +----------+---------------+--------------+-------------+--------------+---------------+
      | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
      +----------+---------------+--------------+-------------+--------------+---------------+
      |        1 |            10 | beijing      |           1 |           22 | emr_test      |
      |        2 |            10 | beijing      |           1 |           22 | emr_test      |
      +----------+---------------+--------------+-------------+--------------+---------------+
      2 rows in set (0.00 sec)
  6. 执行以下命令,查询DWS层数据。
    1. 执行以下命令,使用数据库。
      use flink_cdc;
    2. 执行以下命令,查看orders表信息。
      select * from dws_agg_by_region;
      返回信息如下所示。
      +--------------+-----------+---------------------+
      | order_region | order_cnt | order_total_revenue |
      +--------------+-----------+---------------------+
      | beijing      |         2 |                  20 |
      +--------------+-----------+---------------------+
      1 row in set (0.01 sec)