基于RocketMQ指标的事件驱动伸缩(KEDA)

使用RocketMQ时,消息堆积产生时容易导致系统负载过高。为避免服务崩溃,提高系统的可靠性和稳定性,您可以基于RocketMQ消息堆积指标,使用KEDA(Kubernetes Event-driven Autoscaling)为应用的弹性伸缩方案,实现自动化和高效的容器水平伸缩(HPA)。

功能介绍

RocketMQ是一种高性能、高可靠、高扩展性的分布式消息中间件,已被广泛应用于企业级应用中。但使用RocketMQ时,容易产生消息堆积的问题,尤其是在高负载情况下。消息堆积容易导致系统负载过高,甚至导致服务崩溃。

在此场景下,您可以使用Kubernetes事件驱动自动伸缩工具KEDA,根据自定义的RocketMQ消息堆积指标,启动容器水平伸缩应用。此方案可以帮助您实现自动化和高效性的应用程序伸缩,从而提高系统的可靠性和稳定性。如果您采用的是开源的RocketMQ,考虑通过JMXPrometheus Exporter提供消息对接的数据也可以实现类似的能力。

本文使用阿里云Prometheus作为数据源,介绍如何实现RocketMQ的消息对接伸缩对象配置。

前提条件

步骤一:部署工作负载

本示例创建一个名为sample-appNginx示例应用。

  1. 登录容器计算服务控制台,在左侧导航栏选择集群列表

  2. 在集群列表页面,单击目标集群名称,然后在左侧导航栏,选择工作负载 > 无状态

  3. 无状态页面,单击使用YAML创建资源,按照页面提示选择示例模板自定义,使用内容创建一个名为sample-appNginx应用。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: sample-app
            image: registry.cn-hangzhou.aliyuncs.com/acs-sample/nginx:latest # 您可以按需修改为业务真实的RocketMQ的消费者镜像。
            resources:
              limits:
                cpu: "500m"

步骤二:配置ScaledObject伸缩策略

您可以通过配置ScaledObject来管理KEDA的伸缩策略,包括扩缩容的对象、最大最小副本数、扩缩容阈值(消息堆积量阈值)等。在配置ScaledObject前,您需要获取RocketMQ实例指标数据的Prometheus地址等信息。

获取RocketMQ实例信息

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在实例列表中,单击目标实例名称。

  3. 实例详情页面,记录基础信息中的实例 ID。例如:rmq-cn-uax33****。

  4. 在左侧边栏单击访问控制,在智能身份识别页签查看并记录当前实例的用户名和密码。

获取RocketMQ实例的Prometheus数据源

  1. 登录ARMS控制台

  2. 在左侧导航栏选择Prometheus监控 > 实例列表,进入可观测监控 Prometheus 版的实例列表页面。

  3. 单击目标实例云服务-{{RegionId}},在左侧导航栏,单击设置,记录HTTP API地址(Grafana 读取地址)

    image

