Knative已支持Kafka事件源,您可将Knative与云消息队列 Kafka 版对接,在Knative上实现Kafka消息推送。
前提条件
- 重要
云消息队列 Kafka 版实例和Knative必须处于同一VPC内。
云消息队列 Kafka 版实例的版本必须为2.0.0或以上。
背景信息
Knative是一款基于Kubernetes的Serverless框架,其目标是制定云原生、跨平台的Serverless编排标准。Knative主要包括:
Serving:服务系统,用于配置应用的路由、升级策略、自动扩缩容等。
Eventing:事件系统,用于自动完成事件的绑定和触发。
要让Eventing(事件系统)正常运行,就必须在Knative集群中实现Channel(内部事件存储层),目前支持的Channel实现方式包括Kafka、NATS。本文以云消息队列 Kafka 版为例介绍如何实现Channel。
适用场景
在线短任务处理
AI音视频消息处理
监控告警
数据格式转换
操作流程
在Knative上实现云消息队列 Kafka 版消息推送的操作流程如下图所示。
部署Kafka组件
登录容器服务控制台。
在左侧导航栏,单击集群。
在集群列表页面,单击要部署Kafka组件的集群的名称。
在左侧导航栏,选择 。
在组件管理页签下的add-on组件区域,找到Kafka,在其右侧操作列,单击部署。
在部署Kafka对话框,单击确定。
部署完成后,Kafka右侧的状态显示已部署。
创建event-display服务
继续单击服务管理页签。
在Knative服务管理页面,从命名空间列表选择default,然后在右侧单击使用模板创建。
在使用模板创建页面:
从示例模板列表,选择自定义。
在模板区域,输入模板信息。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: metadata: annotations: autoscaling.knative.dev/minScale: "1" spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d
单击创建。
单击返回。
创建完成后,event-display右侧的状态显示成功。
创建kafka-source服务
创建KafkaSource服务的配置文件kafka-source.yaml。
apiVersion: sources.eventing.knative.dev/v1alpha1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: demo-topic # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrapServers: alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 topics: demo sink: apiVersion: serving.knative.dev/v1alpha1 kind: Service name: event-display
参数
说明
示例值
consumerGroup
创建的Group的名称。
demo-consumer
bootstrapServers
云消息队列 Kafka 版实例的默认接入点。
alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
topics
创建的Topic的名称。
demo-topic
执行以下命令创建KafkaSource服务。
kubectl apply -f kafka-source.yaml
返回示例如下:
cb kafkasource.sources.knative.dev/kafka-source created
发送消息
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理。
在Topic 管理页面,找到目标Topic,在其操作列中,选择 。
在快速体验消息收发面板,发送测试消息。
发送方式选择控制台。
在消息 Key文本框中输入消息的Key值,例如demo。
在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
设置发送到指定分区,选择是否指定分区。
单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
单击否,不指定分区。
根据界面提示信息,通过SDK订阅消息,或者执行Docker命令订阅消息。
发送方式选择Docker,运行Docker容器。
执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
执行发送后如何消费消息?区域的Docker命令,订阅消息。
发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK体验消息收发。
结果验证
发送消息后,通过kubectl logs
命令查看event-display服务的日志,确认event-display服务已接收到云消息队列 Kafka 版发送的消息。