在调用大模型时,未加密的请求内容在传输过程中可能面临被窃听、篡改或伪造的安全风险。当请求内容中涉及个人隐私、商业机密等敏感信息,或通过公网传输时,您可以对请求体中的input字段值加密,以提高数据传输的安全性。
加解密过程
采用混合加密机制:数据由AES对称算法加密,其密钥通过RSA非对称加密实现安全传输。
准备请求数据。
生成AES对称密钥。AES是一种高效的对称加密算法,在此用于加密
input
内容。input
为大模型应用调用请求中的核心对象,包含必选的提示词(prompt
)和可选的历史对话记录(messages
)等参数。使用密钥对
input
内容进行加密。使用RSA公钥对AES密钥进行加密。从阿里云百炼平台获取托管的RSA公钥,对AES密钥进行加密,以确保AES密钥的传输安全。
RSA是一种非对称加密算法,包括公钥和私钥。其中公钥用于加密数据,私钥用于解密数据。
发起请求。发起阿里云百炼平台调用时将加密后的
input
内容、密钥信息(封装在X-DashScope-EncryptionKey
请求头中)传入阿里云百炼平台。阿里云百炼平台处理请求。
阿里云百炼推理链路中全程加密。
阿里云百炼平台在向量召回、模型推理过程中解密数据。
使用相同的AES密钥加密生成的答案,返回加密后的推理结果。
处理响应结果。用户侧收到响应内容,使用AES密钥解密推理结果,获得明文答案。
前提条件
已开通阿里云百炼服务并获得API-KEY:获取API Key。
建议您将API-KEY配置到环境变量中以降低API-KEY的泄漏风险,配置方法可参考配置API Key到环境变量。您也可以在代码中配置API-KEY,但是泄漏风险会提高。
DashScope SDK调用(自动加密·开箱即用)
DashScope SDK 封装了加解密逻辑,您只需启用加密功能即可实现加密调用,无需自行实现加密和解密代码。
约束与限制
不支持自定义密钥,如需自定义密钥,请参见HTTP调用(手动密钥管理)。
仅支持Java和Python,其他编程语言请参见HTTP调用(手动密钥管理)。
接入流程
启用加密功能。
Java SDK
将
enableEncrypt
设置为true
即可启用加密功能。GenerationParam param = GenerationParam.builder() // ...这里省略其他代码 // 启用加密功能 .enableEncrypt(true) .build();
Python SDK
将
enable_encryption
设置为True
即可启用加密功能。# ...这里省略其他代码 response = dashscope.Generation.call( # ...这里省略其他代码 # 启用加密功能 enable_encryption=True )
调用模型。
完整示例代码
示例代码仅供参考,请勿直接在生产环境中使用。
关于请求参数的更多说明,请参见通义千问API文档。
Java SDK
请求示例
import java.util.Arrays;
import java.lang.System;
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationParam;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.ApiException;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.JsonUtils;
public class Main {
public static GenerationResult callWithMessage() throws ApiException, NoApiKeyException, InputRequiredException {
Generation gen = new Generation();
Message systemMsg = Message.builder()
.role(Role.SYSTEM.getValue())
.content("You are a helpful assistant.")
.build();
Message userMsg = Message.builder()
.role(Role.USER.getValue())
.content("你是谁?")
.build();
GenerationParam param = GenerationParam.builder()
// 若没有配置环境变量,请用百炼API Key将下行替换为:.apiKey("sk-xxx")
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
// 此处以qwen-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
.model("qwen-plus")
.messages(Arrays.asList(systemMsg, userMsg))
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
// 启用加密功能
.enableEncrypt(true)
.build();
return gen.call(param);
}
public static void main(String[] args) {
try {
GenerationResult result = callWithMessage();
System.out.println(JsonUtils.toJson(result));
} catch (ApiException | NoApiKeyException | InputRequiredException e) {
// 使用日志框架记录异常信息
System.err.println("An error occurred while calling the generation service: " + e.getMessage());
}
System.exit(0);
}
}
响应示例
{
"finish_reason": "stop",
"text": "我是Qwen,由阿里云开发的超大规模语言模型。我的目标是帮助用户更高效地获取信息、解决各种问题并激发创造力。无论是回答问题、提供信息还是进行创意性的讨论,我都会尽力提供支持。有什么我可以帮到你的吗?"
}
Python SDK
请求示例
import os
import dashscope
messages = [
{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': '你是谁?'}
]
response = dashscope.Generation.call(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
api_key=os.getenv('DASHSCOPE_API_KEY'),
model="qwen-plus", # 此处以qwen-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
messages=messages,
result_format='message',
# 启用加密功能
enable_encryption=True
)
print(response)
响应示例
{
"finish_reason": "stop",
"text": "我是Qwen,由阿里云开发的超大规模语言模型。我的目标是帮助用户更高效地获取信息、解决各种问题并激发创造力。无论是回答问题、提供信息还是进行创意性的讨论,我都会尽力提供支持。有什么我可以帮到你的吗?"
}
HTTP调用(手动密钥管理)
和普通调用的区别
加密调用在普通调用的基础上,需要额外做如下三个处理(详细操作请参见接入流程):
添加
X-DashScope-EncryptionKey
请求头。X-DashScope-EncryptionKey
请求头对应的内容是一个JSON字符串,各参数说明如下:public_key_id
:您的公钥ID,请参见获取RSA的公钥从阿里云百炼平台获取。encrypt_key
:RSA公钥加密的AES密钥,获取方式请参见使用RSA公钥加密AES密钥。iv
:初始向量IV,获取方式请参加参见生成初始向量(IV)。
对请求参数
input
的内容进行加密。普通调用和加密调用请求数据的区别:
普通调用
加密调用
// 。。。这里省略前面其他内容 "input": { "messages": [ { "role": "system", "content": "You are a helpful assistant." }, { "role": "user", "content": "你是谁?" } ] } // 。。。这里省略后面其他内容
// 。。。这里省略前面其他内容 "input": "+J2aT8GNBUD......" // 。。。这里省略后面其他内容
对响应数据进行解密。
接入流程
一、准备请求数据
生成AES密钥
算法:AES
密钥规格:
长度:128位(16字节)/192位(24字节)/256位 (32字节)
密钥长度越长,安全性越高,但计算开销越大;256位安全性最高,适用于金融核心数据等高敏感数据保护
随机性:随机生成,建议使用密码学安全随机源
唯一性:单次请求有效,禁止复用
示例代码:
private static SecretKey generateAesSecretKey() throws Exception { KeyGenerator keyGen = KeyGenerator.getInstance("AES"); SecureRandom secureRandom = new SecureRandom(); keyGen.init(256, secureRandom); return keyGen.generateKey(); }
def generate_aes_secret_key(): """生成256位AES密钥""" return os.urandom(32) # 32字节 = 256位
生成初始向量(IV)
生成GCM加密所需的随机初始化向量:使用安全的随机数生成器生成一个12字节的随机字节序列,然后将这个字节序列进行Base64编码得到IV。
示例代码:
private static final int GCM_IV_LENGTH = 12; // 12 字节 IV private static byte[] generateIv() { byte[] iv = new byte[GCM_IV_LENGTH]; SecureRandom secureRandom = new SecureRandom(); // 随机生成确保唯一性 secureRandom.nextBytes(iv); return iv; }
# 常量定义 GCM_IV_LENGTH = 12 # 12字节IV def generate_iv(): """生成12字节随机IV""" return os.urandom(GCM_IV_LENGTH)
加密input数据
算法:AES-GCM(认证标签长度:128位)
参数:
处理流程:
将input序列化为JSON字符串:将下图中深色部分内容(即“
{"messages":[......]}
”)序列化对序列化后的
input
执行AES-GCM加密将加密结果转成字符串格式,待后续构建并发送请求时将其拼接到
input
字段中
示例代码:
private static final int GCM_TAG_LENGTH = 128; // 128 位 Tag(16 字节) private static String encryptInputWithAes(SecretKey aesSecretKey,byte[] iv, String input) throws Exception { byte[] content = input.getBytes(StandardCharsets.UTF_8); Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv); aesCipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec); byte[] encryptedBytes = aesCipher.doFinal(content); return Base64.encodeBase64String(encryptedBytes); }
def encrypt_input_with_aes(aes_key, iv, plaintext): """使用AES-GCM加密数据""" # 创建AES-GCM加密器 aesgcm = Cipher( algorithms.AES(aes_key), modes.GCM(iv, tag=None), backend=default_backend() ).encryptor() # 关联数据设为空(根据需求可调整) aesgcm.authenticate_additional_data(b'') # 加密数据 ciphertext = aesgcm.update(plaintext.encode('utf-8')) + aesgcm.finalize() # 获取认证标签 tag = aesgcm.tag # 组合密文和标签 encrypted_data = ciphertext + tag # 返回Base64编码结果 return base64.b64encode(encrypted_data).decode('utf-8')
使用RSA公钥加密AES密钥
算法:RSA
处理流程:
示例代码:
private static String encryptAesKeyWithRsaPublicKey(SecretKey aesSecretKey, String publicKey) throws Exception { byte[] aesKeyBytes = aesSecretKey.getEncoded(); String base64AesKey = Base64.encodeBase64String(aesKeyBytes); byte[] publicKeyBytes = Base64.decodeBase64(publicKey); X509EncodedKeySpec spec = new X509EncodedKeySpec(publicKeyBytes); KeyFactory kf = KeyFactory.getInstance("RSA"); PublicKey pubKey = kf.generatePublic(spec); Cipher rsaCipher = Cipher.getInstance("RSA"); rsaCipher.init(Cipher.ENCRYPT_MODE, pubKey); byte[] encryptedBytes = rsaCipher.doFinal(base64AesKey.getBytes()); return Base64.encodeBase64String(encryptedBytes); }
def encrypt_aes_key_with_rsa(aes_key, public_key_str): """使用RSA公钥加密AES密钥""" # 解码Base64格式的公钥 public_key_bytes = base64.b64decode(public_key_str) # 加载公钥 public_key = serialization.load_der_public_key( public_key_bytes, backend=default_backend() ) # 先对AES密钥进行Base64编码 base64_aes_key = base64.b64encode(aes_key).decode('utf-8') # 使用RSA加密 encrypted_bytes = public_key.encrypt( base64_aes_key.encode('utf-8'), padding.PKCS1v15() ) # 返回Base64编码的加密结果 return base64.b64encode(encrypted_bytes).decode('utf-8')
二、构建并发送请求
添加X-DashScope-EncryptionKey
请求头,并将原先的input
替换为加密后的input
,然后用和普通调用同样的方式发送请求即可。
X-DashScope-EncryptionKey
请求头对应的内容是一个JSON字符串,各参数说明如下:
public_key_id
:您的公钥ID,请参见获取RSA的公钥从阿里云百炼平台获取。encrypt_key
:RSA公钥加密的AES密钥,获取方式请参见使用RSA公钥加密AES密钥。iv
:初始向量IV,获取方式请参加参见生成初始向量(IV)。
最终发送给大模型的HTTP请求内容格式如下:
示例代码:
private static String sendEncryptedRequest(String apiKey, String publicKeyId, String encryptedAesKey, byte[] iv, String encryptedInput) throws Exception {
OkHttpClient okHttpClient = new OkHttpClient().newBuilder().build();
JSONObject requestBodyJson = new JSONObject();
requestBodyJson.put("model", "qwen-max");
requestBodyJson.put("input", encryptedInput);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestBodyJson.toJSONString());
JSONObject headerJson = new JSONObject();
headerJson.put("Content-Type", "application/json");
headerJson.put("Accept", "application/json");
headerJson.put("Authorization","Bearer " + apiKey);
String encryptionHeader = String.format("{\"public_key_id\": \"%s\", \"encrypt_key\": \"%s\", \"iv\": \"%s\"}", publicKeyId, encryptedAesKey, Base64.encodeBase64String(iv));
headerJson.put("X-DashScope-EncryptionKey", encryptionHeader);
Headers headers = Headers.of(JSONObject.parseObject(headerJson.toString(), Map.class));
String url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation";
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.headers(headers)
.build();
return okHttpClient.newCall(request).execute().body().string();
}
def send_encrypted_request(api_key, public_key_id, encrypted_aes_key, iv, encrypted_input):
"""发送加密请求到API端点"""
url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
# 构建请求头
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {api_key}",
"X-DashScope-EncryptionKey": json.dumps({
"public_key_id": public_key_id,
"encrypt_key": encrypted_aes_key,
"iv": base64.b64encode(iv).decode('utf-8')
})
}
# 构建请求体
payload = {
"model": "qwen-max",
"input": encrypted_input
}
# 发送POST请求
response = requests.post(
url,
headers=headers,
json=payload
)
# 检查响应状态
response.raise_for_status()
return response.json()
三、解密响应数据
算法:AES-GCM
参数:
处理流程:
使用AES-GCM解密
output
字段对应的内容。返回明文结果。
示例代码:
private static String decryptResponseWithAes(SecretKey aesSecretKey, byte[] iv, String response) throws Exception { JSONObject responseJson = JSONObject.parseObject(response, JSONObject.class); String encryptedOutput = responseJson.getString("output"); Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv); aesCipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec); byte[] decryptedBytes = aesCipher.doFinal(Base64.decodeBase64(encryptedOutput)); return new String(decryptedBytes); }
def decrypt_response_with_aes(aes_key, iv, response_data): """使用AES-GCM解密响应""" # 提取加密的输出 encrypted_output = response_data.get("output") if not encrypted_output: raise ValueError("Response missing 'output' field") # 解码Base64数据 encrypted_data = base64.b64decode(encrypted_output) # 分离密文和标签(标签长度16字节) ciphertext = encrypted_data[:-16] tag = encrypted_data[-16:] # 创建AES-GCM解密器 aesgcm = Cipher( algorithms.AES(aes_key), modes.GCM(iv, tag), backend=default_backend() ).decryptor() # 验证关联数据(与加密时一致) aesgcm.authenticate_additional_data(b'') # 解密数据 decrypted_bytes = aesgcm.update(ciphertext) + aesgcm.finalize() return decrypted_bytes.decode('utf-8')
完整示例代码
本节提供Java和Python示例代码,如您使用其他编程语言,请参照接入流程自行实现。
一个小技巧:您可以使用大模型,让它把下面的代码翻译成您想要的编程语言的代码。
示例代码仅供参考,请勿直接在生产环境中使用。
关于请求参数的更多说明,请参见通义千问API文档。
请求示例
import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.PublicKey;
import java.security.SecureRandom;
import java.security.spec.X509EncodedKeySpec;
import java.util.Map;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import com.alibaba.fastjson.JSONObject;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
public class Main {
// 采用AES-GCM 加密模式,以提升数据安全和完整性保护,在此指定IV、Tag长度
private static final int GCM_TAG_LENGTH = 128; // 128 位 Tag(16 字节)
private static final int GCM_IV_LENGTH = 12; // 12 字节 IV
public static void main(String[] args) throws Exception {
String publicKeyId = "xxx"; // 需要替换为实际获取到的RSA公钥 public_key_id 值
String publicKey = "xxx"; // 需要替换为实际获取到的RSA公钥 public_key 值
// 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:String apiKey = "sk-xxx",但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
String apiKey = System.getenv("DASHSCOPE_API_KEY");
// 生成 AES 密钥
SecretKey aesSecretKey = generateAesSecretKey();
// 生成唯一性iv
byte[] iv = generateIv();
// 加密 input 数据
String input = "{\"messages\": [{\"role\":\"user\",\"content\":\"你是谁?\"}]}";
String encryptedInput = encryptInputWithAes(aesSecretKey, iv, input);
// 使用 RSA 公钥加密 AES 密钥
String encryptedAesKey = encryptAesKeyWithRsaPublicKey(aesSecretKey, publicKey);
// 构建并发送请求
String response = sendEncryptedRequest(apiKey, publicKeyId, encryptedAesKey, iv, encryptedInput);
// 解密响应内容
String decryptOutput = decryptResponseWithAes(aesSecretKey, iv, response);
System.out.println("输出的内容是: " + decryptOutput);
}
private static SecretKey generateAesSecretKey() throws Exception {
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
SecureRandom secureRandom = new SecureRandom();
keyGen.init(256, secureRandom); // AES密钥长度支持128、192、256 位。
return keyGen.generateKey();
}
private static byte[] generateIv() {
byte[] iv = new byte[GCM_IV_LENGTH];
SecureRandom secureRandom = new SecureRandom(); // 随机生成确保唯一性
secureRandom.nextBytes(iv);
return iv;
}
private static String encryptAesKeyWithRsaPublicKey(SecretKey aesSecretKey, String publicKey) throws Exception {
byte[] aesKeyBytes = aesSecretKey.getEncoded();
String base64AesKey = Base64.encodeBase64String(aesKeyBytes);
byte[] publicKeyBytes = Base64.decodeBase64(publicKey);
X509EncodedKeySpec spec = new X509EncodedKeySpec(publicKeyBytes);
KeyFactory kf = KeyFactory.getInstance("RSA");
PublicKey pubKey = kf.generatePublic(spec);
Cipher rsaCipher = Cipher.getInstance("RSA");
rsaCipher.init(Cipher.ENCRYPT_MODE, pubKey);
byte[] encryptedBytes = rsaCipher.doFinal(base64AesKey.getBytes());
return Base64.encodeBase64String(encryptedBytes);
}
private static String encryptInputWithAes(SecretKey aesSecretKey, byte[] iv, String input) throws Exception {
byte[] content = input.getBytes(StandardCharsets.UTF_8);
Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv);
aesCipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec);
byte[] encryptedBytes = aesCipher.doFinal(content);
return Base64.encodeBase64String(encryptedBytes);
}
private static String sendEncryptedRequest(String apiKey, String publicKeyId, String encryptedAesKey, byte[] iv, String encryptedInput) throws Exception {
OkHttpClient okHttpClient = new OkHttpClient().newBuilder().build();
JSONObject requestBodyJson = new JSONObject();
requestBodyJson.put("model", "qwen-max");
requestBodyJson.put("input", encryptedInput);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestBodyJson.toJSONString());
JSONObject headerJson = new JSONObject();
headerJson.put("Content-Type", "application/json");
headerJson.put("Accept", "application/json");
headerJson.put("Authorization","Bearer " + apiKey);
String encryptionHeader = String.format("{\"public_key_id\": \"%s\", \"encrypt_key\": \"%s\", \"iv\": \"%s\"}", publicKeyId, encryptedAesKey, Base64.encodeBase64String(iv));
headerJson.put("X-DashScope-EncryptionKey", encryptionHeader);
Headers headers = Headers.of(JSONObject.parseObject(headerJson.toString(), Map.class));
String url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation";
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.headers(headers)
.build();
return okHttpClient.newCall(request).execute().body().string();
}
private static String decryptResponseWithAes(SecretKey aesSecretKey, byte[] iv,String response) throws Exception {
JSONObject responseJson = JSONObject.parseObject(response, JSONObject.class);
String encryptedOutput = responseJson.getString("output");
Cipher aesCipher = Cipher.getInstance("AES/GCM/NoPadding");
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH, iv);
aesCipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(aesSecretKey.getEncoded(), "AES"), gcmParameterSpec);
byte[] decryptedBytes = aesCipher.doFinal(Base64.decodeBase64(encryptedOutput));
return new String(decryptedBytes);
}
import os
import base64
import json
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
import requests
# 常量定义
GCM_IV_LENGTH = 12 # 12字节IV
def main():
public_key_id = "xxx" # 需要替换为实际获取到的RSA公钥 public_key_id 值
public_key = "xxx" # 需要替换为实际获取到的RSA公钥 public_key 值
# 从环境变量获取API密钥
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
# 若没有配置环境变量,请放开下面这行注释代码,并将"sk-xxx"替换为实际的阿里云百炼API Key,不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
# api_key = "sk-xxx"
raise ValueError("DASHSCOPE_API_KEY environment variable not set")
# 生成AES密钥
aes_secret_key = generate_aes_secret_key()
# 生成IV
iv = generate_iv()
# 加密输入数据
input_data = '{"messages": [{"role":"user","content":"你是谁?"}]}'
encrypted_input = encrypt_input_with_aes(aes_secret_key, iv, input_data)
# 使用RSA公钥加密AES密钥
encrypted_aes_key = encrypt_aes_key_with_rsa(aes_secret_key, public_key)
# 发送加密请求
response = send_encrypted_request(api_key, public_key_id, encrypted_aes_key, iv, encrypted_input)
# 解密响应
decrypted_output = decrypt_response_with_aes(aes_secret_key, iv, response)
print("输出的内容是:", decrypted_output)
def generate_aes_secret_key():
"""生成256位AES密钥"""
return os.urandom(32) # 32字节 = 256位
def generate_iv():
"""生成12字节随机IV"""
return os.urandom(GCM_IV_LENGTH)
def encrypt_aes_key_with_rsa(aes_key, public_key_str):
"""使用RSA公钥加密AES密钥"""
# 解码Base64格式的公钥
public_key_bytes = base64.b64decode(public_key_str)
# 加载公钥
public_key = serialization.load_der_public_key(
public_key_bytes,
backend=default_backend()
)
# 先对AES密钥进行Base64编码
base64_aes_key = base64.b64encode(aes_key).decode('utf-8')
# 使用RSA加密
encrypted_bytes = public_key.encrypt(
base64_aes_key.encode('utf-8'),
padding.PKCS1v15()
)
# 返回Base64编码的加密结果
return base64.b64encode(encrypted_bytes).decode('utf-8')
def encrypt_input_with_aes(aes_key, iv, plaintext):
"""使用AES-GCM加密数据"""
# 创建AES-GCM加密器
aesgcm = Cipher(
algorithms.AES(aes_key),
modes.GCM(iv, tag=None),
backend=default_backend()
).encryptor()
# 关联数据设为空(根据需求可调整)
aesgcm.authenticate_additional_data(b'')
# 加密数据
ciphertext = aesgcm.update(plaintext.encode('utf-8')) + aesgcm.finalize()
# 获取认证标签
tag = aesgcm.tag
# 组合密文和标签
encrypted_data = ciphertext + tag
# 返回Base64编码结果
return base64.b64encode(encrypted_data).decode('utf-8')
def send_encrypted_request(api_key, public_key_id, encrypted_aes_key, iv, encrypted_input):
"""发送加密请求到API端点"""
url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
# 构建请求头
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {api_key}",
"X-DashScope-EncryptionKey": json.dumps({
"public_key_id": public_key_id,
"encrypt_key": encrypted_aes_key,
"iv": base64.b64encode(iv).decode('utf-8')
})
}
# 构建请求体
payload = {
"model": "qwen-max",
"input": encrypted_input
}
# 发送POST请求
response = requests.post(
url,
headers=headers,
json=payload
)
# 检查响应状态
response.raise_for_status()
return response.json()
def decrypt_response_with_aes(aes_key, iv, response_data):
"""使用AES-GCM解密响应"""
# 提取加密的输出
encrypted_output = response_data.get("output")
if not encrypted_output:
raise ValueError("Response missing 'output' field")
# 解码Base64数据
encrypted_data = base64.b64decode(encrypted_output)
# 分离密文和标签(标签长度16字节)
ciphertext = encrypted_data[:-16]
tag = encrypted_data[-16:]
# 创建AES-GCM解密器
aesgcm = Cipher(
algorithms.AES(aes_key),
modes.GCM(iv, tag),
backend=default_backend()
).decryptor()
# 验证关联数据(与加密时一致)
aesgcm.authenticate_additional_data(b'')
# 解密数据
decrypted_bytes = aesgcm.update(ciphertext) + aesgcm.finalize()
return decrypted_bytes.decode('utf-8')
if __name__ == "__main__":
main()
响应示例
{
"finish_reason": "stop",
"text": "我是Qwen,由阿里云开发的超大规模语言模型。我的目标是帮助用户更高效地获取信息、解决各种问题并激发创造力。无论是回答问题、提供信息还是进行创意性的讨论,我都会尽力提供支持。有什么我可以帮到你的吗?"
}
常见问题
Q:代码执行报错,“Invalid AES key Length: 294 bytes”。
A:AES密钥长度支持128位(16字节)、192位(24字节)、256 位(32字节),请检查密钥长度设置是否符合要求。
错误码
如果调用失败并返回报错信息,请参见错误信息进行解决。