以加密的方式接入模型推理功能

在调用大模型时,未加密的请求内容在传输过程中可能面临被窃听、篡改或伪造的安全风险。当请求内容中涉及个人隐私、商业机密等敏感信息,或通过公网传输时,您可以对请求体中的input字段值加密,以提高数据传输的安全性。

加解密过程

image

采用混合加密机制:数据由AES对称算法加密,其密钥通过RSA非对称加密实现安全传输。

  1. 准备请求数据。

    • 生成AES对称密钥。AES是一种高效的对称加密算法,在此用于加密input内容。

      input为大模型应用调用请求中的核心对象,包含必选的提示词(prompt)和可选的历史对话记录(messages)等参数。
    • 使用密钥对input内容进行加密。

    • 使用RSA公钥对AES密钥进行加密。从阿里云百炼平台获取托管的RSA公钥,对AES密钥进行加密,以确保AES密钥的传输安全。

      RSA是一种非对称加密算法,包括公钥和私钥。其中公钥用于加密数据,私钥用于解密数据。
  2. 发起请求。发起阿里云百炼平台调用时将加密后的input内容、密钥信息(封装在X-DashScope-EncryptionKey请求头中)传入阿里云百炼平台。

  3. 阿里云百炼平台处理请求。

    • 阿里云百炼推理链路中全程加密。

    • 阿里云百炼平台在向量召回、模型推理过程中解密数据。

    • 使用相同的AES密钥加密生成的答案,返回加密后的推理结果。

  4. 处理响应结果。用户侧收到响应内容,使用AES密钥解密推理结果,获得明文答案。

前提条件

已开通阿里云百炼服务并获得API-KEY:获取API Key

建议您将API-KEY配置到环境变量中以降低API-KEY的泄漏风险,配置方法可参考配置API Key到环境变量。您也可以在代码中配置API-KEY,但是泄漏风险会提高。

