# -*- coding: utf-8 -*-
import argparse
import alibabacloud_oss_v2 as oss
# 解析XML响应
import xml.etree.ElementTree as ET
import json
from datetime import datetime
# --- 新增导入 ---
from alibabacloud_oss_v2.models import GetObjectTaggingRequest
# --- 新增导入结束 ---
def get_inputs():
"""获取用户的语义查询(必选)和筛选条件(可选)"""
# 1. 获取必选的语义查询
print(" 请输入语义描述(示例:停着车的院子)")
query = input("> 语义关键词: ").strip()
while not query:
print(" 语义关键词不能为空!")
query = input("> 语义关键词: ").strip()
conditions = [] # 初始化筛选条件列表
target_tag = None # --- 用于存储目标标签 ---
# 2. 获取可选的标签筛选
print("\n (可选)请输入标签筛选条件(将用于客户端过滤,示例:camera=camera-a,按回车跳过)")
tag_input = input("> 标签 (格式 key=value): ").strip()
if tag_input:
if '=' in tag_input:
# conditions.append({
# 'field': 'Tags',
# 'value': tag_input,
# 'op': 'eq'
# })
target_tag = tag_input
print(f" 信息:标签 '{target_tag}' 将在获取结果后于客户端进行过滤。")
else:
print(" 标签格式不正确,已忽略。请使用 'key=value' 格式。")
# --- 返回 target_tag ---
return query, conditions, target_tag
def build_metaquery_xml(query, conditions):
"""构建符合OSS规范的MetaQuery XML (包含语义查询和可选筛选)"""
# 始终包含语义查询部分
xml_parts = [f'<Query>{query}</Query>']
# 添加媒体类型限制
xml_parts.append('<MediaTypes><MediaType>video</MediaType></MediaTypes>')
# 添加可选的筛选条件 - 在语义模式下,仅支持部分字段(如此处的 Filename)
for cond in conditions:
# 在 semantic 模式下跳过 Tags 字段的 SimpleQuery 构建 (虽然我们已经不在 conditions 里加 Tags 了,但保留以防万一)
if cond['field'] == 'Tags':
continue
json_query = json.dumps({
"Field": cond['field'],
"Value": cond['value'],
"Operation": cond['op']
}, ensure_ascii=False)
xml_parts.append(f'<SimpleQuery>{json_query}</SimpleQuery>')
# 组合成完整的 MetaQuery XML
meta_query_xml = f'''<MetaQuery>
{"".join(xml_parts)}
</MetaQuery>'''
return meta_query_xml
def format_result(key, pre_url):
"""格式化单个搜索结果"""
return f""" 文件地址:{pre_url}
文件路径:{key}
-----------------------"""
def perform_search():
# 直接赋值给命令行参数
args = argparse.Namespace(
region='cn-beijing', # 替换为你的区域
bucket='ipc-videos-oss-metaquery-demo', # 替换为你的存储空间名称
endpoint='https://oss-cn-beijing.aliyuncs.com', # 替换为你的endpoint
)
# 初始化OSS客户端
credentials = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials
cfg.region = args.region
if args.endpoint:
cfg.endpoint = args.endpoint
client = oss.Client(cfg)
# 获取用户输入 (语义查询 + 可选条件 + 目标标签)
query, conditions, target_tag = get_inputs()
# 构建请求
try:
# 构造请求体(XML 数据)
# --- 只传入 conditions (不含Tags) ---
data_str = build_metaquery_xml(query, conditions)
# 定义操作输入
req = oss.OperationInput(
op_name='DoMetaQuery',
method='POST',
parameters={
'metaQuery': '',
'mode': 'semantic', # <-- 重新加入 semantic 模式
'comp': 'query',
},
headers=None,
body=data_str.encode("utf-8"),
bucket=args.bucket,
)
# 调用泛化接口执行操作
print("\n 发送 DoMetaQuery 请求...")
resp = client.invoke_operation(req)
print(f"\n 请求成功,HTTP 状态码: {resp.http_response.status_code}")
except oss.exceptions.ServiceError as e:
print(f" 服务端错误 (ServiceError):{e.message}")
print(f" - HTTP Status Code: {e.status_code}")
print(f" - Error Code: {e.error_code}")
print(f" - Request ID: {e.request_id}")
return
except oss.exceptions.ClientError as e: # 添加对客户端错误的捕获
print(f" 客户端或网络错误 (ClientError):{e.message}")
return
except Exception as e: # 捕获其他可能的异常
print(f" 未知错误:{e}")
import traceback
traceback.print_exc() # 打印完整的 traceback
return
# 解析和处理结果...
final_results_count = 0 # --- 新增:计数器,用于统计最终符合条件的结果 ---
try:
root = ET.fromstring(resp.http_response.content.decode('utf-8'))
# 查找所有File元素
files = root.findall('.//File')
print(f"\n 从OSS获取到 {len(files)} 个初步匹配结果,开始进行客户端标签过滤...")
if not files:
# 检查是否有 NextContinuationToken,可能需要分页
next_token_elem = root.find('.//NextContinuationToken')
if next_token_elem is not None and next_token_elem.text:
print(" 提示:可能还有更多结果,当前实现未处理分页。")
print("\n 没有初步匹配结果。")
return # 没有文件时提前返回
for i, file in enumerate(files, 1):
# 获取文件名
key_element = file.find('Filename')
if key_element is None:
print(f" 警告:第 {i} 个初步结果缺少 Filename 字段,已跳过。")
continue
key = key_element.text
# --- 客户端标签过滤 ---
if target_tag:
try:
tagging_req = GetObjectTaggingRequest(bucket=args.bucket, key=key)
tagging_resp = client.get_object_tagging(tagging_req)
# 检查返回的标签集是否包含目标标签
tag_found = False
target_k, target_v = target_tag.split('=', 1)
if tagging_resp.tag_set and tagging_resp.tag_set.tags: # 使用 .tags
for tag in tagging_resp.tag_set.tags: # 使用 .tags
if tag.key == target_k and tag.value == target_v:
tag_found = True
break
if not tag_found:
continue # 标签不匹配,跳过此文件
except oss.exceptions.ServiceError as tag_err:
if tag_err.status_code == 404 and tag_err.error_code == 'NoSuchTagSet':
continue
else:
print(f" 警告:获取文件 '{key}' 的标签时出错: {tag_err.error_code} - {tag_err.message},已跳过。")
continue
except Exception as tag_e:
print(f" 警告:获取或处理文件 '{key}' 的标签时发生未知错误: {tag_e},已跳过。")
# --- 添加 traceback 方便调试 ---
import traceback
traceback.print_exc()
# --- 添加结束 ---
continue
# --- 客户端标签过滤结束 ---
# --- 如果通过了标签过滤(或者没有设置标签过滤),则处理并打印结果 ---
final_results_count += 1 # 增加最终结果计数
print(f"\n[{final_results_count}] 文件 '{key}' 符合所有条件:") # 打印最终结果编号
# 生成预签名URL
try:
pre_url = client.presign(
oss.GetObjectRequest(
bucket=args.bucket,
key=key,
)
)
print(format_result(key, pre_url.url))
except Exception as presign_e:
print(f" 警告:为文件 '{key}' 生成预签名URL时出错: {presign_e}")
print(format_result(key, "[无法生成URL]"))
# --- 循环结束后打印最终统计 ---
print(f"\n 客户端过滤完成,共找到 {final_results_count} 个最终匹配结果。")
except ET.ParseError as xml_e:
print(f" 错误:解析OSS响应XML时出错 - {xml_e}")
except Exception as parse_e:
print(f" 错误:处理结果时发生意外错误 - {parse_e}")
if __name__ == "__main__":
perform_search()