通过知识库加密搭建安全增强的RAG应用

如果希望增强RAG(Retrieval-Augmented Generation)知识库数据的安全性,可以加密文本块和向量(向量也可能泄露原始语义),从而使用密文进行存储与网络传输。对于向量加密需采用特殊加密算法使得加密后向量仍能进行相似度检索。本文将在DSW开发环境中,以使用LangChain框架为例,介绍如何构建支持知识库加密的 RAG 应用。

方案概览

关键流程:

  1. 加密存储:文档解析分块向量化后,加密文本块(chunk)和嵌入向量,将密文存入向量数据库。

    本文加解密将使用Pythonrai_sam库,其加密方式如下:

    • 文本块加密:使用行业通用的AES-CTR-256加密算法。

    • 向量加密:使用DCPE加密算法(支持密态向量相似度计算与排序)其安全性高于保序加密,更多安全性说明请参见DCPE论文

  2. 解密推理:用户问题向量化并加密后,检索向量数据库,将检索结果(密文)在推理服务中解密,然后与问题(原文)一起输入大语言模型(LLM)进行推理返回结果。

image

一、开发环境准备

  1. 进入DSW开发环境(您也可以使用本地或其他开发环境)。

    1. 创建DSW实例。本文使用的DSW配置如下:

      • 官方镜像:modelscope:1.26.0-pytorch2.3.1tensorflow2.16.1-gpu-py311-cu121-ubuntu22.04

      • 资源规格:ecs.gn7i-c8g1.2xlarge

    2. 交互式建模(DSW)页面,单击目标实例操作列下的打开

    3. Notebook页签,单击创建Notebookimage

  2. 安装运行代码所需依赖。

    !pip install -U langchain langchain-community langchain_huggingface
    !pip install -U pypdf
    !pip install -U modelscope
    !pip install -U alibabacloud-ha3engine-vector
    !pip install -U pymilvus
    !pip install -U rai_sam
    !pip install -U flask
  3. 下载嵌入模型,以便后续将文本块转换成向量。本文使用bge-large-zh-v1.5,您可根据数据类型及语言或特定领域(如法律)选择其他合适的嵌入模型。

    from modelscope import snapshot_download
    model_dir = snapshot_download('BAAI/bge-large-zh-v1.5', cache_dir='.')
  4. 准备加密密钥。使用rai_sam模块进行加密,需要自定义加密密钥(长度为4 ~ 48字节)和密钥标识(长度为4 ~ 128字节)。本文加密使用的密钥为LD_Secret_0123456789,密钥标识为LD_ID_123456,采用如下方式配置到环境变量,后续通过环境变量获取。实际应用建议使用更安全的方式管理密钥(如密钥管理服务KMS)。

    import os
    
    # 自定义密钥标识(长度为4 ~ 48字节)
    os.environ["SAM_KEY_ID"] = "LD_ID_123456"
    # 自定义密钥(长度为4 ~ 128字节)
    os.environ["SAM_KEY_SECRET"] = "LD_Secret_0123456789"

二、文档处理与加密存储

本节介绍如何将文件加密存储至向量数据库。

2.1 文件分块与向量化

  1. 单击下图所示图标,上传文件大模型安全实践白皮书2024.pdfDSW开发环境的当前工作目录下。

    image

  2. 执行以下代码完成文件分块与向量化。

    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_huggingface import HuggingFaceEmbeddings
    
    # 1. 加载知识库文件
    
    # 文件所在路径
    doc_name = "./大模型安全实践白皮书2024.pdf"
    
    file_path = (
        doc_name
    )
    
    loader = PyPDFLoader(file_path)
    docs = loader.load()
    
    print(len(docs))
    
    # 2. 文本解析分块。将已加载的文档对象进一步拆分成更小的文本块(chunk)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    split_docs = text_splitter.split_documents(docs)
    
    print(len(split_docs))
    
    # 3. 文本向量化
    # 加载嵌入模型
    embeddings_model = HuggingFaceEmbeddings(model_name="./BAAI/bge-large-zh-v1.5")
    # 文本转向量
    documents = []
    for i in range(len(split_docs)):
        documents.append(split_docs[i].page_content)
    embeddings = embeddings_model.embed_documents(documents)
    
    print(len(embeddings), len(embeddings[0]))
    
    # 向量维度
    embedding_dimension = len(embeddings[0])

