通过RRSA连接云消息队列RabbitMQ版
借助RRSA能力,您可在ACK集群中为每个Pod绑定独立的RAM角色,实现细粒度的权限隔离。应用无需硬编码AccessKey,而是通过挂载的OIDC Token向STS申请临时身份凭证,安全访问云消息队列 RabbitMQ 版,从而降低AccessKey泄露的风险。
实现原理
提交Pod:用户部署启用服务账户令牌卷投影功能的Pod。
自动挂载:ACK集群为该Pod自动创建并挂载经签名的OIDC Token文件。
扮演角色:Pod内应用读取该Token,调用STS
AssumeRoleWithOIDC接口,安全获取指定RAM角色的临时身份凭证。建立连接:应用根据获取的临时身份凭证生成动态用户名与密码,建立云消息队列 RabbitMQ 版的连接,完成消息收发及元数据操作。
适用范围
RRSA功能目前仅支持1.22及以上版本的集群,即ACK托管集群基础版、ACK托管集群Pro版、ACK Serverless集群基础版、ACK Serverless集群Pro版和ACK Edge集群Pro版。
步骤一:启用RRSA功能
参考通过RRSA配置ServiceAccount的RAM权限实现Pod权限隔离,为集群开启RRSA功能,并且为Pod注入如下配置:
类别 | 配置项名称 | 配置项说明 |
环境变量 | ALIBABA_CLOUD_ROLE_ARN | 需要扮演的RAM角色ARN。 |
ALIBABA_CLOUD_OIDC_PROVIDER_ARN | OIDC身份提供商的ARN。 | |
ALIBABA_CLOUD_OIDC_TOKEN_FILE | 包含OIDC Token的文件路径。 |
步骤二:获取临时身份凭证
接下来,基于上述已注入Pod的环境变量,获取临时身份凭证。示例代码如下:
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
def get_sts_credentials(region_id):
# 获取环境变量中的三个认证相关信息
oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')
if not oidc_provider_arn:
raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
if not role_arn:
raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
if not token_file_path:
raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")
# 读取token file的具体内容
try:
with open(token_file_path, 'r') as f:
oidc_token = f.read().strip()
except Exception as e:
raise RuntimeError(f"Failed to read OIDC token file: {e}")
# 调用AssumeRoleWithOIDC接口,进行临时身份获取
# 注意:此处endpoint需要根据您所在地域进行填充
config = open_api_models.Config(signature_algorithm='v2')
config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
client = OpenApiClient(config)
params = open_api_models.Params(
action='AssumeRoleWithOIDC',
version='2015-04-01',
protocol='HTTPS',
method='POST',
auth_type='AK',
style='RPC',
pathname='/',
req_body_type='json',
body_type='json'
)
queries = {
'OIDCProviderArn': oidc_provider_arn,
'RoleArn': role_arn,
'OIDCToken': oidc_token,
'RoleSessionName': 'test'
}
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(queries)
)
runtime = util_models.RuntimeOptions()
response = client.call_api(params, request, runtime)
body = response.get('body')
if not body:
raise RuntimeError("Response body is empty")
credentials = body.get('Credentials')
if not credentials:
raise RuntimeError("Credentials not found in response")
# 打印获得的临时accessKeyId, accessKeySecret, 以及securityToken
security_token = credentials.get('SecurityToken')
access_key_id = credentials.get('AccessKeyId')
access_key_secret = credentials.get('AccessKeySecret')
print("SecurityToken:", security_token)
print("AccessKeyId:", access_key_id)
print("AccessKeySecret:", access_key_secret)
return access_key_id, access_key_secret, security_token步骤三:生成用户名与密码
接下来,基于获取的临时身份凭证,生成访问云消息队列 RabbitMQ 版的用户名与密码。示例代码如下:
def get_user_name(ak, instance_id, sts_token):
# 将实例id/ak/sts token拼接,进行base64编码得到用户名
buf = f"0:{instance_id}:{ak}:{sts_token}"
return base64.b64encode(buf.encode('utf-8')).decode('utf-8')
def get_password(sk):
# 获取当前时间戳
timestamp = int(time.time() * 1000)
# 将sk, 时间戳使用HmacSHA1进行哈希
signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
# 将哈希结果与时间进行拼接,并返回base64编码结果
result = f"{signature}:{timestamp}"
return base64.b64encode(result.encode('utf-8')).decode('utf-8')
def hmac_sha1(data, key):
mac = hmac.new(key, data, hashlib.sha1)
return mac.digest().hex().upper()
该用户名与密码不会在控制台展示,它是临时的,生命周期与临时身份凭证相同。
Java语言可通过
com.alibaba.mq.amqp.utils.UserUtils类实现生成用户名与密码,请参见示例代码。其他语言也可参考上述代码逻辑实现生成用户名与密码。
步骤四:建立连接
生成用户名与密码后,即可使用该用户名与密码建立连接。示例代码如下:
import pika
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbitmq_host,
port=port,
virtual_host=vhost,
credentials=credentials
)
)
channel = connection.channel()
print("Connect success.")完整代码示例
下面是完整的Python示例代码,它可以实现:
从Pod的环境变量中获取临时身份凭证。
创建用户名密码。
建立云消息队列 RabbitMQ 版连接。
每秒收发一条消息。
每15秒进行一次临时身份凭证的轮转。
# -*- coding: utf-8 -*-
import os
import sys
import base64
import hmac
import hashlib
import time
import pika
from datetime import datetime
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
# ==================== 配置参数 ====================
# 请根据实际环境修改以下配置
# STS相关配置
REGION_ID = "cn-hangzhou" # 阿里云地域ID
# RabbitMQ连接配置
RABBITMQ_HOST = "xxx.net.mq.amqp.aliyuncs.com" # RabbitMQ接入点地址
INSTANCE_ID = "rabbitmq-xxx-xxxx" # RabbitMQ实例ID
VHOST = "test" # 虚拟主机名称
PORT = 5672 # 连接端口(AMQP协议默认5672)
# 队列和交换机配置
QUEUE_NAME = "test_queue" # 队列名称
EXCHANGE_NAME = '' # 交换机名称,默认交换机为空字符串
# 测试运行配置
TOTAL_DURATION = 300 # 总运行时间:5分钟(秒)
REFRESH_INTERVAL = 15 # STS凭证刷新间隔:15秒
MESSAGE_INTERVAL = 1 # 消息发送间隔:1秒
# ==================================================
def get_sts_credentials(region_id):
"""
通过OIDC获取STS临时凭证
Args:
region_id: 阿里云地域ID
Returns:
tuple: (access_key_id, access_key_secret, security_token)
"""
# 获取环境变量中的三个认证相关信息
oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')
if not oidc_provider_arn:
raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
if not role_arn:
raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
if not token_file_path:
raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")
# 读取token file的具体内容
try:
with open(token_file_path, 'r') as f:
oidc_token = f.read().strip()
except Exception as e:
raise RuntimeError(f"Failed to read OIDC token file: {e}")
# 调用AssumeRoleWithOIDC接口,进行临时身份获取
config = open_api_models.Config(signature_algorithm='v2')
config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
client = OpenApiClient(config)
params = open_api_models.Params(
action='AssumeRoleWithOIDC',
version='2015-04-01',
protocol='HTTPS',
method='POST',
auth_type='AK',
style='RPC',
pathname='/',
req_body_type='json',
body_type='json'
)
queries = {
'OIDCProviderArn': oidc_provider_arn,
'RoleArn': role_arn,
'OIDCToken': oidc_token,
'RoleSessionName': 'test'
}
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(queries)
)
runtime = util_models.RuntimeOptions()
response = client.call_api(params, request, runtime)
body = response.get('body')
if not body:
raise RuntimeError("Response body is empty")
credentials = body.get('Credentials')
if not credentials:
raise RuntimeError("Credentials not found in response")
# 获取临时accessKeyId, accessKeySecret, 以及securityToken
security_token = credentials.get('SecurityToken')
access_key_id = credentials.get('AccessKeyId')
access_key_secret = credentials.get('AccessKeySecret')
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] STS Credentials refreshed")
print(f" AccessKeyId: {access_key_id[:10]}...")
print(f" SecurityToken: {security_token[:20]}...")
return access_key_id, access_key_secret, security_token
def get_user_name(ak, instance_id, sts_token):
"""
生成RabbitMQ用户名
Args:
ak: AccessKey ID
instance_id: RabbitMQ实例ID
sts_token: Security Token
Returns:
str: Base64编码的用户名
"""
buf = f"0:{instance_id}:{ak}:{sts_token}"
return base64.b64encode(buf.encode('utf-8')).decode('utf-8')
def get_password(sk):
"""
生成RabbitMQ密码
Args:
sk: AccessKey Secret
Returns:
str: Base64编码的密码
"""
timestamp = int(time.time() * 1000)
signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
result = f"{signature}:{timestamp}"
return base64.b64encode(result.encode('utf-8')).decode('utf-8')
def hmac_sha1(data, key):
"""
计算HMAC-SHA1签名
Args:
data: 要签名的数据
key: 签名密钥
Returns:
str: 十六进制大写签名字符串
"""
mac = hmac.new(key, data, hashlib.sha1)
return mac.digest().hex().upper()
def connect_to_rabbitmq(ak, sk, sts, rabbitmq_host, instance_id, vhost, port):
"""
连接到RabbitMQ
Args:
ak: AccessKey ID
sk: AccessKey Secret
sts: Security Token
rabbitmq_host: RabbitMQ接入点地址
instance_id: RabbitMQ实例ID
vhost: 虚拟主机名称
port: 连接端口
Returns:
tuple: (connection, channel)
"""
username = get_user_name(ak, instance_id, sts)
password = get_password(sk)
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbitmq_host,
port=port,
virtual_host=vhost,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
)
channel = connection.channel()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connected to RabbitMQ successfully")
print(f" Host: {rabbitmq_host}:{port}")
print(f" Instance: {instance_id}")
print(f" VHost: {vhost}")
return connection, channel
def close_connection_safely(connection):
"""
安全关闭RabbitMQ连接
Args:
connection: RabbitMQ连接对象
"""
try:
if connection and connection.is_open:
connection.close()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connection closed")
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error closing connection: {e}")
def send_receive_messages(connection, channel, queue_name, exchange_name=''):
"""
发送并接收一条消息
Args:
connection: RabbitMQ连接对象
channel: RabbitMQ通道对象
queue_name: 队列名称
exchange_name: 交换机名称
Returns:
bool: 操作是否成功
"""
try:
# 确保队列存在
channel.queue_declare(queue=queue_name, durable=True)
# 发送消息
message = f"Test message at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
channel.basic_publish(
exchange=exchange_name,
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
)
)
# 接收消息
method_frame, header_frame, body = channel.basic_get(queue=queue_name, auto_ack=True)
if method_frame:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent and received: {body.decode()[:50]}...")
return True
else:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent message (no message in queue to receive)")
return True
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error in send/receive: {e}")
return False
def print_config_summary():
"""打印配置摘要"""
print(f"=== Configuration Summary ===")
print(f"Region: {REGION_ID}")
print(f"RabbitMQ Host: {RABBITMQ_HOST}:{PORT}")
print(f"Instance ID: {INSTANCE_ID}")
print(f"VHost: {VHOST}")
print(f"Queue: {QUEUE_NAME}")
print(f"Exchange: {EXCHANGE_NAME if EXCHANGE_NAME else '(default)'}")
print(f"Total duration: {TOTAL_DURATION} seconds")
print(f"STS refresh interval: {REFRESH_INTERVAL} seconds")
print(f"Message interval: {MESSAGE_INTERVAL} second(s)")
print()
def print_test_summary(message_count, error_count):
"""
打印测试摘要
Args:
message_count: 成功消息数
error_count: 错误数
"""
total = message_count + error_count
success_rate = (message_count / total * 100) if total > 0 else 0
print(f"\n=== Test Summary ===")
print(f"Total messages: {message_count}")
print(f"Total errors: {error_count}")
print(f"Success rate: {success_rate:.2f}%")
def main():
"""主函数:执行RabbitMQ连接测试和消息收发"""
print("=== Starting RabbitMQ Test with STS Credentials ===\n")
print_config_summary()
start_time = time.time()
last_refresh_time = 0
connection = None
channel = None
message_count = 0
error_count = 0
try:
# 初始获取STS凭证并建立连接
print("=== Initial STS credentials and connection ===")
ak, sk, sts = get_sts_credentials(REGION_ID)
connection, channel = connect_to_rabbitmq(
ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
)
last_refresh_time = time.time()
while True:
current_time = time.time()
elapsed_time = current_time - start_time
# 检查是否超过总运行时间
if elapsed_time >= TOTAL_DURATION:
print(f"\n=== Test completed ===")
print_test_summary(message_count, error_count)
break
# 检查是否需要刷新STS凭证
if current_time - last_refresh_time >= REFRESH_INTERVAL:
print(f"\n=== Refreshing STS credentials (elapsed: {int(elapsed_time)}s) ===")
# 关闭旧连接
close_connection_safely(connection)
try:
# 获取新凭证并重建连接
ak, sk, sts = get_sts_credentials(REGION_ID)
connection, channel = connect_to_rabbitmq(
ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
)
last_refresh_time = current_time
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Failed to refresh credentials/connection: {e}")
error_count += 1
time.sleep(1)
continue
# 发送和接收消息
try:
if send_receive_messages(connection, channel, QUEUE_NAME, EXCHANGE_NAME):
message_count += 1
else:
error_count += 1
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Message operation failed: {e}")
error_count += 1
# 等待指定的时间间隔
time.sleep(MESSAGE_INTERVAL)
except KeyboardInterrupt:
print(f"\n=== Test interrupted by user ===")
print_test_summary(message_count, error_count)
except Exception as e:
print(f"\n=== Fatal error: {e} ===")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# 清理资源
close_connection_safely(connection)
print(f"\n=== Resources cleaned up ===")
if __name__ == '__main__':
main()
运行后预期输出如下:
