表格存储结合实时计算Flink实现交易数据的实时统计

本文为您介绍使用表格存储和Flink实现交易数据实时统计的样例场景、架构设计和实现方案。

背景信息

在金融市场、电子商务和支付系统等领域,交易数据的实时统计具有重要的意义。它不仅能够提供精准的数据分析结果,帮助用户及时掌握业务动态并优化运营策略,同时也为风险管理、市场趋势的预测等提供了可靠的参考依据。

阿里云实时计算Flink是一套基于Apache Flink构建的一站式实时大数据分析平台,提供端到端亚秒级实时数据分析能力,并通过标准SQL降低业务开发门槛,助力企业向实时化、智能化大数据计算升级转型。

表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储和查询分析服务。表格存储的分布式存储和强大的索引引擎能够支持PB级存储、千万TPS以及毫秒级延迟的服务能力。

说明

表格存储通道服务(Tunnel Service)是基于表格存储数据接口上的全增量一体化服务。在为数据表建立数据通道后,可以通过流式计算的方式对表中历史存量和新增数据进行消费处理。

样例场景

某电商网站产生了海量的交易数据,通过对这些数据进行实时统计与分析,能够帮助用户实时监测网站的整体销售状况,并迅速评估“新销售策略”的效果。

架构设计

  • 样例场景的实现过程说明如下。

    将电商网站的实时交易数据写入TablestoreSource表后,使用FlinkSource表数据通道的流式数据进行聚合计算,并将计算结果重新写入TablestoreSink表,最终通过“大屏”实时展示计算结果。具体实现只需3步:

    1. 创建数据表和数据通道:在表格存储中创建源表和结果表,并为源表创建数据通道。

    2. 创建并启动Flink作业:在Flink中通过流作业对源表数据通道的流式数据进行聚合计算,并将结果写入结果表。

    3. 压入测试数据并实时获取计算结果:部署stream-compute服务,模拟写入交易数据后,实时展示计算结果。

      说明

      本文将计算结果直接输出至控制台。您也可以在DataV中添加Tablestore数据源,在DataV的大屏上展示实时数据。更多信息,请参见对接DataV-Board 7.0.

  • 架构图如下图所示。

    image

准备工作

  • 已开通表格存储服务并创建实例。具体操作,请参见开通服务并创建实例

  • 已开通实时计算Flink版。具体操作,请参见开通实时计算Flink

    重要

    实时计算Flink必须与表格存储服务位于同一地域。实时计算Flink支持的地域,请参见地域列表

  • 已准备带有Linux系统的服务器,并开启公网访问。本文以Alibaba Cloud Linux 3为例进行操作说明。

  • 已获取AccessKey信息。

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。具体操作,请参见创建RAM用户并授权

方案实现

步骤一:在表格存储中创建数据表和数据通道

1. 创建数据表

  1. 登录表格存储控制台

  2. 在页面上方,选择资源组和地域。

  3. 概览页面,单击实例名称或在实例操作列单击实例管理

  4. 实例详情页签,单击创建数据表。具体操作,请参见创建数据表

    分别创建数据表source_ordersink_order,表结构信息请参见源表结果表

    说明

    表格存储的数据表具有Schema-Free特点,因此只需要定义主键,无需定义属性列。

2. 创建数据通道

source_order表(源表)创建数据通道。具体操作,请参见创建数据通道

数据通道的配置信息,请参见下表。

参数

示例

说明

通道名

flink_agg

数据通道名称。

通道类型

全量加增量

数据通道类型,通道服务提供了增量全量全量加增量三种类型的分布式数据实时消费通道。

步骤二:在实时计算Flink中创建作业并启动

1. 创建作业

  1. 进入SQL作业创建页面。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL

  2. 单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步

    说明

    Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板数据同步模板

  3. 填写作业信息

    作业参数

    示例

    说明

    文件名称

    agg_order

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    存储位置

    作业草稿

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    引擎版本

    vvr-8.0.8-flink-1.17

    当前作业使用的Flink的引擎版本,引擎版本详情请参见功能发布记录引擎版本介绍

  4. 单击创建

