使用RocketMQ时,消息堆积产生时容易导致系统负载过高。为避免服务崩溃,提高系统的可靠性和稳定性,您可以基于RocketMQ消息堆积指标,使用KEDA(Kubernetes Event-driven Autoscaling)为应用的弹性伸缩方案,实现自动化和高效的容器水平伸缩(HPA)。
功能介绍
RocketMQ是一种高性能、高可靠、高扩展性的分布式消息中间件,已被广泛应用于企业级应用中。但使用RocketMQ时,容易产生消息堆积的问题,尤其是在高负载情况下。消息堆积容易导致系统负载过高,甚至导致服务崩溃。
在此场景下,您可以使用Kubernetes事件驱动自动伸缩工具KEDA,根据自定义的RocketMQ消息堆积指标,启动容器水平伸缩应用。此方案可以帮助您实现自动化和高效性的应用程序伸缩,从而提高系统的可靠性和稳定性。如果您采用的是开源的RocketMQ,考虑通过JMX的Prometheus Exporter提供消息对接的数据也可以实现类似的能力。
本文使用阿里云Prometheus作为数据源,介绍如何实现RocketMQ的消息对接伸缩对象配置。
前提条件
已安装ack-keda组件。
已创建消息队列RocketMQ版5.x系列实例,以及名为
keda
的Topic和Group。RocketMQ 5.x Serverless实例能够根据业务负载快速伸缩资源,支持根据实际的使用量分配资源和计算费用,能够有效地节约成本。
已在ARMS中接入阿里云 RocketMQ(5.0) 服务。
步骤一:部署工作负载
本示例创建一个名为sample-app的Nginx示例应用。
登录容器计算服务控制台,在左侧导航栏选择集群列表。
在集群列表页面,单击目标集群名称,然后在左侧导航栏,选择
。在无状态页面,单击使用YAML创建资源,按照页面提示选择示例模板为自定义,使用内容创建一个名为sample-app的Nginx应用。
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 1 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: sample-app image: registry.cn-hangzhou.aliyuncs.com/acs-sample/nginx:latest # 您可以按需修改为业务真实的RocketMQ的消费者镜像。 resources: limits: cpu: "500m"
步骤二:配置ScaledObject伸缩策略
您可以通过配置ScaledObject来管理KEDA的伸缩策略,包括扩缩容的对象、最大最小副本数、扩缩容阈值(消息堆积量阈值)等。在配置ScaledObject前,您需要获取RocketMQ实例指标数据的Prometheus地址等信息。
获取RocketMQ实例信息
登录云消息队列 RocketMQ 版控制台,在左侧导航栏单击实例列表。
在顶部菜单栏选择地域,如华东1(杭州),然后在实例列表中,单击目标实例名称。
在实例详情页面,记录基础信息中的实例 ID。例如:rmq-cn-uax33****。
在左侧边栏单击访问控制,在智能身份识别页签查看并记录当前实例的用户名和密码。
获取RocketMQ实例的Prometheus数据源
登录ARMS控制台。
在左侧导航栏选择 ,进入可观测监控 Prometheus 版的实例列表页面。
单击目标实例云服务-{{RegionId}},在左侧导航栏,单击设置,记录HTTP API地址(Grafana 读取地址)。
创建ScaledObject配置
创建ScaledObject.yaml。
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: prometheus-scaledobject namespace: default spec: scaleTargetRef: name: sample-app maxReplicaCount: 10 minReplicaCount: 2 triggers: - type: prometheus metadata: serverAddress: {{RocketMQ实例的Prometheus数据源HTTP API地址}} metricName: rocketmq_consumer_inflight_messages query: sum({__name__=~"rocketmq_consumer_ready_messages|rocketmq_consumer_inflight_messages",instance_id="{{RocketMQ实例ID}}",topic=~"keda"}) by (consumer_group) threshold: '30'
参数说明如下:
参数
说明
scaleTargetRef.name
配置扩缩容的对象。此处配置步骤一:部署工作负载创建好的应用sample-app。
maxReplicaCount
扩容的最大副本数。
minReplicaCount
缩容的最小副本数。
serverAddress
配置存储RocketMQ指标数据的Prometheus地址,即上文记录的HTTP API地址(Grafana 读取地址)。
metricName
PromQL请求数据。
query
对metricName中PromQL请求的数据做聚合操作,此处聚合方式为消息堆积量的PromQL。其中
instance_id
请替换为RocketMQ的实例ID。threshold
扩缩容的阈值。本示例将消息堆积量30作为阈值,超过30时会触发扩容。
部署并查看ScaledObject资源。
部署ScaledObject资源。
kubectl apply -f ScaledObject.yaml
获取伸缩配置状态。
kubectl get ScaledObject
预期输出:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE prometheus-scaledobject apps/v1.Deployment sample-app 2 10 prometheus True False False 105s
检查HPA的生成情况。
kubectl get hpa
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-prometheus-scaledobject Deployment/sample-app 0/30 (avg) 2 10 2 28m
(可选)使用Prometheus Token以提高数据读取的安全性,并配置Prometheus Token的验证。
步骤三:配置生产者和消费者
RocketMQ 5.x 实例
本示例基于rocketmq-keda-sample项目生产和消费数据。请根据注释内容更新RocketMQ实例的地址、用户名和密码。
Producer
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"time"
)
func main() {
p, err := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
producer.WithCredentials(primitive.Credentials{
AccessKey: "xxxxxxxxxxx", // RocketMQ instance username
SecretKey: "xxxxxxxxxxx", // RocketMQ instance password
}),
producer.WithRetry(2),
)
if err != nil {
fmt.Printf("failed to start producer: %s", err.Error())
os.Exit(1)
}
err = p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for true {
res, err := p.SendSync(context.Background(), primitive.NewMessage("keda",
[]byte("Hello RocketMQ Go Client!")))
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
time.Sleep(100 * time.Millisecond)
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
Consumer
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
"time"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("keda"),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "xxxxxxxxxxx", // RocketMQ instance username
SecretKey: "xxxxxxxxxxx", // RocketMQ instance password
}),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
)
err := c.Subscribe("keda", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
time.Sleep(1 * time.Second)
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}
RocketMQ 5.x Serverless实例
本示例基于RocketMQ-Client中提供的示例代码生产和消费数据。请根据注释内容更新RocketMQ实例的地址、用户名和密码,并根据Serverless版实例公网访问说明中提供的信息进行修改。
Producer
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)
const (
Topic = "xxxxxx" // Topic Name
Endpoint = "xxxxxx" // RocketMQ instance address
AccessKey = "xxxxxx" // RocketMQ instance username
SecretKey = "xxxxxx" // RocketMQ instance password
)
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many producers, singleton pattern is more recommended.
producer, err := rmq_client.NewProducer(&rmq_client.Config{
Endpoint: Endpoint,
NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop producer
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
// new a message
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
// wait a moment
time.Sleep(time.Second * 1)
}
}
Consumer
package main
import (
"context"
"fmt"
"log"
"os"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)
const (
Topic = "xxxxxx" // Topic Name
ConsumerGroup = "xxxxxx" // Consumer Group Name
Endpoint = "xxxxxx" // RocketMQ instance address
AccessKey = "xxxxxx" // RocketMQ instance username
SecretKey = "xxxxxx" // RocketMQ instance password
)
var (
// maximum waiting time for receive func
awaitDuration = time.Second * 5
// maximum number of messages received at one time
maxMessageNum int32 = 16
// invisibleDuration should > 20s
invisibleDuration = time.Second * 20
// receive messages in a loop
)
func main() {
// log to console
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many consumers, singleton pattern is more recommended.
simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
Endpoint: Endpoint,
NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
ConsumerGroup: ConsumerGroup,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithSimpleAwaitDuration(awaitDuration),
rmq_client.WithSimpleSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
Topic: rmq_client.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
// start simpleConsumer
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop simpleConsumer
defer simpleConsumer.GracefulStop()
go func() {
for {
fmt.Println("start receive message")
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
// ack message
for _, mv := range mvs {
simpleConsumer.Ack(context.TODO(), mv)
fmt.Println(mv)
}
fmt.Println("wait a moment")
fmt.Println()
time.Sleep(time.Second * 3)
}
}()
// run for a while
time.Sleep(time.Minute)
}
步骤四:使用生产和消费数据实现扩缩容
以下以使用RocketMQ 5.x 实例为例进行演示。
登录云消息队列 RocketMQ 版控制台,在左侧导航栏单击实例列表。
在顶部菜单栏选择地域,如华东1(杭州),然后在实例列表中,单击目标实例名称,查看并记录接入点和网络信息。
在左侧导航栏,单击访问控制,然后单击智能身份识别页签,查看并记录实例用户名和实例密码。
运行Producer程序生产数据,查看HPA伸缩情况。
生产数据。
go run producer.go
查看HPA伸缩情况。
kubectl get hpa
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-prometheus-scaledobject Deployment/sample-app 32700m/30 (avg) 2 10 10 47m
可以看到,sample-app应用已经扩容到KEDA组件设置的最大副本数。
关闭Producer程序,运行Consumer程序,并查看HPA伸缩情况。
消费数据。
go run consumer.go
查看HPA伸缩情况
kubectl get hpa -w
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-prometheus-scaledobject Deployment/sample-app 222500m/30 (avg) 2 10 10 50m keda-hpa-prometheus-scaledobject Deployment/sample-app 232400m/30 (avg) 2 10 10 51m keda-hpa-prometheus-scaledobject Deployment/sample-app 0/30 (avg) 2 10 10 52m keda-hpa-prometheus-scaledobject Deployment/sample-app 0/30 (avg) 2 10 2 57m
可以看到,在数据消费结束一段时间后(约5分钟),sample-app应用缩容至KEDA组件设置的最小副本数。
相关文档
您也可以实现基于RabbitMQ指标的KEDA,监控队列长度和消息速率指标,请参见基于RabbitMQ指标的容器水平伸缩。