DashScope SDK调用(自动加密·开箱即用

DashScope SDK 封装了加解密逻辑,您只需启用加密功能即可实现加密调用,无需自行实现加密和解密代码。

约束与限制

接入流程

  1. 安装最新版DashScope SDK

  2. 启用加密功能。

    Java SDK

    enableEncrypt设置为true即可启用加密功能。

    GenerationParam param = GenerationParam.builder()
      // ...这里省略其他代码
      // 启用加密功能
      .enableEncrypt(true)
      .build();

    Python SDK

    enable_encryption设置为True即可启用加密功能。

    # ...这里省略其他代码
    response = dashscope.Generation.call(
        # ...这里省略其他代码
        # 启用加密功能
        enable_encryption=True
    )
  3. 调用模型。

完整示例代码

示例代码仅供参考,请勿直接在生产环境中使用。
关于请求参数的更多说明,请参见通义千问API文档

Java SDK

请求示例

import java.util.Arrays;
import java.lang.System;
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationParam;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.ApiException;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.JsonUtils;

public class Main {
    public static GenerationResult callWithMessage() throws ApiException, NoApiKeyException, InputRequiredException {
        Generation gen = new Generation();
        Message systemMsg = Message.builder()
                .role(Role.SYSTEM.getValue())
                .content("You are a helpful assistant.")
                .build();
        Message userMsg = Message.builder()
                .role(Role.USER.getValue())
                .content("你是谁?")
                .build();
        GenerationParam param = GenerationParam.builder()
                // 若没有配置环境变量,请用百炼API Key将下行替换为:.apiKey("sk-xxx")
                .apiKey(System.getenv("DASHSCOPE_API_KEY"))
                // 此处以qwen-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
                .model("qwen-plus")
                .messages(Arrays.asList(systemMsg, userMsg))
                .resultFormat(GenerationParam.ResultFormat.MESSAGE)
                // 启用加密功能
                .enableEncrypt(true)
                .build();
        return gen.call(param);
    }
    public static void main(String[] args) {
        try {
            GenerationResult result = callWithMessage();
            System.out.println(JsonUtils.toJson(result));
        } catch (ApiException | NoApiKeyException | InputRequiredException e) {
            // 使用日志框架记录异常信息
            System.err.println("An error occurred while calling the generation service: " + e.getMessage());
        }
        System.exit(0);
    }
}

响应示例

{
    "finish_reason": "stop",
    "text": "我是Qwen,由阿里云开发的超大规模语言模型。我的目标是帮助用户更高效地获取信息、解决各种问题并激发创造力。无论是回答问题、提供信息还是进行创意性的讨论,我都会尽力提供支持。有什么我可以帮到你的吗?"
}

Python SDK

请求示例

import os
import dashscope

messages = [
    {'role': 'system', 'content': 'You are a helpful assistant.'},
    {'role': 'user', 'content': '你是谁?'}
    ]
response = dashscope.Generation.call(
    # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
    api_key=os.getenv('DASHSCOPE_API_KEY'),
    model="qwen-plus", # 此处以qwen-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
    messages=messages,
    result_format='message',
    # 启用加密功能
    enable_encryption=True
)
print(response)

响应示例

{
    "finish_reason": "stop",
    "text": "我是Qwen,由阿里云开发的超大规模语言模型。我的目标是帮助用户更高效地获取信息、解决各种问题并激发创造力。无论是回答问题、提供信息还是进行创意性的讨论,我都会尽力提供支持。有什么我可以帮到你的吗?"
}

HTTP调用(手动密钥管理

普通调用的区别

加密调用在普通调用的基础上,需要额外做如下三个处理(详细操作请参见接入流程):

  1. 添加X-DashScope-EncryptionKey请求头。

    X-DashScope-EncryptionKey请求头对应的内容是一个JSON字符串,各参数说明如下:

  2. 对请求参数input的内容进行加密。

    普通调用和加密调用请求数据的区别:

    普通调用

    加密调用

    // 。。。这里省略前面其他内容
    "input": {
        "messages": [
            {
                "role": "system",
                "content": "You are a helpful assistant."
            },
            {
                "role": "user",
                "content": "你是谁?"
            }
        ]
    }
    // 。。。这里省略后面其他内容
    // 。。。这里省略前面其他内容
    "input": "+J2aT8GNBUD......"
    // 。。。这里省略后面其他内容
  3. 对响应数据进行解密。

接入流程

一、准备请求数据

  1. 生成AES密钥

    • 算法:AES

    • 密钥规格

      • 长度:128位(16字节)/192位(24字节)/256位 (32字节)

        密钥长度越长,安全性越高,但计算开销越大;256位安全性最高,适用于金融核心数据等高敏感数据保护
      • 随机性:随机生成,建议使用密码学安全随机源

      • 唯一性:单次请求有效,禁止复用

    • 示例代码

      private static SecretKey generateAesSecretKey() throws Exception {
              KeyGenerator keyGen = KeyGenerator.getInstance("AES");
              SecureRandom secureRandom = new SecureRandom();
              keyGen.init(256, secureRandom);
              return keyGen.generateKey();
      }
      def generate_aes_secret_key():
          """生成256AES密钥"""
          return os.urandom(32)  # 32字节 = 256
  2. 生成初始向量(IV)

    生成GCM加密所需的随机初始化向量:使用安全的随机数生成器生成一个12字节的随机字节序列,然后将这个字节序列进行Base64编码得到IV。

    示例代码

    private static final int GCM_IV_LENGTH = 12; // 12 字节 IV
        
    private static byte[] generateIv() {
        byte[] iv = new byte[GCM_IV_LENGTH];
        SecureRandom secureRandom = new SecureRandom(); // 随机生成确保唯一性
        secureRandom.nextBytes(iv);
        return iv;
    }
    # 常量定义
    GCM_IV_LENGTH = 12       # 12字节IV
    
    def generate_iv():
        """生成12字节随机IV"""
        return os.urandom(GCM_IV_LENGTH)
  3. 加密input数据

    • 算法:AES-GCM(认证标签长度:128位)

    • 参数

    • 处理流程

      1. input序列化为JSON字符串:将下图中深色部分内容(即“{"messages":[......]}”)序列化

        image.png

      2. 对序列化后的input执行AES-GCM加密

      3. 将加密结果转成字符串格式,待后续构建并发送请求时将其拼接到input字段中

    • 示例代码

      private static final int GCM_TAG_LENGTH = 128; // 128 位 Tag(16 字节)
      
      private static String encryptInputWithAes(SecretKey aesSecretKey,byte[] iv, String input) throws Exception {
              byte[] content = input.getBytes(StandardCharsets.UTF_8);
      
              Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding");
              GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv);
              aesCipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec);
      
              byte[] encryptedBytes = aesCipher.doFinal(content);
              return Base64.encodeBase64String(encryptedBytes);
      }
      def encrypt_input_with_aes(aes_key, iv, plaintext):
          """使用AES-GCM加密数据"""
          # 创建AES-GCM加密器
          aesgcm = Cipher(
              algorithms.AES(aes_key),
              modes.GCM(iv, tag=None),
              backend=default_backend()
          ).encryptor()
          
          # 关联数据设为空(根据需求可调整)
          aesgcm.authenticate_additional_data(b'')
          
          # 加密数据
          ciphertext = aesgcm.update(plaintext.encode('utf-8')) + aesgcm.finalize()
          
          # 获取认证标签
          tag = aesgcm.tag
          
          # 组合密文和标签
          encrypted_data = ciphertext + tag
          
          # 返回Base64编码结果
          return base64.b64encode(encrypted_data).decode('utf-8')
  4. 使用RSA公钥加密AES密钥

    • 算法:RSA

    • 处理流程

      1. Base64编码原始AES密钥(即步骤1生成的AES密钥)

      2. 使用RSA公钥(请参见获取RSA的公钥从阿里云百炼平台获取public_keypublic_key_id)对编码后的AES密钥进行加密

      3. 对加密结果进行Base64编码

    • 示例代码

      private static String encryptAesKeyWithRsaPublicKey(SecretKey aesSecretKey, String publicKey) throws Exception {
              byte[] aesKeyBytes = aesSecretKey.getEncoded();
              String base64AesKey = Base64.encodeBase64String(aesKeyBytes);
      
              byte[] publicKeyBytes = Base64.decodeBase64(publicKey);
              X509EncodedKeySpec spec = new X509EncodedKeySpec(publicKeyBytes);
              KeyFactory kf = KeyFactory.getInstance("RSA");
              PublicKey pubKey = kf.generatePublic(spec);
      
              Cipher rsaCipher = Cipher.getInstance("RSA");
              rsaCipher.init(Cipher.ENCRYPT_MODE, pubKey);
              byte[] encryptedBytes = rsaCipher.doFinal(base64AesKey.getBytes());
      
              return Base64.encodeBase64String(encryptedBytes);
      }
      def encrypt_aes_key_with_rsa(aes_key, public_key_str):
          """使用RSA公钥加密AES密钥"""
          # 解码Base64格式的公钥
          public_key_bytes = base64.b64decode(public_key_str)
          
          # 加载公钥
          public_key = serialization.load_der_public_key(
              public_key_bytes,
              backend=default_backend()
          )
          
          # 先对AES密钥进行Base64编码
          base64_aes_key = base64.b64encode(aes_key).decode('utf-8')
          
          # 使用RSA加密
          encrypted_bytes = public_key.encrypt(
              base64_aes_key.encode('utf-8'),
              padding.PKCS1v15()
          )
          
          # 返回Base64编码的加密结果
          return base64.b64encode(encrypted_bytes).decode('utf-8')

