使用Knative Eventing发送第一个事件

Knative Eventing通过其Broker-Trigger模型为Serverless应用提供了强大的事件驱动能力,不仅支持事件的转发和过滤,还允许开发者进行复杂的事件流程编排。这种架构适用于需要响应外部或内部触发器来执行业务逻辑的应用场景,例如基于用户行为自动发送通知、处理来自不同数据源的数据流等。本文介绍如何基于Knative Eventing构建一个简单的事件驱动架构,该事件驱动架构包含从服务部署到消息传递机制的建立,再到最终的功能验证。

原理说明

Knative Eventing提供了一个完整的事件模型,方便接入各个外部系统的事件。事件接入以后,通过Cloud Event标准在内部流转,结合Broker-Trigger事件驱动模型进行事件处理。

如下图所示,在Knative事件驱动架构中,Broker是消息传递的中介,Trigger用于在特定条件下自动触发处理流程。Broker-Trigger事件驱动模型的原理是通过Trigger订阅Broker事件并过滤,最后将事件发送到对应的服务进行消费。

image
  1. Event Source:这是产生事件的源头,可以是内部系统(如数据库更新)或外部服务(如云消息服务)。

  2. Ingress:在Eventing架构中,用于接收外部事件进入Knative集群。

  3. Channel:在Knative Eventing中使用Broker-Trigger模型需要选择相应的Channel, 也就是事件流转系统,当前支持Kafka、NATS Streaming、InMemoryChannel事件转发通道,默认为InMemoryChannel。

  4. Broker:事件的中介层,负责接收来自不同事件源的事件,并根据配置的Trigger进行路由。Broker可以实现复杂的事件流和处理逻辑,支持多种协议和后端事件服务(如NATS、Kafka等),使得事件的管理和分发更加灵活高效。

  5. Trigger:定义了事件如何被路由到特定的服务。每个Trigger都关联了一个事件过滤器(通常基于事件类型或内容)和一个Service目标,只有匹配过滤条件的事件才会被发送到指定的Service。

  6. Service:在Eventing场景下,Service是事件的最终处理者,接收到由Trigger路由过来的事件后执行相应的业务逻辑。

前提条件

已安装Knative Serving和Knative Eventing。具体操作,请参见部署Knative部署Eventing

下文介绍如何利用Knative Eventing框架搭建一套简单的事件驱动系统,其中包含了事件产生、传输及消费的基本要素。

步骤一:部署Knative服务

本文以event-display为示例Knative服务,该服务的主要职责是接收事件并打印出其内容。

  1. 使用以下示例代码创建Knative Service。

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
          - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:1215
  2. 部署Knative Service。

    kubectl apply -f event-display.yaml
  3. 查看Knative Service是否创建成功。

    kubectl get ksvc

    预期输出:

    NAME            URL                                        LATESTCREATED         LATESTREADY           READY   REASON
    event-display   http://event-display.default.example.com   event-display-00001   event-display-00001   True    

步骤二:创建Broker和Trigger

在示例中,创建了一个名为default的Broker以及一个与之关联的Triggermy-service-trigger,后者指定了所有未特别指定处理方式的事件都应转发至之前部署的event-display服务。

  1. 创建Broker。

    1. 使用以下示例代码创建Broker。

      apiVersion: eventing.knative.dev/v1
      kind: Broker
      metadata:
        name: default
        namespace: default
    2. 部署Broker。

      kubectl  apply -f broker.yaml
    3. 查看Broker是否创建成功。

      kubectl  get broker

      预期输出:

      NAME      URL                                                                        AGE   READY   REASON
      default   http://broker-ingress.knative-eventing.svc.cluster.local/default/default   9s    True    

      输出结果展示了Broker的基本信息和URL。URLzhi ding le指定了事件应该发送到哪里。

  2. 创建Trigger。

    1. 使用以下示例代码创建Trigger。

      订阅default Broker并将事件转发到event-display服务上。

      apiVersion: eventing.knative.dev/v1
      kind: Trigger
      metadata:
        name: my-service-trigger
      spec:
        broker: default
        subscriber:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      以上代码创建的my-service-triggerdefault Broker关联,并指定了event-display服务作为订阅者。这意味着,任何发送到default Broker的事件,只要没有其他特定的Trigger指定处理,都将被转发到event-display服务上。

    2. 部署Trigger。

      kubectl apply -f trigger.yaml
    3. 查看Trigger是否创建成功。

      kubectl  get trigger

      预期输出:

      NAME                 BROKER    SUBSCRIBER_URI                                   AGE   READY   REASON
      my-service-trigger   default   http://event-display.default.svc.cluster.local   22s   True    

      输出结果展示Trigger已经设置成功,并且处于就绪状态,表明配置正确且能够接收并路由事件。

