恶意文件检测SDK
恶意文件检测SDK功能依托云安全中心多引擎检测平台,为您提供简单易用的恶意文件检测服务,只需要编写少量代码即可识别恶意文件。恶意文件检测SDK功能也支持检测阿里云对象存储OSS是否存在恶意文件。本文介绍如何使用恶意文件检测SDK功能。
使用说明
恶意文件检测SDK为云端检测方案,使用该功能会将您的文件上传到云端进行检测。
一次恶意文件检测仅可以检测一个不超过20 MB的文件。
经过企业实名认证的阿里云账号可以免费试用恶意文件检测SDK功能。试用版包含10,000次恶意文件检测次数。购买企业版可以选择您所需的恶意文件检测次数。试用版和企业版的默认接口请求频率不同。
试用版:10次/秒。
企业版:20次/秒。
恶意文件检测SDK支持以下两种使用方式:
文件检测SDK:通过SDK接入的方式检测恶意文件,在返回结果中获取恶意文件信息。支持通过Java或Python方式接入。
OSS文件检测:在云安全中心控制台执行对阿里云对象存储Bucket内文件的检测,并支持查看存在风险的文件列表。
恶意文件的检测速度会受到网速、计算机性能、云产品限制等方面的影响。恶意文件检测SDK内部采用队列的方式,缓解外部峰值时的请求来提高并发率。当内部的队列满时,拒绝外部继续提交请求,并使用等待接口等待队列有空间可用时再处理外部请求。
您可以通过加长队列长度,提高并发量,该方法会影响部分样本的检测时长。timeout_ms参数为样本超时时间,单位为毫秒。为了减少超时,建议您将timeout_ms参数设置为60,000毫秒(即60秒)。
支持检测病毒类型
前提条件
如果您使用的是RAM用户,请确保已为RAM用户授予AliyunYundunSASFullAccess权限。具体操作,请参见为RAM用户授权。
开通服务
云安全中心支持通过免费试用和付费购买的方式开通服务。
免费试用
如果您的阿里云账号已通过企业认证,您可以通过免费试用开通恶意文件检测SDK服务,并获取1万次恶意文件检测次数。每个阿里云账号仅有一次免费试用机会。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
在恶意文件检测SDK页面,单击立即试用。
付费购买
如果免费试用无法满足需求,您可以通过付费购买的方式开通恶意文件检测SDK服务。
如果存在未使用的免费试用次数,付费购买次数后,剩余的免费试用次数将累计在您付费购买的次数中。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
在恶意文件检测SDK页面,单击立即购买。在购买面板,选择恶意文件检测SDK为是,并按照要检测文件的数量购买足够的恶意文件检测次数。
如果您已购买云安全中心付费版本,您可以直接选择所需的恶意文件检测次数。如未购买,请根据您的需求选择云安全中心版本:
无需使用云安全中心的其他安全防护功能时,版本选择仅采购增值服务。
如需使用云安全中心的其他功能,例如漏洞修复、容器威胁检测,请选择相应的云安全中心版本。各版本的功能差异详情,请参见功能特性。
仔细阅读并选中服务协议,单击立即购买并完成支付。
开通恶意文件检测SDK服务后,您可以在恶意文件检测SDK页面,查看您的恶意文件检测SDK的剩余检测次数。