二、构建并发送请求

添加X-DashScope-EncryptionKey请求头,并将原先的input替换为加密后的input,然后用和普通调用同样的方式发送请求即可。

X-DashScope-EncryptionKey请求头对应的内容是一个JSON字符串,各参数说明如下:

最终发送给大模型的HTTP请求内容格式如下:

image

示例代码:

private static String sendEncryptedRequest(String apiKey, String publicKeyId, String encryptedAesKey, byte[] iv, String encryptedInput) throws Exception {
        OkHttpClient okHttpClient = new OkHttpClient().newBuilder().build();

        JSONObject requestBodyJson = new JSONObject();
        requestBodyJson.put("model", "qwen-max");
        requestBodyJson.put("input", encryptedInput);

        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestBodyJson.toJSONString());

        JSONObject headerJson = new JSONObject();
        headerJson.put("Content-Type", "application/json");
        headerJson.put("Accept", "application/json");
        headerJson.put("Authorization","Bearer " + apiKey);

        String encryptionHeader = String.format("{\"public_key_id\": \"%s\", \"encrypt_key\": \"%s\", \"iv\": \"%s\"}", publicKeyId, encryptedAesKey, Base64.encodeBase64String(iv));
        headerJson.put("X-DashScope-EncryptionKey", encryptionHeader);

        Headers headers = Headers.of(JSONObject.parseObject(headerJson.toString(), Map.class));

        String url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation";
        Request request = new Request.Builder()
                .url(url)
                .post(requestBody)
                .headers(headers)
                .build();

        return okHttpClient.newCall(request).execute().body().string();
}
def send_encrypted_request(api_key, public_key_id, encrypted_aes_key, iv, encrypted_input):
    """发送加密请求到API端点"""
    url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
    
    # 构建请求头
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": f"Bearer {api_key}",
        "X-DashScope-EncryptionKey": json.dumps({
            "public_key_id": public_key_id,
            "encrypt_key": encrypted_aes_key,
            "iv": base64.b64encode(iv).decode('utf-8')
        })
    }
    
    # 构建请求体
    payload = {
        "model": "qwen-max",
        "input": encrypted_input
    }
    
    # 发送POST请求
    response = requests.post(
        url,
        headers=headers,
        json=payload
    )
    
    # 检查响应状态
    response.raise_for_status()
    return response.json()

