Knative已支持Kafka事件源,您可将Knative与消息队列Kafka版对接,在Knative上实现Kafka消息推送。

前提条件

背景信息

Knative是一款基于Kubernetes的Serverless框架,其目标是制定云原生、跨平台的Serverless编排标准。Knative主要包括:

  • Serving:服务系统,用于配置应用的路由、升级策略、自动扩缩容等。
  • Eventing:事件系统,用于自动完成事件的绑定和触发。

要让Eventing(事件系统)正常运行,就必须在Knative集群中实现Channel(内部事件存储层),目前支持的Channel实现方式包括Kafka、NATS。本文以消息队列Kafka版为例介绍如何实现Channel。

适用场景

  • 在线短任务处理
  • AI音视频消息处理
  • 监控告警
  • 数据格式转换

操作流程

在Knative上实现Kafka消息推送的操作流程如下图所示。

dg_task_flow

部署Kafka组件

  1. 登录容器服务控制台
  2. 在左侧导航栏,单击 集群
  3. 集群列表页面,单击要部署Kafka组件的集群的集群名称。
  4. 左侧导航栏,选择应用 > Knative
  5. 组件管理页签下的add-on组件区域,找到Kafka,在其右侧 操作列,单击 部署
  6. 部署Kafka对话框,单击确定
    部署完成后,Kafka右侧的状态显示已部署Knative_2

创建event-display服务

  1. Knative组件管理页面,单击 服务管理页签。
  2. Knative服务管理页面,单击使用模板创建
  3. 使用模板创建页面:
    1. 集群列表,选择已部署Knative组件的集群。
    2. 命名空间列表,选择default
    3. 示例模板列表,选择自定义
    4. 模板区域,输入模板信息。
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          metadata:
            annotations:
              autoscaling.knative.dev/minScale: "1"
          spec:
            containers:
            - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d
    5. 单击创建
    6. 单击返回
      创建完成后,event-display右侧的状态显示成功pg_event_display

创建kafka-source服务

  1. 通过kubectl连接Kubernetes集群
  2. 创建KafkaSource服务的配置文件kafka-source.yaml
    apiVersion: sources.eventing.knative.dev/v1alpha1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: demo-topic
      # Broker URL. Replace this with the URLs for your kafka cluster,
      # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
      bootstrapServers: 192.168.X.XXX:9092,192.168.X.XXX:9092,192.168.X.XXX:9092
      topics: demo
      sink:
        apiVersion: serving.knative.dev/v1alpha1
        kind: Service
        name: event-display
    参数 说明 示例值
    consumerGroup 创建的Consumer Group的名称。 demo-consumer
    bootstrapServers 消息队列Kafka版实例的默认接入点。 192.168.X.XXX:9092,192.168.X.XXX:9092,192.168.X.XXX:9092
    topics 创建的Topic的名称。 demo-topic
  3. 执行以下命令创建KafkaSource服务。
    kubectl apply -f kafka-source.yaml
    返回示例如下。
    cb kafkasource.sources.knative.dev/kafka-source created

发送消息

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例列表
  4. 实例列表页面,找到目标实例,在其右侧操作列,单击详情
  5. 在左侧导航栏,单击Topic管理
  6. Topic管理页面,找到目标Topic,在其右侧操作列,选择more > 发送消息
  7. 发送消息面板:
    1. Message Key文本框,输入demo
    2. Message Value文本框,输入{"key": "test"}
      注意 Message Value必须为JSON格式。
      send_message

结果验证

发送消息后,通过kubectl logs命令查看event-display服务的日志,确认event-display服务已接收到消息队列Kafka版发送的消息。