Lindorm流引擎提供了100%兼容Flink SQL的能力,本文介绍如何使用Flink SQL提交流引擎计算任务将Kafka Topic中的数据导入至Lindorm宽表。

前提条件

  • 已将客户端IP地址添加至Lindorm实例的白名单中。具体操作,请参见设置白名单
  • 已获取Lindorm流引擎的Lindorm Stream SQL地址。如何获取,请参见查看连接地址
  • 已获取Lindorm宽表引擎中HBase兼容的专有网络连接地址。如何获取,请参见查看连接地址

注意事项

如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。
  • ECS实例与Lindorm实例属于同一专有网络。

连接Lindorm流引擎使用的网络类型

网络类型说明
专有网络(推荐)专有网络VPC(Virtual Private Cloud)是您自己独有的云上私有网络,不同的专有网络之间二层逻辑隔离,拥有较高的安全性和性能。Lindorm-cli部署在ECS实例上时,通过专有网络连接至Lindorm流引擎,可获得更高的安全性和更低的网络延迟。
公网公网即互联网,当本地设备需要测试或管理Lindorm流引擎时,可在本地设备上部署Lindorm-cli,然后通过公网连接至Lindorm流引擎。
说明 通过公网连接不会产生流量费用,但存在一定的安全风险,推荐通过专有网络连接以获取更高的安全性。

操作步骤

  1. 在ECS上执行以下命令下载流引擎客户端压缩包。
    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-0.1.5.tar.gz
  2. 执行以下命令解压流引擎客户端压缩包。
    tar zxvf lindorm-sqlline-0.1.5.tar.gz
  3. 进入lindorm-sqlline-0.1.5/bin目录,执行以下命令连接至Lindorm流引擎。
    ./lindorm-sqlline -url <连接地址>
    参数示例说明
    连接地址jdbc:streamsql:url=http://ld-bp17pwu1541ia****-proxy-stream.lindorm.rds.aliyuncs.com:30060Lindorm流引擎的Lindorm Stream SQL连接地址,获取方法请参见查看连接地址
  4. 使用Lindorm流引擎。
    通过Lindorm流引擎将日志数据进行转化后实时同步至Lindorm宽表中,并对日志数据进行多维检索操作。本示例的操作流程如下:流程图
    1. 通过Lindorm-cli连接Lindorm宽表引擎,并在Lindorm宽表中创建一张表,具体操作请参见通过Lindorm-cli连接并使用宽表引擎
      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
      primary key (loglevel, thread) );
    2. 通过流引擎客户端连接Lindorm流引擎。
      ./lindorm-sqlline -url jdbc:streamsql:url=http://ld-bp17pwu1541ia****-proxy-stream.lindorm.rds.aliyuncs.com:30060
    3. 写入数据。通过Kafka API写入数据至指定的Topic(名称为log_topic)中,具体操作请参见通过开源Kafka客户端写入Lindorm流引擎数据
      #创建Topic
      ./kafka-topics.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic log_topic --create
      
      #写入数据
      ./kafka-console-producer.sh --bootstrap-server ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080 --topic log_topic
      {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"}
      {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"}
      {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"}
      {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}
    4. 使用Flink SQL提交流引擎计算任务读取Kafka Topic中的数据导入至Lindorm表中。
      CREATE FJOB log_to_lindorm(
          --kafka Source表
          CREATE TABLE originalData(
              `loglevel` VARCHAR,
              `thread` VARCHAR,
              `class` VARCHAR,
              `detail` VARCHAR,
              `timestamp` BIGINT
          )WITH(
              'connector'='kafka',
              'topic'='log_topic',
              'scan.startup.mode'='earliest-offset',
              'properties.bootstrap.servers'='ld-bp17pwu1541ia****-proxy-stream.lindorm.rds.aliyuncs.com:30080',
              'format'='json'
          );
          --Lindorm宽表 Sink表
          CREATE TABLE lindorm_log_table(
              `loglevel` VARCHAR,
              `thread` VARCHAR,
              `class` VARCHAR,
              `detail` VARCHAR,
              `timestamp` BIGINT,
              PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
          )WITH(
              'connector'='lindorm',
              'seedServer'='ld-bp17pwu1541ia****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020',
              'userName'='root',
              'password'='root',
              'tableName'='log',
              'namespace'='default'
          );
          --过滤Kafka中的ERROR日志写入Lindorm宽表
          INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
      );
    5. 查询数据。通过以下两种方式都可以查询流处理链路的结果。
      • 通过Lindorm-cli连接Lindorm宽表引擎并查询处理结果。log为Serving表名。
        SELECT * FROM log LIMIT 5;
        返回结果如下。
        +----------+----------+-------------------------+-----------------------+---------------+
        | loglevel |  thread  |          class          |        detail         |   timestamp   |
        +----------+----------+-------------------------+-----------------------+---------------+
        | ERROR    | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 |
        | ERROR    | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 |
        +----------+----------+-------------------------+-----------------------+---------------+
                                                
      • 通过Lindorm宽表引擎的集群管理系统查询处理结果,具体操作请参见数据查询