基于RabbitMQ指标的容器水平伸缩

本文介绍如何使用RabbitMQ指标和事件驱动自动伸缩工具Keda实现应用的弹性伸缩。

前提条件

功能介绍

随着企业业务的不断发展,对应用程序的要求也逐渐提高。消息队列RabbitMQ版是一款基于高可用分布式存储架构实现的AMQP 0-9-1协议的消息产品,具备高并发、分布式、灵活扩缩容等云消息服务优势。使用RabbitMQ和Keda可以监控队列长度和消息速率指标,根据需求进行弹性伸缩应用,帮助您优化资源使用和满足应用程序的负载需求。目前阿里云RabbitMQ只支持amqp协议传输数据,RabbitMQ社区版本支持多种协议。

本文举例如何配置ack-keda,实现将RabbitMQ指标转换为HPA可用指标,并实现容器自动伸缩。

步骤一:部署工作负载创建应用

  1. 登录容器服务管理控制台,在左侧导航栏选择集群

  2. 集群列表页面,单击目标集群名称,然后在左侧导航栏,选择工作负载 > 无状态

  3. 无状态页面,单击使用YAML创建资源

  4. 创建页面,选择示例模板自定义。使用如下内容创建名为sample-app的应用,然后单击创建

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # 修改为业务真实的RabbitMQ的消费者镜像。
            resources:
              limits:
                cpu: "500m"

步骤二:部署基于RabbitMQ指标的弹性示例

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 单击目标实例名称,进入实例详情页面。在接入点信息页签,查看并记录公网接入点Endpoint

  3. 在左侧导航栏中单击静态用户名密码,查看并记录用户名密码静态用户名密码..png

  4. 在左侧导航栏中单击Vhost列表,查看并记录Vhost的值,例如amq-test获取vhost..png

  5. 执行如下指令,创建连接认证字符串。

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    其中rabbitmq-usernamerabbitmq-password分别为记录的用户名和密码、localhost为记录的Endpoint、vhost为上一步记录的名称。

  6. 使用如下YAML创建Secret。

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1xWXpNmd4TVRBNEXN0 # 已创建的连接认证字符串。
  7. 使用如下YAML创建TriggerAuthentication对象,并将该YAML文件部署到Kubernetes集群中。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret为上一步创建的Secret。
          key: host

    执行如下命令,将YAML文件部署到Kubernetes集群中。

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    部署完成后,您可以在Keda中使用RabbitMQ触发器,并且可以通过引用TriggerAuthentication对象来连接RabbitMQ获取指标数据。

  8. 使用如下内容,创建YAML文件ScaledObject.yaml。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: rabbitmq-deployment
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: amq-test
          mode: QueueLength
          value: "20"
          metricName: custom-testqueue 
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    参数

    说明

    scaleTargetRef

    配置扩缩容的对象,这里配置步骤一:部署工作负载创建应用已创建的应用sample-app。

    maxReplicaCount

    最大副本数。

    minReplicaCount

    最小副本数。

    protocol

    keda组件与RabitMQ之间的通信协议。取值范围:auto、http、amqp。

    queueName

    待读取信息的队列名称。

    value

    触发扩容的阈值。

  9. 执行如下命令,创建资源。

    // 下发伸缩配置。
    kubectl apply -f ScaledObject.yaml   
    
    scaledobject.keda.sh/rabbitmq-scaledobject created
    
    // 获取伸缩配置状态。
    kubectl get ScaledObject
    
    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10    rabbitmq   keda-trigger-auth-rabbitmq-conn   True    False    False      17s
    
    // 检查HPA的生成情况。
    kubectl get hpa
    
    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

步骤三:生产及消费数据实现扩缩容

  1. 本次测试借助下面代码生产及消费数据。

    )package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"    // 替换为待读取信息的队列名称。
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
      url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // 访问的RabbitMQ的url,拼接方式amqp://guest:password@localhost:5672/vhost
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	go produce(ch, q)
    	select {}
    }
    
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "Failed to publish a message")
    		log.Printf("Successed to publish a message: %s", msg)
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. 运行Producer程序生产数据,然后执行如下命令,查看HPA详情。

    kubectl get hpa

    预期输出:

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s

    预期输出表明,sample-app已经扩容到keda组件设置的最大值。

  3. 关闭Producer程序,运行Consumer程序,然后执行如下命令。

    kubectl get hpa -w

    预期输出:

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    预期输出表明,在数据消费结束一段时间后,sample-app缩容至keda组件设置的最小值。

相关文档

关于如何利用Kubernetes事件驱动自动伸缩工具Keda,根据自定义的RocketMQ消息堆积指标,启动容器水平伸缩(HPA),请参见基于RocketMQ指标的容器水平伸缩