2.2 加密文本块与向量

对嵌入向量和原始文本内容进行加密。

import os
from rai_sam.engine.packager.content import SamContentPackager
from rai_sam.engine.packager.vector import SamVectorPackager

# 向量加密
vector_packager = SamVectorPackager()
enc_embeddings = vector_packager.SamPkgEncryptVectors(os.getenv("SAM_KEY_ID"), os.getenv("SAM_KEY_SECRET"), embeddings)

# 文本块加密 
content_packager = SamContentPackager()
contents = []
for i in range(len(split_docs)):
    contents.append(split_docs[i].page_content)
enc_contents = content_packager.SamPkgEncryptContents(os.getenv("SAM_KEY_ID"), os.getenv("SAM_KEY_SECRET"), contents)

# 存储到向量数据库的数据
data = [ {"id": i, 
          "vector": enc_embeddings[i],
          "content": enc_contents[i],
          "metadata": split_docs[i].metadata["source"],
          "key_id": os.getenv("SAM_KEY_ID")
         } for i in range(len(embeddings))
       ]
print(len(data))

2.3 存储密文到向量数据库

本文选择Milvus Lite 向量数据库以快速实践,实际生产应用建议选择成熟、云上托管的向量数据库服务。参见使用阿里云向量数据库

from pymilvus import MilvusClient

demo_collection_name = "milvus_demo_collection"

# 连接向量数据库 (初次使用,将在当前文件夹下生成名为milvus_demo.db 的数据库文件。)
client = MilvusClient("./milvus_demo.db")

# 创建集合
if client.has_collection(demo_collection_name):
    client.drop_collection(demo_collection_name)

client.create_collection(
    collection_name=demo_collection_name,
    dimension=embedding_dimension
)

# 插入数据到向量数据库
res = client.insert(
    collection_name=demo_collection_name,
    data=data
)
print(res)

三、部署模型服务

为了数据安全,请求模型服务的向量检索结果是密文,需解密后输入到大语言模型(LLM)中进行推理。您可以参考示例app.py修改您的在线预测代码文件以适配加密知识库。

单击查看app.py完整代码

# -*- coding: utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import re
import json
import torch
import logging
from flask import Flask, request
from modelscope import AutoModelForCausalLM, AutoTokenizer
from rai_sam.engine.client.content import SamContentClient

logging.basicConfig(level=logging.DEBUG)
log: logging.Logger = logging.getLogger(__name__)

app = Flask(__name__)

# 用来加密和解密知识库。需与知识库文件加密使用的密钥保持一致。此处从环境变量获取
# SAM_KEY_ID:知识库密钥ID(长度为4 ~ 48字节),SAM_KEY_SECRET:密钥(长度为4 ~ 128字节)。
sam_key_sets = {
    os.getenv("SAM_KEY_ID"): os.getenv("SAM_KEY_SECRET"),
}

# 配置为大语言模型文件的绝对路径
model_name = "/mnt/workspace/Qwen/Qwen2___5-3B-Instruct"

# Pre-defined context start and end indentifier
rai_context_start = "<|rai_sam_encrypted_context_start|>"
rai_context_end = "<|rai_sam_encrypted_context_end|>"

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map=device
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

rai_sam_client = SamContentClient()

def rai_sam_decrypt_content(key_ids: list[str], contents: list[str]) -> list[str]:
    client = SamContentClient()

    if len(set(key_ids)) == 1:
       combine = True
    else:
       combine = False

    log.info("combine: %d", combine)

    if combine == True:
       key_id = key_ids[0]
       key_secret = sam_key_sets.get(key_id)
       if key_secret == None:
           raise RuntimeError("No sam key secret found")

       dec_contents = client.SamClientDecryptContents(key_id, key_secret, contents)
       if dec_contents == None:
           log.error("Failed to decrypt contents")
           return None
    else:
       dec_contents = []
       for i in range(len(key_ids)):
           key_id = key_ids[i]
           key_secret = sam_key_sets.get(key_id)
           if key_secret == None:
               raise RuntimeError("No sam key secret found")

           content = contents[i]
           dec_content = client.SamClientDecryptContents(key_id, key_secret, [content])
           if dec_content == None:
               log.error("Failed to decrypt content")
               return None

           dec_contents.append(dec_content[0])

    return dec_contents

