使用Python SDK语音反垃圾接口检测实时语音流或语音文件中的垃圾内容。

背景信息

语音流检测和语音文件检测均为异步检测,检测结果需要您以轮询或者回调的方式获取。关于调用请求中的检测场景参数(scenes)、返回结果中的分类参数(label)和操作建议参数(suggestion)的说明,请参见语音异步检测

语音检测按照检测的语音文件、语音流的时间长度进行计费,计费粒度为分钟。按照每天累计检测的总时长计费,每天检测总时长不足一分钟的按照1分钟计费。

准备工作

在进行具体的服务调用之前,请完成以下准备工作:

提交语音异步检测任务

接口 描述 支持的Region
VoiceAsyncScanRequest 异步检测语音流或语音文件中是否包含违规内容。语音流格式支持:
  • RTMP
  • HTTP
  • FLV
cn-shanghai
示例代码
  • 提交语音URL
    # coding=utf-8
    # 以下代码将调用语音异步检测接口。
    from aliyunsdkcore import client
    from aliyunsdkcore.profile import region_provider
    from aliyunsdkgreen.request.v20180509 import VoiceAsyncScanRequest
    from aliyunsdkgreenextension.request.extension import HttpContentHelper
    import json
    
    # 请使用您自己的AccessKey信息。
    clt = client.AcsClient("yourAccessKeyId", "yourAccessKeySecret","cn-shanghai")
    # 每次请求时需要新建request,请勿复用request对象。
    region_provider.modify_point('Green', 'cn-shanghai', 'green.cn-shanghai.aliyuncs.com')
    
    request = VoiceAsyncScanRequest.VoiceAsyncScanRequest()
    request.set_accept_format('JSON')
    
    task = {
        "url": "https://xxxx.flv",
        }
    
    # 如果需要检测语音流,请将live设置为true。
    request.set_content(HttpContentHelper.toValue({"tasks": [task], "scenes": ["antispam"], "live":false}))
    response = clt.do_action_with_exception(request)
    print(response)
    result = json.loads(response)
    if 200 == result["code"]:
        taskResults = result["data"]
        for taskResult in taskResults:
            code = taskResult["code"]
            if 200 == code:
                # 请保存taskId,用于查询语音检测结果。
                print(taskResult["taskId"])
  • 提交本地语音文件
    # coding=utf-8
    # 以下代码将调用语音异步检测接口。
    from aliyunsdkcore import client
    from aliyunsdkcore.profile import region_provider
    from aliyunsdkgreen.request.v20180509 import VoiceAsyncScanRequest
    from aliyunsdkgreenextension.request.extension import HttpContentHelper
    from aliyunsdkgreenextension.request.extension import ClientUploader
    import json
    
    # 请使用您自己的AccessKey信息。
    clt = client.AcsClient("yourAccessKeyId", "yourAccessKeySecret","cn-shanghai")
    # 每次请求时需要新建request,请勿复用request对象。
    region_provider.modify_point('Green', 'cn-shanghai', 'green.cn-shanghai.aliyuncs.com')
    
    request = VoiceAsyncScanRequest.VoiceAsyncScanRequest()
    request.set_accept_format('JSON')
    
    # 上传本地文件到服务端。请修改成您自己的本地文件路径。
    uploader = ClientUploader.getVoiceClientUploader(clt)
    url = uploader.uploadFile('d:/暴恐涉政1.mp3')
    # 将type设置为file,表示检测语音文件。
    task = {
        "url": url,
        "type": "file"
        }
    
    request.set_content(HttpContentHelper.toValue({"tasks": [task], "scenes": ["antispam"]}))
    response = clt.do_action_with_exception(request)
    print(response)
    result = json.loads(response)
    if 200 == result["code"]:
        taskResults = result["data"]
        for taskResult in taskResults:
            code = taskResult["code"]
            if 200 == code:
                # 请保存taskId,用于查询语音检测结果。
                print(taskResult["taskId"])

查询语音异步检测结果

接口 描述 支持的Region
VoiceAsyncScanResultsRequest 查询异步语音检测结果。 cn-shanghai
示例代码
# coding=utf-8
# 以下代码将调用异步语音检测结果查询接口。
from aliyunsdkcore import client
from aliyunsdkcore.profile import region_provider
from aliyunsdkgreen.request.v20180509 import VoiceAsyncScanResultsRequest
import json
import time

# 请使用您自己的AccessKey信息。
clt = client.AcsClient("yourAccessKeyId", "yourAccessKeySecret","cn-shanghai")
# 每次请求时需要新建request,请勿复用request对象。
region_provider.modify_point('Green', 'cn-shanghai', 'green.cn-shanghai.aliyuncs.com')

def get_task(task_id):
    request = VoiceAsyncScanResultsRequest.VoiceAsyncScanResultsRequest()
    request.set_accept_format('JSON')
    task = {
        "taskId": task_id,
    }
    request.set_content(bytearray(json.dumps([task]), "utf-8"))
    response = clt.do_action_with_exception(request)
    result = json.loads(response)
    if 200 == result["code"]:
        taskResults = result["data"]
        for taskResult in taskResults:
            code = taskResult["code"]
            if 280 == code:
                print(taskResult["msg"])
            else:
                print(response)
            return code
    result["code"]

while True:
    code = get_task("input taskid")
    if code == 280:
        print("Scanning:")
        time.sleep(10)
    elif code == 200:
        print("=====SUCCESS=====")
        break
    else:
        print("=====ERROR=====")
        break