基于RabbitMQ指标的容器水平伸缩

更新时间:2025-04-01 10:25:04

ack-keda提供事件驱动弹性能力,会从事件源中进行数据的周期性消费,当消息出现堆积,即可秒级触发一个批次的离线任务伸缩。RabbitMQKeda搭配,可以监控队列长度和消息速率指标。本文介绍如何利用RabbitMQ指标消息队列长度和事件驱动自动伸缩工具Keda实现应用的弹性伸缩。

前提条件

步骤一:部署工作负载创建应用

  1. 登录容器服务管理控制台,在左侧导航栏选择集群列表

  2. 集群列表页面,单击目标集群名称,然后在左侧导航栏,选择工作负载 > 无状态

  3. 无状态页面,单击使用YAML创建资源

  4. 创建页面,选择示例模板自定义。使用如下内容创建名为sample-app的应用,然后单击创建

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # 修改为业务真实的RabbitMQ的消费者镜像。
            resources:
              limits:
                cpu: "500m"

步骤二:部署基于RabbitMQ指标的弹性示例

以下步骤通过获取已创建RabbitMQ消息队列的实例信息,对接RabbitMQHPA伸缩指标,实现将RabbitMQ指标转换为HPA可用指标,并实现容器自动伸缩。

  1. 查看消息队列RabbitMQ实例信息。

    1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

    2. 单击目标实例名称,进入实例详情页面。在接入点信息页签,查看并记录公网接入点Endpoint

      说明

      云消息队列RabbitMQ在创建实例时,使用公网接入点会增加安全风险,可能会受到外部攻击或未授权访问,本实例使用公网接入点仅作为演示。如果您的应用主要在阿里云VPC内运行,并且不需要外部访问,建议不开启公网访问,以提高安全性。

    3. 在左侧导航栏中单击静态用户名密码,查看并记录用户名密码静态用户名密码..png

    4. 在左侧导航栏中单击Vhost列表,查看并记录Vhost的值,例如amq-test获取vhost..png

  2. 执行如下指令,创建连接认证字符串。

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    其中rabbitmq-usernamerabbitmq-password分别为上一步记录的用户名和密码,localhost为记录的Endpoint,vhost为上一步记录的名称。

  3. 使用如下YAML创建Secret。

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1x****** # 已创建的连接认证字符串。
  4. 使用如下YAML创建TriggerAuthentication对象,并将该YAML文件部署到集群中。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret为上一步创建的Secret。
          key: host

    执行如下命令,将YAML文件部署到集群中。

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    部署完成后,您可以在Keda中使用RabbitMQ触发器,并且可以通过引用TriggerAuthentication对象来连接RabbitMQ获取指标数据。

  5. 使用如下内容,创建YAML文件ScaledObject.yaml。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: rabbitmq-deployment
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: queue-test
          mode: QueueLength
          value: "20"
          metricName: queue-test 
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    参数

    说明

    参数

    说明

    scaleTargetRef

    配置扩缩容的对象,这里配置步骤一:部署工作负载创建应用已创建的应用sample-app。

    maxReplicaCount

    最大副本数。

    minReplicaCount

    最小副本数。

    protocol

    keda组件与RabbitMQ之间的通信协议。取值范围:auto、http、amqp。

    queueName

    待读取信息的队列名称。

    value

    触发扩容的阈值。

    metricName

    自定义指标的名称,用于告诉HPA从哪个指标获取数据来决策扩缩容。在这里是RabbitMQ消息队列名称queue-test,用于获取队列长度。

  6. 应用配置,查看ScaledObject、HPA资源状态。

    执行如下命令,创建资源。

    kubectl apply -f ScaledObject.yaml  

    执行如下命令,获取伸缩配置状态。

    kubectl get ScaledObject

    预期输出:

    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10   rabbitmq   keda-trigger-auth-rabbitmq-conn    True    False    False      17s

    执行如下命令,查看HPA的生成情况。

    kubectl get hpa

    预期输出:

    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

步骤三:生产及消费数据实现扩缩容

下面通过RabbitMQ消息队列的生产者与消费者代码,展示利用RabbitMQ消息队列长度实现容器的伸缩效果。

  1. 基于以下producer代码生产RabbitMQ消息。

    package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"    // 替换为待读取信息的队列名称。
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
            url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // 访问的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	go produce(ch, q)
    	select {}
    }
    // 生产消息队列
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "Failed to publish a message")
    		log.Printf("Successed to publish a message: %s", msg)
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. 执行如下命令,查看HPA详情。

    运行producer代码生产消息队列。

    go run producer.go

    执行如下命令,查看HPA详情

    kubectl get hpa

    预期输出:

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app    443000m/20 (avg)   1         10        10         9m15s

    预期输出表明,sample-app已经扩容到keda组件设置的最大值。

  3. 关闭producer程序,运行下面consumer程序消费队列。

    package main
    
    import (
    	"fmt"
    	"log"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"
    	url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // 访问的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost
    )
    
    func main() {
    	// 连接到 RabbitMQ
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	// 创建一个通道
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	// 声明队列(确保队列存在)
    	_, err = ch.QueueDeclare(
    		queueName, // 队列名称
    		true,      // 是否持久化
    		false,     // 是否排外
    		false,     // 是否在队列中等待
    		false,     // 是否自动删除
    		nil,       // 额外属性
    	)
    	failOnError(err, "Failed to declare a queue")
    
    	// 获取消息通道
    	msgs, err := ch.Consume(
    		queueName, // 队列名称
    		"",        // 消费者的名字,如果不需要可以留空
    		true,      // 自动确认
    		false,     // 是否排外
    		false,     // 是否在队列中等待
    		false,     // 是否必须优先
    		nil,       // 额外的参数
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	// 创建一个 goroutine 处理接收到的消息
    	go func() {
    		for msg := range msgs {
    			log.Printf("Received a message: %s", msg.Body)
    			// 在这里可以处理接收到的消息,例如进行业务逻辑处理
    			fmt.Printf("Processed message: %s\n", msg.Body)
    		}
    	}()
    
    	// 阻塞主程序,保持运行
    	select {}
    }
    
    // 简单的错误处理函数
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }

    运行consumer代码消费消息。

    go run consumer.go

    执行如下命令,监控HPA缩容,

    kubectl get hpa -w

    预期输出:

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    预期输出表明,在数据消费结束一段时间后,sample-app缩容至Keda组件设置的最小值。

相关文档

您还可以根据自定义的 RocketMQ 消息堆积指标配置 HPA,实现更灵活的消息驱动扩缩容,具体操作请参见基于RocketMQ指标的KEDA

  • 本页导读 (1)
  • 前提条件
  • 步骤一:部署工作负载创建应用
  • 步骤二:部署基于RabbitMQ指标的弹性示例
  • 步骤三:生产及消费数据实现扩缩容
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等