调用SDK检测恶意文件
准备工作
已配置环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。阿里云SDK支持通过定义
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。更多信息,请参见身份验证配置。您需要选择接入方式并参考下表的说明获取SDK包。
接入方式
版本要求
获取SDK包
Java接入
必须使用1.8或更高版本的JDK。
您可以通过以下方式获取Java SDK。
通过Maven直接获取(联网环境下推荐):打开Maven项目下的pom.xml文件,添加aliyun-sas-filedetect依赖。
<dependency> <groupId>com.aliyun</groupId> <artifactId>filedetect</artifactId> <version>1.0.0</version> </dependency>
离线导入安装 (无互联网连接的环境):在联网环境下访问Java SDK代码库并下载Java SDK,将其下载的Java SDK添加到项目工程中。
Python接入
Python 3.6及以上版本。
您可以通过以下方式获取部署SDK包。
使用pip快速安装(联网环境下推荐)
pip install -U alibabacloud_filedetect
离线安装 (无互联网连接的环境):在联网环境下访问Python代码库并下载Python SDK。将Python SDK上传至项目工程环境,解压压缩包后,运行以下安装命令:
pip install alibabacloud_sas_filedetect-1.0.0-py3-none-any.whl
示例代码
使用Python接入时,请替换下面示例中的测试用例路径参数,即path
。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步检测文件接口。
* @param detector 检测器对象。
* @param path 待检测的文件路径。
* @param timeout_ms 设置超时时间,单位为毫秒。
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满。
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测文件接口。
* @param detector 检测器对象。
* @param path 待检测的文件路径。
* @param timeout_ms 设置超时时间,单位为毫秒。
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满。
* @param callback 结果回调函数。
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化检测结果。
* @param result 检测结果对象。
* @return 格式化后的字符串。
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] md5: %s, time: %d, result: %s, score: %d"
, info.md5, info.time, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", virus_type: %s, ext_info: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
/**
* 同步检测目录或文件。
* @param path 指定路径,支持选择文件或目录。指定目录时使用递归遍历。
* @param is_sync_scan 是否使用同步接口,推荐使用异步接口。true表示使用同步接口,false表示使用异步接口。
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 异步检测目录或文件。
* @param path 指定路径,支持选择文件或目录。指定目录时使用递归遍历。
* @param is_sync_scan 是否使用同步接口,推荐使用异步接口。true表示使用同步接口,false表示使用异步接口。
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 开始对文件或目录进行检测。
* @param path 指定路径,支持选择文件或目录。指定目录时使用递归遍历。
* @param is_sync_scan 是否使用同步接口,推荐使用异步接口。true表示使用同步接口,false表示使用异步接口。
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任务执行完成。
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 获取检测器实例。
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化。
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
boolean is_sync_scan = false; // 指定是异步检测还是同步检测。异步检测性能更优。false表示异步检测,true表示同步检测。
int timeout_ms = 120000; // 单个样本检测时间,单位为毫秒。
String path = "test2.php"; // 待扫描的文件或目录。
// 启动扫描,直到扫描结束。
scan(detector, path, timeout_ms, is_sync_scan);
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步检测文件接口。
@param detector 检测器对象。
@param path 待检测的文件路径。
@param timeout_ms 设置超时时间,单位为毫秒。
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满。
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测文件接口。
@param detector 检测器对象。
@param path 待检测的文件路径。
@param timeout_ms 设置超时时间,单位为毫秒。
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满。
@param callback 结果回调函数。
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化检测结果。
@param result 检测结果对象。
@return 格式化后的字符串。
"""
@staticmethod
def formatDetectResult(result):
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] md5: {}, time: {}, result: {}, score: {}".format(info.md5,
info.time, info.result, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", virus_type: {}, ext_info: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code, info.error_string)
return msg
"""
同步检测目录或文件。
@param path 指定路径,支持选择文件或目录。指定目录时使用递归遍历。
@param is_sync 是否使用同步接口,推荐使用异步接口。true表示使用同步接口,false表示使用异步接口。
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
elif os.path.isfile(abs_path):
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(self.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
异步检测目录或文件。
@param path 指定路径,支持选择文件或目录。指定目录时使用递归遍历。
@param is_sync 是否使用同步接口,推荐使用异步接口。true表示使用同步接口,false表示使用异步接口。
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
elif os.path.isfile(abs_path):
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
开始对文件或目录进行检测。
@param path 指定路径,支持选择文件或目录。指定目录时使用递归遍历。
@param is_sync 是否使用同步接口,推荐使用异步接口。true表示使用同步接口,false表示使用异步接口。
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任务完成。
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(used_time_ms, len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 获取检测器实例。
detector = OpenAPIDetector.get_instance()
# 读取环境变量中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化。
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret))
# 自定义扫描参数。
is_sync_scan = False # 是异步检测还是同步检测。异步检测性能更优。false表示异步检测。
timeout_ms = 120000 # 单个样本检测时间,单位为毫秒。
path = "./test" # 待扫描的文件或目录。
# 启动扫描,直到扫描结束。
self.scan(detector, path, timeout_ms, is_sync_scan)
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()
返回结果
struct DetectResult {
std::string md5; // 样本md5
long time = 0; // 用时,单位为毫秒
ERR_CODE error_code; // 错误码
std::string error_string; // 扩展错误信息
enum RESULT {
RES_WHITE = 0, //安全文件
RES_BLACK = 1, //可疑文件
RES_PENDING = 3 //检测中
};
RESULT result; //检测结果
int score; //检测分值,取值范围0~100
std::string virus_type; //病毒类型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //扩展信息为JSON字符串
};
错误码说明如下:
enum ERR_CODE {
ERR_INIT = -100, // 需要初始化,或者重复初始化
ERR_FILE_NOT_FOUND = -99, // 文件未找到
ERR_DETECT_QUEUE_FULL = -98, // 检测队列满
ERR_CALL_API = -97, // 调用API错误
ERR_TIMEOUT = -96, // 超时
ERR_SUCC = 0 // 成功
};
检测分值越高,文件可能存在的风险越高。检测分值和危险等级对应表如下:
分数区间 | 危险等级 |
0~60 | 安全 |
61~70 | 风险 |
71~80 | 可疑 |
81~100 | 恶意 |
OSS文件检测
执行Bucket文件检测
执行Bucket文件检测前,请确保当前阿里云账号下有足够的剩余检测次数。如果剩余检测次数不足,您可以在恶意文件检测SDK页面的风险文件总览页签下,单击升级配置购买足够的检测次数。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
单击OSS文件检测页签,选择合适的检测方式并执行检测。
如果您的Bucket没有在OSS文件检测的列表中,您可以单击同步Bucket,同步最新的Bucket列表。
检测方式
说明
操作步骤
手动全量检测
检测单个或多个Bucket内的所有文件。
在OSS文件检测页签,单击单个Bucke操作列的检测或选中多个Bucket后单击批量检测。
在策略配置面板,指定需要检测文件的类型并单击确定。
手动增量检测
针对已检测且上次检测后有文件更新的Bucket,提供只检测未检测过的文件的功能。
在OSS文件检测页签,单击目标Bucke操作列的增量检测。
在策略配置面板,指定需要检测文件的类型并单击确定。
自动检测
支持对指定Bucket设置检测条件开启周期性自动检测。自动检测仅检测OSS新增的文件,不会重复检测同一个文件。
在OSS文件检测页签,单击策略配置。
在策略配置面板,打开文件自动检测开关。
在策略配置面板,配置检测周期、文件检测时间、文件检测类型和生效Bucket,单击确定。
查看检测结果
查看OSS风险文件总览
在风险文件总览页签的统计数据区域,您可以查看已检测的OSS文件总数、存在不同等级风险(高危、中危、低危)的文件数量和剩余检测次数。
查看存在风险文件详情
在风险文件详情列表,您可以单击目标风险文件操作列的详情。在该文件的详情面板,查看检测出的存在风险的文件详情、事件说明等信息。
重要检测出风险的文件可能被植入了恶意脚本、DDoS木马等病毒,建议您根据事件说明中的处置建议及时处理该文件。
查看OSS文件检测统计数据
在OSS文件检测页签的统计数据区域,您可以查看检测出恶意文件的Bucket数、存在不同等级风险(高危、中危、低危)的文件数量、未检测Bucket数量和Bucket总数。
查看单个Bucket的检测结果
在OSS文件检测页签,您可以单击目标Bucke操作列的详情,查看Bucket的基本信息和风险文件详情。
导出检测结果
导出所有风险文件列表
在风险文件总览页签,单击
图标,等待文件导出完成后,在提示框中单击下载。
按Bucket维度导出所有风险文件列表
在OSS文件检测页签,单击
图标,等待文件导出完成后,在提示框中单击下载。