Knative已支持Kafka事件源,您可将Knative与消息队列Kafka版对接,在Knative上实现Kafka消息推送。

前提条件

  • 您的消息队列Kafka版实例为包年包月专业版,且实例版本为2.0.0或以上。
    说明
    • Knative中的Kafka事件源目前仅支持2.0.0及以上版本。
    • 目前仅包年包月专业版消息队列Kafka版实例支持升级为2.0.0或以上版本,详情请参见升级实例服务版本
  • 创建Topic
  • 创建Consumer Group
  • 部署Knative

背景信息

Knative是一款基于Kubernetes的Serverless框架,其目标是制定云原生、跨平台的Serverless编排标准。Knative系统分为三部分:

  • Build:构建系统,用于将函数和应用构建成容器镜像。
  • Serving:服务系统,用于配置应用的路由、升级策略、自动扩缩容等。
  • Eventing:事件系统,用于自动完成事件的绑定和触发。

要让Eventing(事件系统)正常运行,就必须在Knative集群中实现Bus(内部事件存储层),目前支持的Bus实现方式包括Stub、Kafka。本文以消息队列Kafka版为例介绍如何实现Bus。

操作流程

在Knative上实现Kafka消息推送的操作流程如下图所示。

dg_task_flow

部署Kafka组件

  1. 登录容器服务控制台
  2. 在左侧导航栏,选择 Knative(公测) > 组件管理
  3. Knative组件管理页面:
    1. 集群列表,选择已部署Knative组件的集群。
    2. add-on组件列表,找到Kafka,在其右侧操作列,单击部署
    在 Knative 上推送 Kafka 消息_1
  4. 部署Kafka对话框,单击确定

    部署完成后,Kafka右侧的状态显示已部署

    Knative_2

创建event-display服务

  1. 在左侧导航栏,选择 Knative(公测) > 服务管理
  2. Knative服务管理页面,单击使用模板创建
  3. 使用模板创建
    1. 集群列表,选择已部署Knative组件的集群。
    2. 命名空间列表,选择default
    3. 示例模板列表,选择自定义
    4. 模板区域,输入模板信息。
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
            - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542****
    5. 单击创建
    pg_event_display_template
    创建完成后,Knative服务管理页面显示 event-displaypg_event_display

创建Kafka Source服务

  1. 在左侧导航栏,选择Knative(公测) > 服务管理
  2. Knative服务管理页面,单击使用模板创建。
  3. 使用模板创建页面:
    1. 集群列表,选择已部署Knative组件的集群
    2. 命名空间列表,选择default
    3. 示例模板列表,选择自定义
    4. 模板区域,输入模板信息。
      apiVersion: sources.eventing.knative.dev/v1alpha1
      kind: KafkaSource
      metadata:
        name: alikafka-source
      spec:
        consumerGroup: demo-consumer
        # Broker URL. Replace this with the URLs for your kafka cluster,
        # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
        bootstrapServers: xxx.xxx.x.xx:9092,xxx.xxx.x.xx:9092,xxx.xxx.x.xx:9092
        topics: demo-topic
        sink:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: event-display
    5. 单击创建
    pg_kafka_source

发送消息

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击Topic管理
  4. Topic管理页面,找到要发送消息的Topic,在其右侧操作操作列中,单击发送消息发送消息
    pg_send_msg
    • Topic:发送消息的Topic,例如demo-topic。
    • Message Key:消息的键,例如demo。
    • Message Value:消息的值,必须为JSON格式,例如 {"key": "test"}

结果验证

发送消息成功后,连接Kubernetes集群,执行命令kubectl logs {pod},可查看发送消息的日志。
说明 如需连接Kubernetes集群,请参见通过kubectl连接Kubernetes集群
pg_log