PyODPS 2节点

更新时间:2025-02-28 01:47:15

DataWorks提供了PyODPS 2节点类型,允许您使用PyODPS语法在DataWorks平台上开发PyODPS任务。PyODPS集成了MaxComputePython SDK,使您能够在PyODPS 2节点上直接编写和编辑Python代码来操作MaxCompute。

前提条件

  • (可选,RAM账号需要)进行任务开发的RAM账号已被添加至对应工作空间中,并具有开发空间管理员(权限较大,谨慎添加)角色权限,添加成员的操作详情请参见为工作空间添加空间成员

    说明

    如果您使用的是主账号,则忽略该添加操作。

  • 已开发创建项目目录,详情请参见项目目录

  • 已创建PyODPS 2节点,详情请参见创建周期任务

背景信息

PyODPSMaxComputePython SDK,提供了简洁易用的编程接口,让您能够使用Python编写作业、查询表和视图,以及管理MaxCompute资源,详情请参见PyODPS。在DataWorks中,您可以通过PyODPS节点来调度运行Python任务,并将其与其他作业进行集成操作。

注意事项

  • 如果您的PyODPS任务需要访问特殊的网络环境(如VPC网络或IDC网络中的数据源或服务等),请使用Serverless调度资源组,并参考网络连通解决方案打通Serverless资源组与目标环境的网络连通。

  • PyODPS语法及更多信息请参见PyODPS文档

  • PyODPS节点分为PyODPS 2PyODPS 3两种,二者的区别在于底层Python语言版本不同。PyODPS 2底层Python语言版本为Python 2,PyODPS 3底层Python语言版本为Python 3,请您根据实际使用的Python语言版本创建PyODPS节点。

  • 若通过PyODPS节点执行SQL无法正常产生数据血缘关系,即数据血缘在数据地图无法正常展示,您可在任务代码处通过手动设置DataWorks调度运行的相关参数解决。查看数据血缘,详情请参见查看血缘信息;参数设置,详情请参见设置运行参数hints。任务运行时所需参数可参考如下代码获取。

    import os
    ...
    # get DataWorks sheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submiting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Pyodps节点的输出日志最大支持4MB。建议您尽量避免在日志中直接输出大量的数据结果。相反,建议您多输出告警日志和正常进度的日志,以提供更有价值的信息。

使用限制

  • 使用独享调度资源组执行PyODPS节点时,建议在节点内获取到独享资源组本地处理的数据不超过50MB,该操作受限于独享调度资源组的规格,处理的本地数据过多并超出操作系统阈值时可能发生OOM(Got Killed)错误。请避免在PyODPS节点中写入过多的数据处理代码。详情请参见高效使用PyODPS最佳实践

  • 使用Serverless资源组执行PYODPS节点时,您可根据节点内需要处理的数据量合理配置PyODPS节点的CU。

  • 如果您发现有Got killed报错,即表明内存使用超限,进程被中止。因此,请尽量避免本地的数据操作。通过PyODPS发起的SQLDataFrame任务(除to_pandas外)不受此限制。

  • 非自定义函数代码可以使用平台预装的NumpyPandas。不支持其他带有二进制代码的第三方包。

  • 由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默认设置为False。如果需要全局开启instance tunnel,需要手动将该值设置为True。

  • PyODPS 2节点底层的Python版本为2.7。

  • 不支持在PyODPS节点内配置多Python任务并发执行。

