流引擎支持通过ETL SQL进行表级别的实时预计算,提供实时的类物化视图能力。本文介绍如何使用ETL SQL对宽表数据进行实时同步与实时预计算。
前提条件
已开通流引擎。
已开通宽表引擎。
场景一:实时镜像表
说明
实时同步源表数据,可用于数据共享、数据备份、读写分离、异构索引等多种业务场景。
数据准备
创建源表和镜像表。
-- 创建源表 CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1)); -- 创建镜像表 CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
提交ETL
CREATE ETL sync_etl AS INSERT INTO sink SELECT * FROM source;
数据验证
向源表中插入数据。
INSERT INTO source(p1, c1) VALUES(0, 0.0); INSERT INTO source(p1, c1) VALUES(1, 1.0); INSERT INTO source(p1, c1) VALUES(2, 2.0); INSERT INTO source(p1, c1) VALUES(3, 3.0); INSERT INTO source(p1, c1) VALUES(4, 4.0);
查询镜像表中的数据。
SELECT * FROM sink;
返回结果:
+------+------+ | p1 | c1 | +------+------+ | 0 | 0.0 | | 1 | 1.0 | | 2 | 2.0 | | 3 | 3.0 | | 4 | 4.0 | +------+------+
场景二:实时预计算
说明
流引擎提供丰富的预计算能力。此处以多表预JOIN(提前对多张表执行数据合并)为例,对多张表的数据进行预打宽处理,避免每次查询都需要执行JOIN操作,导致响应时间变长、计算开销过大的问题。
数据准备
创建两张源表:
user_tbl
(用户信息)和order_tbl
(用户订单信息),并为order_tbl
表创建二级索引idx1
。-- 创建源表1:user_tbl CREATE TABLE `user_tbl` ( `user_id` varchar NOT NULL, `user_name` varchar, `user_addr` varchar, PRIMARY KEY (`user_id`) ); -- 创建源表2:order_tbl CREATE TABLE `order_tbl` ( `order_id` varchar NOT NULL, `user_id` varchar, `product_name` varchar, `price` decimal(38, 20), PRIMARY KEY (`order_id`) ); -- 为源表order_tbl创建索引idx1 CREATE INDEX idx1 ON `order_tbl`(user_id desc) WITH (COMPRESSION='ZSTD');
创建结果表
user_order_tbl
,用于保存数据整合后的结果。CREATE TABLE `user_order_tbl` ( `order_id` varchar NOT NULL, `user_id` varchar, `product_name` varchar, `price` decimal(38, 20), `user_name` varchar, `user_addr` varchar, `user_addr_code` varchar, PRIMARY KEY (`order_id`) ) WITH (MUTABILITY = 'MUTABLE_UDT');
提交ETL
创建ETL,添加数据整合逻辑。
具体如下:
通过
user_id
字段关联源表order_tbl
和user_tbl
,将用户信息和其订单数据关联。使用正则表达式
REGEXP_EXTRACT
提取user_addr
中的地址代码。创建ETL,将整合后的数据插入到目标表
user_order_tbl
中,整理为一个同时包含用户信息及其订单数据的表。
CREATE ETL join_etl AS INSERT INTO `lindorm_table`.`default`.`user_order_tbl` ( order_id, user_id, product_name, price, user_name, user_addr, user_addr_code ) SELECT o.order_id, o.user_id, o.product_name, o.price, u.user_name, u.user_addr, REGEXP_EXTRACT(u.user_addr, '#(.*?)$', 1) AS user_addr_code FROM `lindorm_table`.`default`.`order_tbl` o JOIN `lindorm_table`.`default`.`user_tbl` u ON o.user_id = u.user_id;
数据验证
向源表中插入数据。
INSERT INTO user_tbl (user_id, user_name, user_addr) VALUES ('U001', '张三', '北京市朝阳区#100000'), ('U002', '李四', '上海市浦东新区#200000'), ('U003', '王五', '广州市天河区#510000'), ('U004', '赵六', '深圳市南山区#518000'); INSERT INTO order_tbl (order_id, user_id, product_name, price) VALUES ('O1001', 'U001', '笔记本电脑', 8999.00), ('O1002', 'U001', '无线鼠标', 159.00), ('O1003', 'U002', '智能手机', 6999.00), ('O1004', 'U002', '蓝牙耳机', 299.00), ('O1005', 'U003', '平板电脑', 3499.00), ('O1006', 'U004', '机械键盘', 799.00), ('O1007', 'U004', '显示器', 1299.00);
查询结果表。ETL会根据数据整合逻辑筛选数据,并将处理结果实时写入结果表中。
SELECT * FROM user_order_tbl;
返回结果:
+----------+---------+-----------------+---------------------------+-----------+------------------------------+----------------+ | order_id | user_id | product_name | price | user_name | user_addr | user_addr_code | +----------+---------+-----------------+---------------------------+-----------+------------------------------+----------------+ | O1001 | U001 | 笔记本电脑 | 8999.00000000000000000000 | 张三 | 北京市朝阳区#100000 | 100000 | | O1002 | U001 | 无线鼠标 | 159.00000000000000000000 | 张三 | 北京市朝阳区#100000 | 100000 | | O1003 | U002 | 智能手机 | 6999.00000000000000000000 | 李四 | 上海市浦东新区#200000 | 200000 | | O1004 | U002 | 蓝牙耳机 | 299.00000000000000000000 | 李四 | 上海市浦东新区#200000 | 200000 | | O1005 | U003 | 平板电脑 | 3499.00000000000000000000 | 王五 | 广州市天河区#510000 | 510000 | | O1006 | U004 | 机械键盘 | 799.00000000000000000000 | 赵六 | 深圳市南山区#518000 | 518000 | | O1007 | U004 | 显示器 | 1299.00000000000000000000 | 赵六 | 深圳市南山区#518000 | 518000 | +----------+---------+-----------------+---------------------------+-----------+------------------------------+----------------+
该文章对您有帮助吗?