文档

通过流引擎实现交易数据实时统计

更新时间:

云原生多模数据库 Lindorm流引擎是面向流式数据处理的引擎,提供了流式数据的存储和轻计算功能。本文介绍使用Lindorm流引擎对交易数据进行实时统计,并将统计结果(即每小时的订单数量和交易金额)存储至Lindorm宽表中。

前提条件

  • 已安装Java环境,要求安装JDK 1.7及以上版本。

  • 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单

  • 已获取Lindorm流引擎的连接地址,具体操作请参见查看连接地址

操作步骤

  1. 创建Lindorm宽表。通过Lindorm-cli连接Lindorm宽表引擎,并在Lindorm宽表中创建一张表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎

    说明

    创建的Lindorm宽表用于存储交易数据的统计结果。

    CREATE TABLE IF NOT EXISTS order_stat (
      biz VARCHAR,
      WINDOWSTART LONG,
      WINDOWEND LONG,
      total_order_price DOUBLE,
      count LONG,
      primary key (biz,WINDOWSTART)
    );
  2. 创建数据流表。通过Lindorm-cli连接Lindorm流引擎,具体操作请参见通过Lindorm-cli连接并使用Lindorm流引擎(旧接口不推荐)

    CREATE STREAM IF NOT EXISTS orders (
      `biz` STRING,
      `order_id` STRING,
      `price` DOUBLE,
      `detail` STRING,
      `timestamp` LONG
    ) WITH (
      value_format = 'json',
      key_value = 'order_id',
      stream_topic = 'order_topic',
      TIMESTAMP = 'timestamp'
    );
  3. 创建Serving表。Serving表对于Lindorm流引擎来说是一个外表,交易数据存储在Lindorm宽表中,通过以下语句来关联Lindorm宽表。

    CREATE External Table IF NOT EXISTS lindorm_order_stat WITH (
      table_type = 'lindorm.table',
      table_name = 'order_stat',
      endpoint = 'ld-bp17pwu1541ia****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020'
    );
    说明
    • table_type:表示源数据存储在Lindorm宽表引擎中。

    • table_name:表示Lindorm宽表的表名。

    • endpoint:Lindorm宽表引擎的HBase兼容连接地址。

  4. 创建流处理任务,统计出每小时的订单数量和交易金额。

    CREATE CQ lindorm_order_state
    Insert Into lindorm_order_stat
    SELECT biz, SUM(price) AS total_order_price , COUNT(*) AS count from orders window TUMBLING ( size 1 hour) GROUP BY biz;
  5. 通过Kafka API写入数据至数据流表中指定的Topic(即order_topic)。

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.Future;
    
    public class KafkaToLindormStreamDemo {
    
      public static void main(String[] args) {
        Properties props = new Properties();
        //设置Lindorm Stream Kafka地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-bp17pwu1541ia****-proxy-stream-public.lindorm.rds.aliyuncs.com:30080");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
        //构造一个Kafka消息队列。
        String topic = "order_topic";
    
        try {
          for (int i = 0; i < 100; i++) {
            JSONObject json = new JSONObject();
            json.put("biz", "biz1");
            json.put("order_id", UUID.randomUUID().toString());
            json.put("price", new Random().nextInt(1000));
            json.put("detail", "order detail");
            json.put("timestamp", System.currentTimeMillis() - i * 30 * 60 * 1000);
            Future<RecordMetadata> future = producer.send(
                new ProducerRecord<String, String>(topic, json.getString("order_id"),
                    json.toString()));
            producer.flush();
            try {
              RecordMetadata recordMetadata = future.get();
              System.out.println("Produce ok:" + recordMetadata.toString());
            } catch (Throwable t) {
              System.out.println("Produce exception " + t.getMessage());
              t.printStackTrace();
            }
          }
        } catch (Exception e) {
          System.out.println("Produce exception " + e.getMessage());
          e.printStackTrace();
        }
      }
    }
  6. 通过Lindorm-cli连接Lindorm宽表引擎并查询统计结果。

    SELECT * FROM order_stat;

    返回结果如下:

    +------+---------------+---------------+-------------------+-------+
    | biz  |  WINDOWSTART  |   WINDOWEND   | total_order_price | count |
    +------+---------------+---------------+-------------------+-------+
    | biz1 | 1645869600000 | 1645873200000 | 694.000000        | 1     |
    | biz1 | 1645873200000 | 1645876800000 | 1170.000000       | 2     |
    | biz1 | 1645876800000 | 1645880400000 | 453.000000        | 2     |
    | biz1 | 1645880400000 | 1645884000000 | 958.000000        | 2     |
    | biz1 | 1645884000000 | 1645887600000 | 365.000000        | 2     |
    | biz1 | 1645887600000 | 1645891200000 | 674.000000        | 2     |
    | biz1 | 1645891200000 | 1645894800000 | 1467.000000       | 2     |
    | biz1 | 1645894800000 | 1645898400000 | 1430.000000       | 2     |
    | biz1 | 1645898400000 | 1645902000000 | 422.000000        | 2     |
    | biz1 | 1645902000000 | 1645905600000 | 822.000000        | 2     |
    | biz1 | 1645905600000 | 1645909200000 | 1521.000000       | 2     |
    | biz1 | 1645909200000 | 1645912800000 | 930.000000        | 2     |
    | biz1 | 1645912800000 | 1645916400000 | 1501.000000       | 2     |
  • 本页导读 (1)