PyODPS是MaxCompute的Python版本的SDK,提供简单方便的Python编程接口。PyODPS支持类似Pandas的快速、灵活和富有表现力的数据结构。您可以通过PyODPS提供的DataFrame API使用Pandas的数据结果处理功能。本文用于帮助您快速开始使用PyODPS,并且能够用于实际项目。

前提条件

请您确保已经完成如下操作:

背景信息

MaxCompute大部分开发都可以通过SQL语句实现,但对于复杂的业务场景以及自定义函数(UDF)都需要使用Python,例如:
  • 接口对接

    MaxCompute表的每条数据,都需要通过身份证号码和外部HTTP的验证接口进行连通。

  • 异步调用

    对于上千个处理逻辑相似的任务,如果对于不同任务都单独创建一个节点,不利于管理,并且在同一时间会占用过多资源。利用PyODPS可以异步生成SQL执行任务队列,通过队列实现任务数的并发,方便统一管理所有节点。

  • UDF

    当MaxCompute内置函数无法满足需求时,需要快速开发UDF自定义函数。例如,要针对DECIMAL类型进行千分位数字格式化展示。

  • 提升SQL性能

    要计算库存事务是否遵循先进先出的规则。由于只有一张表,每条数据都需要跟历史数据进行比对,而通常库存事务表体量巨大,时间复杂度是O(N²),往往无法运行处理历史至今的结果。用Python结合Cache循环每条记录,只需遍历一次即可,时间复杂度是O(N),可极大提升效率。

  • 其他SQL很难解决的场景

    金额分摊模型,10元由3个人均分,把尾差摊到其中一人身上。

更多PyODPS应用场景请参见:

操作步骤

  1. 新建PyODPS节点。
    为方便您快速开始,本文中使用DataWorks PyODPS节点进行开发,详情请参见PyODPS
    说明
    • PyODPS节点底层的Python版本为2.7 。
    • PyODPS节点获取本地处理的数据量不能超过50MB,节点运行时占用的内存不能超过1GB,否则节点任务会被系统中止。因此请避免在PyODPS任务中写入数据量较大的Python处理代码。
    • 在DataWorks上编写代码并进行调试效率较低,为提升运行效率,建议本地安装IDEA进行代码开发。
    1. 新建业务流程。
      进入数据开发页面,右键单击业务流程,选择新建业务流程
    2. 新建PyODPS节点。
      右键单击新建的业务流程,选择新建 > MaxCompute > PyODPS,输入节点名称,单击提交
  2. 编辑PyODPS节点。
    1. 编写程序代码。
      在PyODPS节点的编辑框中输入测试代码。以下是一个完整的使用PyODPS提供的接口执行表操作示例,更多关于表操作以及SQL操作的方法请参见SQL
      from odps import ODPS
      import sys
      
      reload (sys)
      #修改系统默认编码。数据中存在中文字符时需要执行此操作。
      sys.setdefaultencoding('utf8')
      
      #以直接指定字段名以及字段类型的方式创建非分区表my_new_table。
      table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
      
      #向非分区表my_new_table中插入数据。
      records = [[111, 'aaa'],
                [222, 'bbb'],
                [333, 'ccc'],
                [444, '中文']]
      o.write_table(table, records)
      
      #读取非分区表my_new_table中的数据。
      for record in o.read_table(table):
          print record[0],record[1]
      
      #以运行SQL的方式读取表中的数据。
      result = o.execute_sql('select * from my_new_table;',hints={'odps.sql.allow.fullscan': 'true'})
      
      #读取SQL执行结果。
      with result.open_reader() as reader:    
          for record in reader:            
              print record[0],record[1]
      
      #删除表以清除资源。
      table.drop()
    2. 运行代码。
      完成编辑后,单击运行按钮。运行结束后,您可以在下方的运行日志中看到运行结果。输出如下代表执行成功。11