创建ScaledObject配置

  1. 创建ScaledObject.yaml。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: prometheus-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: sample-app
      maxReplicaCount: 10
      minReplicaCount: 2
      triggers:
      - type: prometheus
        metadata:
          serverAddress: {{RocketMQ实例的Prometheus数据源HTTP API地址}}
          metricName: rocketmq_consumer_inflight_messages
          query: sum({__name__=~"rocketmq_consumer_ready_messages|rocketmq_consumer_inflight_messages",instance_id="{{RocketMQ实例ID}}",topic=~"keda"}) by (consumer_group)
          threshold: '30'

    参数说明如下:

    参数

    说明

    scaleTargetRef.name

    配置扩缩容的对象。此处配置步骤一:部署工作负载创建好的应用sample-app。

    maxReplicaCount

    扩容的最大副本数。

    minReplicaCount

    缩容的最小副本数。

    serverAddress

    配置存储RocketMQ指标数据的Prometheus地址,即上文记录的HTTP API地址(Grafana 读取地址)

    metricName

    PromQL请求数据。

    query

    metricNamePromQL请求的数据做聚合操作,此处聚合方式为消息堆积量的PromQL。其中instance_id请替换为RocketMQ的实例ID。

    threshold

    扩缩容的阈值。本示例将消息堆积量30作为阈值,超过30时会触发扩容。

  2. 部署并查看ScaledObject资源。

    1. 部署ScaledObject资源。

      kubectl apply -f ScaledObject.yaml
    2. 获取伸缩配置状态。

      kubectl get ScaledObject

      预期输出:

      NAME                      SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS     AUTHENTICATION   READY   ACTIVE   FALLBACK   AGE
      prometheus-scaledobject   apps/v1.Deployment   sample-app        2     10    prometheus                    True    False    False      105s
    3. 检查HPA的生成情况。

      kubectl get hpa

      预期输出:

      NAME                               REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   0/30 (avg)   2         10        2          28m
  3. (可选)使用Prometheus Token以提高数据读取的安全性,并配置Prometheus Token的验证。

    展开查看详细步骤

    1. 按照页面提示,生成Prometheus Token。

      image

    2. 创建一个Secret,其中需对customAuthHeader: "Authorization"customAuthValue这两个字段的Value值进行Base64编码。

      apiVersion: v1
      kind: Secret
      metadata:
        name: keda-prom-secret
        namespace: default
      data:
        customAuthHeader: "QXV0Xxxxxxxlvbg=="
        customAuthValue: "kR2tpT2lJeFpXSmxaVFV6WlMTxxxxxxxxRMVFE0TUdRdE9USXpaQzFqWkRZd09EZ3dOVFV5WWpZaWZRLjlDaFBYU0Q2dEhWc1dQaFlyMGh3ZU5FQjZQZWVETXFjTlYydVNqOU82TTQ="
    3. 参见下方示例代码,创建KEDA请求数据的凭证,并部署到集群中。

      apiVersion: keda.sh/v1alpha1
      kind: TriggerAuthentication
      metadata:
        name: keda-prom-creds
        namespace: default
      spec:
        secretTargetRef:
          - parameter: customAuthHeader
            name: keda-prom-secret
            key: customAuthHeader
          - parameter: customAuthValue
            name: keda-prom-secret
            key: customAuthValue
    4. ScaledObject YAML文件中,配置authenticationRef字段,填写创建的凭证名称。

      apiVersion: keda.sh/v1alpha1
      kind: ScaledObject
      metadata:
        name: prometheus-scaledobject
        namespace: default
      spec:
        scaleTargetRef:
          name: sample-app
        maxReplicaCount: 10
        minReplicaCount: 2
        triggers:
        - type: prometheus
          metadata:
            serverAddress: http://cn-beijing.arms.aliyuncs.com:9090/api/v1/prometheus/8cba801fff65546a3012e9a684****/****538168824185/cloud-product-rocketmq/cn-beijing
            metricName: rocketmq_consumer_inflight_messages
            query: sum({__name__=~"rocketmq_consumer_ready_messages|rocketmq_consumer_inflight_messages",instance_id="rmq-cn-uax3xxxxxx",topic=~"keda"}) by (consumer_group)
            threshold: '30'
            authModes: "custom"
          authenticationRef: # 配置字段。
            name: keda-prom-creds # 填写凭证名称。
      说明

      本示例使用Custom authentication的认证类型。您可以参见KEDA社区文档选择其他认证方式。

步骤三:配置生产者和消费者

RocketMQ 5.x 实例

本示例基于rocketmq-keda-sample项目生产和消费数据。请根据注释内容更新RocketMQ实例的地址、用户名和密码。

Producer

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"os"
	"time"
)

func main() {
	p, err := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
		producer.WithCredentials(primitive.Credentials{
			AccessKey: "xxxxxxxxxxx",  // RocketMQ instance username
			SecretKey: "xxxxxxxxxxx",  // RocketMQ instance password
		}),
		producer.WithRetry(2),
	)

	if err != nil {
		fmt.Printf("failed to start producer: %s", err.Error())
		os.Exit(1)
	}

	err = p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	for true {
		res, err := p.SendSync(context.Background(), primitive.NewMessage("keda",
			[]byte("Hello RocketMQ Go Client!")))

		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
		time.Sleep(100 * time.Millisecond)
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

Consumer

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"os"
	"time"
)