三、解密响应数据

  • 算法:AES-GCM

  • 参数

    • 密钥:发送请求时生成的AES密钥(即【准备请求数据】的步骤1生成的AES密钥)。

    • IV: 发送请求时使用的初始化向量(即【准备请求数据】的步骤2生成的IV)。

  • 处理流程

    1. 使用AES-GCM解密output字段对应的内容。

    2. 返回明文结果。

  • 示例代码

    private static String decryptResponseWithAes(SecretKey aesSecretKey, byte[] iv, String response) throws Exception {
            JSONObject responseJson = JSONObject.parseObject(response, JSONObject.class);
            String encryptedOutput = responseJson.getString("output");
    
            Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding");
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv);
            aesCipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec);
    
            byte[] decryptedBytes = aesCipher.doFinal(Base64.decodeBase64(encryptedOutput));
            return new String(decryptedBytes);
    }
    def decrypt_response_with_aes(aes_key, iv, response_data):
        """使用AES-GCM解密响应"""
        # 提取加密的输出
        encrypted_output = response_data.get("output")
        if not encrypted_output:
            raise ValueError("Response missing 'output' field")
        
        # 解码Base64数据
        encrypted_data = base64.b64decode(encrypted_output)
        
        # 分离密文和标签(标签长度16字节)
        ciphertext = encrypted_data[:-16]
        tag = encrypted_data[-16:]
        
        # 创建AES-GCM解密器
        aesgcm = Cipher(
            algorithms.AES(aes_key),
            modes.GCM(iv, tag),
            backend=default_backend()
        ).decryptor()
        
        # 验证关联数据(与加密时一致)
        aesgcm.authenticate_additional_data(b'')
        
        # 解密数据
        decrypted_bytes = aesgcm.update(ciphertext) + aesgcm.finalize()
        
        return decrypted_bytes.decode('utf-8')

