通过RRSA连接云消息队列RabbitMQ版

更新时间:

借助RRSA能力,您可在ACK集群中为每个Pod绑定独立的RAM角色,实现细粒度的权限隔离。应用无需硬编码AccessKey,而是通过挂载的OIDC TokenSTS申请临时身份凭证,安全访问云消息队列 RabbitMQ 版,从而降低AccessKey泄露的风险。

实现原理

  1. 提交Pod:用户部署启用服务账户令牌卷投影功能的Pod。

  2. 自动挂载:ACK集群为该Pod自动创建并挂载经签名的OIDC Token文件。

  3. 扮演角色:Pod内应用读取该Token,调用STS AssumeRoleWithOIDC 接口,安全获取指定RAM角色的临时身份凭证。

  4. 建立连接:应用根据获取的临时身份凭证生成动态用户名与密码,建立云消息队列 RabbitMQ 版的连接,完成消息收发及元数据操作。

适用范围

RRSA功能目前仅支持1.22及以上版本的集群,即ACK托管集群基础版ACK托管集群ProACK Serverless集群基础版ACK Serverless集群ProACK Edge集群Pro

步骤一:启用RRSA功能

参考通过RRSA配置ServiceAccountRAM权限实现Pod权限隔离,为集群开启RRSA功能,并且为Pod注入如下配置:

类别

配置项名称

配置项说明

环境变量

ALIBABA_CLOUD_ROLE_ARN

需要扮演的RAM角色ARN。

ALIBABA_CLOUD_OIDC_PROVIDER_ARN

OIDC身份提供商的ARN。

ALIBABA_CLOUD_OIDC_TOKEN_FILE

包含OIDC Token的文件路径。

步骤二:获取临时身份凭证

接下来,基于上述已注入Pod的环境变量,获取临时身份凭证。示例代码如下:

from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient

def get_sts_credentials(region_id):
    # 获取环境变量中的三个认证相关信息
    oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
    role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
    token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')

    if not oidc_provider_arn:
        raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
    if not role_arn:
        raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
    if not token_file_path:
        raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")

    # 读取token file的具体内容
    try:
        with open(token_file_path, 'r') as f:
            oidc_token = f.read().strip()
    except Exception as e:
        raise RuntimeError(f"Failed to read OIDC token file: {e}")

    # 调用AssumeRoleWithOIDC接口,进行临时身份获取
    # 注意:此处endpoint需要根据您所在地域进行填充
    config = open_api_models.Config(signature_algorithm='v2')
    config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
    client = OpenApiClient(config)

    params = open_api_models.Params(
        action='AssumeRoleWithOIDC',
        version='2015-04-01',
        protocol='HTTPS',
        method='POST',
        auth_type='AK',
        style='RPC',
        pathname='/',
        req_body_type='json',
        body_type='json'
    )

    queries = {
        'OIDCProviderArn': oidc_provider_arn,
        'RoleArn': role_arn,
        'OIDCToken': oidc_token,
        'RoleSessionName': 'test'
    }

    request = open_api_models.OpenApiRequest(
        query=OpenApiUtilClient.query(queries)
    )
    runtime = util_models.RuntimeOptions()

    response = client.call_api(params, request, runtime)

    body = response.get('body')
    if not body:
        raise RuntimeError("Response body is empty")

    credentials = body.get('Credentials')
    if not credentials:
        raise RuntimeError("Credentials not found in response")

    # 打印获得的临时accessKeyId, accessKeySecret, 以及securityToken
    security_token = credentials.get('SecurityToken')
    access_key_id = credentials.get('AccessKeyId')
    access_key_secret = credentials.get('AccessKeySecret')

    print("SecurityToken:", security_token)
    print("AccessKeyId:", access_key_id)
    print("AccessKeySecret:", access_key_secret)

    return access_key_id, access_key_secret, security_token

步骤三:生成用户名与密码

接下来,基于获取的临时身份凭证,生成访问云消息队列 RabbitMQ 版的用户名与密码。示例代码如下:

def get_user_name(ak, instance_id, sts_token):
    # 将实例id/ak/sts token拼接,进行base64编码得到用户名
    buf = f"0:{instance_id}:{ak}:{sts_token}"
    return base64.b64encode(buf.encode('utf-8')).decode('utf-8')


