ack-keda提供事件驱动弹性能力,会从事件源中进行数据的周期性消费,当消息出现堆积,即可秒级触发一个批次的离线任务伸缩。RabbitMQ和Keda搭配,可以监控队列长度和消息速率指标。本文介绍如何利用RabbitMQ指标消息队列长度和事件驱动自动伸缩工具Keda实现应用的弹性伸缩。
前提条件
已部署ack-keda组件,请参见事件驱动弹性。
已创建云消息队列 RabbitMQ 版实例,请参见创建资源。
通过kubectl已连接Kubernetes集群,请参见通过kubectl连接集群。
已安装Go语言运行环境。
步骤一:部署工作负载创建应用
登录容器服务管理控制台,在左侧导航栏选择集群列表。
在集群列表页面,单击目标集群名称,然后在左侧导航栏,选择 。
在无状态页面,单击使用YAML创建资源。
在创建页面,选择示例模板为自定义。使用如下内容创建名为sample-app的应用,然后单击创建。
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # 修改为业务真实的RabbitMQ的消费者镜像。 resources: limits: cpu: "500m"
步骤二:部署基于RabbitMQ指标的弹性示例
以下步骤通过获取已创建RabbitMQ消息队列的实例信息,对接RabbitMQ与HPA伸缩指标,实现将RabbitMQ指标转换为HPA可用指标,并实现容器自动伸缩。
查看消息队列RabbitMQ实例信息。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
单击目标实例名称,进入实例详情页面。在接入点信息页签,查看并记录公网接入点的Endpoint。
云消息队列RabbitMQ在创建实例时,使用公网接入点会增加安全风险,可能会受到外部攻击或未授权访问,本实例使用公网接入点仅作为演示。如果您的应用主要在阿里云VPC内运行,并且不需要外部访问,建议不开启公网访问,以提高安全性。
在左侧导航栏中单击静态用户名密码,查看并记录用户名和密码。
在左侧导航栏中单击Vhost列表,查看并记录Vhost的值,例如amq-test。
执行如下指令,创建连接认证字符串。
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64
其中
rabbitmq-username
和rabbitmq-password
分别为上一步记录的用户名和密码,localhost
为记录的Endpoint,vhost
为上一步记录的名称。使用如下YAML创建Secret。
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # 已创建的连接认证字符串。
使用如下YAML创建TriggerAuthentication对象,并将该YAML文件部署到集群中。
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret为上一步创建的Secret。 key: host
执行如下命令,将YAML文件部署到集群中。
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yaml
部署完成后,您可以在Keda中使用RabbitMQ触发器,并且可以通过引用TriggerAuthentication对象来连接RabbitMQ获取指标数据。
使用如下内容,创建YAML文件ScaledObject.yaml。
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: rabbitmq-deployment maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-conn
参数
说明
参数
说明
scaleTargetRef
配置扩缩容的对象,这里配置步骤一:部署工作负载创建应用已创建的应用sample-app。
maxReplicaCount
最大副本数。
minReplicaCount
最小副本数。
protocol
keda组件与RabbitMQ之间的通信协议。取值范围:auto、http、amqp。
queueName
待读取信息的队列名称。
value
触发扩容的阈值。
metricName
自定义指标的名称,用于告诉HPA从哪个指标获取数据来决策扩缩容。在这里是RabbitMQ消息队列名称queue-test,用于获取队列长度。
应用配置,查看ScaledObject、HPA资源状态。
执行如下命令,创建资源。
kubectl apply -f ScaledObject.yaml
执行如下命令,获取伸缩配置状态。
kubectl get ScaledObject
预期输出:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17s
执行如下命令,查看HPA的生成情况。
kubectl get hpa
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
步骤三:生产及消费数据实现扩缩容
下面通过RabbitMQ消息队列的生产者与消费者代码,展示利用RabbitMQ消息队列长度实现容器的伸缩效果。
基于以下producer代码生产RabbitMQ消息。
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // 替换为待读取信息的队列名称。 numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // 访问的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } // 生产消息队列 func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Successed to publish a message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
执行如下命令,查看HPA详情。
运行producer代码生产消息队列。
go run producer.go
执行如下命令,查看HPA详情
kubectl get hpa
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s
预期输出表明,sample-app已经扩容到keda组件设置的最大值。
关闭producer程序,运行下面consumer程序消费队列。
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // 访问的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost ) func main() { // 连接到 RabbitMQ conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明队列(确保队列存在) _, err = ch.QueueDeclare( queueName, // 队列名称 true, // 是否持久化 false, // 是否排外 false, // 是否在队列中等待 false, // 是否自动删除 nil, // 额外属性 ) failOnError(err, "Failed to declare a queue") // 获取消息通道 msgs, err := ch.Consume( queueName, // 队列名称 "", // 消费者的名字,如果不需要可以留空 true, // 自动确认 false, // 是否排外 false, // 是否在队列中等待 false, // 是否必须优先 nil, // 额外的参数 ) failOnError(err, "Failed to register a consumer") // 创建一个 goroutine 处理接收到的消息 go func() { for msg := range msgs { log.Printf("Received a message: %s", msg.Body) // 在这里可以处理接收到的消息,例如进行业务逻辑处理 fmt.Printf("Processed message: %s\n", msg.Body) } }() // 阻塞主程序,保持运行 select {} } // 简单的错误处理函数 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
运行consumer代码消费消息。
go run consumer.go
执行如下命令,监控HPA缩容,
kubectl get hpa -w
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m
预期输出表明,在数据消费结束一段时间后,sample-app缩容至Keda组件设置的最小值。
相关文档
您还可以根据自定义的 RocketMQ 消息堆积指标配置 HPA,实现更灵活的消息驱动扩缩容,具体操作请参见基于RocketMQ指标的KEDA。
- 本页导读 (1)
- 前提条件
- 步骤一:部署工作负载创建应用
- 步骤二:部署基于RabbitMQ指标的弹性示例
- 步骤三:生产及消费数据实现扩缩容
- 相关文档