完整示例代码

本节提供JavaPython示例代码,如您使用其他编程语言,请参照接入流程自行实现。

一个小技巧:您可以使用大模型,让它把下面的代码翻译成您想要的编程语言的代码。
示例代码仅供参考,请勿直接在生产环境中使用。
关于请求参数的更多说明,请参见通义千问API文档

请求示例

import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.PublicKey;
import java.security.SecureRandom;
import java.security.spec.X509EncodedKeySpec;
import java.util.Map;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import com.alibaba.fastjson.JSONObject;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;

public class Main {
    // 采用AES-GCM 加密模式,以提升数据安全和完整性保护,在此指定IV、Tag长度
    private static final int GCM_TAG_LENGTH = 128; // 128 位 Tag(16 字节)
    private static final int GCM_IV_LENGTH = 12; // 12 字节 IV

    public static void main(String[] args) throws Exception {
        String publicKeyId = "xxx"; // 需要替换为实际获取到的RSA公钥 public_key_id 值
        String publicKey = "xxx"; // 需要替换为实际获取到的RSA公钥 public_key 值

        // 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:String apiKey = "sk-xxx",但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
        String apiKey = System.getenv("DASHSCOPE_API_KEY");

        // 生成 AES 密钥
        SecretKey aesSecretKey = generateAesSecretKey();

        // 生成唯一性iv
        byte[] iv = generateIv();

        // 加密 input 数据
        String input = "{\"messages\": [{\"role\":\"user\",\"content\":\"你是谁?\"}]}";
        String encryptedInput = encryptInputWithAes(aesSecretKey, iv, input);

        // 使用 RSA 公钥加密 AES 密钥
        String encryptedAesKey = encryptAesKeyWithRsaPublicKey(aesSecretKey, publicKey);

        // 构建并发送请求
        String response = sendEncryptedRequest(apiKey, publicKeyId, encryptedAesKey, iv, encryptedInput);

        // 解密响应内容
        String decryptOutput = decryptResponseWithAes(aesSecretKey, iv, response);
        System.out.println("输出的内容是: " + decryptOutput);
    }

    private static SecretKey generateAesSecretKey() throws Exception {
        KeyGenerator keyGen = KeyGenerator.getInstance("AES");
        SecureRandom secureRandom = new SecureRandom();
        keyGen.init(256, secureRandom); // AES密钥长度支持128、192、256 位。
        return keyGen.generateKey();
    }

    private static byte[] generateIv() {
        byte[] iv = new byte[GCM_IV_LENGTH];
        SecureRandom secureRandom = new SecureRandom(); // 随机生成确保唯一性
        secureRandom.nextBytes(iv);
        return iv;
    }

    private static String encryptAesKeyWithRsaPublicKey(SecretKey aesSecretKey, String publicKey) throws Exception {
        byte[] aesKeyBytes = aesSecretKey.getEncoded();
        String base64AesKey = Base64.encodeBase64String(aesKeyBytes);

        byte[] publicKeyBytes = Base64.decodeBase64(publicKey);
        X509EncodedKeySpec spec = new X509EncodedKeySpec(publicKeyBytes);
        KeyFactory kf = KeyFactory.getInstance("RSA");
        PublicKey pubKey = kf.generatePublic(spec);

        Cipher rsaCipher = Cipher.getInstance("RSA");
        rsaCipher.init(Cipher.ENCRYPT_MODE, pubKey);
        byte[] encryptedBytes = rsaCipher.doFinal(base64AesKey.getBytes());

        return Base64.encodeBase64String(encryptedBytes);
    }

