本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
业务中用户上传文件、代码发布、数据共享等场景,可能引入勒索病毒、挖矿程序、WebShell等恶意文件,对业务系统和数据安全构成威胁。恶意文件检测功能依托云安全中心的多引擎检测平台,提供多种方式识别和处理这些风险。本文档旨在帮助不同角色的用户根据业务场景,快速选择最适合的检测方式。
功能概述
常见应用场景
应用场景 | 说明 |
服务器攻击防范 | 防范蠕虫病毒、挖矿木马等,避免服务器资源被消耗或被用于DDoS攻击。 |
定向攻击防御 | 检测后门、黑客工具等,防止数据被窃取或系统被远程控制。 |
办公网络与文件存储 | 识别含有恶意宏或脚本的文档/压缩包,防范钓鱼和凭据失窃。 |
全系统环境检测 | 查杀勒索病毒和感染型病毒,避免数据被加密勒索或系统大范围瘫痪。 |
检测方式
云安全中心提供集成SDK和控制台操作两种方式,执行检测任务。
特性 | 集成SDK | 控制台操作 |
使用方式 | 在业务代码中集成SDK进行调用,支持通过Java或Python方式接入。 | 登录云安全中心控制台,通过图形化界面操作。 |
支持检测的文件 |
| 仅支持对象存储OSS文件。 |
适用范围
支持的文件类型
压缩文件检测: 支持解压并扫描未加密的压缩包,解压层级与文件数可配置。
加密文件检测: 支持自动解密并扫描经OSS服务端加密(SSE-KMS、SSE-OSS)的数据。
支持的文件格式:
压缩包:
.7z、.zip、.rar、.tar、.gz、.bz2、.xz、.lzma、.ar、.tar.gz。脚本/WebShell:
.php、.jsp、.jspx、.asp、.aspx、.sh、.py、.ps1、.pl、.bat。文档类型:
.doc、.pdf、.ppt、.xsl、.rtf、.hta、.chm。可执行/二进制文件:
.apk、.exe、.dll、.ocx、.com、.so、.sys、.ko、.obj。
支持的病毒文件:支持检测反弹Shell后门、DDoS木马、挖矿程序等病毒文件,更多内容参见支持检测的病毒类型。
存储类型限制:仅支持检测存储类型为“标准存储”和“低频访问”的OSS文件,不支持“归档存储”类型。
支持检测的OSS Bucket的地域:
华北1(青岛)、华北2(北京)、华北3(张家口)、华北5(呼和浩特)、华北6(乌兰察布)
华东1(杭州)、华东2(上海)
华南1(深圳)、华南2(河源)、华南3(广州)
西南1(成都)、中国香港
新加坡、马来西亚(吉隆坡)、印度尼西亚(雅加达)、菲律宾(马尼拉)、泰国(曼谷)、日本(东京)、韩国(首尔)、美国(硅谷)、美国(弗吉尼亚)、德国(法兰克福)、英国(伦敦)
开通服务与配置通知
购买并开通服务
访问云安全中心控制台-风险治理-恶意文件SDK,在页面左侧顶部,选择需防护资产所在的区域:中国内地或非中国内地。
根据页面引导,选择立即试用、立即购买(包年包月)或 开通按量付费模式 来激活功能。
立即试用:如果阿里云账号已通过企业认证,可以通过免费试用开通恶意文件检测SDK功能。
重要每个阿里云账号仅能申请一次免费试用。
试用版提供10,000次恶意文件检测次数。
包年包月:引导跳转至购买页面,请参照如下说明完成购买配置并支付。
在恶意文件检测区域,将是否选购置为是。
按需输入所需的检测文件次数即购买数量(10万次起售)。
按量付费:
在按量付费开通弹窗中,单击开通按量付费。
开启策略(可选):勾选后将自动开启对所选Bucket中新上传文件的周期性检测策略。该策略立即生效。如需调整检测范围,可在策略配置中修改。
配置通知(可选)
云安全中心提供钉钉机器人通知功能,可将检测到的恶意文件告警信息自动推送至钉钉群组,以便及时响应检测风险。操作步骤如下:
访问云安全中心控制台-系统配置-通知设置,在页面左侧顶部,选择需防护资产所在的区域:中国内地或非中国内地。
在钉钉机器人页签,单击添加新的机器人。
通知范围:建议勾选恶意文件检测的所有等级。
其他信息:请参见配置钉钉机器人通知。
配置并执行检测任务
控制台操作
如果待检测的文件存储在阿里云对象存储OSS Bucket中,可以直接在云安全中心控制台执行对目标OSS Bucket内文件进行批量或周期性扫描。
手动检测:此方式适用于对存量文件进行一次性的扫描,支持全量检测和增量检测。
自动检测:此方式适用于对新增文件进行持续性的自动扫描。
手动检测(存量文件)
进入OSS文件检测页面
访问云安全中心控制台-风险治理-恶意文件SDK,在页面左侧顶部,选择需防护资产所在的区域:中国内地或非中国内地。并进入OSS文件检测页签。
发起检测任务
重要如果Bucket没有在OSS文件检测的列表中,请单击同步Bucket,同步最新的Bucket列表。
全量检测:
功能说明:检测单个或多个 Bucket 内的所有文件。
操作入口:
单个检测:在目标 Bucket 操作列单击检测。
批量检测:勾选多个 Bucket 后,单击列表下方的批量检测。
增量检测
功能说明:针对已检测且上次检测后有文件更新的Bucket,仅检测自上次任务以来新增或变更的文件。
操作入口:在目标 Bucket 操作列,单击增量检测。
配置检测参数
在弹出的对话框中,根据需求进行配置。
文件检测类型:选择要扫描的文件扩展名,默认检测全部类型的文件。
解压层级和单个压缩包解压文件数量限制:如果需要扫描压缩包内部,请设置解压层数(最多5层)和单个压缩包文件数量(最多1000个),默认为不解压。
文件解密类型:如果OSS文件使用了服务端加密(SSE-OSS或SSE-KMS),请选择对应的解密方式,以便云安全中心解密后进行检测。
扫描路径:
按前缀匹配:输入文件名前缀来匹配扫描指定文件。
配置到整个Bucket:扫描整个Bucket文件。
等待检测任务完成
说明检测任务完成后,后台需要进行数据统计和同步,结果通常会有 1-2小时 的延迟,请您耐心等待。
可在Bucket列表页,查看目标文件检测状态,当状态由未检测变成已检测时,即表示检测已完成。
自动检测(增量文件)
进入策略管理页面
访问云安全中心控制台-风险治理-恶意文件SDK,在页面左侧顶部,选择需防护资产所在的区域:中国内地或非中国内地。
单击右上角策略管理。
创建或修改检测策略
创建新策略:在策略管理面板,单击新增策略。
修改现有策略:在已有策略的操作列的编辑,将目标Bucket添加到该策略的生效范围内。
参照如下说明完成配置后,单击确定。
实时增量检测:
说明仅中国内地区域支持配置实时增量检测,非中国内地区域不支持。
开启:当有新文件上传到指定的Bucket时,会立即触发检测。
关闭:按照设定的检测周期(例如每天、每3天)和文件检测时间(例如凌晨02:00-04:00)对周期内新增的文件进行扫描。
生效Bucket:选择此策略要应用的一个或多个Bucket。若此Bucket已被其他策略占用,则不可重复选择。
警告策略生效后,不会自动检测新增Bucket中的文件,若有检测需求请手动编辑策略,将其添加到“生效 Bucket”列表中。
检测周期:选择检测任务执行周期。
文件检测时间:选择检测任务执行时间,间隔不能低于1小时。
警告若检测任务运行超过设定检测时间,将自动暂停,并在下一个检测周期内自动重启任务。
其他参数(如解压、解密、扫描路径):请参见手动检测的配置检测参数。
集成SDK
在业务代码中集成恶意文件检测SDK,在返回结果中获取恶意文件信息,实现对文件的检测。
步骤一:准备访问凭证与权限
创建访问密钥
为保障账户安全,请勿使用主账号AccessKey。建议创建专用于API访问的RAM用户,并为其授予最小必要权限具体操作,请参见创建RAM用户、创建AccessKey。
新建账号
已有RAM账号
使用主账号或RAM管理员账号登录RAM控制台,并进入目标用户详情页。
在认证管理页签下的AccessKey区域,单击创建AccessKey。
根据实际业务选择使用场景,并勾选我确认必须创建 AccessKey。单击继续创建。
完成安全验证,,系统会自动为RAM用户生成一个AccessKey ID和AccessKey Secret,请妥善保管密钥信息。
说明可下载密钥CSV文件或复制内容保存到本地。更多内容,请参见创建AccessKey。
配置权限策略
在用户页面,单击目标RAM用户,单击操作列的添加权限。
在权限策略区域,选择AliyunYundunSASFullAccess后,单击确认新增授权。
说明更多内容请参见为RAM用户授权。
配置环境变量
阿里云SDK支持通过定义
ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取访问密钥(即AccessKey)并自动完成鉴权。更多信息,请参见配置环境变量。Linux/macOS
通过export命令配置环境变量
重要使用export命令配置的临时环境变量仅当前会话有效,当会话退出之后所设置的环境变量将会丢失。若需长期保留环境变量,可将export命令配置到对应操作系统的启动配置文件中。
配置AccessKey ID并按回车。
# 将<ACCESS_KEY_ID>替换为您自己的AccessKey ID。 export ALIBABA_CLOUD_ACCESS_KEY_ID=yourAccessKeyID配置AccessKey Secret并回车。
# 将<ACCESS_KEY_SECRET>替换为您自己的AccessKey Secret。 export ALIBABA_CLOUD_ACCESS_KEY_SECRET=yourAccessKeySecret验证是否配置成功。
执行
echo $ALIBABA_CLOUD_ACCESS_KEY_ID命令,如果返回正确的AccessKey ID,则说明配置成功。
Windows
通过图形用户界面GUI
操作步骤
以下为Windows 10中通过图形用户界面设置环境变量的步骤。
在桌面右键单击此电脑,选择属性>高级系统设置>环境变量>系统变量/用户变量>新建,完成以下配置:
变量
示例值
AccessKey ID
变量名:ALIBABA_CLOUD_ACCESS_KEY_ID
变量值:LTAI****************
AccessKey Secret
变量名:ALIBABA_CLOUD_ACCESS_KEY_SECRET
变量值:yourAccessKeySecret
测试设置是否成功
单击开始(或快捷键:Win+R)> 运行(输入 cmd)> 确定(或按 Enter 键),打开命令提示符,执行
echo %ALIBABA_CLOUD_ACCESS_KEY_ID%、echo %ALIBABA_CLOUD_ACCESS_KEY_SECRET%命令。若返回正确的AccessKey,则说明配置成功。
通过命令行提示符CMD
操作步骤
以管理员身份打开命令提示符,并使用以下命令在系统中新增环境变量。
setx ALIBABA_CLOUD_ACCESS_KEY_ID yourAccessKeyID /M setx ALIBABA_CLOUD_ACCESS_KEY_SECRET yourAccessKeySecret /M其中
/M表示系统级环境变量,设置用户级环境变量时可以不携带该参数。测试设置是否成功
单击开始(或快捷键:Win+R)> 运行(输入 cmd)> 确定(或按 Enter 键),打开命令提示符,执行
echo %ALIBABA_CLOUD_ACCESS_KEY_ID%、echo %ALIBABA_CLOUD_ACCESS_KEY_SECRET%命令。若返回正确的AccessKey,则说明配置成功。
通过Windows PowerShell
在PowerShell中,设置新的环境变量(对所有新会话都有效):
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_ID', 'yourAccessKeyID', [System.EnvironmentVariableTarget]::User) [System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'yourAccessKeySecret', [System.EnvironmentVariableTarget]::User)为所有用户设置环境变量(需要管理员权限):
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_ID', 'yourAccessKeyID', [System.EnvironmentVariableTarget]::Machine) [System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'yourAccessKeySecret', [System.EnvironmentVariableTarget]::Machine)设置临时的环境变量(仅当前会话有效):
$env:ALIBABA_CLOUD_ACCESS_KEY_ID = "yourAccessKeyID" $env:ALIBABA_CLOUD_ACCESS_KEY_SECRET = "yourAccessKeySecret"在PowerShell中,执行
Get-ChildItem env:ALIBABA_CLOUD_ACCESS_KEY_ID、Get-ChildItem env:ALIBABA_CLOUD_ACCESS_KEY_SECRET命令。若返回正确的AccessKey,则说明配置成功。
步骤二:安装SDK
根据开发语言选择相应的安装方式,目前仅支持Java接入和Python接入。
Java 接入
环境要求:JDK 1.8 或更高版本。
获取方式:
访问Java SDK代码库下载最新版本的 Java SDK 文件。
将下载的SDK文件引用至项目中。
Python 接入
环境要求:Python 3.6 或更高版本。
获取方式:
在线安装:使用pip命令快速安装。
<BASH> pip install -U alibabacloud_filedetect离线安装:
在有网络的环境中,访问 Python代码库并下载最新版本的Python SDK。
将下载的源码包上传至项目环境并解压。
进入解压后的SDK根目录,执行安装命令:
<BASH> # 切换至python SDK根目录 # 目录名可能因版本而异,如alibabacloud_filedetect-1.0.0 cd alibabacloud_filedetect-x.x.x/ # 使用您的python版本进行安装 python setup.py install
步骤三:编写代码并执行检测
环境配置完成后,可参考如下Python和Java代码示例,编写代码。
请根据实际情况修改示例代码中的待检测文件/目录路径path、URL和MD5等参数。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步检测文件接口
* @param detector 检测器对象
* @param path 待检测的文件路径
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测文件接口
* @param detector 检测器对象
* @param path 待检测的文件路径
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @param callback 结果回调函数
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 同步检测URL文件接口
* @param detector 检测器对象
* @param url 待检测的文件URL
* @param md5 待检测的文件md5
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测URL文件接口
* @param detector 检测器对象
* @param url 待检测的文件URL
* @param md5 待检测的文件md5
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @param callback 结果回调函数
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化检测结果
* @param result 检测结果对象
* @return 格式化后的字符串
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* 同步检测目录或文件
* @param path 指定路径,可以是文件或者目录。如果路径为目录,则递归遍历其内容
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 异步检测目录或文件
* @param path 指定路径,可以是文件或者目录。如果路径为目录,则递归遍历其内容
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 开始对文件或目录进行检测
* @param path 指定路径,可以是文件或者目录。如果路径为目录,则递归遍历其内容
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任务执行完成
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 获取检测器实例
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// 设置解压缩参数(可选,默认不解压压缩包)
boolean decompress = true; // 是否识别压缩文件并解压,默认为false
int decompressMaxLayer = 5; // 最大解压层数,decompress参数为true时生效
int decompressMaxFileCount = 1000; // 最大解压文件数,decompress参数为true时生效
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// 示例用法1:扫描本地目录或文件
boolean is_sync_scan = false; // 是异步检测还是同步检测。异步检测性能更好。false表示异步检测
int timeout_ms = 500000; // 单个样本检测时间,单位为毫秒
String path = "test2.php"; // 待扫描的文件或目录
// 启动扫描,直到扫描结束
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// 示例用法2:扫描URL文件
int timeout_ms = 500000; // 单个样本检测时间,单位为毫秒
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待扫描的URL文件
String md5 = "a767f*************6e21d000000"; // 待扫描的文件MD5
// 同步扫描。如果需要异步扫描,调用detectUrl接口
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步检测文件接口
@param detector 检测器对象
@param path 待检测的文件路径
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,False表示不等待直接返回错误,True表示一直等待直到队列不满时
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测文件接口
@param detector 检测器对象
@param path 待检测的文件路径
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,False表示不等待直接返回错误,True表示一直等待直到队列不满时
@param callback 结果回调函数
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
同步检测URL文件接口
@param detector 检测器对象
@param url 待检测的文件URL
@param md5 待检测的文件md5
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测URL文件接口
@param detector 检测器对象
@param url 待检测的文件URL
@param md5 待检测的文件md5
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
@param callback 结果回调函数
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化检测结果
@param result 检测结果对象
@return 格式化后的字符串
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
同步检测目录或文件
@param path 指定路径,可以是文件或者目录。如果路径为目录,则递归遍历其内容
@param is_sync 是否使用同步接口,推荐使用异步。 True是同步,False是异步
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
异步检测目录或文件
@param path 指定路径,可以是文件或者目录。如果路径为目录,则递归遍历其内容
@param is_sync 是否使用同步接口,推荐使用异步。True是同步, False是异步
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
开始对文件或目录进行检测
@param path 指定路径,可以是文件或者目录。如果路径为目录,则递归遍历其内容
@param is_sync 是否使用同步接口,推荐使用异步。 True是同步,False是异步
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任务执行完成
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 获取检测器实例
detector = OpenAPIDetector.get_instance()
# 读取环境变量中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# 设置解压缩参数(可选,默认不解压压缩包)
decompress = True # 是否识别压缩文件并解压,默认为false
decompressMaxLayer = 5 # 最大解压层数,decompress参数为true时生效
decompressMaxFileCount = 1000 # 最大解压文件数,decompress参数为true时生效
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# 示例用法1:扫描本地目录或文件
is_sync_scan = False # 是异步检测还是同步检测。异步检测性能更好。False表示异步检测
timeout_ms = 500000 # 单个样本检测时间,单位为毫秒
path = "test.bin" # 待扫描的文件或目录
# 启动扫描,直到扫描结束
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# 示例用法2:扫描URL文件
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待扫描的URL文件
md5 = "a767ffc59d93125c7505b6e21d000000"
# 同步扫描。如果需要异步扫描,调用detectUrl接口
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()
步骤四:解析返回结果
调用SDK进行恶意文件检测后,可在程序的运行结果中查看检测结果,同时结果将会同步到云安全中心控制台中。
结果数据结构:每次检测都会返回一个
DetectResult对象,其核心字段如下:字段名
类型
描述
md5String
文件的MD5哈希值。
timelong
本次检测所用时间(毫秒)。
error_codeERR_CODE
错误码,
ERR_SUCC表示成功。error_stringString
详细的错误信息描述。
resultRESULT (enum)
检测结果枚举:
RES_WHITE:安全RES_BLACK:可疑/恶意RES_PENDING: 检测中。
scoreint
检测分值,范围为 0-100,分数越高,风险越高。
0 ~ 60:安全,文件可信通常无需处理。
61 ~ 70:风险 (低危),文件存在轻微风险,建议人工复核。
71 ~ 80:可疑 (中危),文件有较大概率是恶意的,建议隔离或删除。
81 ~ 100:恶意 (高危),文件基本可确定为恶意文件,应立即隔离或删除。
virus_typeString
病毒类型,例如WebShell、MalScript、Hacktool" 等。
ext_infoString
扩展信息,通常为JSON格式字符串,包含更详细的检测上下文。
compresslistList
若检测的是压缩包且开启了解压,此列表包含压缩包内每个文件的检测结果。
错误码与故障排查
错误码
说明
错误原因
建议处理方式
ERR_SUCC成功
检测成功完成。
无
ERR_INIT初始化错误
SDK需要初始化,或重复进行了初始化。
确保在调用检测接口前已成功调用
init()。ERR_FILE_NOT_FOUND文件未找到
指定的本地文件路径不存在。
检查文件路径是否正确,以及程序是否有文件读取权限。
ERR_CALL_APIAPI调用错误
调用云端API时发生未知错误。
检测代码及参数值。
ERR_TIMEOUT超时
检测时间超过了设置的
timeout_ms。增加
timeout_ms参数的值,或检查网络连接。ERR_UPLOAD上传失败
文件上传至云端进行分析时失败,可重试。
通常为网络问题,建议增加重试逻辑。
ERR_ABORT任务终止
程序在检测完成前退出,可能当前检测文件数量已超过预付费购买数量或账户余额不足,无法支持后付费。
请追加购买检测文件数量或完成账户充值。
ERR_DETECT_QUEUE_FULL检测队列满
提交检测任务过于频繁,服务端队列已满。
业务高峰期可能触发。可调用
waitQueueAvailable()等待队列可用,或稍后重试。ERR_TIMEOUT_QUEUE队列超时
在队列中等待的时间过长。
ERR_MD5MD5格式错误
提供的MD5字符串格式不正确。
修改MD5格式。
ERR_URLURL格式错误
提供的URL字符串格式不正确。
修改URL格式。
查看与处理检测结果
查看结果
风险文件总览页签:
集中展示所有检测方式(OSS检测和API调用)发现的风险文件。可在此搜索、筛选风险文件,并查看其风险等级、威胁标签等信息。
如果是压缩包文件,可单击文件前的展开
图标,查看该压缩包内的风险文件列表。
OSS文件检测页签:
以Bucket维度展示风险统计信息,如每个Bucket的风险文件数、检测进度等。
可单击操作列的详情,查看关联的恶意文件列表。
结果投递:如果已开通云安全中心日志分析或日志管理功能,恶意文件检测日志将自动存入云安全中心的专属Logstore,方便您进行深度查询与溯源分析。详情请参见恶意文件检测日志和日志管理。
结果处理:支持对检测结果进行加白名单、忽略、禁止访问等操作,具体操作请参见处理检测结果。
退订服务
包年包月
在云安全中心控制台-总览页面的包年包月服务区域,单击。
在订单降配页签的恶意文件检测区域,将是否选购置为否后,单击立即下单。
按量付费
方式一:在云安全中心控制台-总览页面按量付费服务区域,关闭恶意文件检测SDK开关。
方式二:
访问云安全中心控制台-风险治理-恶意文件SDK,在页面左侧顶部,选择需防护资产所在的区域:中国内地或非中国内地。
在风险文件总览页签,单击的已使用检测次数区域的停止使用。
计费说明
使用恶意文件检测SDK功能按照检测的文件个数计算文件检测次数进行收费。
计费方式:支持免费试用、包年包月(预付费)和按量付费(后付费)。
免费试用:完成企业实名认证的阿里云账号可免费试用,提供10,000次检测次数。
重要若免费试用次数未使用完,开通付费版功能后,剩余的免费试用次数可继续且优先抵扣使用。
包年包月:资源包单价为0.001元/次/月,10万次起售,购买后在所选有效期内可用,到期后清零。
按量付费:
文件检测次数以自然日为计费周期,单价为0.0015元/次。
开通按量付费服务后,系统将收取云安全中心的基础服务费(0.05元/小时)。
压缩包计费:如果开启了压缩包检测,将按照解压后的文件个数计算消耗的检测次数。
示例:检测一个包含1000个小文件的压缩包,将消耗1000次检测次数。
处理检测结果:包含在功能服务费中,不额外收费。
更多付费模式和计费逻辑说明,请参见计费概述。
配额与限制
购买限制:在同一时间内,同一阿里云账号只能选择一种计费方式。
说明切换计费方式后,不影响已检测结果的展示和周期性策略的执行。
文件大小限制:
任务总文件大小: 无限制。
单个文件检测大小限制:
通过 SDK 检测离线文件:单个文件不超过 100 MB。
通过控制台检测 OSS 文件:单个文件不超过 1 GB。
超限说明:
当文件大小超过限制或解压后的文件超出数量/大小限制时,相关文件将不被检测,未检测文件不会统计计费次数。
恶意文件检测请求若超出队列容量时需等待,或优化并发处理能力。
压缩包文件检测限制:最多解压5层、1000个文件、总大小最多1GB。
环境限制:不支持金融云和政务云环境。
默认接口请求频率(QPS):试用版为10 次/秒,付费版为20 次/秒。
说明检测速度受网络状况、服务器性能、云产品规格等多种因素影响。
SDK 内部采用异步队列机制来应对请求峰值,提高并发处理能力。当内部队列满时,会暂停处理新请求,直至队列有可用空间。
附录
API参考文档
可以通过调用API接口方式使用恶意文件检测功能,部分参考文档如下,更多内容请参见恶意文件检测SDK、恶意文件检测OSS。
支持检测的病毒类型
病毒类型(virus_type) | 病毒名称 |
Backdoor | 反弹Shell后门 |
DDoS | DDoS木马 |
Downloader | 下载器木马 |
Engtest | 引擎测试程序 |
Hacktool | 黑客工具 |
Trojan | 高危程序 |
Malbaseware | 被污染的基础软件 |
MalScript | 恶意脚本 |
Malware | 恶意程序 |
Miner | 挖矿程序 |
Proxytool | 代理工具 |
RansomWare | 勒索病毒 |
RiskWare | 风险软件 |
Rootkit | Rootkit |
Stealer | 窃密工具 |
Scanner | 扫描器 |
Suspicious | 可疑程序 |
Virus | 感染型病毒 |
WebShell | 网站后门 |
Worm | 蠕虫病毒 |
AdWare | 广告程序 |
Patcher | 破解程序 |
Gametool | 私服工具 |
常见问题
计费与额度问题
对于已经使用的文件检测次数,支持返还吗?
不支持。检测前请确认检测的文件范围和检测策略,以免因误操作,浪费文件检测次数。
为什么OSS文件检测页签的“文件总数”和“检测文件总数”不一致?
文件总数:指Bucket中文件的自然个数(一个压缩包计为 1 个)。
检测文件总数:实际扫描的文件总数量。若开启压缩包扫描,其内部文件也会计入,总数取决于解压层级和最大解压文件数的设置。
当扫描了包含大量文件的压缩包时,“检测文件总数”会远大于“文件总数”,按实际检测的文件数收费。
因额度不足导致检测任务中断了怎么办?
如果使用包年包月模式,当剩余检测次数不足时,扫描任务会中断。可参考如下解决方案:
升级配置:在云安全中心控制台-总览页面的包年包月服务区域,单击变更配置>立即升级,买足够的恶意文件检测次数。具体说明,请参见升级与降配。
修改检测策略:在检测设置中关闭压缩包解压,以减少单次任务消耗的次数,然后重新执行扫描。
授权管理问题
如何查看恶意文件SDK的授权详情(如剩余次数、到期时间)?
可在控制台查看当前账号的个人SDK授权情况,包含版本、开通状态、授权限制,剩余次数、频率限制、到期时间等信息。
访问云安全中心控制台-风险治理-恶意文件SDK,在页面左侧顶部,选择需防护资产所在的区域:中国内地或非中国内地。
单击右上角的接口及权限详情。