def generate_prompt_plaintext(in_prompt: str) -> str:
    start_pos = in_prompt.find(rai_context_start)
    if start_pos == -1:
        log.info("The input prompt is plaintext")
        return in_prompt

    log.debug("rai_context_start pos: %d", start_pos)

    end_pos = in_prompt.rfind(rai_context_end)
    if end_pos == -1:
        log.error("Not find context end tag: %s", rai_context_end)
        return None

    log.debug("rai_context_end pos: %d", end_pos)

    # Get context content in the in_prompt 
    context = in_prompt[start_pos + len(rai_context_start):end_pos]
    context_json = json.loads(context)
    log.debug("context_json: %s", context_json)

    contents = context_json["contents"]
    log.debug("contents: %s", contents)

    key_ids = context_json["key_ids"]
    log.debug("key_ids: %s", key_ids)

    if len(contents) != len(key_ids):
        raise RuntimeError("the length of contents and key_ids is not euqal")

    dec_contents = rai_sam_decrypt_content(key_ids, contents)
    log.debug("dec_contents: %s", dec_contents)
 
    context = "\n\n".join(
        [content for content in dec_contents]
    )

    out_prompt = in_prompt[:start_pos] + context + in_prompt[end_pos + len(rai_context_end):]

    return out_prompt

def generate_model_response(
        prompt: str,
        max_new_tokens: int = 512,
        temperature: float = 1.0,
        top_k: int = 50,
        top_p: float = 1.0) -> str:

    model_inputs = tokenizer(prompt, return_tensors="pt").to(device)

    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=max_new_tokens,
        temperature=temperature,
        top_k=top_k,
        top_p=top_p
    )

    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    log.debug("response: %s", response)

    tag = "Assistant:"
    pos = response.find(tag)
    if pos != -1:
        print("pos: %d", pos)
        response = response[pos + len(tag):]

    return response.strip()

@app.route('/model', methods=['POST'])
def process_model_generate():
    log.debug('process model generate start.')

    body = request.json
    log.debug("request body: %s", body)

    if "prompt" in body:
        in_prompt = body['prompt']
    else:
        raise RuntimeError("No prompt found")

    if "max_new_tokens" in body:
        max_new_tokens = body['max_new_tokens']
    else:
        max_new_tokens = 512

    if "temperature" in body:
        temperature = body['temperature']
    else:
        temperature = 0.95

    if "top_k" in body:
        top_k = body['top_k']
    else:
        top_k = 50

    if "top_p" in body:
        top_p = body['top_p']
    else:
        top_p = 1.0

    log.debug("prompt: %s", in_prompt)
    log.debug("max_new_tokens: %d", max_new_tokens)
    log.debug("temperature: %f", temperature)
    log.debug("top_k: %d", top_k)
    log.debug("top_p: %f", top_p)

    prompt = generate_prompt_plaintext(in_prompt)
    if prompt == None:
        log.error("Failed to generate prompt plaitext")
        raise RuntimeError("generate prompt fail")

    log.debug("generated prompt: %s", prompt)

    model_response = generate_model_response(prompt,
                         max_new_tokens, temperature, top_k, top_p)
    if model_response == None:
        log.error("Failed to generate model response")
        raise RuntimeError("generate model response fail")

    response = {
        "response": model_response
    }

    return response

