云原生多模数据库Lindorm流引擎是面向流式数据处理的引擎,提供了流式数据的存储和轻计算功能。本文介绍使用Lindorm流引擎对交易数据进行实时统计,并将统计结果(即每小时的订单数量和交易金额)存储至Lindorm宽表中。
前提条件
操作步骤
- 创建Lindorm宽表。通过Lindorm-cli连接Lindorm宽表引擎,并在Lindorm宽表中创建一张表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎。说明 创建的Lindorm宽表用于存储交易数据的统计结果。
CREATE TABLE IF NOT EXISTS order_stat ( biz VARCHAR, WINDOWSTART LONG, WINDOWEND LONG, total_order_price DOUBLE, count LONG, primary key (biz,WINDOWSTART) );
- 创建数据流表。通过Lindorm-cli连接Lindorm流引擎,具体操作请参见通过Lindorm-cli连接并使用Lindorm流引擎。
CREATE STREAM IF NOT EXISTS orders ( `biz` STRING, `order_id` STRING, `price` DOUBLE, `detail` STRING, `timestamp` LONG ) WITH ( value_format = 'json', key_value = 'order_id', stream_topic = 'order_topic', TIMESTAMP = 'timestamp' );
- 创建Serving表。Serving表对于Lindorm流引擎来说是一个外表,交易数据存储在Lindorm宽表中,通过以下语句来关联Lindorm宽表。
CREATE External Table IF NOT EXISTS lindorm_order_stat WITH ( table_type = 'lindorm.table', table_name = 'order_stat', endpoint = 'ld-bp17pwu1541ia****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020' );
说明table_type
:表示源数据存储在Lindorm宽表引擎中。table_name
:表示Lindorm宽表的表名。endpoint
:Lindorm宽表引擎的HBase兼容连接地址。
- 创建流处理任务,统计出每小时的订单数量和交易金额。
CREATE CQ lindorm_order_state Insert Into lindorm_order_stat SELECT biz, SUM(price) AS total_order_price , COUNT(*) AS count from orders window TUMBLING ( size 1 hour) GROUP BY biz;
- 通过Kafka API写入数据至数据流表中指定的Topic(即order_topic)。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.codehaus.jettison.json.JSONObject; import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.Future; public class KafkaToLindormStreamDemo { public static void main(String[] args) { Properties props = new Properties(); //设置Lindorm Stream Kafka地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-bp17pwu1541ia****-proxy-stream-public.lindorm.rds.aliyuncs.com:30080"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //构造一个Kafka消息队列。 String topic = "order_topic"; try { for (int i = 0; i < 100; i++) { JSONObject json = new JSONObject(); json.put("biz", "biz1"); json.put("order_id", UUID.randomUUID().toString()); json.put("price", new Random().nextInt(1000)); json.put("detail", "order detail"); json.put("timestamp", System.currentTimeMillis() - i * 30 * 60 * 1000); Future<RecordMetadata> future = producer.send( new ProducerRecord<String, String>(topic, json.getString("order_id"), json.toString())); producer.flush(); try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { System.out.println("Produce exception " + t.getMessage()); t.printStackTrace(); } } } catch (Exception e) { System.out.println("Produce exception " + e.getMessage()); e.printStackTrace(); } } }
- 通过Lindorm-cli连接Lindorm宽表引擎并查询统计结果。
返回结果如下:SELECT * FROM order_stat;
+------+---------------+---------------+-------------------+-------+ | biz | WINDOWSTART | WINDOWEND | total_order_price | count | +------+---------------+---------------+-------------------+-------+ | biz1 | 1645869600000 | 1645873200000 | 694.000000 | 1 | | biz1 | 1645873200000 | 1645876800000 | 1170.000000 | 2 | | biz1 | 1645876800000 | 1645880400000 | 453.000000 | 2 | | biz1 | 1645880400000 | 1645884000000 | 958.000000 | 2 | | biz1 | 1645884000000 | 1645887600000 | 365.000000 | 2 | | biz1 | 1645887600000 | 1645891200000 | 674.000000 | 2 | | biz1 | 1645891200000 | 1645894800000 | 1467.000000 | 2 | | biz1 | 1645894800000 | 1645898400000 | 1430.000000 | 2 | | biz1 | 1645898400000 | 1645902000000 | 422.000000 | 2 | | biz1 | 1645902000000 | 1645905600000 | 822.000000 | 2 | | biz1 | 1645905600000 | 1645909200000 | 1521.000000 | 2 | | biz1 | 1645909200000 | 1645912800000 | 930.000000 | 2 | | biz1 | 1645912800000 | 1645916400000 | 1501.000000 | 2 |