    private static String encryptInputWithAes(SecretKey aesSecretKey, byte[] iv, String input) throws Exception {
        byte[] content = input.getBytes(StandardCharsets.UTF_8);

        Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding");
        GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv);
        aesCipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec);

        byte[] encryptedBytes = aesCipher.doFinal(content);
        return Base64.encodeBase64String(encryptedBytes);
    }

    private static String sendEncryptedRequest(String apiKey, String publicKeyId, String encryptedAesKey, byte[] iv, String encryptedInput) throws Exception {
        OkHttpClient okHttpClient = new OkHttpClient().newBuilder().build();
        JSONObject requestBodyJson = new JSONObject();
        requestBodyJson.put("model", "qwen-max");
        requestBodyJson.put("input", encryptedInput);

        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestBodyJson.toJSONString());

        JSONObject headerJson = new JSONObject();
        headerJson.put("Content-Type", "application/json");
        headerJson.put("Accept", "application/json");
        headerJson.put("Authorization","Bearer " + apiKey);

        String encryptionHeader = String.format("{\"public_key_id\": \"%s\", \"encrypt_key\": \"%s\", \"iv\": \"%s\"}", publicKeyId, encryptedAesKey, Base64.encodeBase64String(iv));
        headerJson.put("X-DashScope-EncryptionKey", encryptionHeader);

        Headers headers = Headers.of(JSONObject.parseObject(headerJson.toString(), Map.class));

        String url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation";
        Request request = new Request.Builder()
                .url(url)
                .post(requestBody)
                .headers(headers)
                .build();

        return okHttpClient.newCall(request).execute().body().string();
    }

    private static String decryptResponseWithAes(SecretKey aesSecretKey, byte[] iv,String response) throws Exception {
        JSONObject responseJson = JSONObject.parseObject(response, JSONObject.class);
        String encryptedOutput = responseJson.getString("output");

        Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding");
        GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv);
        aesCipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec);

        byte[] decryptedBytes = aesCipher.doFinal(Base64.decodeBase64(encryptedOutput));
        return new String(decryptedBytes);
    }
import os
import base64
import json
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
import requests

# 常量定义
GCM_IV_LENGTH = 12       # 12字节IV

def main():
    public_key_id = "xxx"  # 需要替换为实际获取到的RSA公钥 public_key_id 值
    public_key = "xxx"     # 需要替换为实际获取到的RSA公钥 public_key 值
    
    # 从环境变量获取API密钥
    api_key = os.getenv("DASHSCOPE_API_KEY")
    if not api_key:
        # 若没有配置环境变量,请放开下面这行注释代码,并将"sk-xxx"替换为实际的阿里云百炼API Key,不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
        # api_key = "sk-xxx"
        raise ValueError("DASHSCOPE_API_KEY environment variable not set")

    # 生成AES密钥
    aes_secret_key = generate_aes_secret_key()
    
    # 生成IV
    iv = generate_iv()
    
    # 加密输入数据
    input_data = '{"messages": [{"role":"user","content":"你是谁?"}]}'
    encrypted_input = encrypt_input_with_aes(aes_secret_key, iv, input_data)
    
    # 使用RSA公钥加密AES密钥
    encrypted_aes_key = encrypt_aes_key_with_rsa(aes_secret_key, public_key)
    
    # 发送加密请求
    response = send_encrypted_request(api_key, public_key_id, encrypted_aes_key, iv, encrypted_input)
    
    # 解密响应
    decrypted_output = decrypt_response_with_aes(aes_secret_key, iv, response)
    print("输出的内容是:", decrypted_output)

def generate_aes_secret_key():
    """生成256位AES密钥"""
    return os.urandom(32)  # 32字节 = 256位

def generate_iv():
    """生成12字节随机IV"""
    return os.urandom(GCM_IV_LENGTH)