步骤三:发送事件并验证服务是否能够正常接收事件

从外部直接向Broker发送HTTP POST请求模拟事件触发,然后使用curl命令发送带有特定CloudEvents头信息的请求到Broker,观察event-display服务是否接收到这条消息,并按预期展示。

  1. 从Broker中获取发送事件请求的地址broker-ingress.knative-eventing,通过向Broker发送事件请求,验证Knative Eventing系统中的Broker服务是否已经正确配置并能够接收事件。

    1. 通过kubectl的port-forward功能将集群内部服务的端口转发到本地,以访问服务。

       kubectl port-forward svc/broker-ingress -n knative-eventing 8080:80
    2. 使用curl命令发送一个POST请求到本地的8080端口,以验证服务是否能够正确接收并处理请求。

      curl -v "http://localhost:8080/default/default" \
         -X POST \
         -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \
         -H "Ce-Specversion: 1.0" \
         -H "Ce-Type: dev.knative.samples.helloworld" \
         -H "Ce-Source: dev.knative.samples/helloworldsource" \
         -H "Content-Type: application/json" \
         -d '{"msg":"Hello World from the curl pod."}'
      

      预期输出:

      Note: Unnecessary use of -X or --request, POST is already inferred.
      *   Trying 127.0.0.1:8080...
      * Connected to localhost (127.0.0.1) port 8080 (#0)
      > POST /default/default HTTP/1.1
      > Host: localhost:8080
      > User-Agent: curl/8.1.2
      > Accept: */*
      > Ce-Id: 53****3-88be-4077-9d7a-a3f162******9
      > Ce-Specversion: 1.0
      > Ce-Type: dev.knative.samples.helloworld
      > Ce-Source: dev.knative.samples/helloworldsource
      > Content-Type: application/json
      > Content-Length: 40
      > 
      < HTTP/1.1 202 Accepted
      < Allow: POST, OPTIONS
      < Date: Tue, 15 Oct 2024 09:36:42 GMT
      < Content-Length: 0
      < 
      * Connection #0 to host localhost left intact

      预期输出表明,服务能够正确地接收到发送到http://localhost:8080/default/default的POST请求,并返回了一个202 Accepted状态码,表明请求已被接受。

  2. 查看event-display的Pod日志,确认事件来源。

    1. 获取event-display Pod的信息。查看名为event-display的Pod的日志

      kubectl get pods --namespace=default | grep event-display
      # 输出结果如下所示:
      event-display-00001-deployment-766f7b9fd6-gfcz5   2/2     Running   0          3m43s
    2. 查看日志内容。

      kubectl logs event-display-00001-deployment-766f7b9fd6-gfcz5 
      # 日志输出结果如下所示:
      Defaulted container "user-container" out of: user-container, queue-proxy
      2024/10/15 09:36:44 Hello world sample started.
      2024/10/15 09:36:44 receive cloudevents.Event: %!(EXTRA string=Validation: valid Context Attributes,   specversion: 1.0   type: dev.knative.samples.helloworld   source: dev.knative.samples/helloworldsource   id: 536808d3-88be-4077-9d7a-a3f162705f79   datacontenttype: application/json Extensions,   knativearrivaltime: 2024-10-15T09:36:42.142165068Z Data,   {     "msg": "Hello World from the curl pod."   } )

      预期输出表明,一个运行在Kubernetes上的应用正在接收来自Knative Samples Hello World源的CloudEvents事件,并打印出了这些事件的内容。