Knative已支持Kafka事件源,您可将Knative与消息队列Kafka版对接,在Knative上实现Kafka消息推送。
背景信息
Knative是一款基于Kubernetes的Serverless框架,其目标是制定云原生、跨平台的Serverless编排标准。Knative主要包括:
- Serving:服务系统,用于配置应用的路由、升级策略、自动扩缩容等。
- Eventing:事件系统,用于自动完成事件的绑定和触发。
要让Eventing(事件系统)正常运行,就必须在Knative集群中实现Channel(内部事件存储层),目前支持的Channel实现方式包括Kafka、NATS。本文以消息队列Kafka版为例介绍如何实现Channel。
适用场景
- 在线短任务处理
- AI音视频消息处理
- 监控告警
- 数据格式转换
操作流程
在Knative上实现Kafka消息推送的操作流程如下图所示。
部署Kafka组件
- 登录容器服务控制台。
- 在左侧导航栏,单击 集群。
- 在集群列表页面,单击要部署Kafka组件的集群的集群名称。
- 在左侧导航栏,选择。
- 在 组件管理页签下的add-on组件区域,找到Kafka,在其右侧 操作列,单击 部署。
- 在部署Kafka对话框,单击确定。
部署完成后,
Kafka右侧的
状态显示
已部署。

创建event-display服务
- 在Knative组件管理页面,单击 服务管理页签。
- 在Knative服务管理页面,单击使用模板创建。
- 在使用模板创建页面:
- 从集群列表,选择已部署Knative组件的集群。
- 从命名空间列表,选择default。
- 从示例模板列表,选择自定义。
- 在模板区域,输入模板信息。
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1"
spec:
containers:
- image: registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d
- 单击创建。
- 单击返回。
创建完成后,
event-display右侧的
状态显示
成功。

创建kafka-source服务
- 通过kubectl连接Kubernetes集群。
- 创建KafkaSource服务的配置文件kafka-source.yaml。
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: demo-topic
# Broker URL. Replace this with the URLs for your kafka cluster,
# which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
bootstrapServers: 192.168.X.XXX:9092,192.168.X.XXX:9092,192.168.X.XXX:9092
topics: demo
sink:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: event-display
参数 |
说明 |
示例值 |
consumerGroup |
创建的Consumer Group的名称。 |
demo-consumer |
bootstrapServers |
消息队列Kafka版实例的默认接入点。
|
192.168.X.XXX:9092,192.168.X.XXX:9092,192.168.X.XXX:9092 |
topics |
创建的Topic的名称。 |
demo-topic |
- 执行以下命令创建KafkaSource服务。
kubectl apply -f kafka-source.yaml
返回示例如下。
cb kafkasource.sources.knative.dev/kafka-source created
发送消息
- 登录消息队列Kafka版控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,单击实例列表。
- 在实例列表页面,找到目标实例,在其右侧操作列,单击详情。
- 在左侧导航栏,单击Topic管理。
- 在Topic管理页面,找到目标Topic,在其右侧操作列,选择。
- 在发送消息面板:
- 在Message Key文本框,输入demo。
- 在Message Value文本框,输入{"key": "test"}。
注意 Message Value必须为JSON格式。
结果验证
发送消息后,通过kubectl logs
命令查看event-display服务的日志,确认event-display服务已接收到消息队列Kafka版发送的消息。
在文档使用中是否遇到以下问题
更多建议
匿名提交