def get_password(sk):
    # 获取当前时间戳
    timestamp = int(time.time() * 1000)
    # 将sk, 时间戳使用HmacSHA1进行哈希
    signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
    # 将哈希结果与时间进行拼接,并返回base64编码结果
    result = f"{signature}:{timestamp}"
    return base64.b64encode(result.encode('utf-8')).decode('utf-8')


def hmac_sha1(data, key):
    mac = hmac.new(key, data, hashlib.sha1)
    return mac.digest().hex().upper()
说明
  • 该用户名与密码不会在控制台展示,它是临时的,生命周期与临时身份凭证相同。

  • Java语言可通过com.alibaba.mq.amqp.utils.UserUtils类实现生成用户名与密码,请参见示例代码

  • 其他语言也可参考上述代码逻辑实现生成用户名与密码。

步骤四:建立连接

生成用户名与密码后,即可使用该用户名与密码建立连接。示例代码如下:

import pika

credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host=rabbitmq_host,
        port=port,
        virtual_host=vhost,
        credentials=credentials
    )
)
channel = connection.channel()
print("Connect success.")

完整代码示例

下面是完整的Python示例代码,它可以实现:

  1. Pod的环境变量中获取临时身份凭证。

  2. 创建用户名密码。

  3. 建立云消息队列 RabbitMQ 版连接。

  4. 每秒收发一条消息。

  5. 15秒进行一次临时身份凭证的轮转。

# -*- coding: utf-8 -*-
import os
import sys
import base64
import hmac
import hashlib
import time
import pika
from datetime import datetime

from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient


# ==================== 配置参数 ====================
# 请根据实际环境修改以下配置

# STS相关配置
REGION_ID = "cn-hangzhou"  # 阿里云地域ID

# RabbitMQ连接配置
RABBITMQ_HOST = "xxx.net.mq.amqp.aliyuncs.com"  # RabbitMQ接入点地址
INSTANCE_ID = "rabbitmq-xxx-xxxx"  # RabbitMQ实例ID
VHOST = "test"  # 虚拟主机名称
PORT = 5672  # 连接端口(AMQP协议默认5672)

# 队列和交换机配置
QUEUE_NAME = "test_queue"  # 队列名称
EXCHANGE_NAME = ''  # 交换机名称,默认交换机为空字符串

# 测试运行配置
TOTAL_DURATION = 300  # 总运行时间:5分钟(秒)
REFRESH_INTERVAL = 15  # STS凭证刷新间隔:15秒
MESSAGE_INTERVAL = 1  # 消息发送间隔:1秒

# ==================================================


def get_sts_credentials(region_id):
    """
    通过OIDC获取STS临时凭证
    
    Args:
        region_id: 阿里云地域ID
        
    Returns:
        tuple: (access_key_id, access_key_secret, security_token)
    """
    # 获取环境变量中的三个认证相关信息
    oidc_provider_arn = os.getenv('ALIBABA_CLOUD_OIDC_PROVIDER_ARN')
    role_arn = os.getenv('ALIBABA_CLOUD_ROLE_ARN')
    token_file_path = os.getenv('ALIBABA_CLOUD_OIDC_TOKEN_FILE')

    if not oidc_provider_arn:
        raise ValueError("ALIBABA_CLOUD_OIDC_PROVIDER_ARN is not set")
    if not role_arn:
        raise ValueError("ALIBABA_CLOUD_ROLE_ARN is not set")
    if not token_file_path:
        raise ValueError("ALIBABA_CLOUD_OIDC_TOKEN_FILE is not set")

    # 读取token file的具体内容
    try:
        with open(token_file_path, 'r') as f:
            oidc_token = f.read().strip()
    except Exception as e:
        raise RuntimeError(f"Failed to read OIDC token file: {e}")

    # 调用AssumeRoleWithOIDC接口,进行临时身份获取
    config = open_api_models.Config(signature_algorithm='v2')
    config.endpoint = 'sts.' + region_id + '.aliyuncs.com'
    client = OpenApiClient(config)

    params = open_api_models.Params(
        action='AssumeRoleWithOIDC',
        version='2015-04-01',
        protocol='HTTPS',
        method='POST',
        auth_type='AK',
        style='RPC',
        pathname='/',
        req_body_type='json',
        body_type='json'
    )

    queries = {
        'OIDCProviderArn': oidc_provider_arn,
        'RoleArn': role_arn,
        'OIDCToken': oidc_token,
        'RoleSessionName': 'test'
    }

    request = open_api_models.OpenApiRequest(
        query=OpenApiUtilClient.query(queries)
    )
    runtime = util_models.RuntimeOptions()

    response = client.call_api(params, request, runtime)

    body = response.get('body')
    if not body:
        raise RuntimeError("Response body is empty")

    credentials = body.get('Credentials')
    if not credentials:
        raise RuntimeError("Credentials not found in response")

    # 获取临时accessKeyId, accessKeySecret, 以及securityToken
    security_token = credentials.get('SecurityToken')
    access_key_id = credentials.get('AccessKeyId')
    access_key_secret = credentials.get('AccessKeySecret')

    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] STS Credentials refreshed")
    print(f"  AccessKeyId: {access_key_id[:10]}...")
    print(f"  SecurityToken: {security_token[:20]}...")

    return access_key_id, access_key_secret, security_token

    
