KsqlDB是一个用于Apache Kafka的流式SQL引擎,KsqlDB降低了进入流处理的门槛,提供了一个简单的、完全交互式的SQL接口,用于处理Kafka的数据,可以让我们在流数据上持续执行SQL查询,KsqlDB支持广泛的强大的流处理操作,包括聚合、连接、窗口、会话等。
架构图
下图分别为传统的流处理应用架构和基于KsqlDB的应用架构示例。通过对比不难看出,流处理引擎以及连接器部分均从之前的独立角色集成到了KsqlDB。除此之外,KsqlDB还通过物化视图提供了流处理过程中的查询功能。更多关于ksqlDB的信息。请参见KsqlDB官方文档。
传统的流处理应用架构图
基于KsqlDB应用的架构图
使用KsqlDB
创建Topic并进行配置
创建Topic,本文将以Topic
ksql_test
为例进行说明。创建Schema,选择Avro校验模式并添加以下校验规则。
{ "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" } ] }
为Topic
ksql_test
开启Schema格式校验。
授权
云消息队列 Confluent 版支持对KsqlDB集群进行RBAC授权,本文将以新创建的用户test
为例进行说明。
创建
test
用户,按照下面所述为其添加集群授权。详情请参见用户管理和授权。用户名
集群
资源
角色
test
Kafka cluster
Cluster
SystemAdmin
test
KSQL
Cluster
ResourceOwner
test
Schema Registry
Cluster
SystemAdmin
为KsqlDB添加Topic
ksql_test
的只读访问权限(KsqlDB的默认用户为ksql)。用户名
集群
资源
角色
ksql
Kafka cluster
Topic
DeveloperRead
操作
登录云消息队列 Confluent 版控制台,在左侧导航栏,单击实例列表。
在顶部菜单栏,选择地域,然后在实例列表页面,单击目标实例名称。
在实例详情页面,单击右上角的登录控制台进行Control Center登录。
登录Control Center控制台,在Home页面单击controlcenter.clusterk卡片,进入到Cluster overview页面。
在主页的左侧导航栏,单击ksqlDB,然后单击目标KsqlDB集群名称。
在KsqlDB集群详情页,单击Editor页签,按需创建Stream、使用Ksql命令进行Select查询等操作。详情请参见KSQL快速使用。
创建Stream
CREATE STREAM ksql_test_stream WITH (KAFKA_TOPIC='ksql_test',VALUE_FORMAT='AVRO');
从Stream中查询数据
SELECT * FROM ksql_test_stream EMIT CHANGES;
测试验证
开启Stream数据查询。
在ksqldb页面中,单击Editor页签,输入下面查询语句后,单击Run query。
SELECT * FROM ksql_test_stream EMIT CHANGES;
发送测试消息。
重新打开一个Control Center控制台窗口。
在Topic
ksql_test
详情页面,单击Messages页签后,再单击Produce a new message。在Produce a new message面板中,填入消息内容后,单击Produce。
{ "id": "Tome", "amount": 18 }
验证发送的消息。
返回到之前开启的Stream数据查询窗口,已经查询到了发送的测试消息。