通过Knative事件驱动实现消息发送
Knative是一款基于Kubernetes的开源Serverless应用编排框架,其目标是制定云原生、跨平台的Serverless应用编排标准。阿里云Knative基于ACK Serverless之上,在完全兼容社区Knaitve的同时对FC、ECI工作负载进行统一应用编排,支持事件驱动、自动弹性。本文通过一个Demo示例介绍如何通过Knative部署消息处理、事件驱动和Homepage,实现消息发送。
前提条件
Demo介绍
Demo主要包含以下三部分:
HomePage:用于发送和接收弹幕,通过Knative Serving部署到函数计算FC。
事件驱动:用于接收事件,并进行事件过滤、流转,通过Knative Serving部署到ECI。
消息处理:用于处理弹幕消息,通过Knative Serving部署到ECI。
弹幕服务Demo实现弹幕功能的主要流程如下图所示。

您通过前端发送弹幕消息到HomePage。
HomePage接着将弹幕发送到Kafka,事件驱动接收弹幕消息,然后路由到消息处理进行加工。
待弹幕加工完之后,将弹幕结果发送到表格存储中。
页面展示前端获取的弹幕结果。
步骤一:部署消息处理
消息处理用于接收事件驱动发送的弹幕请求,并根据请求数进行自动扩缩容,待弹幕消息处理完成之后将结果发送到表格存储。部署之前,需要先确认当前无工作负载,以便观察部署之后的结果。
在容器服务管理控制台确认当前无工作负载。
登录容器服务管理控制台。
在控制台左侧导航栏,单击集群。
在集群列表页面,单击目标集群名称或者目标集群右侧操作列下的详情。
在集群管理页左侧导航栏,选择 。
在顶部的命名空间下拉列表中,选择default,确认default命名空间下无工作负载。
在容器服务管理控制台通过Knative把消息处理部署到ECI类型的工作负载。
在集群管理页左侧导航栏中,选择 。
在Knative页面,单击服务管理页签,然后单击使用模板创建。
从命名空间下拉列表中,选择default。从示例模板下拉列表中,选择自定义。复制以下YAML示例粘贴至模板,然后单击创建。
YAML示例如下。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: test-barrage-process spec: template: metadata: annotations: autoscaling.knative.dev/maxScale: "100" #服务配置的最大Pod数。 autoscaling.knative.dev/minScale: "0" #服务配置的最小Pod数。 k8s.aliyun.com/eci-image-snapshot-id: imc-uf636kjjx8xr4e75**** labels: danmu.role: "manager" spec: containerConcurrency: 2 #配置的Pod最大请求并发数。 serviceAccountName: barrage-install-sa containers: - args: - /manager env: - name: OTS_ENDPOINT #配置的表格存储访问地址。 value: https://barrage.cn-hangzhou.tablestore.aliyuncs.com - name: TABLE_NAME value: barrage - name: OTS_INSTANCENAME value: barrage - name: OTS_KEYID value: xxx - name: OTS_SECRET value: xxx - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: ROLE value: manager - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: TRACE_NAME value: "process" - name: PARENT_SPAN value: "barrage-sender" - name: SUB_SPAN value: "process" - name: TRACING #配置的调用链地址。 value: "http://tracing-analysis-dc-sh.aliyuncs.com/adapt_g2it2kg78n@5cf06035aec2eb9_g2it2kg78n@53df7ad2afe****/api/traces" image: registry-vpc.cn-shanghai.aliyuncs.com/knative-sample/barrage-manager:forrester-****_4cd77c84-20210618215458 name: user-container ports: - containerPort: 8000 name: http1
步骤二:部署事件驱动
事件驱动用于接收事件并进行事件流过滤、流转。本文示例使用Kafka事件源作为事件驱动,用于从Kafka接收弹幕消息,然后把弹幕路由到消息处理。
在集群管理页左侧导航栏,选择 。
在Knative页面,单击服务管理页签,然后单击使用模板创建。
从命名空间下拉列表中,选择default。从示例模板下拉列表中,选择自定义。复制以下YAML示例粘贴至模板,然后单击创建。
YAML示例如下。
apiVersion: sources.knative.dev/v1alpha1 kind: KafkaSource metadata: annotations: k8s.aliyun.com/req-timeout: "60" k8s.aliyun.com/retry-count: "5" k8s.aliyun.com/retry-interval: "2" name: barrage namespace: default spec: bootstrapServers: 192.168.42.205:9092,192.168.42.204:9092,192.168.42.203:9092 #配置消息队列Kafka版实例的默认接入点。 consumerGroup: barrage-info-consumer #配置消息队列Kafka版实例消费组。 sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: test-barrage-process #路由的目标消息处理。 namespace: default topics: barrage-info #配置消息队列Kafka版实例Topic的名称。
消息处理和事件驱动都部署成功后,可以在集群管理页左侧导航栏,选择
,在容器组页面看到消息处理以及Kafka事件源实例的状态都是running。
步骤三:部署HomePage
HomePage用于接收前端弹幕消息,并将弹幕消息发送到Kafka,同时从表格存储中接收弹幕结果。HomePage通过Knative函数方式部署之后,会自动在FC中创建服务、函数、自定义域名。部署HomePage之前需要先确认FC中无弹幕服务、函数以及自定义域名。
在函数计算控制台确认无弹幕服务、函数以及自定义域名。
在容器服务管理控制台,通过Knative把HomePage部署到FC类型的工作负载。
登录容器服务管理控制台。
在控制台左侧导航栏,单击集群。
在集群列表页面,单击目标集群名称或者目标集群右侧操作列下的详情。
在集群管理页左侧导航栏中,选择 。
在Knative页面,单击服务管理页签,然后单击使用模板创建。
从命名空间下拉列表中,选择default。从示例模板下拉列表中,选择自定义。复制以下YAML示例粘贴至模板,然后单击创建。
YAML示例如下。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: demo-barrage annotations: workload.serving.knative.aliyun.com/class: "fc" #部署FC类型的工作负载。 spec: template: metadata: annotations: fc.revision.serving.knative.aliyun.com/code-space: "image" #指定镜像类型。 fc.revision.serving.knative.aliyun.com/role-arm: "acs:ram::xxxx:role/knative-fc" #输入ARN绑定权限策略。 fc.revision.serving.knative.aliyun.com/domain: '{"domain":"barrage.demo.knative.top","path":"/*"}' #指定访问域名barrage.demo.knative.top。 spec: containers: - image: registry.cn-shanghai.aliyuncs.com/knative-sample/barrage-main:forrester-****_4cd77c84-20210618214527 env: - name: OTS_ENDPOINT #配置表格存储访问地址为OTS_ENDPOINT。 value: https://barrage.cn-hangzhou.ots.aliyuncs.com - name: TABLE_NAME value: barrage - name: OTS_INSTANCENAME value: barrage - name: OTS_KEYID value: xxx - name: OTS_SECRET value: xxx - name: KAFKA_SERVER #配置Kafka服务地址。 value: "106.15.XX.XX:9093,47.100.XX.XX:9093,47.102.XX.XX:9093" - name: KAFKA_USER value: "alikafka_pre-cn-xxx" - name: KAFKA_PWD value: "xxx" - name: KAFKA_TOPIC #配置弹幕消息Topic。 value: "barrage-info" - name: TRACING #配置调用链地址。 value: "http://tracing-analysis-dc-sh.aliyuncs.com/adapt_g2it2kg78n@5cf06035aec****_g2it2kg78n@53df7ad2afe8301/api/traces" - name: TRACE_NAME1 value: "sender" - name: TRACE_NAME2 value: "receiver" - name: TRACE_NAME3 value: "result" - name: PARENT_SPAN value: "barrage-sender" - name: SUB_SPAN1 value: "sender" - name: SUB_SPAN2 value: "result"
步骤四:访问服务
通过在函数计算控制台的域名管理页面得到的自定义域名,访问服务。
本文示例为http://barrage.demo.knative.top。
设置弹幕的参数,单击Send。
Message:具体发送的弹幕消息。
Concurrency:弹幕的并发数。
Duration:弹幕的持续时间。
本文使用默认配置发送弹幕。
执行结果
可以看到不断有弹幕消息展示出来,如下图所示。