@app.route('/v1/chat/completions', methods=['POST'])
def process_model_chat_completions():
    log.debug('process model chat completions start.')

    body = request.json
    log.debug("request json: %s", body)
 
    if "messages" in body:
        messages = body['messages']
    else:
        raise RuntimeError("No messages found")

    if "temperature" in body:
        temperature = body['temperature']
    else:
        temperature = 0.95

    log.debug("temperature: %s", temperature)

    in_prompt = str(messages[0])
    prompt = generate_prompt_plaintext(in_prompt)
    if prompt == None:
        raise RuntimeError("generate prompt fail")

    log.debug("generated prompt: %s", prompt)

    model_response = generate_model_response(prompt, temperature=temperature)
    if model_response == None:
        raise RuntimeError("generate model response fail")

    message = {
        'role': 'assistant',
        'content': model_response
    }

    content = {
        'message': message
    }

    response = {
        'choices': [content]
    }

    return response

if __name__ == "__main__":
    app.run(host = '0.0.0.0', port = '8000', debug=True)

为方便测试,本节在DSW开发环境中使用上述app.py代码文件启动一个推理服务。

  1. 下载模型代码。

    # 模型下载
    from modelscope import snapshot_download
    model_dir = snapshot_download('Qwen/Qwen2.5-3B-Instruct', cache_dir='/mnt/workspace/')
  2. 启动推理服务。打开Terminal,在app.py所在目录执行以下命令:

    # 设置临时环境变量 
    export SAM_KEY_ID=LD_ID_123456
    export SAM_KEY_SECRET=LD_Secret_0123456789
    # 运行python代码文件
    python app.py

    出现如下结果,说明服务启动成功。

    image

四、向量检索与推理

4.1 用户输入向量化与加密

执行以下代码,将用户查询内容转换为向量并进行加密。

from langchain_huggingface import HuggingFaceEmbeddings
from rai_sam.engine.packager.vector import SamVectorPackager

query = "大模型安全建设的指导思想是什么?"

# 加载嵌入模型并将问题向量化
embeddings_model = HuggingFaceEmbeddings(model_name="./BAAI/bge-large-zh-v1.5")
query_embedding = embeddings_model.embed_query(query)

# 加密问题向量
vector_packager = SamVectorPackager()
enc_query_embedding = vector_packager.SamPkgEncryptVectors(os.getenv("SAM_KEY_ID"), os.getenv("SAM_KEY_SECRET"), [query_embedding])[0]

print(len(enc_query_embedding))

4.2 检索相关知识片段

from pymilvus import MilvusClient

client = MilvusClient("./milvus_demo.db")

demo_collection_name = "milvus_demo_collection"

# 使用加密的查询向量进行检索
search_res = client.search(
    collection_name=demo_collection_name,
    data=[enc_query_embedding],
    limit=3,
    output_fields=["content", "key_id"],
)

# 输出检索结果
for res in search_res[0]:
    print("Index:", res["id"])
    print("Distance:", res["distance"])
    print("Content:", res["entity"]["content"])
    print("KeyID:", res["entity"]["key_id"])
    print("\n")

retrieved_contents = [
    (res["entity"]["content"]) for res in search_res[0]
]
key_ids = [
    (res["entity"]["key_id"]) for res in search_res[0]
]

4.3 解密和推理

将检索内容解密后与问题输入到大语言模型(LLM)中进行推理,生成推理结果。

import os
import json
from langchain import hub
from langchain_community.llms.pai_eas_endpoint import PaiEasEndpoint
from langchain_core.output_parsers import StrOutputParser
from langchain_community.llms import chatglm

rai_context_start = "<|rai_sam_encrypted_context_start|>"
rai_context_end = "<|rai_sam_encrypted_context_end|>"

# 本地模型服务
llm = chatglm.ChatGLM(endpoint_url="http://127.0.0.1:8000/model")

# EAS模型服务
# llm = PaiEasEndpoint(
#    eas_service_url="<service_url>/model",
#    eas_service_token="<service_token>",
#)

prompt = hub.pull("rlm/rag-prompt")

chain = prompt | llm | StrOutputParser()

contents = {
    "contents": retrieved_contents,
    "key_ids": key_ids
}

content_str = json.dumps(contents)
context = rai_context_start + content_str + rai_context_end

print("context: ", context)
print("\n")

response = chain.invoke({"context": context, "question": query})
print(response)

执行结果如下:

