通过Lindorm-cli连接并使用Lindorm流引擎(旧接口不推荐)

本文介绍通过Lindorm-cli连接Lindorm流引擎的具体操作和使用示例。

前提条件

  • 已将客户端IP地址添加至Lindorm实例的白名单中。具体操作,请参见设置白名单

  • 已获取Lindorm流引擎的Lindorm Stream SQL地址。具体操作,请参见查看连接地址

  • 已获取Lindorm宽表引擎中HBase兼容的专有网络连接地址。具体操作,请参见查看连接地址

注意事项

如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。

  • ECS实例与Lindorm实例属于同一专有网络。

连接Lindorm流引擎使用的网络类型

网络类型

说明

专有网络(推荐)

专有网络VPC(Virtual Private Cloud)是您自己独有的云上私有网络,不同的专有网络之间二层逻辑隔离,拥有较高的安全性和性能。Lindorm-cli部署在ECS实例上时,通过专有网络连接至Lindorm流引擎,可获得更高的安全性和更低的网络延迟。

公网

公网即互联网,当本地设备需要测试或管理Lindorm流引擎时,可在本地设备上部署Lindorm-cli,然后通过公网连接至Lindorm流引擎。

说明

通过公网连接不会产生流量费用,但存在一定的安全风险,推荐通过专有网络连接以获取更高的安全性。

操作步骤

  1. 在ECS上执行以下命令下载Lindorm-cli压缩包。

    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-0.1.4.tar.gz
  2. 执行以下命令解压Lindorm-cli压缩包。

    tar zxvf lindorm-sqlline-0.1.4.tar.gz
  3. 进入lindorm-sqlline-0.1.4/bin目录,执行以下命令连接至Lindorm流引擎。

    ./lindorm-sqlline -url <连接地址> -u <用户名> -p <密码>

    参数

    示例

    说明

    连接地址

    jdbc:streamsql:url=http://ld-bp17pwu1541ia****-proxy-stream.lindorm.rds.aliyuncs.com:30060

    Lindorm流引擎的Lindorm Stream SQL连接地址,获取方法请参见查看连接地址

    用户名

    user

    如果您忘记用户名密码,可以通过Lindorm宽表引擎的集群管理系统修改密码,具体操作请参见修改用户密码

    密码

    test

  4. 使用Lindorm流引擎。

    通过Lindorm流引擎将日志数据进行转化后实时同步至Lindorm宽表中,并对日志数据进行多维检索操作。本示例的操作流程如下:流程图

    准备工作

    1. 通过Lindorm-cli连接Lindorm宽表引擎,并在Lindorm宽表中创建一张表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎

      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
        primary key (timestamp)
      );
    2. 通过Lindorm-cli连接Lindorm流引擎。

      ./lindorm-sqlline -url jdbc:streamsql:url=http://ld-bp17pwu1541ia****-proxy-stream.lindorm.rds.aliyuncs.com:30060 -u root -p root

    操作步骤

    1. 在Lindorm流引擎中创建数据流表。

      CREATE STREAM IF NOT EXISTS originalData (
        `loglevel` STRING,
        `thread` STRING,
        `class` STRING,
        `detail` STRING,
        `timestamp` LONG
      ) WITH (
        value_format = 'json', //表示数据源的格式
        key_value = 'thread',  //表示分区键
        stream_topic = 'log_topic', //指定数据流表的物理数据存储在流存储的某个Topic上
        TIMESTAMP = 'timestamp'   //指定时间戳列
      );

      创建成功后,通过以下命令查看结果:

      list streams;

      执行结果如下:

      +-------------------------------------------------------------------------------------+
      |                                        ShowStreams                                  |
      +-------------------------------------------------------------------------------------+
      |  Stream Name  | Stream Topic | Format
      --------------------------------------
       originalData | log_topic    | JSON
      --------------------------------------
        |
      +-------------------------------------------------------------------------------------+
      1 row selected (0.13 seconds)
    2. 创建Serving表。Serving表示用于提供查询能力,例如点查询、范围查询、多维检索、全文检索、向量检索、时序查询、时空查询、图查询、交互式分析等。Serving表对于Lindorm流引擎来说是一个外表,源数据存储在Lindorm宽表中,通过以下语句来关联Lindorm宽表。

      CREATE External Table IF NOT EXISTS lindorm_log_table WITH (
        table_type = 'lindorm.table',
        table_name = 'log',
        endpoint = 'ld-bp17pwu1541ia****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020'
      );
      说明
      • table_type:表示源数据存储在Lindorm宽表引擎中。

      • table_name:表示Lindorm宽表的表名。

      • endpoint:Lindorm宽表引擎中HBase兼容的专有网络连接地址。

      创建成功后,通过以下命令查看结果:

      list external tables;

      执行结果如下:

      +-------------------------------------------------------------------------------------------+
      |                                    ShowExternalTables                                     |
      +-------------------------------------------------------------------------------------------+
      |  External Table Name
      ---------------------
       lindorm_log_table
      ---------------------
        |
      +-------------------------------------------------------------------------------------------+
      1 row selected (0.024 seconds)
    3. 创建流处理链路。将数据流表中的日志数据同步写入Serving表中,同时过滤出loglevel等于error的日志。

      CREATE CQ log_to_lindorm 
      Insert Into lindorm_log_table 
      Select loglevel, thread, class, detail, timestamp From originalData 
      Where loglevel = 'ERROR';
    4. 写入数据。通过Kafka API写入数据至指定的Topic(名称为log_topic)中,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据

    5. 查询数据。通过以下两种方式都可以查询流处理链路的结果。

      • 通过Lindorm-cli连接Lindorm宽表引擎并查询处理结果。lindorm_log_table为Serving表的名称。

        SELECT * FROM lindorm_log_table; 
      • 通过Lindorm宽表引擎的集群管理系统查询处理结果,具体操作请参见数据查询