UDF(User-Defined Function)即自定义函数,当iTAG提供的函数无法满足您的业务需求时,您可以根据本文中的开发流程及使用示例,自行编写代码逻辑创建自定义函数,以满足多样化的业务需求。
背景信息
iTAG UDF定义为小批量计算,尽量不要在UDF内部进行内存占用高的复杂计算。UDF数据操作目前支持保证数据条目的值修改和水平字段拓展,暂不支持垂直行拓展。
前提条件
创建工作空间,具体操作,请参见创建工作空间。
操作步骤
进入智能标注(iTAG)。
登录PAI控制台。
在左侧导航栏单击工作空间列表,在工作空间列表页面中单击待操作的工作空间名称,进入对应的工作空间。
在左侧导航栏,选择数据准备>智能标注(iTAG)。
单击前往管理页,然后单击服务市场,跳转至智能标注新版页面。
在管理中心>资产管理页面的服务管理页签下,单击新建UDF服务,配置相关参数。
参数
说明
基础信息
服务名称
自定义名称,全局唯一。
服务描述
自定义UDF服务的应用场景或注意事项。
UDF协议
DataFrame:工作流UDF,表示该UDF的输入和输出使用DataFrame协议进行,可以在标注链路/工作流/预标注中使用。
Json:标注链路UDF,表示该UDF的输入和输出使用JSON协议进行, 可以在标注链路中使用。
UDF权限
官方:由官方创建并提供给用户进行功能调试,全租户通用。
私有:用户根据实际需求自定义的UDF,租户内部通用。
参数定义
自定义入参
配置UDF需要接收的真实字段信息并完成调试。
用户提供的Mock值,在用户调试时提供给UDF,完成调试,模拟真实数据和场景。
决定UDF的Schema,Mock值的Schema将会固定为UDF输入的Schema。
出参预览(自动生成)
出参不需要用户进行配置,会展示用户调试后返回的结果进行打印,来确定返回的Schema。
在UDF调试区域编写UDF,然后单击调试,检查UDF逻辑是否正确。
调试时,会将Mock值赋值到UDF入参里,然后在出参预览展示出参名和出参值。
调试无问题后,单击保存。
(可选)UDF创建成功后,可在以下场景中使用。
场景一:创建任务
在管理中心>任务管理页面的任务管理页签下,单击创建任务。完成数据选择和模板选择后,在任务配置页面设置UDF相关参数。
场景二:创建任务流
在管理中心>任务管理页面的任务流管理页签下,单击创建任务流。构建任务流后,选中UDF组件,在右侧配置UDF相关参数。
场景三:创建模板
在管理中心>资产管理页面的模板管理页签下,单击新建模板,根据实际需求选择目标模板后,单击编辑,进入创建模板页面。单击,在右侧数据校验选择已创建的UDF。
UDF代码实践
本文为您提供以下UDF代码示例,应用时请结合您实际业务情况适当调整。
对问题进行预标注
在自定义入参区域配置如下参数。
参数名为
input_data
,Mock值为test
。在UDF调试区域填入以下代码,单击调试。
#! /usr/bin/env python # -*- coding:utf-8 -*- # Author: <Author Name> import time import json import requests import uuid from aitag.enums import EventKeyEnum, ContextKeyEnum from aitag.annotations import Result, ImageAnnotation from aitag.decorator.decorator import udf_data_process import logging logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO) def ask_gpt(query, sys_prompt="你是一个多领域的专家,职责是回答对话中的问题给出的答案有以下原则:尽量简洁;"): """接入 GPT 服务""" dialog_messages = [] dialog_messages.append({"role": "system", "content": sys_prompt}) dialog_messages.append({"role": "user", "content": query}) url = 'https://api.mit-spider.alibaba-inc.com/chatgpt/api/ask' authorization = "" header = { "Content-Type": "application/json;charset=utf-8", "Authorization": authorization } data = { "model": "gpt-4", "messages": dialog_messages, "temperature": 0.7 } round = 0 retry = 2 response_content = "" while round < retry: response = requests.post(url, json=data, headers=header) try: resp_dict = response.json() if resp_dict.get("code") == 500: message = resp_dict.get("message") if "HTTPSConnectionPool" in message: round = round + 1 continue else: res = {"reranked_responses": "Token过长"} break elif resp_dict.get("code") == 429: res = {"reranked_responses": "今日token超出限额"} break else: resp_data = resp_dict.get("data", {}) contents = [] for element in resp_data.get("response", ""): contents.append(element.get("content")) response_content = contents[-1] break except: res = {"reranked_responses": "Error, Try Again"} break return response_content def build_pre_label_obj(post_text, rewrite_text): """构建预标注格式""" return { "tabId": "MultimodalRLHFExtensions", "annotations": [ { "id": None, "labels": { }, "exif": { "data": [ { "postText": post_text, "postList": [ { "index": None, "text": post_text, "title": "机器人1", "questionsValue": { } } ], "rewriteText": rewrite_text, "replyList": [ { "index": None, "text": rewrite_text, "title": "机器人1", "questionsValue": { "是否通过": "是", "是否参考信息不足无法回答": "是" } } ], "id": str(uuid.uuid4()) } ] } } ], "type": "MultimodalRLHFExtensions", "version": "v2" } @udf_data_process def handler(event, context): """ udf 执行的入口函数,系统会构造入参, 函数出参要是pnadas udf对象(由装饰器来进行约束和校验)。 :param event: 是aitag.decorator.in_out_process.Event的实例, 会存放本次要处理的数据等 :param context: 是aitag.decorator.in_out_process.Context 存放本次执行的上下文信息 :return: DataFrame, 必须返回pandas DataFrame的格式 """ # 打印执行上下文呢 logging.info(context) # event是aitag.decorator.in_out_process.Event的实例, 使用如下方式获得Pandas DataFrame类型的输入数据 data_df = event.userInputDataFrame # [user code start] ==================================================== # 假如入参需要用到"a","b"两列(请在自定义参数里配置这两个参数),然后将两列的值做个拼接,存到一个新列 "c" c_values, objects = [], [] for index, row in data_df.iterrows(): rsp_content = ask_gpt(row.humman_prompt) c_values.append(rsp_content) pre_label_dict = build_pre_label_obj(row.humman_prompt, rsp_content) objects.append(json.dumps(pre_label_dict)) # DataFrame里插入新的一列,指定索引,列名,列值数组 data_df.insert(loc=len(data_df.columns), column="对话预标注", value=c_values) data_df.insert(loc=len(data_df.columns), column="objects", value=objects) return data_df
在工作流中对PDF进行预处理
将PDF文件上传到OSS。
在自定义入参区域配置如下参数。
参数名为
pdf_url
,Mock值为PDF文件所在的OSS路径。在UDF调试区域填入以下代码,单击调试。
import json from urllib import request import logging from io import BytesIO import fitz from aitag.decorator.decorator import udf_data_process logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO) @udf_data_process def handler(event, context): """ udf 执行的入口函数,系统会构造入参, 函数出参要是pnadas udf对象(由装饰器来进行约束和校验)。 :param event: 是aitag.decorator.in_out_process.Event的实例, 会存放本次要处理的数据等 :param context: 是aitag.decorator.in_out_process.Context 存放本次执行的上下文信息 :return: DataFrame, 必须返回pandas DataFrame的格式 """ logging.info(context) data_df = event.userInputDataFrame def get_pdf_text_info(file): """获取 PDF 文本信息""" document = fitz.open(stream=file.read()) try: page_infos, texts = [], [] for page_num, page in enumerate(document.pages()): text = page.get_text("json") texts.append(page.get_text("text")) page_info = json.loads(text) page_info["page_num"] = page_num + 1 page_infos.append(page_info) finally: document.close() return { "text_json_info": json.dumps(page_infos), "text_info": "\n".join(texts) } def process_row(row): """处理 DataFrame 行""" pdf_url = row["pdf_url"] with request.urlopen(pdf_url) as file: pdf_text_info = get_pdf_text_info(file) for key, value in pdf_text_info.items(): row[key] = value return row # 返回处理后的 DataFrame 对象 return data_df.apply(process_row, axis=1)
对对话排序标注链路中用户提交的内容进行自定义限制和校验
在自定义入参区域配置如下参数。
UDF协议选择Json,参数名为
input_data
,Mock值如下:[ { "Rank 1(best)": [ { "value": "很抱歉听到您在抖音直播间购买手机时遭遇了欺骗。以下是一些您可以考虑采取的步骤:\n\n与商家沟通:如果您认为自己被欺骗了,您可以与商家联系并说明情况。要求他们退款或提供更好的解决方案。\n向平台投诉:如果商家无法解决问题,您可以向抖音平台投诉。抖音有一个投诉渠道,您可以通过该渠道提交投诉并提供相关证据。平台会对此进行调查,并可能采取相应的行动。\n寻求法律帮助:如果您认为商家的行为违反了消费者权益,您可以考虑寻求法律帮助。您可以咨询律师,了解您的权利和应采取的措施。\n总之,如果您遭遇了欺骗,您应该采取积极的措施来保护自己的权益,并确保类似事件不再发生。", "feedback": "", "key": "item2", "sortStamp": [ "1" ], "radio": [ "1" ] } ], "Rank 2": [ { "value": "首先,您可以与商家协商解决。您可以要求商家提供购买凭证和相关的销售记录,以便确认商家是否存在欺诈行为。如果商家不能提供相关凭证,您可以考虑向消费者协会或市场监管部门投诉,以维护您的合法权益。\n\n另外,根据消费者权益保护法,如果商家存在欺诈行为,您可以要求商家退货或者赔偿。您可以根据自己的实际损失和商家的违法情节,要求商家承担相应的责任。\n\n最后,您可以在购买商品时多加谨慎,尽量选择信誉好、口碑佳的商家,以减少购物风险。", "feedback": "", "key": "item5", "sortStamp": [ "2" ], "radio": [ "2" ] }, { "value": "首先,您可以联系抖音平台客服,向平台投诉该商家的行为,平台将会协调处理您的投诉。同时,您也可以联系消费者协会或者相关行政部门投诉商家的欺诈行为。另外,您可以保留相关证据,如购买记录、聊天记录、交易记录等,以便更好地维护自己的权益。最后,建议您在购买商品时,选择信誉良好的商家,并且保留好相关证据,以便在出现问题时能够及时维权。", "feedback": "", "key": "item1", "sortStamp": [ "3" ], "radio": [ "3" ] } ], "Rank 3": [ { "value": "遇到这种情况,建议先与商家沟通,了解具体情况。对于商家的说法,可以通过查看手机的详细参数和照片来进行核实。如果商家仍然坚持说这是一部手机,而且没有其他证据表明它是模型机,可以考虑向消费者协会或者相关行政部门投诉。此外,如果商家存在其他违法行为,可以向相关部门提供证据并申请维权。", "feedback": "", "key": "item4", "sortStamp": [ "4" ], "radio": [ "4" ] } ], "Rank 4": [ { "value": "首先感谢您选择在抖音直播间购买商品,同时也非常遗憾您在购买过程中遇到了问题。根据您提供的情况,您在直播间中抽奖中得到了一部手机,但是拿到手的却是一个模型机,这确实给您带来了困扰。\n\n根据您提供的信息,商家表示手机是以实物为准,但是您认为手机并不是实物,而是一个模型机。建议您可以与商家进行沟通,要求商家提供相关的购买记录和凭证,以便您能够确认手机的真实性和存在性。同时,您也可以要求商家提供质检报告等相关证明文件,以便确认手机是否符合相关的质量标准。\n\n如果商家无法提供相关证明文件,您可以考虑向消费者协会或者市场监管部门进行投诉,以维护您的合法权益。同时,您也可以尝试与商家协商处理方案,例如退货或者更换其他商品。\n\n最后,我建议您在购买商品时要保持警惕,尽可能选择信誉好、口碑佳的商家,同时也要注意保留相关的购买凭证和证明文件,以便在发生纠纷时能够提供相关证据。", "feedback": "", "key": "item3", "sortStamp": [ "5" ], "radio": [ "5" ] } ], "Rank 5": [ ], "Rank 6": [ ], "Rank 7(worst)": [ ] } ]
在UDF调试区域填入以下代码,单击调试。
#! /usr/bin/env python # -*- coding:utf-8 -*- # Author: <Author Name> import json import random as r from aitag.enums import EventKeyEnum, ContextKeyEnum from aitag.annotations import Result, ImageAnnotation import logging def handler(event, context): """ udf 执行的入口函数,系统会构造入参, 函数出参要是pnadas udf对象(由装饰器来进行约束和校验)。 :param event: 是aitag.decorator.in_out_process.Event的实例, 会存放本次要处理的数据等 :param context: 是aitag.decorator.in_out_process.Context 存放本次执行的上下文信息 :return: DataFrame, 必须返回pandas DataFrame的格式 """ # 判断前后的字段 field_name = "总分" # 入参event、context详细说明见:https://yuque.antfin.com/ilabel/nd60n9/firbp3lfe8m64c8s#BxeBe try: event_user_dict = json.loads(event).get(EventKeyEnum.userInput.name) # context 由系统复制,主要包括一些运行时信息,用户可不关心。具体包括哪些可以通过ContextKeyEnum枚举,或者文档查看。 context_dict = json.loads(context) trace_id = context_dict.get(ContextKeyEnum.requestId.name) # 初始化校验结果 udf_result_dict = {"Success": True, "Data": '{"Message": "ok"}'} # 从标注结果获取对应的rank结果 rank_result = json.loads(event_user_dict["input_data"]) # rank_items = json.loads(rank_result[0].get("exif", {}).get("data", "[]")) # 修改校验规则,假设第一个rank结果必须大于1,可以根据自己的需要进行修改 if len(rank_result) == 0: # 防止没有标注结果 udf_result_dict = {"Success": False, "Data": '{"Message": "no data submit!!!"}'} else: # {'input_data': '[{"exif": {"data": "{\\"rank 1(best)\\": [{\\"value\\": \\"\\"}]}"}}]'} # harmful_s = {'harmful_all', 'harmful_zero'} for idx, mr_d in enumerate(rank_result): rank_keys = sorted(mr_d.keys()) # 全部的排序值 model_num = sum([len(mr_d[rank]) for rank in rank_keys]) current_num = 0 success_flag = True max_rank_value = 0 for rank in rank_keys: # 如果当前为空,则必定已经填满所有model # rule: 是排序不是打分 if len(mr_d[rank]) == 0 and current_num < model_num: udf_result_dict = {"Success": False, "Data": '{"Message": "[CUSTOM ERROR]本页第%d题%s为空,后面rank却不为空"}' % ( idx + 1, rank)} success_flag = False # 必须要填写复选框 good_flag = True rank_radio_s = set() for model_info in mr_d[rank]: cur_rank_value = int(model_info.get(field_name, "0")) max_rank_value = max_rank_value or cur_rank_value if max_rank_value < cur_rank_value: udf_result_dict = {"Success": False, "Data": '{"Message": "[CUSTOM ERROR]本页第%d题 候选项:(文本)(%02d) %s目前最小为%d 小于当前 %d"}' % ( idx + 1, int(model_info['key'][len('item'):]), rank, max_rank_value, cur_rank_value)} success_flag = False break else: max_rank_value = cur_rank_value if not success_flag: break current_num += len(mr_d[rank]) itag_udf_result = Result(0, "success", udf_result_dict) # 返回字符串格式。str() 或 to_json() except: udf_result_dict = {"Success": True, "Data": '{"Message": "ok"}'} itag_udf_result = Result(0, "success", udf_result_dict) return itag_udf_result.to_json()
附录:SDK说明
UDF SDK提供了数据处理,定义了数据规范,您可以通过SDK来完成业务。
enums
记录常用的枚举值,您可以从枚举值中取到输入的数据和当前批次的数据。
ContextKeyEnum
名称
值
说明
requestId
0
请求ID
datasetId
1
数据集ID
dataId
2
数据ID
taskId
3
任务ID
templateId
4
模板ID
udfId
5
UDF ID
udfExecSource
6
执行来源
EventKeyEnum
名称
值
说明
userInput
0
用户输入
upStreamInput
1
流输入
inputList
2
输入列表
decorator
udf_data_process
对输入的字符串的事件和上下文信息转化为DataFrame。
函数内参数说明
Event,UDF的事件说明,具体参数说明如下:
userInput
用户输入的内容,同V1版本的
event.get(EventKeyEnum.userInput.name)
。userInputList
用户输入的内容列表,同V1版本的
event.get(EventKeyEnum.inputList.name)
。userInputDataFrame
用户输入数据的DataFrame对象,用户可以直接操作该对象进行逻辑处理。
Context,记录了执行UDF的上下文内容,具体参数说明如下:
requestId:请求ID。
datasetId:数据集ID。
dataId:处理的数据ID。
taskId:当前任务ID。
templateId:当前模板ID。
udfId:当前执行UDF的ID。
udfExecSource:UDF执行的来源信息,一般为Workflow、标注流程、NoteBook。
输入和输出
新版的装饰器函数将会处理输入为DataFrame对象的支持,需要保证输出也为DataFrame对象,不允许使用其他对象进行返回,否则会报错。
附录:目前支持的Python包
ccessControl==5.7
Acquisition==5.0
aiofiles==0.6.0
aiohttp==3.8.3
aiosignal==1.3.1
aitag
alibabacloud-credentials==0.3.2
alibabacloud-docmind-api20220711==1.0.0
alibabacloud-endpoint-util==0.0.3
alibabacloud-gateway-spi==0.0.1
alibabacloud-openapi-util==0.2.1
alibabacloud-openplatform20191219==2.0.0
alibabacloud-oss-sdk==0.1.0
alibabacloud-oss-util==0.0.6
alibabacloud-tea==0.3.3
alibabacloud-tea-fileform==0.0.5
alibabacloud-tea-openapi==0.3.7
alibabacloud-tea-util==0.3.11
alibabacloud-tea-xml==0.0.2
aliyun-python-sdk-core==2.13.35
aliyun-python-sdk-core-v3==2.13.32
aliyun-python-sdk-kms==2.14.0
arxiv==1.4.8
async-timeout==4.0.3
asyncpg==0.22.0
attrs==21.2.0
AuthEncoding==5.0
BTrees==5.0
cachetools==5.3.1
certifi==2020.12.5
cffi==1.14.5
chardet==3.0.4
charset-normalizer==3.2.0
cleanlab==2.4.0
click==6.7
cmake==3.16.3
concurrent-log==1.0.1
coreapi==2.3.3
coreschema==0.0.4
crcmod==1.7
croniter==1.0.13
crontab==1.0.1
cryptography==3.0
cycler==0.10.0
Cython==0.29.13
dashscope==1.7.0
dataclasses==0.6
dataclasses-json==0.5.14
DateTime==5.2
decorator==4.4.2
defusedxml==0.7.1
Deprecated==1.2.14
dj-pagination==2.4.0
Django==2.2.4
django-allauth==0.44.0
django-appconf==1.0.2
django-cacheops==4.0.6
django-compressor==2.2
django-cors-headers==3.0.2
django-filter==2.0.0
django-redis==4.12.1
django-rest-auth==0.9.5
django-revproxy==0.9.15
django-rq==2.8.1
django-sendfile==0.3.11
djangorestframework==3.9.1
dlib==19.22.0
drf-yasg==1.16.0
easydict==1.9
EasyProcess==0.3
entrypoint2==1.1
expiringdict==1.1.4
ExtensionClass==5.0
faiio
feedparser==6.0.10
ffmpy==0.2.2
filelock==3.12.3
freezegun==1.2.2
frozenlist==1.4.0
fsspec==2023.9.0
funcy==1.16
furl==2.0.0
geojson==2.5.0
gevent==21.1.2
gitdb==4.0.7
gitdb2==4.0.2
GitPython==2.1.11
google-api-core==2.11.1
google-api-python-client==2.98.0
google-api-wrapper==2.0.0a1
google-auth==2.22.0
google-auth-httplib2==0.1.0
googleapis-common-protos==1.60.0
greenlet==1.1.0
grpcio==1.37.1
grpcio-tools==1.37.1
httplib2==0.22.0
huggingface-hub==0.16.4
idna==2.10
imageio==2.9.0
inflection==0.5.1
itypes==1.2.0
Jinja2==2.11.1
jmespath==0.10.0
joblib==1.0.1
jsonpath==0.82
kiwisolver==1.3.1
langchain==0.0.274
langsmith==0.0.33
levenshtein==0.12.0
Markdown==3.0.1
MarkupSafe==2.0.1
marshmallow==3.20.1
matplotlib==3.4.2
multidict==5.1.0
multipart==0.2.4
munch==2.5.0
mypy-extensions==1.0.0
mysqlclient==2.0.3
nacos-sdk-python==0.1.12
networkx==2.5.1
numexpr==2.8.5
numpy==1.22.4
oauthlib==3.1.0
openai==0.27.8
opencv-contrib-python==4.5.5.62
opencv-python==4.8.0.76
orderedmultidict==1.0.1
oss2==2.14.0
packaging==23.1
pandas==2.0.3
pascal-voc-writer==0.1.4
patool==1.12
pdf2image==1.6.0
Persistence==4.0.post1
persistent==5.0
Pillow==8.2.0
portalocker==2.3.0
protobuf==3.14.0
psutil==5.7.0
psycopg2-binary==2.8.6
py==1.10.0
pyasn1==0.4.8
pyasn1-modules==0.3.0
pycparser==2.20
pycryptodome==3.10.1
pydantic==1.10.12
pyDes==2.0.1
Pygments==2.3.1
PyJWT==2.1.0
pymilvus==0.2.13
PyMuPDF==1.23.1
PyMuPDFb==1.23.0
pyodps==0.10.7
pyparsing==2.4.7
pypdf==3.15.4
python-dateutil==2.8.1
python-gettext==5.0
python-Levenshtein==0.12.2
python-logstash==0.4.6
python3-openid==3.2.0
pytz==2020.1
pyunpack==0.3
PyWavelets==1.1.1
PyYAML==5.4.1
rcssmin==1.0.6
redis==4.0.2
regex==2023.8.8
requests==2.31.0
requests-oauthlib==1.3.0
RestrictedPython==6.0
retry==0.9.2
retrying==1.3.3
rjsmin==1.0.12
rq==1.15.1
rq-scheduler==0.13.1
rsa==4.6
ruamel.yaml==0.17.4
ruamel.yaml.clib==0.2.2
rules==2.0
safetensors==0.3.3
scikit-image==0.18.1
scikit-learn==0.24.2
scipy==1.6.3
sgmllib3k==1.0.0
Shapely==1.7.1
six==1.16.0
sklearn==0.0
smmap==4.0.0
SQLAlchemy==1.4.0
sqlalchemy-json==0.4.0
sqlparse==0.2.4
tenacity==8.2.3
termcolor==2.3.0
tfsClient
threadpoolctl==2.1.0
tifffile==2021.4.8
tiktoken==0.4.0
tokenizers==0.13.3
torch==1.12.1
torchaudio==0.12.1
torchvision==0.13.1
tqdm==4.66.1
transaction==3.1.0
transformers==4.32.0
typing-extensions==4.2.0
typing-inspect==0.9.0
tzdata==2023.3
ujson==4.0.2
uritemplate==3.0.1
urllib3==1.25.11
uWSGI==2.0.19.1
Vizer==0.1.5
wrapt==1.15.0
ws4py==0.5.1
yarl==1.6.3
zExceptions==5.0
zope.browser==3.0
zope.component==6.0
zope.configuration==5.0
zope.contenttype==5.0
zope.deferredimport==5.0
zope.deprecation==5.0
zope.event==4.5.0
zope.exceptions==5.0.1
zope.hookable==5.4
zope.i18n==5.1
zope.i18nmessageid==6.0.1
zope.interface==5.4.0
zope.location==5.0
zope.proxy==5.0.0
zope.publisher==7.0
zope.schema==7.0.1
zope.security==6.1
zope.testing==5.0.1