context:  <|rai_sam_encrypted_context_start|>{"contents": [******], "key_ids": ["LD_ID_****", "LD_ID_****", "LD_ID_****"]}<|rai_sam_encrypted_context_end|>
大模型安全建设的核心指导思想是以人为本,确保技术发展既符合伦理道德,又能为人类社会带来积极影响。这意味着在大模型的技术和应用过程中,始终将人的利益、需求和安全置于首位。以人为本的理念要求所有参与者,包括设计者、开发者和使用者,都要保持这种思维,并切实保障用户和社会的安全与利益。如果偏离这一核心,可能会导致安全风险和挑战,引发诸如侵犯隐私、社会不公平及伦理道德冲突等不可预见的问题。

生产环境应用

如果您想要在生产环境中应用如上方案,可以参考以下内容使用阿里云向量数据库,并在模型推理服务(EAS)中部署安全加密的推理服务,以增强您的应用安全性。

使用阿里云向量数据库

请参考如下代码使用阿里云向量数据库 MilvusOpenSearchElasticsearch进行加密数据的存储与检索。

阿里云Milvus

数据存储

from pymilvus import MilvusClient

demo_collection_name = "milvus_demo_collection"

client = MilvusClient(
    uri="http://c-xxxx-internal.milvus.aliyuncs.com:19530",
    token="User:Password",
    db_name="default"
)

if client.has_collection(demo_collection_name):
    client.drop_collection(demo_collection_name)

client.create_collection(
    collection_name=demo_collection_name,
    dimension=embedding_dimension
)

res = client.insert(
    collection_name=demo_collection_name,
    data=data
)
print(res)

数据检索

from pymilvus import MilvusClient

demo_collection_name = "milvus_demo_collection"

client = MilvusClient(
    uri="http://c-xxx-internal.milvus.aliyuncs.com:19530",
    token="User:YourPassword",
    db_name="default"
)

# 使用加密的查询向量来检索
search_res = client.search(
    collection_name=demo_collection_name,
    data=[enc_query_embedding],
    limit=3,
    output_fields=["content", "key_id"],
)

for res in search_res[0]:
    print("Index:", res["id"])
    print("Distance:", res["distance"])
    print("Content:", res["entity"]["content"])
    print("KeyID:", res["entity"]["key_id"])
    print("\n")

retrieved_contents = [
    (res["entity"]["content"]) for res in search_res[0]
]
key_ids = [
    (res["entity"]["key_id"]) for res in search_res[0]
]

其中关键配置说明如下:

  • demo_collection_name:Milvus实例的Collection名称,例如milvus_demo_collection。

  • uri:Milvus实例的访问地址,支持内网或公网访问。格式为http://<访问地址>:<port>

  • token:格式为User:Password,即Milvus实例用户名:Milvus实例密码

  • db_name:配置为已创建的数据库名称,例如default。

阿里云OpenSearch

数据存储

from alibabacloud_ha3engine_vector import models, client
from Tea.exceptions import TeaException, RetryError

# 实例ID
instance_id = "ha-cn-xxx"

# 表名称
table_name = "OPS_demo"

# 实例的域名,实例的用户名和密码
Config = models.Config(
    endpoint=instance_id + ".public.ha.aliyuncs.com",
    instance_id=instance_id,
    protocol="http",
    access_user_name="root",
    access_pass_word="YourPassword"
)

ha3EngineClient = client.Client(Config)

try:
    documentArrayList = []
    for i in range(len(data)):
        add2Document = {
            "fields": data[i],
            "cmd": "add"
        }
        documentArrayList.append(add2Document)

    print(len(documentArrayList))

    optionsHeaders = {}
    pushDocumentsRequest = models.PushDocumentsRequest(optionsHeaders, documentArrayList)

    pkField = "id"
    response = ha3EngineClient.push_documents(instance_id + "_" + table_name, pkField, pushDocumentsRequest)

    print(response.body)
except TeaException as e:
    print(f"send request with TeaException : {e}")
except RetryError as e:
    print(f"send request with Connection Exception  : {e}")

数据检索

