首页 云原生多模数据库 Lindorm 流引擎 连接指南 通过客户端连接并使用Lindorm流引擎

通过客户端连接并使用Lindorm流引擎

更新时间: 2023-08-04 11:46:07

Lindorm流引擎提供了100%兼容Flink SQL的能力,本文介绍如何使用Flink SQL提交流引擎计算任务将Kafka Topic中的数据导入至Lindorm宽表。

前提条件

  • 已将客户端IP地址添加至Lindorm实例的白名单中。具体操作,请参见设置白名单

  • 已获取Lindorm流引擎的Lindorm Stream SQL地址。如何获取,请参见查看连接地址

注意事项

如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。
  • ECS实例与Lindorm实例属于同一专有网络。

操作步骤概述

操作步骤

说明

步骤一:数据准备

步骤二:使用流引擎

通过流引擎提交计算任务并读取Kafka Topic中的数据,导入至结果表中。

步骤三:查询流引擎处理结果

在结果表中查询计算任务处理结果。

操作步骤

数据准备

  1. 通过Kafka API写入数据。以通过开源Kafka脚本工具写入数据为例。

    #创建Topic
    ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create
    
    #写入数据
    ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic
    {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"}
    {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}

    Lindorm Stream Kafka地址的获取方式请参见查看连接地址

  2. 在宽表引擎中创建结果表,用于存储计算任务的处理结果。

    1. 通过Lindorm-cli连接宽表引擎。如何连接,请参见通过Lindorm-cli连接并使用宽表引擎

    2. 创建结果表log

      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
      primary key (loglevel, thread) );

使用流引擎

  1. 在ECS上执行以下命令,下载流引擎客户端压缩包。

    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-0.1.5.tar.gz
  2. 执行以下命令,解压压缩包。

    tar zxvf lindorm-sqlline-0.1.5.tar.gz
  3. 进入lindorm-sqlline-0.1.5/bin目录,执行以下命令连接至Lindorm流引擎。

    ./lindorm-sqlline -url <Lindorm Stream SQL地址>

    Lindorm Stream SQL地址的获取方式,请参见查看连接地址

  4. 使用Flink SQL提交流引擎计算任务读取Kafka Topic中的数据导入至结果表中。

    CREATE FJOB log_to_lindorm(
        --kafka Source表
        CREATE TABLE originalData(
            `loglevel` VARCHAR,
            `thread` VARCHAR,
            `class` VARCHAR,
            `detail` VARCHAR,
            `timestamp` BIGINT
        )WITH(
            'connector'='kafka',
            'topic'='log_topic',
            'scan.startup.mode'='earliest-offset',
            'properties.bootstrap.servers'='Lindorm Stream Kafka地址',
            'format'='json'
        );
        --Lindorm宽表 Sink表
        CREATE TABLE lindorm_log_table(
            `loglevel` VARCHAR,
            `thread` VARCHAR,
            `class` VARCHAR,
            `detail` VARCHAR,
            `timestamp` BIGINT,
            PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
        )WITH(
            'connector'='lindorm',
            'seedServer'='Lindorm宽表引擎的HBase兼容地址',
            'userName'='root',
            'password'='root',
            'tableName'='log',
            'namespace'='default'
        );
        --过滤Kafka中的ERROR日志写入Lindorm宽表
        INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
    );

    Lindorm宽表引擎的HBase兼容地址的获取方式,请参见查看连接地址

查询流引擎处理结果

支持以下两种查询方式:

  • 通过Lindorm-cli连接宽表引擎并执行以下命令查询处理结果。

    SELECT * FROM log LIMIT 5;

    返回结果:

    +----------+----------+-------------------------+-----------------------+---------------+
    | loglevel |  thread  |          class          |        detail         |   timestamp   |
    +----------+----------+-------------------------+-----------------------+---------------+
    | ERROR    | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 |
    | ERROR    | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 |
    +----------+----------+-------------------------+-----------------------+---------------+                                     
  • 通过宽表引擎的集群管理系统查询处理结果,具体操作请参见数据查询

阿里云首页 云原生多模数据库 Lindorm 相关技术圈