本文为您介绍如何在DataWorks中进行PyODPS参数的传递。

前提条件

请提前开通MaxCompute和DataWorks服务,并在DataWorks完成业务流程的创建。本例使用DataWorks简单模式。

操作步骤

  1. 准备数据
    1. 登录Dataworks控制台,以DDL模式创建分区表user_detail。
      创建分区表
      代码如下。
      create table if not exists user_detail
      (
      userid    BIGINT comment '用户id',
      job       STRING comment '工作类型',
      education STRING comment '教育程度'
      ) comment '用户信息表'
      partitioned by (dt STRING comment '日期',region STRING comment '地区');
    2. DDL模式创建源数据表user_detail_ods。
      源数据表
      代码如下。
      create table if not exists user_detail_ods
      (
        userid    BIGINT comment '用户id',
        job       STRING comment '工作类型',
        education STRING comment '教育程度',
        dt STRING comment '日期',
        region STRING comment '地区'
      );
    3. 将以下测试数据保存为user_detail.txt文件。
      0001,互联网,本科,20190715,beijing
      0002,教育,大专,20190716,beijing
      0003,金融,硕士,20190715,shandong
      0004,互联网,硕士,20190715,beijing
    4. user_detail.txt中的数据导入到表user_detail_ods。
      数据导入
    5. 业务流程 > 数据开发中,新建ODPS SQL节点,并将节点命名为test。
      新建SQL节点
      test.sql代码如下。
      insert overwrite table user_detail partition (dt,region)
      select userid,job,education,dt,region from user_detail_ods;
    6. 单击运行按钮,将动态分区插入到分区表user_detail。
      新动态分区插入
  2. 传递参数
    1. 业务流程 > 数据开发中,新建PyODPS节点,并将节点命名为传递参数示例。
      传递参数
      代码如下。
      import sys
      reload(sys)
      print('dt=' + args['dt'])
      #修改系统默认编码。
      sys.setdefaultencoding('utf8')
      #获取表。
      t = o.get_table('user_detail')
      #接受传入的分区参数。
      with t.open_reader(partition='dt=' + args['dt'] + ',region=beijing') as reader1:
          count = reader1.count
      print("查询分区表数据:")
      for record in reader1:
          print record[0],record[1],record[2]
    2. 调度配置窗口中,将参数设置为undefineddt=20190715
      调度配置
    3. 单击高级运行(带参数运行)按钮。
      高级运行
    4. 运行日志中查看运行结果。
      运行日志