func main() {
	sig := make(chan os.Signal)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("keda"),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: "xxxxxxxxxxx", // RocketMQ instance username
			SecretKey: "xxxxxxxxxxx", // RocketMQ instance password
		}),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
	)
	err := c.Subscribe("keda", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
			time.Sleep(1 * time.Second)
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	<-sig
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}

RocketMQ 5.x Serverless实例

本示例基于RocketMQ-Client中提供的示例代码生产和消费数据。请根据注释内容更新RocketMQ实例的地址、用户名和密码,并根据Serverless版实例公网访问说明中提供的信息进行修改。

Producer

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
	Topic     = "xxxxxx" // Topic Name
	Endpoint  = "xxxxxx" // RocketMQ instance address
	AccessKey = "xxxxxx" // RocketMQ instance username
	SecretKey = "xxxxxx" // RocketMQ instance password
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// In most case, you don't need to create many producers, singleton pattern is more recommended.
	producer, err := rmq_client.NewProducer(&rmq_client.Config{
		Endpoint: Endpoint,
		NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop producer
	defer producer.GracefulStop()

	for i := 0; i < 10; i++ {
		// new a message
		msg := &rmq_client.Message{
			Topic: Topic,
			Body:  []byte("this is a message : " + strconv.Itoa(i)),
		}
		// set keys and tag
		msg.SetKeys("a", "b")
		msg.SetTag("ab")
		// send message in sync
		resp, err := producer.Send(context.TODO(), msg)
		if err != nil {
			log.Fatal(err)
		}
		for i := 0; i < len(resp); i++ {
			fmt.Printf("%#v\n", resp[i])
		}
		// wait a moment
		time.Sleep(time.Second * 1)
	}
}

Consumer

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
	Topic         = "xxxxxx" // Topic Name
	ConsumerGroup = "xxxxxx" // Consumer Group Name
	Endpoint  = "xxxxxx" // RocketMQ instance address
	AccessKey = "xxxxxx" // RocketMQ instance username
	SecretKey = "xxxxxx" // RocketMQ instance password
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// In most case, you don't need to create many consumers, singleton pattern is more recommended.
	simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
		Endpoint:      Endpoint,
		NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
		ConsumerGroup: ConsumerGroup,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithSimpleAwaitDuration(awaitDuration),
		rmq_client.WithSimpleSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
			Topic: rmq_client.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		for {
			fmt.Println("start receive message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				simpleConsumer.Ack(context.TODO(), mv)
				fmt.Println(mv)
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	// run for a while
	time.Sleep(time.Minute)
}

步骤四:使用生产和消费数据实现扩缩容

以下以使用RocketMQ 5.x 实例为例进行演示。

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在实例列表中,单击目标实例名称,查看并记录接入点和网络信息

  3. 在左侧导航栏,单击访问控制,然后单击智能身份识别页签,查看并记录实例用户名和实例密码。

  4. 运行Producer程序生产数据,查看HPA伸缩情况。

    1. 生产数据。

      go run producer.go
    2. 查看HPA伸缩情况。

      kubectl get hpa

      预期输出:

      NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   32700m/30 (avg)   2         10        10         47m

      可以看到,sample-app应用已经扩容到KEDA组件设置的最大副本数。

  5. 关闭Producer程序,运行Consumer程序,并查看HPA伸缩情况。

    1. 消费数据。

      go run consumer.go
    2. 查看HPA伸缩情况

      kubectl get hpa -w

      预期输出:

      NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   222500m/30 (avg)   2         10        10         50m
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   232400m/30 (avg)   2         10        10         51m
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   0/30 (avg)         2         10        10         52m
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   0/30 (avg)         2         10        2          57m

    可以看到,在数据消费结束一段时间后(约5分钟),sample-app应用缩容至KEDA组件设置的最小副本数。

相关文档

您也可以实现基于RabbitMQ指标的KEDA,监控队列长度和消息速率指标,请参见基于RabbitMQ指标的容器水平伸缩