常见问题

本文为您介绍MaxFrame的常见报错问题,帮助您迅速识别并解决报错问题。

问题1:报错invalid type INT for function UDF definition, you need to set odps.sql.type.system.odps2=true; in order to use it

  • 报错原因:在未开启MaxCompute 2.0数据类型版本的情况下,使用MaxCompute 2.0的数据类型,导致作业执行时出现错误。

  • 解决方案:通过Flag开启MaxCompute 2.0数据类型,示例如下:

    from maxframe import config
    # 在new_session之前添加
    config.options.sql.settings = {
        "odps.sql.type.system.odps2": "true"
    }

问题2:报错UDF : No module named 'cloudpickle'

  • 报错原因:缺少依赖的cloudpickle包。

  • 解决方案:引用MaxCompute基础镜像,示例如下:

    from maxframe import config
    # 在new_session之前添加
    config.options.sql.settings = {
        "odps.session.image": "common",
    }

问题3:如何在DataFrame提交(apply)的UDF中实现资源复用

在部分UDF场景中,可能涉及到某些较多的资源创建或者销毁行为(例如初始化数据库连接、加载模型等),希望在每个UDF被加载时只会执行一次。

可以利用Python中函数参数默认值只被初始化一次的特性,实现资源复用。

例如,下述UDF中,模型只会被加载一次。

def predict(s, _ctx={}):
    from ultralytics import YOLO
    # _ctx 的初始值是一个空 dict,在 Python 执行过程中只会被初始化一次。
    # 使用模型时,可以先判断 _ctx 中是否存在该模型,不存在则执行加载,然后存入 dict 中。
    if not _ctx.get("model", None):
        model = YOLO(os.path.join("./", "yolo11n.pt"))
        _ctx["model"] = model
    model = _ctx["model"]

    # 后续调用模型的相关接口

下面给出了一个需要销毁资源的UDF示例,该示例中使用了一个自定义的类MyConnector负责创建和关闭数据库连接。

class MyConnector:

    def __init__(self):
        # 在 __init__ 中创建数据库连接
        self.conn = create_connection()

    def __del__(self):
        # 在 __del__ 中关闭数据库连接
        try:
            self.conn.close()
        except:
            pass


def process(s, connector=MyConnector()):
    # 直接调用 connector 内的数据库连接,无需在 UDF 内部再次执行连接创建和关闭
    connector.conn.execute("xxxxx")
说明

初始化的实际执行次数取决于运行UDF的Worker数量,每个Worker执行UDF时都是一个单独的Python环境。例如,某个UDF调用需要处理10万行数据,总共被分配给了10个UDF Worker,每个Worker处理一万条数据,则总共会执行10次初始化,每个Worker在处理1万条数据的过程中,初始化过程只会执行一次。