通过客户端连接并使用Lindorm流引擎
Lindorm流引擎提供了100%兼容Flink SQL的能力,本文介绍如何使用Flink SQL提交流引擎计算任务将Kafka Topic中的数据导入至Lindorm宽表。
前提条件
注意事项
如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
- 所在地域相同,并建议所在可用区相同(以减少网络延时)。
- ECS实例与Lindorm实例属于同一专有网络。
操作步骤概述
操作步骤 | 说明 |
| |
通过流引擎提交计算任务并读取Kafka Topic中的数据,导入至结果表中。 | |
在结果表中查询计算任务处理结果。 |
操作步骤
数据准备
通过Kafka API写入数据。以通过开源Kafka脚本工具写入数据为例。
#创建Topic ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create #写入数据 ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"} {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}
Lindorm Stream Kafka地址的获取方式请参见查看连接地址。
在宽表引擎中创建结果表,用于存储计算任务的处理结果。
通过Lindorm-cli连接宽表引擎。如何连接,请参见通过Lindorm-cli连接并使用宽表引擎。
创建结果表
log
。CREATE TABLE IF NOT EXISTS log ( loglevel VARCHAR, thread VARCHAR, class VARCHAR, detail VARCHAR, timestamp BIGINT, primary key (loglevel, thread) );
使用流引擎
在ECS上执行以下命令,下载流引擎客户端压缩包。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-0.1.5.tar.gz
执行以下命令,解压压缩包。
tar zxvf lindorm-sqlline-0.1.5.tar.gz
进入
lindorm-sqlline-0.1.5/bin
目录,执行以下命令连接至Lindorm流引擎。./lindorm-sqlline -url <Lindorm Stream SQL地址>
Lindorm Stream SQL地址的获取方式,请参见查看连接地址。
使用Flink SQL提交流引擎计算任务读取Kafka Topic中的数据导入至结果表中。
CREATE FJOB log_to_lindorm( --kafka Source表 CREATE TABLE originalData( `loglevel` VARCHAR, `thread` VARCHAR, `class` VARCHAR, `detail` VARCHAR, `timestamp` BIGINT )WITH( 'connector'='kafka', 'topic'='log_topic', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='Lindorm Stream Kafka地址', 'format'='json' ); --Lindorm宽表 Sink表 CREATE TABLE lindorm_log_table( `loglevel` VARCHAR, `thread` VARCHAR, `class` VARCHAR, `detail` VARCHAR, `timestamp` BIGINT, PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED )WITH( 'connector'='lindorm', 'seedServer'='Lindorm宽表引擎的HBase兼容地址', 'userName'='root', 'password'='root', 'tableName'='log', 'namespace'='default' ); --过滤Kafka中的ERROR日志写入Lindorm宽表 INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR'; );
Lindorm宽表引擎的HBase兼容地址的获取方式,请参见查看连接地址。
查询流引擎处理结果
支持以下两种查询方式:
通过Lindorm-cli连接宽表引擎并执行以下命令查询处理结果。
SELECT * FROM log LIMIT 5;
返回结果:
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+
通过宽表引擎的集群管理系统查询处理结果,具体操作请参见数据查询。