本文介绍如何使用PyODPS的Sequence及执行操作。
操作步骤
已创建DataWorks工作空间。本文以参加数据开发(Data Studio)公测的新版工作空间为例。
在DataWorks中新建表
pyodps_iris。登录DataWorks控制台,在左上角选择地域。
在工作空间列表页面,单击目标工作空间对应的操作列。
在调试配置页面,选择计算资源和资源组。
如果显示没有资源组,点击新建资源组后需要等待几分钟创建完成。并在资源组列表中,为该资源组绑定使用工作空间。
在MaxCompute SQL节点文件中,按照如下语句新建表
pyodps_iris。CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment '片长度(cm)', sepalwidth DOUBLE comment '片宽度(cm)', petallength DOUBLE comment '瓣长度(cm)', petalwidth DOUBLE comment '瓣宽度(cm)', name STRING comment '种类' );
下载测试数据集并导入MaxCompute。
下载并解压鸢尾花数据集,将
iris.data重命名为iris.csv。登录DataWorks控制台,在左上角选择地域。
在左侧导航栏选择。
单击进入数据上传与下载。
在左侧导航栏单击上传图标
,单击数据上传。
在Data Studio页面中新建MaxCompute PyODPS 2节点。输入如下示例代码,单击运行。
from odps import DataFrame iris = DataFrame(o.get_table('iristable_new')) #获取列。 print iris.sepallength.head(5) print iris['sepallength'].head(5) #查看列的类型。 print iris.sepallength.dtype #修改列的类型。 iris.sepallength.astype('int') #计算。 print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() #重命名列。 print iris.sepalwidth.rename('speal_width').head(5) #简单的列变化。 print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)新建并运行PyODPS节点PyExecute。代码如下:
from odps import options from odps import DataFrame #查看运行时的instance的logview。 options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) #缓存中间Collection结果。 cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) #异步和并行执行。 from odps.df import Delay delay = Delay() #创建Delay对象。 df = iris[iris.sepalwidth < 5].cache() #有一个共同的依赖。 future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future对象,此时并没有执行。 future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()
该文章对您有帮助吗?