Dify是一款第三方提供的可视化编排生成式AI应用功能的专业工作站。开发者通过Dify平台部署的大模型或AI应用部署,同样可以接入AI安全护栏, 以获得全面的大模型安全防护能力。本文介绍如何在Dify平台的工作流、Agent中完成AI安全护栏产品的集成。
实践教程概述
当前在Dify平台上,针对Agent和工作流场景,支持插件、扩展内容审查API多种集成方案:
工作流-插件集成:通过本地安装插件方式,快速安装安全护栏插件实现对大模型输入和输出进行防控。
Agent-扩展内容审查API:通过API扩展的方式对内容审查API进行扩展,需要部署一套AI安全护栏与Dify内容审查API标准协议进行适配的服务,建议本地化进行服务部署。
工作流-扩展内容审查API:社区版和云服务存在差异,可以参考Agent-扩展内容审查API。
集成方式 | 支持场景 | 是否支持流式 | 支持的Dify版本 | 对接方式 |
插件集成 | 工作流 | 否 | 云服务、Dify社区版 | 0代码一键集成 |
扩展内容审查API | Agent、chatflow工作流 | 是 | 云服务、Dify社区版 | 本地部署转发服务,有少量代码开发量。 |
前提条件
在Dify平台进行AI安全护栏的集成前,需要完成以下准备工作:
开通AI安全护栏产品。点击开通AI安全护栏按量付费。
创建RAM用户,为RAM用户授权系统策略权限,并获取RAM账号的AccessKey。
实践方案一:工作流插件集成
以下实践教程中,相关的截图和说明均基于Dify云服务。对于使用Dify社区版或Dify Premium的用户,操作流程一致,但页面可能略有不同。
下载插件
插件名称:AI安全护栏-Dify插件
版本号:1.0.2
发布时间:2025.08.05
安装插件
在插件页面中,点击安装插件,选择本地插件的方式。上传已下载的AI安全护栏插件。
在工作流中使用插件
大模型输入风险检测:在LLM节点之前新增安全护栏的插件节点,检测内容为input,模态类型为文本,检测类型为输入。
大模型输出风险检测:在LLM节点下一步新增AI安全护栏节点,检测内容为LLM的text,模态类型为文本,检测类型为输出。
输出变量:_finalSuggestion是所有检测项的综合防护建议,取值有block/pass/watch/mask等,可根据需要对不同建议做后续处理。
AI安全护栏控制台配置
在正式运行前,还需要在AI安全护栏的控制台对防护功能进行配置。包括打开敏感数据检测、提示词攻击的检测开关,或对每一防护类型进行细检测项的配置等。更多的配置方案,参考文档:检测项配置。
效果示例
完成插件集成、检测项配置后,即可在Dify平台上应用AI安全护栏的防护能力。以下视频演示了如何对大模型输出的异常结果进行识别。
实践方案二:通过Agent扩展内容审核API的方式
AI安全护栏还可以支持通过Agent扩展内容审核API的方式实现被Dify平台应用集成。以下以Dify社区版作为演示。云服务、Dify Premium等其他版本,整体集成流程基本一致。
部署转发服务
AI安全护栏产品的API最大支持单次2000字符输入,因此输入长度大于2000字符的情况下,需要进行适配,处理方法如下:
输入审查:将输入切分为多段,每段不超过2000个字符,并发调用安全护栏API。
输出审查:Dify每隔300字符左右发起一次内容审核API调用,处理上截取最近2000字符进行调用。
以下分别为处理逻辑和启动脚本的示例代码:
from fastapi import FastAPI, Body, HTTPException, Header
from pydantic import BaseModel
import base64
from collections.abc import Generator
from typing import Any
import hmac
import hashlib
from urllib.parse import quote
import requests
from datetime import datetime
from datetime import timezone
import uuid
import json
import re
import concurrent.futures
# 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen)
SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com"
# 超过这个长度时对文本进行切分
MAX_LENGTH = 2000
# 调用安全护栏的输入检测和输出检测的ServiceCode
SERVICE_INPUT = "query_security_check"
SERVICE_OUTPUT = "response_security_check"
ENCODING = "UTF-8"
ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
ALGORITHM = "HmacSHA1"
def format_iso8601_date():
return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT)
def percent_encode(value):
if value is None:
return ""
return (
quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A")
)
def create_signature(string_to_sign, secret):
secret = secret + "&"
signature = hmac.new(
secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1
).digest()
return base64.b64encode(signature).decode(ENCODING)
def create_string_to_sign(http_method, parameters):
sorted_keys = sorted(parameters.keys())
canonicalized_query_string = ""
for key in sorted_keys:
canonicalized_query_string += (
"&" + percent_encode(key) + "=" + percent_encode(parameters[key])
)
string_to_sign = (
http_method
+ "&"
+ percent_encode("/")
+ "&"
+ percent_encode(canonicalized_query_string[1:])
)
return string_to_sign
def split_text(text: str, max_length: int = 1950) -> list[str]:
"""将文本按 max_length 分段,尽量保留完整句子(识别多种标点)"""
segments = []
while len(text) > max_length:
# 提取当前最大长度范围内的子串
chunk = text[:max_length]
# 使用正则查找最后一个句号、感叹号、问号等断句符号的位置
match = None
for pattern in [r"[。!?;:\.?!]+"]: # 匹配多种结束符号
matches = list(re.finditer(pattern, chunk))
if matches:
match = matches[-1] # 取最后一个匹配项
if match:
cut_point = match.end() # 包含标点符号
else:
cut_point = max_length # 找不到就强制截断
segments.append(text[:cut_point])
text = text[cut_point:]
if text:
segments.append(text)
return segments
def request(content_segment, type, aliyun_access_key, aliyun_access_secret):
print(datetime.now(), f" [{type} request content]-> {content_segment}")
# 3.1 构造请求参数
parameters = {
"Action": "MultiModalGuard",
"Version": "2022-03-02",
"AccessKeyId": aliyun_access_key,
"Timestamp": format_iso8601_date(),
"SignatureMethod": "HMAC-SHA1",
"SignatureVersion": "1.0",
"SignatureNonce": str(uuid.uuid4()),
"Format": "JSON",
"Service": (
SERVICE_INPUT if type == "input" else SERVICE_OUTPUT
),
"ServiceParameters": json.dumps(
{"content": content_segment}, ensure_ascii=False
),
}
string_to_sign = create_string_to_sign("POST", parameters)
signature = create_signature(string_to_sign, aliyun_access_secret)
parameters["Signature"] = signature
# 3.2 发送请求
response = requests.post(SERVICE_URL, data=parameters)
body = response.json()
print(datetime.now(), " [response body]-> ", body)
if response.status_code != 200:
raise Exception(
f"response http status_code not 200. status_code: {response.status_code}, body: {body}"
)
if body.get("Code") != 200:
raise Exception(
f"response code not 200. code: {body.get('Code')}, body: {body}"
)
return body
app = FastAPI()
class InputData(BaseModel):
point: str
params: dict = {}
@app.post("/api/dify/receive")
async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)):
"""
Receive API query data from Dify.
"""
#print(data)
auth_scheme, _, api_key = authorization.partition(" ")
if auth_scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Unauthorized")
# api_key decode
try:
decoded_bytes = base64.b64decode(api_key)
decoded_str = decoded_bytes.decode("utf-8")
ak, sk = decoded_str.split(":", 1)
except Exception as e:
# 如果调用失败,抛出异常
raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}")
point = data.point
if point == "ping":
return {"result": "pong"}
if point == "app.moderation.input":
return handle_app_moderation_input(params=data.params, ak=ak, sk=sk)
elif point == "app.moderation.output":
return handle_app_moderation_output(params=data.params, ak=ak, sk=sk)
raise HTTPException(status_code=400, detail="Not implemented")
def handle_app_moderation_input(params: dict, ak: str, sk: str):
app_id = params.get("app_id")
inputs = params.get("inputs", {})
query = params.get("query")
contents = (
[query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50)
)
# 并发执行
bodys = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents]
for future in concurrent.futures.as_completed(futures):
bodys.append(future.result())
contentModerationSuggestion=""
sensitiveDataSuggestion=""
promptAttackSuggestion=""
maliciousUrlSuggestion=""
_finalSuggestion="pass"
desensitization=""
# 遍历bodys解析出各个检测项的建议
for body in bodys:
finalSuggestion = body.get("Data", {}).get("Suggestion", "")
detailList = body.get("Data", {}).get("Detail", [])
if finalSuggestion and _finalSuggestion!="block" :
_finalSuggestion = finalSuggestion
for detail in detailList:
suggestion = detail.get("Suggestion", "")
type = detail.get("Type", "")
if type == "contentModeration":
if suggestion and contentModerationSuggestion!="block" :
contentModerationSuggestion = suggestion
elif type == "sensitiveData":
desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")
if suggestion and sensitiveDataSuggestion!="block" :
sensitiveDataSuggestion = suggestion
elif type == "promptAttack":
if suggestion and promptAttackSuggestion!="block" :
promptAttackSuggestion = suggestion
elif type == "maliciousUrl":
if suggestion and maliciousUrlSuggestion!="block" :
maliciousUrlSuggestion = suggestion
# 可以根据不同的场景返回不同的回答内容
output_response = "Your content violates our usage policy."
if contentModerationSuggestion=="block":
output_response = "Your content involves content security."
elif sensitiveDataSuggestion=="block" or sensitiveDataSuggestion=="mask":
output_response = "Your content involves sensitive data."
elif promptAttackSuggestion=="block":
output_response = "Your content involves prompt attack."
elif maliciousUrlSuggestion=="block":
output_response = "Your content involves malicious url."
flagged = False
action = "direct_output"
if _finalSuggestion == "block" :
flagged = True
elif sensitiveDataSuggestion=="mask":
flagged = True
action = "overridden"
query = desensitization
response = {"flagged": flagged, "action": action}
if flagged:
if action == "direct_output":
response["preset_response"] = output_response
elif action == "overridden":
response["inputs"] = inputs
response["query"] = query
print(response)
return response
def handle_app_moderation_output(params: dict, ak: str, sk: str):
app_id = params.get("app_id")
text = params.get("text", "")
print(f"handle_app_moderation_output length:{len(text)}")
# 获取最近的2000字符,大小根据需要调整,建议大于dify的窗口大小
if len(text) > MAX_LENGTH:
content = text[-MAX_LENGTH:]
else:
content = text
# 执行检测
body = request(content, "output", ak, sk)
contentModerationSuggestion=""
sensitiveDataSuggestion=""
promptAttackSuggestion=""
maliciousUrlSuggestion=""
desensitization=""
_finalSuggestion=body.get("Data", {}).get("Suggestion", "")
detailList = body.get("Data", {}).get("Detail", [])
for detail in detailList:
suggestion = detail.get("Suggestion", "")
type = detail.get("Type", "")
if type == "contentModeration":
contentModerationSuggestion = suggestion
elif type == "sensitiveData":
desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")
sensitiveDataSuggestion = suggestion
elif type == "promptAttack":
promptAttackSuggestion = suggestion
elif type == "maliciousUrl":
maliciousUrlSuggestion = suggestion
# 可以根据不同的场景返回不同的回答内容
output_response = "Your content violates our usage policy."
if contentModerationSuggestion=="block":
output_response = "Your content involves content security."
elif sensitiveDataSuggestion=="block":
output_response = "Your content involves sensitive data."
elif promptAttackSuggestion=="block":
output_response = "Your content involves prompt attack."
elif maliciousUrlSuggestion=="block":
output_response = "Your content involves malicious url."
flagged = False
action = "direct_output"
if _finalSuggestion == "block":
flagged = True
elif sensitiveDataSuggestion=="mask":
flagged = True
action = "overridden"
response = {"flagged": flagged, "action": action}
if flagged:
if action == "direct_output":
response["preset_response"] = output_response
elif action == "overridden":
response["text"] = desensitization
print(response)
return response
if __name__ == "__main__":
import uvicorn
# 开放端口可以根据自定义选择
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
将上述Python代码保存到main.py中,使用如下命令启动
# 启动脚本示例
pip install fastapi uvicorn
uvicorn main:app --reload --host 0.0.0.0
新增API扩展
通过设置-API扩展页面,来实现新增API扩展。
API Endpoint:填写部署转发服务脚本后的可访问地址。
API-Key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码参考base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})
import base64
# AccessKeyId 和 AccessKeySecret
access_key_id = ""
access_key_secret = ""
# 拼接并编码
auth_str = f"{access_key_id}:{access_key_secret}"
encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')
print(encoded_auth)
在Agent中配置API扩展
需要在Agent中完成API扩展的配置,已实现成功集成。
在Agent页面右下角选择管理,去配置内容审查。
选择API扩展。
选中已创建完成的AI安全护栏的API扩展。
根据业务需求选择是否打开输入和输出内容的开关。
在输出时Dify会累计约300个字符做一次内容审查。
效果示例
以下是AI安全护栏通过该方式集成在Agent中的效果示例:
常见问题
Dify社区版安装时提示失败
如您在安装Dify社区版或其他私有化版本时,遇到以下报错,可以通过禁用签名校验、使用签名版进行解决。
解决方法一:禁用签名校验
cd ${dify_path}/docker
# 停止
docker compose down
vi .env
# 将FORCE_VERIFYING_SIGNATURE修改为false
FORCE_VERIFYING_SIGNATURE=false
# 重启
docker compose up -d
解决方法二:使用签名版
可以参考Dify平台官方文档:第三方签名
下载签名版和签名所用的公钥
将用公钥放在插件守护程序可以访问的位置。
例如,在
docker/volumes/plugin_daemon
下创建public_keys
目录,并将公钥文件复制到对应路径:mkdir docker/volumes/plugin_daemon/public_keys cp dify_sign.public.pem docker/volumes/plugin_daemon/public_keys
修改
docker-compose.yaml
services: plugin_daemon: environment: FORCE_VERIFYING_SIGNATURE: true THIRD_PARTY_SIGNATURE_VERIFICATION_ENABLED: true THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS: /app/storage/public_keys/dify_sign.public.pem
说明docker/volumes/plugin_daemon
在plugin_daemon
容器中被挂载到/app/storage
。确保在THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS
中指定的路径对应于容器内的路径。重启容器
cd docker docker compose down docker compose up -d
重启服务后,第三方签名验证功能将在当前社区版环境中启用。