KsqlDB

KsqlDB是一个用于Apache Kafka的流式SQL引擎,KsqlDB降低了进入流处理的门槛,提供了一个简单的、完全交互式的SQL接口,用于处理Kafka的数据,可以让我们在流数据上持续执行SQL查询,KsqlDB支持广泛的强大的流处理操作,包括聚合、连接、窗口、会话等。

架构图

下图分别为传统的流处理应用架构和基于KsqlDB的应用架构示例。通过对比不难看出,流处理引擎以及连接器部分均从之前的独立角色集成到了KsqlDB。除此之外,KsqlDB还通过物化视图提供了流处理过程中的查询功能。更多关于ksqlDB的信息。请参见KsqlDB官方文档

  • 传统的流处理应用架构图image

  • 基于KsqlDB应用的架构图image

使用KsqlDB

创建Topic并进行配置

  1. 创建Topic,本文将以Topicksql_test为例进行说明。

  2. 创建Schema,选择Avro校验模式并添加以下校验规则。

    {
        "namespace": "io.confluent.examples.clients.basicavro",
        "type": "record",
        "name": "Payment",
        "fields": [
            {
                "name": "id",
                "type": "string"
            },
            {
                "name": "amount",
                "type": "double"
            }
        ]
    }
  3. Topicksql_test开启Schema格式校验

授权

云消息队列 Confluent 版支持对KsqlDB集群进行RBAC授权,本文将以新创建的用户test为例进行说明。

  1. 创建test用户,按照下面所述为其添加集群授权。详情请参见用户管理和授权

    用户名

    集群

    资源

    角色

    test

    Kafka cluster

    Cluster

    SystemAdmin

    test

    KSQL

    Cluster

    ResourceOwner

    test

    Schema Registry

    Cluster

    SystemAdmin

  2. KsqlDB添加Topicksql_test的只读访问权限(KsqlDB的默认用户为ksql)。

    用户名

    集群

    资源

    角色

    ksql

    Kafka cluster

    Topic

    DeveloperRead

操作

  1. 登录云消息队列 Confluent 版控制台,在左侧导航栏,单击实例列表

  2. 在顶部菜单栏,选择地域,然后在实例列表页面,单击目标实例名称。

  3. 实例详情页面,单击右上角的登录控制台进行Control Center登录。

  4. 登录Control Center控制台,在Home页面单击controlcenter.clusterk卡片,进入到Cluster overview页面。

    image

  5. 在主页的左侧导航栏,单击ksqlDB,然后单击目标KsqlDB集群名称。

  6. KsqlDB集群详情页,单击Editor页签,按需创建Stream、使用Ksql命令进行Select查询等操作。详情请参见KSQL快速使用

    • 创建Stream

      CREATE STREAM ksql_test_stream WITH (KAFKA_TOPIC='ksql_test',VALUE_FORMAT='AVRO');
    • Stream中查询数据

      SELECT * FROM ksql_test_stream EMIT CHANGES;

测试验证

  1. 开启Stream数据查询。

    ksqldb页面中,单击Editor页签,输入下面查询语句后,单击Run query

    SELECT * FROM ksql_test_stream EMIT CHANGES;

    image

  2. 发送测试消息。

    1. 重新打开一个Control Center控制台窗口。

    2. Topicksql_test详情页面,单击Messages页签后,再单击Produce a new message

    3. Produce a new message面板中,填入消息内容后,单击Produce

      {
          "id": "Tome",
          "amount": 18
      }

      image

  3. 验证发送的消息。

    返回到之前开启的Stream数据查询窗口,已经查询到了发送的测试消息。

    image

其他操作