PyODPS的Sequence及执行操作

本文介绍如何使用PyODPSSequence及执行操作。

操作步骤

  1. 创建MaxCompute项目

  2. 创建DataWorks工作空间。本文以参加数据开发(Data Studio)公测的新版工作空间为例。

  3. DataWorks中新建表pyodps_iris

    1. 登录DataWorks控制台,在左上角选择地域。

    2. 工作空间列表页面,单击目标工作空间对应的操作快速进入 > Data Studio

    3. 调试配置页面,选择计算资源资源组

      如果显示没有资源组,点击新建资源组后需要等待几分钟创建完成。并在资源组列表中,为该资源组绑定使用工作空间。

    4. MaxCompute SQL节点文件中,按照如下语句新建表pyodps_iris

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment '片长度(cm)',
      sepalwidth   DOUBLE comment '片宽度(cm)',
      petallength  DOUBLE comment '瓣长度(cm)',
      petalwidth   DOUBLE comment '瓣宽度(cm)',
      name         STRING comment '种类'
      );
  4. 下载测试数据集并导入MaxCompute。

    1. 下载并解压鸢尾花数据集,将iris.data重命名为iris.csv

    2. 登录DataWorks控制台,在左上角选择地域。

    3. 在左侧导航栏选择数据集成 > 数据上传与下载

    4. 单击进入数据上传与下载

    5. 在左侧导航栏单击上传图标image,单击数据上传

  5. Data Studio页面中新建MaxCompute PyODPS 2节点。输入如下示例代码,单击运行。

    from odps import DataFrame
    iris = DataFrame(o.get_table('iristable_new'))
    
    #获取列。
    print iris.sepallength.head(5)
    
    print iris['sepallength'].head(5)
    
    #查看列的类型。
    print iris.sepallength.dtype
    
    #修改列的类型。
    iris.sepallength.astype('int')
    
    #计算。
    print iris.groupby('name').sepallength.max().head(5)
    
    print iris.sepallength.max()
    
    #重命名列。
    print iris.sepalwidth.rename('speal_width').head(5)
    
    #简单的列变化。
    print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
  6. 新建并运行PyODPS节点PyExecute。代码如下:

    from odps import options
    from odps import DataFrame
    
    #查看运行时的instancelogview。
    options.verbose = True
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    my_logs = []
    def my_loggers(x):
      my_logs.append(x)
    
    options.verbose_log = my_loggers
    
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    print(my_logs)
    
    #缓存中间Collection结果。
    cached = iris[iris.sepalwidth < 3.5].cache()
    print cached.head(3)
    
    #异步和并行执行。
    from odps.df import Delay
    delay = Delay() #创建Delay对象。
    df = iris[iris.sepalwidth < 5].cache()  #有一个共同的依赖。
    future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future对象,此时并没有执行。
    future2 = df.sepalwidth.mean().execute(delay=delay)
    future3 = df.sepalwidth.max().execute(delay=delay)
    
    delay.execute(n_parallel=3)
    
    print future1.result()
    print future2.result()
    print future3.result()