本文介绍如何快速通过Lindorm流引擎进行数据读写。
前提条件
操作步骤概览
操作步骤 | 执行环境 | 说明 |
宽表引擎 | 在宽表引擎中创建表,用于存储流引擎数据。 | |
流引擎 | 使用流引擎Shell客户端连接流引擎。 | |
确认流引擎是否已自动关联宽表的元数据。 | ||
在流引擎中创建流表,用于存储流数据。 | ||
创建实时入库链路,用于数据写入。 | ||
向流表实时写入流数据。 | ||
| 连接宽表引擎,查询流引擎处理结果。 |
操作步骤
创建结果表
通过Lindorm-cli连接宽表引擎。如何连接,请参见通过Lindorm-cli连接并使用宽表引擎。
创建结果表
log
。CREATE TABLE IF NOT EXISTS log ( loglevel VARCHAR, thread VARCHAR, class VARCHAR, detail VARCHAR, `timestamp` BIGINT, primary key (loglevel, thread) );
通过客户端连接流引擎
在ECS上执行以下命令,下载流引擎Shell客户端压缩包。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
执行以下命令,解压压缩包。
tar zxvf lindorm-sqlline-2.0.2.tar.gz
进入
lindorm-sqlline-2.0.2/bin
目录,执行以下命令连接至Lindorm流引擎。./lindorm-sqlline -url <Lindorm Stream SQL地址>
Lindorm Stream SQL地址的获取方式,请参见查看连接地址。
检查数据关联情况
在流引擎Shell客户端中,执行以下语句,使用固定Catalog。
USE CATALOG lindorm_table;
lindorm_table
为固定的Catalog,所有的宽表数据均会关联此Catalog。返回结果:
+--------+ | Result | +--------+ | result | +--------+ | OK | +--------+
执行以下语句,切换至结果表
log
所在的宽表数据库。USE `default`;
宽表默认Database为
default
。如果您创建了其他的Database来存储结果表log
,此处需要将default
修改为对应的Database名称。执行以下语句查看
default
中所有的表,检查结果表log是否已关联至流引擎。SHOW TABLES;
返回结果:
+------------+ | Result | +------------+ | table name | +------------+ | log | +------------+
返回结果中包含结果表
log
,表示已成功关联。
创建流表
执行以下语句,切回流引擎Catalog。
USE CATALOG lindorm_stream;
创建流表
originalData
。CREATE STREAM originalData( `loglevel` VARCHAR, `thread` VARCHAR, `class` VARCHAR, `detail` VARCHAR, `timestamp` BIGINT );
(可选)查看建表结果。
DESCRIBE originalData;
返回结果:
+---------------------------------------------------------+ | Result | +---------------------------------------------------------+ | name | type | null | key | extras | watermark | +---------------------------------------------------------+ | loglevel | STRING | TRUE | <NULL> | <NULL> | <NULL> | | thread | STRING | TRUE | <NULL> | <NULL> | <NULL> | | class | STRING | TRUE | <NULL> | <NULL> | <NULL> | | detail | STRING | TRUE | <NULL> | <NULL> | <NULL> | | timestamp | BIGINT | TRUE | <NULL> | <NULL> | <NULL> | +---------------------------------------------------------+
创建实时入库链路
创建实时计算链路。
CREATE CQ extract_error_logs INSERT INTO lindorm_table.`default`.log SELECT * FROM originalData WHERE loglevel = 'ERROR';
(可选)查看实时计算链路创建结果。
DESCRIBE extract_error_logs;
返回结果:
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Result | +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Query ID | Job ID | Status | Is Stoppable | Start time | End time | Duration | Max Parallelism | Sql | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | extract_error_logs | 4aca7b4e145219dd937c923996b98c2e | RUNNING | false | 1693810182829 | -1 | 15682 | -1 | CklOU0VSVCBJTlRPIGxpbmRvcm1fdGFibGUuYGhkYC5sb2cKU0VMRUNUICogRlJPTSBvcmlnaW5hbERhdGEgCldIRVJFIGxvZ2xldmVsID0gJ0VSUk9SJw==| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ For detailed information in monitor web;
写入数据
执行以下命令,向流表中写入数据。
INSERT INTO originalData values ('INFO','thread-1', 'com.alibaba.stream.test', 'thread-1 info detail',1675840911549); INSERT INTO originalData values ('ERROR','thread-2', 'com.alibaba.stream.test', 'thread-2 error detail',1675840911549); INSERT INTO originalData values ('WARN','thread-3', 'com.alibaba.stream.test', 'thread-3 warn detail',1675840911549); INSERT INTO originalData values ('ERROR','thread-4','com.alibaba.stream.test','thread-4 error detail',1675840911549);
(可选)执行以下命令,检查数据是否正确写入。
SELECT /*+ OPTIONS('scan.startup.mode'='earliest-offset')*/ * FROM originalData;
返回结果:
+----------+----------+-------------------------+----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+----------------------+---------------+ | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | INFO | thread-1 | com.alibaba.stream.test | thread-1 info detail | 1675840911549 | | WARN | thread-3 | com.alibaba.stream.test | thread-3 warn detail | 1675840911549 | | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail| 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail| 1675840911549 | | | | | | null | | | | | | null | | | | | | null | | | | | | null | +----------+----------+-------------------------+----------------------+---------------+
查询结果数据
(推荐)在宽表引擎中查询结果数据:
SELECT * FROM log;
返回结果:
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+
在流引擎中查询结果数据:
重要流引擎客户端虽然支持对宽表的查询,但其效率并不高。建议通过宽表引擎的Lindorm-cli来访问。
SELECT * FROM lindorm_table.`default`.log;
返回结果:
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | | | | | null | | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+