本文为您介绍MaxFrame的常见报错问题,帮助您迅速识别并解决报错问题。
问题1:报错invalid type INT for function UDF definition, you need to set odps.sql.type.system.odps2=true; in order to use it
报错原因:在未开启MaxCompute 2.0数据类型版本的情况下,使用MaxCompute 2.0的数据类型,导致作业执行时出现错误。
解决方案:通过Flag开启MaxCompute 2.0数据类型,示例如下:
from maxframe import config # 在new_session之前添加 config.options.sql.settings = { "odps.sql.type.system.odps2": "true" }
问题2:报错UDF : No module named 'cloudpickle'
报错原因:缺少依赖的cloudpickle包。
解决方案:引用MaxCompute基础镜像,示例如下:
from maxframe import config # 在new_session之前添加 config.options.sql.settings = { "odps.session.image": "common", }
问题3:如何在DataFrame提交(apply)的UDF中实现资源复用
在部分UDF场景中,可能涉及到某些较多的资源创建或者销毁行为(例如初始化数据库连接、加载模型等),希望在每个UDF被加载时只会执行一次。
可以利用Python中函数参数默认值只被初始化一次的特性,实现资源复用。
例如,下述UDF中,模型只会被加载一次。
def predict(s, _ctx={}):
from ultralytics import YOLO
# _ctx 的初始值是一个空 dict,在 Python 执行过程中只会被初始化一次。
# 使用模型时,可以先判断 _ctx 中是否存在该模型,不存在则执行加载,然后存入 dict 中。
if not _ctx.get("model", None):
model = YOLO(os.path.join("./", "yolo11n.pt"))
_ctx["model"] = model
model = _ctx["model"]
# 后续调用模型的相关接口
下面给出了一个需要销毁资源的UDF示例,该示例中使用了一个自定义的类MyConnector负责创建和关闭数据库连接。
class MyConnector:
def __init__(self):
# 在 __init__ 中创建数据库连接
self.conn = create_connection()
def __del__(self):
# 在 __del__ 中关闭数据库连接
try:
self.conn.close()
except:
pass
def process(s, connector=MyConnector()):
# 直接调用 connector 内的数据库连接,无需在 UDF 内部再次执行连接创建和关闭
connector.conn.execute("xxxxx")
说明
初始化的实际执行次数取决于运行UDF的Worker数量,每个Worker执行UDF时都是一个单独的Python环境。例如,某个UDF调用需要处理10万行数据,总共被分配给了10个UDF Worker,每个Worker处理一万条数据,则总共会执行10次初始化,每个Worker在处理1万条数据的过程中,初始化过程只会执行一次。
文档内容是否对您有帮助?