文档

通过流引擎实现交易数据实时统计

更新时间:

云原生多模数据库 Lindorm流引擎是面向流式数据处理的引擎,提供了流式数据的存储和计算功能。本文介绍使用Lindorm流引擎对交易数据进行实时统计,并将统计结果(即每小时的订单数量和交易金额)存储至Lindorm宽表中。

前提条件

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

  • 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单

  • 已获取Lindorm流引擎的连接地址,具体操作请参见查看连接地址

操作步骤

  1. 创建Lindorm宽表。通过Lindorm-cli连接Lindorm宽表引擎,并在Lindorm宽表中创建一张表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎

    说明

    创建的Lindorm宽表用于存储交易数据的统计结果。

    CREATE TABLE IF NOT EXISTS order_stat (
      biz VARCHAR,
      window_start LONG,
      window_end LONG,
      total_order_price DOUBLE,
      count LONG,
      primary key (biz,window_start)
    );
  2. 通过Kafka脚本工具写入数据至数据流表中指定的Topic(即order_topic),具体操作请参见通过开源Kafka脚本工具链接Lindorm流引擎

    #创建Topic
    ./kafka-topics.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic order_topic --create
    
    #写入数据
    ./kafka-console-producer.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic order_topic
    {"biz":"biz1","order_id":"b972fb8f-49b2-4ab6-95fa-afae4a2c9950","price":225,"detail":"order detail","timestamp":1678130410598}
    {"biz":"biz1","order_id":"6f0d4982-0a7f-4d85-b591-fadac34b5be4","price":778,"detail":"order detail","timestamp":1678132211179}
    {"biz":"biz1","order_id":"f7af664b-eef0-43ff-b59f-25e5439350b5","price":264,"detail":"order detail","timestamp":1678134011185}
    {"biz":"biz1","order_id":"5429a220-9669-4bbd-bd9e-29fa07131dd2","price":898,"detail":"order detail","timestamp":1678135811187}
    {"biz":"biz1","order_id":"757d8422-3483-4b01-adc3-2c59d6399411","price":40,"detail":"order detail","timestamp":1678137611188}
    {"biz":"biz1","order_id":"eaf68373-ecfd-49cd-b68c-2c5cba6cadee","price":74,"detail":"order detail","timestamp":1678139411190}
    {"biz":"biz1","order_id":"00d7765f-8c91-4884-8d50-3d3604e67d25","price":4,"detail":"order detail","timestamp":1678141211191}
    {"biz":"biz1","order_id":"8b2fae6e-5dbb-40d5-bcba-2eaa990bc28f","price":837,"detail":"order detail","timestamp":1678143011192}
    {"biz":"biz1","order_id":"1e6bffff-cb9b-4cf1-b16c-49c012750438","price":637,"detail":"order detail","timestamp":1678144811194}
    {"biz":"biz1","order_id":"96248d8c-a58f-49da-b772-ef0eefca7d43","price":825,"detail":"order detail","timestamp":1678146611195}
    {"biz":"biz1","order_id":"d31469d7-0f3d-452d-8f20-692df913bea3","price":270,"detail":"order detail","timestamp":1678148411197}
    {"biz":"biz1","order_id":"b0149b41-5707-4dbb-9bec-22404521b69f","price":928,"detail":"order detail","timestamp":1678150211199}
    {"biz":"biz1","order_id":"0cbc2f81-75b8-4ec4-803d-98a5528073b1","price":477,"detail":"order detail","timestamp":1678152011200}
    {"biz":"biz1","order_id":"ca9aeea8-4d29-4b53-8cb5-c89947738d59","price":350,"detail":"order detail","timestamp":1678153811202}
    {"biz":"biz1","order_id":"1a8681e6-edea-4205-8d5f-87cd9a144604","price":702,"detail":"order detail","timestamp":1678155611203}
    {"biz":"biz1","order_id":"833f9f90-9f2f-486b-a6b8-1eb3378d4e93","price":939,"detail":"order detail","timestamp":1678157411204}
    {"biz":"biz1","order_id":"afbd9ad8-748d-4986-80ef-d828d6eebd32","price":306,"detail":"order detail","timestamp":1678159211206}
    {"biz":"biz1","order_id":"69df83e4-1ff5-4e68-9b30-29b89a5145e9","price":149,"detail":"order detail","timestamp":1678161011207}
    {"biz":"biz1","order_id":"6767c106-2ab9-4bf7-af1d-6a0c48a84a41","price":477,"detail":"order detail","timestamp":1678162811208}
    {"biz":"biz1","order_id":"728e4249-10c8-4d8f-a13b-72478fd7d452","price":763,"detail":"order detail","timestamp":1678164611249}
    {"biz":"biz1","order_id":"c69a4cc1-89e2-4d5f-9e4f-d8645f11eb22","price":302,"detail":"order detail","timestamp":1678166411251}
    {"biz":"biz1","order_id":"53e159aa-c6c2-4980-b253-27b733fe2e8a","price":129,"detail":"order detail","timestamp":1678168211253}
    {"biz":"biz1","order_id":"d51cf02b-9ac8-4554-9ccf-77ce8bb00407","price":670,"detail":"order detail","timestamp":1678170011254}
    {"biz":"biz1","order_id":"b3f19fa7-c402-404f-af6d-751057579a87","price":342,"detail":"order detail","timestamp":1678171811256}
    {"biz":"biz1","order_id":"082fe0d9-38ef-41e2-a412-88f36b78aaa9","price":407,"detail":"order detail","timestamp":1678173611257}
  3. 连接流引擎提交计算任务, 具体操作请参见使用客户端链接并使用Lindorm流引擎

    CREATE FJOB order_compute(
       SET 'parallelism.default' = '1'; --任务并发度
       SET 'execution.checkpointing.interval' = '60000'; --checkpoint周期
      
       CREATE TABLE order_detail( 
          `biz` VARCHAR,
          `order_id` VARCHAR,
          `price` DOUBLE,
          `detail` VARCHAR,
          `timestamp` BIGINT,
          `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
          WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
        ) WITH (  
          'connector'='kafka',
          'topic'='order_topic',
          'properties.group.id' = 'order_group',
          'scan.startup.mode' = 'earliest-offset',
          'properties.bootstrap.servers'='Lindorm Stream Kafka地址(xxx:30080),
          'format'='json');    
    
      CREATE TABLE order_stat(
          `biz` VARCHAR,
          `window_start` TIMESTAMP(3),
          `window_end` TIMESTAMP(3),
          `total_order_price` DOUBLE,
          `count` BIGINT,
          PRIMARY KEY (biz, window_start) NOT ENFORCED
        ) WITH (  
          'connector'='lindorm',
          'tableName'='order_stat',
          'seedServer'='Lindorm 宽表地址(xxx:30020),
          'namespace'='default',
          'userName' = 'root',
          'password' = 'root');
    
      INSERT INTO order_stat
      SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
      FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
      GROUP BY biz, window_start, window_end;
    );
  4. 通过Lindorm-cli连接Lindorm宽表引擎并查询统计结果,具体操作请参见通过Lindorm-cli连接并使用宽表引擎

    SELECT * FROM order_stat; 

    返回结果如下:

    +------+-------------------------------+-------------------------------+-------------------+-------+
    | biz  |         window_start          |          window_end           | total_order_price | count |
    +------+-------------------------------+-------------------------------+-------------------+-------+
    | biz1 | 2023-03-07 11:00:00 +0000 UTC | 2023-03-07 12:00:00 +0000 UTC | 1003              | 2     |
    | biz1 | 2023-03-07 12:00:00 +0000 UTC | 2023-03-07 13:00:00 +0000 UTC | 1162              | 2     |
    | biz1 | 2023-03-07 13:00:00 +0000 UTC | 2023-03-07 14:00:00 +0000 UTC | 114               | 2     |
    | biz1 | 2023-03-07 14:00:00 +0000 UTC | 2023-03-07 15:00:00 +0000 UTC | 841               | 2     |
    | biz1 | 2023-03-07 15:00:00 +0000 UTC | 2023-03-07 16:00:00 +0000 UTC | 1462              | 2     |
    | biz1 | 2023-03-07 16:00:00 +0000 UTC | 2023-03-07 17:00:00 +0000 UTC | 1198              | 2     |
    | biz1 | 2023-03-07 17:00:00 +0000 UTC | 2023-03-07 18:00:00 +0000 UTC | 827               | 2     |
    | biz1 | 2023-03-07 18:00:00 +0000 UTC | 2023-03-07 19:00:00 +0000 UTC | 1641              | 2     |
    | biz1 | 2023-03-07 19:00:00 +0000 UTC | 2023-03-07 20:00:00 +0000 UTC | 455               | 2     |
    | biz1 | 2023-03-07 20:00:00 +0000 UTC | 2023-03-07 21:00:00 +0000 UTC | 1240              | 2     |
    | biz1 | 2023-03-07 21:00:00 +0000 UTC | 2023-03-07 22:00:00 +0000 UTC | 431               | 2     |
    | biz1 | 2023-03-07 22:00:00 +0000 UTC | 2023-03-07 23:00:00 +0000 UTC | 1012              | 2     |
    +------+-------------------------------+-------------------------------+-------------------+-------+

    因为没有后续数据,最后一个窗口未关闭,导致最后一条数据没有输出至宽表。详细原理,请参见窗口函数