操作步骤

  1. PyODPS 2节点编辑页面,执行如下开发操作。

    PyODPS 2代码示例

    创建PyODPS节点后,您可以进行代码编辑及运行,更多关于PyODPS语法说明,请参见基本操作概述。本文为您介绍以下五种代码示例,您可根据实际业务需要,选择示例内容。

    ODPS入口
    执行SQL
    设置运行参数
    读取运行结果
    DataFrame

    DataWorksPyODPS节点中,将会包含一个全局的变量odpso,即ODPS入口,您无需手动定义ODPS入口。

    print(odps.exist_table('PyODPS_iris'))

    您可以在PyODPS节点中执行SQL,详情请参见SQL

    • DataWorks上默认未开启instance tunnel,即instance.open_reader默认使用Result接口(最多一万条记录)。您可以通过reader.count获取记录数。如果您需要迭代获取全部数据,则需要关闭limit限制。您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit限制。

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # 关闭limit限制,读取全部数据。
      
      with instance.open_reader() as reader:
        # 通过Instance Tunnel可读取全部数据。
    • 您也可以通过在open_reader上添加tunnel=True,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加 limit=False,实现仅对本次关闭limit限制。

      # 本次open_reader使用Instance Tunnel接口,且能读取全部数据。
      with instance.open_reader(tunnel=True, limit=False) as reader:
    说明

    若您未开启Instance Tunnel,可能导致获取数据格式错误,解决方法请参见Python SDK常见问题

    • 您可以通过设置hints参数,来设置运行时的参数,参数类型是dict。 Hints参数的详情请参见SET操作

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • 对全局配置设置sql.settings后,每次运行时,都需要添加相关的运行时的参数。

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # 根据全局配置添加hints。

    运行SQL的实例能够直接执行open_reader的操作,有以下两种情况:

    • SQL返回了结构化的数据。

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # 处理每一个record。
    • 可能执行的是descSQL语句,通过reader.raw属性,获取到原始的SQL执行结果。

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      说明

      如果使用了自定义调度参数,页面上直接触发运行PyODPS 2节点时,需要写死时间,PyODPS节点无法直接替换。

    您还可以通过DataFrame的方式处理数据。

    • 执行

      DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # 调用立即执行的方法,处理每条Record。

      如果您需要在Print时调用立即执行,需要开启options.interactive

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # 在开始处打开开关。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Print时会立即执行。
    • 打印详细信息

      通过设置options.verbose选项。在DataWorks上,默认已经处于打开状态,运行过程会打印Logview等详细信息。

    PyODPS 2代码开发

    以下以一个简单示例为您介绍PyODPS节点的使用:

    1. 准备数据集,创建pyodps_iris示例表,具体操作请参见Dataframe数据处理

    2. 创建DataFrame,详情请参见MaxCompute表创建DataFrame

    3. PyODPS节点中输入以下代码。

      from odps.df import DataFrame
      
      # 从ODPS表创建DataFrame。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepallength.head(5))

    执行PyODPS任务

    1. 调试配置计算资源中,选择配置计算资源、计算配额DataWorks资源组

      说明
      • 访问公共网络或VPC网络环境的数据源需要使用与数据源测试连通性成功的调度资源组。详情请参见网络连通方案

      • 您可以根据任务情况选择配置镜像信息。

    2. 在工具栏单击运行PyODPS任务。

  2. 如需定期执行节点任务,请根据业务需求配置调度信息。配置详情请参见调度配置

    DataWorks中的SQL节点不同,为了避免影响代码,PyODPS节点不会在代码中替换类似 ${param_name}的字符串,而是在执行代码前,在全局变量中增加一个名为argsdict,调度参数可以在此获取。例如,在参数中设置ds=${yyyymmdd},则可以通过以下方式在代码中获取该参数。

    print('ds=' + args['ds'])
    ds=20240930
    说明

    如果您需要获取名为ds的分区,则可以使用如下方法。

    o.get_table('table_name').get_partition('ds=' + args['ds'])
  3. 节点任务配置完成后,需对节点进行发布。详情请参见节点/工作流发布

  4. 任务发布后,您可以在运维中心查看周期任务的运行情况。详情请参见运维中心入门

后续步骤

PyODPS常见问题:您可了解PyODPS执行过程中的常见问题,便于出现异常时快速排查解决。

  • 本页导读 (1)
  • 前提条件
  • 背景信息
  • 注意事项
  • 使用限制
  • 操作步骤
  • 后续步骤