def get_user_name(ak, instance_id, sts_token):
    """
    生成RabbitMQ用户名
    
    Args:
        ak: AccessKey ID
        instance_id: RabbitMQ实例ID
        sts_token: Security Token
        
    Returns:
        str: Base64编码的用户名
    """
    buf = f"0:{instance_id}:{ak}:{sts_token}"
    return base64.b64encode(buf.encode('utf-8')).decode('utf-8')


def get_password(sk):
    """
    生成RabbitMQ密码
    
    Args:
        sk: AccessKey Secret
        
    Returns:
        str: Base64编码的密码
    """
    timestamp = int(time.time() * 1000)
    signature = hmac_sha1(sk.encode('utf-8'), str(timestamp).encode('utf-8'))
    result = f"{signature}:{timestamp}"
    return base64.b64encode(result.encode('utf-8')).decode('utf-8')


def hmac_sha1(data, key):
    """
    计算HMAC-SHA1签名
    
    Args:
        data: 要签名的数据
        key: 签名密钥
        
    Returns:
        str: 十六进制大写签名字符串
    """
    mac = hmac.new(key, data, hashlib.sha1)
    return mac.digest().hex().upper()


def connect_to_rabbitmq(ak, sk, sts, rabbitmq_host, instance_id, vhost, port):
    """
    连接到RabbitMQ
    
    Args:
        ak: AccessKey ID
        sk: AccessKey Secret
        sts: Security Token
        rabbitmq_host: RabbitMQ接入点地址
        instance_id: RabbitMQ实例ID
        vhost: 虚拟主机名称
        port: 连接端口
        
    Returns:
        tuple: (connection, channel)
    """
    username = get_user_name(ak, instance_id, sts)
    password = get_password(sk)
    
    credentials = pika.PlainCredentials(username, password)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host=rabbitmq_host,
            port=port,
            virtual_host=vhost,
            credentials=credentials,
            heartbeat=600,
            blocked_connection_timeout=300
        )
    )
    channel = connection.channel()
    
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connected to RabbitMQ successfully")
    print(f"  Host: {rabbitmq_host}:{port}")
    print(f"  Instance: {instance_id}")
    print(f"  VHost: {vhost}")
    
    return connection, channel


def close_connection_safely(connection):
    """
    安全关闭RabbitMQ连接
    
    Args:
        connection: RabbitMQ连接对象
    """
    try:
        if connection and connection.is_open:
            connection.close()
            print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Connection closed")
    except Exception as e:
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error closing connection: {e}")