def encrypt_aes_key_with_rsa(aes_key, public_key_str):
    """使用RSA公钥加密AES密钥"""
    # 解码Base64格式的公钥
    public_key_bytes = base64.b64decode(public_key_str)
    
    # 加载公钥
    public_key = serialization.load_der_public_key(
        public_key_bytes,
        backend=default_backend()
    )
    
    # 先对AES密钥进行Base64编码
    base64_aes_key = base64.b64encode(aes_key).decode('utf-8')
    
    # 使用RSA加密
    encrypted_bytes = public_key.encrypt(
        base64_aes_key.encode('utf-8'),
        padding.PKCS1v15()
    )
    
    # 返回Base64编码的加密结果
    return base64.b64encode(encrypted_bytes).decode('utf-8')

def encrypt_input_with_aes(aes_key, iv, plaintext):
    """使用AES-GCM加密数据"""
    # 创建AES-GCM加密器
    aesgcm = Cipher(
        algorithms.AES(aes_key),
        modes.GCM(iv, tag=None),
        backend=default_backend()
    ).encryptor()
    
    # 关联数据设为空(根据需求可调整)
    aesgcm.authenticate_additional_data(b'')
    
    # 加密数据
    ciphertext = aesgcm.update(plaintext.encode('utf-8')) + aesgcm.finalize()
    
    # 获取认证标签
    tag = aesgcm.tag
    
    # 组合密文和标签
    encrypted_data = ciphertext + tag
    
    # 返回Base64编码结果
    return base64.b64encode(encrypted_data).decode('utf-8')

def send_encrypted_request(api_key, public_key_id, encrypted_aes_key, iv, encrypted_input):
    """发送加密请求到API端点"""
    url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
    
    # 构建请求头
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": f"Bearer {api_key}",
        "X-DashScope-EncryptionKey": json.dumps({
            "public_key_id": public_key_id,
            "encrypt_key": encrypted_aes_key,
            "iv": base64.b64encode(iv).decode('utf-8')
        })
    }
    
    # 构建请求体
    payload = {
        "model": "qwen-max",
        "input": encrypted_input
    }
    
    # 发送POST请求
    response = requests.post(
        url,
        headers=headers,
        json=payload
    )
    
    # 检查响应状态
    response.raise_for_status()
    return response.json()

def decrypt_response_with_aes(aes_key, iv, response_data):
    """使用AES-GCM解密响应"""
    # 提取加密的输出
    encrypted_output = response_data.get("output")
    if not encrypted_output:
        raise ValueError("Response missing 'output' field")
    
    # 解码Base64数据
    encrypted_data = base64.b64decode(encrypted_output)
    
    # 分离密文和标签(标签长度16字节)
    ciphertext = encrypted_data[:-16]
    tag = encrypted_data[-16:]
    
    # 创建AES-GCM解密器
    aesgcm = Cipher(
        algorithms.AES(aes_key),
        modes.GCM(iv, tag),
        backend=default_backend()
    ).decryptor()
    
    # 验证关联数据(与加密时一致)
    aesgcm.authenticate_additional_data(b'')
    
    # 解密数据
    decrypted_bytes = aesgcm.update(ciphertext) + aesgcm.finalize()
    
    return decrypted_bytes.decode('utf-8')

if __name__ == "__main__":
    main()

响应示例

{
    "finish_reason": "stop",
    "text": "我是Qwen,由阿里云开发的超大规模语言模型。我的目标是帮助用户更高效地获取信息、解决各种问题并激发创造力。无论是回答问题、提供信息还是进行创意性的讨论,我都会尽力提供支持。有什么我可以帮到你的吗?"
}

常见问题

Q:代码执行报错,“Invalid AES key Length: 294 bytes”。

A:AES密钥长度支持128位(16字节)、192位(24字节)、256 位(32字节),请检查密钥长度设置是否符合要求。

错误码

如果调用失败并返回报错信息,请参见错误信息进行解决。