import json
from alibabacloud_ha3engine_vector import models, client
from alibabacloud_ha3engine_vector.client import Client
from alibabacloud_ha3engine_vector.models import Config
from alibabacloud_ha3engine_vector.models import FetchRequest, QueryRequest

# 实例ID
instance_id = "ha-cn-xxx"

# 表名称
table_name = "OPS_demo"

# 实例的域名,实例的用户名和密码
Config = models.Config(
    endpoint=instance_id + ".public.ha.aliyuncs.com",
    instance_id=instance_id,
    protocol="http",
    access_user_name="root",
    access_pass_word="YourPassword"
)

ha3EngineClient = client.Client(Config)

# 使用加密的查询向量来检索
request = QueryRequest(
    table_name=table_name,
    vector=enc_query_embedding,
    search_params="{\\\"qc.searcher.scan_ratio\\\":0.01}",
    top_k=3,
    output_fields=["content", "key_id"],
    sort = "__vs_vector_score__")

response = ha3EngineClient.query(request)
search_res = json.loads(response.body)

for res in search_res['result']:
    print("Index:", res["id"])
    print("Distance:", res["score"])
    print("Content:", res["fields"]["content"])
    print("KeyId:", res["fields"]["key_id"])
    print("\n")

retrieved_contents = [
    (res["fields"]["content"]) for res in search_res['result']
]
key_ids = [
    (res["fields"]["key_id"]) for res in search_res['result']
]

其中:

  • instance_id:配置为OpenSearch的实例ID。

  • table_name:配置为OpenSearch索引表名称。

  • access_user_name:配置为OpenSearch实例的用户名。

  • access_pass_word:配置为OpenSearch实例的密码。

阿里云Elasticsearch

数据存储

from elasticsearch import Elasticsearch

index_name = "elasticsearch_demo"

index_config = {
    "mappings": {
        "properties": {
            "vector": {
                "type": "dense_vector",
                "dims": embedding_dimension,
                "similarity": "cosine"
            },
            "content": {
                "type": "text"
            },
           "metadata": {
                "type": "text"
            },
            "key_id": {
                "type": "text"
            }
        }
    }
}

client = Elasticsearch(
        '<Elasticsearch URL>',
        basic_auth=('elastic', '<YourPassword>')
)

exists = client.indices.exists(index=index_name)
if exists == False:
    result = client.indices.create(index=index_name, body=index_config)
    print(result)
else:
    print("{0} has existed".format(index_name))


for i in range(len(data)):
    document = data[i]
    client.index(
        index=index_name,
        id = document['id'],
        document=document
)
print("Documents indexed successfully")

数据检索

from elasticsearch import Elasticsearch

index_name = "elasticsearch_demo"

client = Elasticsearch(
        '<Elasticsearch URL>',
        basic_auth=('elastic', '<YourPassword>')
)

# 使用加密的查询向量来检索
response = client.search(
    index = index_name,
    query = {
        "knn": {
            "field": "vector",
            "query_vector": enc_query_embedding,
            "k": 3
        }
    },
    fields=["content", "key_id"]
)
search_res = response["hits"]

# 输出检索结果
for res in search_res["hits"]:
    print("Index:", res["_id"])
    print("Score:", res["_score"])
    print("Content:", res["_source"]["content"])
    print("KeyId:", res["_source"]["key_id"])
    print("\n")

retrieved_contents = [
    (res["_source"]["content"]) for res in search_res["hits"]
]
key_ids = [
    (res["_source"]["key_id"]) for res in search_res["hits"]
]

其中:

  • index_name:在Elasticsearch实例页面,更新YML文件配置为允许自动创建索引后,即可自定义索引名称。

  • <Elasticsearch URL>:配置为Elasticsearch实例访问地址,支持内网或公网访问。格式为http://<地址>:<端口>

  • <YourPassword>:配置为Elasticsearch实例的登录密码。

部署PAI-EAS模型服务

EAS支持配置安全加密环境,通过配置系统信任管理服务,保证服务部署和调用的过程中数据、模型和代码等信息可以安全加密,实现安全可验证的推理服务。详情请参考安全加密推理服务