def send_receive_messages(connection, channel, queue_name, exchange_name=''):
    """
    发送并接收一条消息
    
    Args:
        connection: RabbitMQ连接对象
        channel: RabbitMQ通道对象
        queue_name: 队列名称
        exchange_name: 交换机名称
        
    Returns:
        bool: 操作是否成功
    """
    try:
        # 确保队列存在
        channel.queue_declare(queue=queue_name, durable=True)
        
        # 发送消息
        message = f"Test message at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=queue_name,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化消息
            )
        )
        
        # 接收消息
        method_frame, header_frame, body = channel.basic_get(queue=queue_name, auto_ack=True)
        
        if method_frame:
            print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent and received: {body.decode()[:50]}...")
            return True
        else:
            print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Sent message (no message in queue to receive)")
            return True
            
    except Exception as e:
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error in send/receive: {e}")
        return False


def print_config_summary():
    """打印配置摘要"""
    print(f"=== Configuration Summary ===")
    print(f"Region: {REGION_ID}")
    print(f"RabbitMQ Host: {RABBITMQ_HOST}:{PORT}")
    print(f"Instance ID: {INSTANCE_ID}")
    print(f"VHost: {VHOST}")
    print(f"Queue: {QUEUE_NAME}")
    print(f"Exchange: {EXCHANGE_NAME if EXCHANGE_NAME else '(default)'}")
    print(f"Total duration: {TOTAL_DURATION} seconds")
    print(f"STS refresh interval: {REFRESH_INTERVAL} seconds")
    print(f"Message interval: {MESSAGE_INTERVAL} second(s)")
    print()


def print_test_summary(message_count, error_count):
    """
    打印测试摘要
    
    Args:
        message_count: 成功消息数
        error_count: 错误数
    """
    total = message_count + error_count
    success_rate = (message_count / total * 100) if total > 0 else 0
    
    print(f"\n=== Test Summary ===")
    print(f"Total messages: {message_count}")
    print(f"Total errors: {error_count}")
    print(f"Success rate: {success_rate:.2f}%")


def main():
    """主函数:执行RabbitMQ连接测试和消息收发"""
    
    print("=== Starting RabbitMQ Test with STS Credentials ===\n")
    print_config_summary()
    
    start_time = time.time()
    last_refresh_time = 0
    connection = None
    channel = None
    message_count = 0
    error_count = 0
    
    try:
        # 初始获取STS凭证并建立连接
        print("=== Initial STS credentials and connection ===")
        ak, sk, sts = get_sts_credentials(REGION_ID)
        connection, channel = connect_to_rabbitmq(
            ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
        )
        last_refresh_time = time.time()
        
        while True:
            current_time = time.time()
            elapsed_time = current_time - start_time
            
            # 检查是否超过总运行时间
            if elapsed_time >= TOTAL_DURATION:
                print(f"\n=== Test completed ===")
                print_test_summary(message_count, error_count)
                break
            
            # 检查是否需要刷新STS凭证
            if current_time - last_refresh_time >= REFRESH_INTERVAL:
                print(f"\n=== Refreshing STS credentials (elapsed: {int(elapsed_time)}s) ===")
                
                # 关闭旧连接
                close_connection_safely(connection)
                
                try:
                    # 获取新凭证并重建连接
                    ak, sk, sts = get_sts_credentials(REGION_ID)
                    connection, channel = connect_to_rabbitmq(
                        ak, sk, sts, RABBITMQ_HOST, INSTANCE_ID, VHOST, PORT
                    )
                    last_refresh_time = current_time
                except Exception as e:
                    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Failed to refresh credentials/connection: {e}")
                    error_count += 1
                    time.sleep(1)
                    continue
            
            # 发送和接收消息
            try:
                if send_receive_messages(connection, channel, QUEUE_NAME, EXCHANGE_NAME):
                    message_count += 1
                else:
                    error_count += 1
            except Exception as e:
                print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Message operation failed: {e}")
                error_count += 1
            
            # 等待指定的时间间隔
            time.sleep(MESSAGE_INTERVAL)
            
    except KeyboardInterrupt:
        print(f"\n=== Test interrupted by user ===")
        print_test_summary(message_count, error_count)
    except Exception as e:
        print(f"\n=== Fatal error: {e} ===")
        import traceback
        traceback.print_exc()
        sys.exit(1)
    finally:
        # 清理资源
        close_connection_safely(connection)
        print(f"\n=== Resources cleaned up ===")


if __name__ == '__main__':
    main()

运行后预期输出如下:

image