方案实现

本文介绍实现大数据分析方案的详细配置操作,包括在RDS MySQL中创建源表、在表格存储中创建结果表、在实时计算Flink中创建作业并启动和在表格存储中分析数据。

前提条件

已了解方案背景信息和完成准备工作。更多信息,请参见方案背景准备工作

步骤一:在RDS MySQL中创建源表

  1. 通过DMS登录RDS MySQL。具体操作,请参见通过DMS登录RDS MySQL

  2. 创建数据表consume_record。

    consume_record数据表作为流计算任务源表,用于接入实时计算Flink。

    1. 编写创数据表的SQL语句。

      本方案中SQL示例如下:

      CREATE TABLE `consume_record` (
          `consume_id` varchar(20) NOT NULL,
          `product_id` varchar(20) NOT NULL,
          `consume_time` bigint(20) NOT NULL,
          `consume_name` varchar(20) NOT NULL,
          `consume_phone` varchar(20) NOT NULL,
          PRIMARY KEY (`consume_id`)
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8
      COMMENT='消费记录数据源表';
    2. 单击执行(F8)

    image

步骤二:在表格存储中创建维表和结果表

  1. 登录表格存储控制台

  2. 创建数据表consume_productproduct。具体操作,请参见创建数据表

    说明

    由于表格存储的数据表是schema free的,只需要定义主键,无需定义属性列。

    consume_product数据表作为流计算任务结果表,用于保存商品消费信息与商品元数据信息。表结构请参见下表。

    字段名称

    数据类型

    是否主键

    描述

    consume_id

    STRING

    消费ID(主键)。

    product_id

    STRING

    商品ID。

    price

    DOUBLE

    商品单价。

    consume_time

    BIGINT(20)

    消费时间。

    consume_name

    STRING

    消费者名称。

    consume_phone

    STRING

    消费者联系方式。

    product数据表是流计算任务维表,用于保存商品元数据信息。表结构请参见下表。

    字段名称

    数据类型

    是否主键

    描述

    product_id

    STRING

    商品ID。

    price

    DOUBLE

    商品单价。

    product_type

    STRING

    商品类别。

步骤三:在实时计算Flink中创建作业并启动

  1. 登录实时计算管理控制台

  2. 实时计算控制台页面的Flink全托管区域,单击目标工作空间操作列的控制台

  3. 创建作业gmv_post_aggregation。

    1. 在左侧导航栏,单击SQL开发

    2. 单击新建

    3. 新建作业草稿对话框,单击空白的流作业草稿

    4. 单击下一步

    5. 填写作业配置信息。

      作业参数

      示例

      说明

      文件名称

      flink-test

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      存储位置

      作业开发

      指定该作业的代码文件所属的文件夹。默认存放在作业开发目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

      引擎版本

      vvr-6.0.4-flink-1.15

      当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    6. 单击创建

  4. 配置作业并上线。

    将以下作业代码示例拷贝到作业文本编辑区并根据实际修改配置信息。

    -- mysql-cdc源表。
    CREATE TEMPORARY TABLE consume_record (
      `consume_id` VARCHAR(20),
      `product_id` VARCHAR(20),
      `consume_time` BIGINT,
      `consume_name` VARCHAR(20),
      `consume_phone` VARCHAR(20),
      PRIMARY KEY(consume_id)  NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '<hostname>',
      'port' = '3306',
      'username' = '',
      'password' = '',
      'database-name' = '',
      'table-name' = ''
    );
    -- tablestore维表。
    CREATE TEMPORARY TABLE product (
      product_id STRING,
      price DOUBLE,
      product_type STRING,
      PRIMARY KEY (product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'ots',
      'endPoint' = '',
      'instanceName' = '',
      'tableName' = '',
      'accessId' = '',
      'accessKey' = ''
    );
    -- tablestore结果表。
    CREATE TEMPORARY TABLE consume_product (
       consume_id STRING,
       product_id STRING,
       price DOUBLE,
       consume_time BIGINT,
       consume_name STRING,
       consume_phone STRING,
       PRIMARY KEY (consume_id,product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'ots',
      'endPoint' = '',
      'instanceName' = '',
      'tableName' = '',
      'accessId' = '',
      'accessKey' = '',
      'valueColumns' = 'price,consume_time,consume_name,consume_phone'
    );
    insert into consume_product
    select s.consume_id,d.product_ID as product_id,d.price,
            UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as consume_time,
            s.consume_name,s.consume_phone
            from `consume_record` as s
            join `product` for system_time as of proctime() as d
            on s.product_id = d.product_ID

    配置项说明请参见下表。

    分类

    配置项

    示例

    描述

    mysql-cdc源表配置

    connector

    mysql-cdc

    MySQL连接器,必须设置为mysql-cdc。

    hostname

    myhost

    主机名称。

    port

    3306

    主机端口。

    username

    myflinktest

    数据库账号。

    password

    ********

    数据库密码。

    database-name

    flinktest

    数据库名称。

    table-name

    consume_record

    数据表名称。

    tablestore结果表

    connector

    ots

    表格存储连接器,必须设置为ots。

    endPoint

    https://myinstance.cn-hangzhou.ots.aliyuncs.com

    实例的服务地址。更多信息,请参见服务地址

    instanceName

    myinstance

    实例名称。

    tableName

    consume_product

    数据表名称。

    accessId

    ************

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。关于获取AccessKey IDAccessKey Secret的具体操作,请参见获取AccessKey

    accessKey

    ****************

    valueColumns

    price,consume_time

    保存到结果表中的列以列表形式表示,多个列之间用半角逗号(,)分隔。

    只有当表格存储作为结果表时才需要配置此项。

  5. 部署作业。

    1. 单击部署

    2. 部署新版本对话框,填写备注信息和为作业添加标签以及根据需要选择是否提交到Session集群或者是否跳过部署前的深度检查。

      如果未选中跳过部署前的深度检查,则系统会进行深度检查,深度检查通过后再进行部署。如果选中了跳过部署前的深度检查,则系统会直接进行部署。

    3. 单击确定

  6. 启动作业。

    1. 在左侧导航栏,单击作业运维

    2. 单击目标作业名称操作列中的启动

    3. 作业启动对话框,根据需要选择启动配置,单击启动

步骤四:在表格存储中分析数据

您可以通过多元索引或者SQL查询功能进行数据分析。

通过多元索引进行数据分析的具体步骤如下:

  1. 为数据表consume_product创建多元索引。具体操作,请参见创建多元索引

  2. 通过多元索引的统计聚合功能实现数据分析。

    关于统计聚合功能的更多信息,请参见统计聚合

     public void agg(SyncClient client) {
        //构建查询语句。
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("consume_product") //设置数据表名称。
                .indexName("consume_product_index")  //设置多元索引名称。
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.matchAll())
                        .addGroupBy(GroupByBuilders.groupByField("groupByProductID","product_id").addSubAggregation(
                                AggregationBuilders.sum("sumagg","price")
                        ))
                        .build())
                .build();
        SearchResponse searchResponse = syncClient.search(searchRequest);
        for(GroupByFieldResultItem item : searchResponse.getGroupByResults().getAsGroupByFieldResult("groupByProductID").getGroupByFieldResultItems()){
            System.out.println("商品ID:"+item.getKey()+",交易总额:"+item.getSubAggregationResults().getAsSumAggregationResult("sumagg").getValue());
        }
    }

    结果示例如下:

    商品ID:A001,交易总额:20.0
    商品ID:A002,交易总额:40.0
    商品ID:A004,交易总额:20.0
    商品ID:A003,交易总额:5.0
    商品ID:A005,交易总额:15.0
    商品ID:A006,交易总额:5.0
    商品ID:A008,交易总额:5.0

通过SQL查询进行数据分析的具体步骤如下:

  1. 为数据表consume_product创建映射关系。具体操作,请参见步骤一:创建映射关系

    创建映射关系的SQL语句示例如下:

    CREATE TABLE consume_product (
       consume_id VARCHAR(1024),
       product_id VARCHAR(1024),
       
       price DOUBLE,
       consume_time BIGINT,
       consume_name MEDIUMTEXT,
       consume_phone MEDIUMTEXT,
       PRIMARY KEY (consume_id,product_id)
    )
  2. 执行以下SQL语句进行数据分析。

    select product_id, sum(price) from consume_product group by product_id

    结果示例如下图所示。fig_20220128_sql