2. 编写SQL作业

  1. 分别创建source_order表(源表)和sink_order表(结果表)的临时表。

    详细配置信息,请参见附录1:Tablestore连接器

    -- 创建源表的临时表
    CREATE TEMPORARY TABLE tablestore_input (
        metering VARCHAR,
        orderid VARCHAR,
        price DOUBLE,
        byerid BIGINT,
        sellerid BIGINT,
        productid BIGINT,
        ts BIGINT,
        ptime AS TO_TIMESTAMP( ts * 1000),
        WATERMARK FOR ptime AS ptime - INTERVAL '2' SECOND
    ) WITH (
        'connector' = 'ots', -- 源表的连接器类型。固定取值为ots。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
        'instanceName' = 'xxx', -- 表格存储的实例名称。
        'tableName' = 'source_order', -- 表格存储的源表名称。
        'tunnelName' = 'flink_agg', -- 表格存储源表的数据通道名称。
        'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx' -- 阿里云账号或者RAM用户的AccessKey Secret。
    );
    
    -- 创建结果表的临时表
    CREATE TEMPORARY TABLE tablestore_output (
        metering VARCHAR,
        ts BIGINT,
        price DOUBLE,
        ordercount BIGINT,
        primary key(metering, ts) NOT ENFORCED -- 主键。
    ) WITH (
        'connector' = 'ots',  -- 结果表的连接器类型。固定取值为ots。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
        'instanceName' = 'xxx', -- 表格存储的实例名称。
        'tableName' = 'sink_order', -- 表格存储的结果表名称。
        'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
        'valueColumns' = 'price,ordercount' --插入字段的列名。
    );
  2. 编写作业逻辑。

    INSERT INTO tablestore_output
    SELECT 
        DISTINCT metering as metering,
        UNIX_TIMESTAMP(CAST(TUMBLE_START(ptime, INTERVAL '5' SECOND) as STRING)) as ts,
        SUM(price) as price,
        COUNT(orderid) as ordercount
    FROM tablestore_input
    GROUP BY TUMBLE(ptime, INTERVAL '5' SECOND), metering;

3. 部署作业

SQL编辑区域右上方,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确定

说明

Session集群适用于非生产环境的开发测试环境,通过部署或调试作业提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将生产作业提交至Session集群中,可能会导致业务稳定性问题。

4. 启动作业

  1. 在左侧导航栏,单击运维中心 > 作业运维

  2. 单击目标作业操作列中的启动,打开作业启动面板。

  3. 选择无状态启动后,单击启动。当作业状态转变为运行中时,代表作业运行正常。

    作业启动参数配置,详情请参见作业启动

    说明

    如果您对作业进行了修改(例如更改代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要重新部署作业,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,或者更新非动态生效的参数配置时,也需要停止后再启动作业。作业停止详情请参见作业停止

步骤三:压入测试数据并实时获取计算结果

1. 安装依赖

登录至Linux系统的服务器,执行以下命令安装JDK(1.8及以上,推荐1.8)。

yum -y install java-1.8.0-openjdk-devel.x86_64

2. 部署stream-compute服务

  1. 下载stream-compute压缩包。

    具体下载路径,请参见stream-compute-1.0-SNAPSHOT-release.tar.gz

    说明

    如果您需要自行编译stream-compute,请参见GitHub源码

  2. stream-compute压缩包上传到服务器,并执行如下命令进行解压。

    tar -zxvf stream-compute-1.0-SNAPSHOT-release.tar.gz
  3. 编辑配置文件。

    执行以下命令创建配置文件。

    vim ~/tablestoreConf.json
    • 配置文件示例如下。

      {
          "endpoint":"https://xxx.cn-hangzhou.ots.aliyuncs.com",
          "accessId":"xxxxxxxxxxx",
          "accessKey":"xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
          "instanceName":"xxx"
      }
    • 详细配置项请参见下表。

      配置项

      说明

      endpoint

      表格存储实例的服务地址。更多信息,请参见服务地址

      • 如果使用自建Linux服务器,需填写公网地址

      • 如果使用云服务器ECS并且与Tablestore处于同一专有网络VPC,推荐VPC地址

      instanceName

      表格存储的实例名称。

      accessId

      阿里云账号或者RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)。

      accessKey

3. 启动压力器和模拟大屏

  1. 启动压力器,模拟写入大量交易数据。

    ./bin/mock_order_generator

    压力器启动成功的示例如下图所示,其中每一行数据代表一条交易记录。

    image

  2. 新建窗口后,启动模拟大屏,实时查看计算结果。

    ./bin/data_show_screen

    模拟大屏启动成功的示例如下图所示。Flink每隔5秒对交易数据进行一次聚合计算,图中数据代表该时间段内的交易总金额和订单数量。

    image

附录:示例表结构

源表

源表为原始数据表,存储了所有的交易记录,主要包含字段:计量类型、订单号ID、交易时间、交易金额、买家ID、卖家ID和商品ID等。表设计如下:

表名:source_order

字段

类型

注释

metering

String

计量类型(主键)。样例中默认是web。

orderid

String

订单号ID(主键)。

ts

Integer

交易时间。Unix时间戳,精度为秒。

price

Double

交易金额。

buyerid

Integer

买家ID。

sellerid

Integer

卖家ID。

productid

Integer

商品ID。

结果表

结果表存储了聚合计算后的结果数据,主要包含字段:计量类型、交易时间、交易金额和交易次数。表设计如下:

表名:sink_order

字段

类型

注释

metering

String

计量类型(主键)。样例中默认是web。

ts

Integer

交易时间(主键)。Unix时间戳,精度为秒。

price

Double

交易金额。

ordercount

Integer

交易次数。