Use PyODPS in DataWorks

更新时间:
复制 MD 格式

PyODPS, the Python SDK for MaxCompute, allows you to develop and run PyODPS tasks in DataWorks. This topic describes the limitations, outlines the main workflow, and provides simple examples to get you started.

Limitations

  • Usage limits

    If you receive a Got killed error, the process was terminated for exceeding the memory limit. Avoid downloading data within a PyODPS node for processing in DataWorks. Instead, submit data processing tasks to MaxCompute for distributed execution. For a comparison of the two methods, see Precautions: Do not download full data locally to run PyODPS.

  • Package support limits

    • PyODPS nodes in DataWorks do not include some packages, such as matplotlib. This may limit the following features:

      • The plot function of DataFrame.

      • DataFrame user-defined functions (UDFs) must be submitted to MaxCompute for execution. The Python sandbox supports only pure Python libraries and NumPy as third-party libraries, preventing the direct use of Pandas.

      • In code that runs in DataWorks without UDFs, you can use the pre-installed NumPy and Pandas packages. Other third-party packages containing binary code are not supported.

    • PyODPS nodes in DataWorks do not support the Python atexit package. Use a try-finally block to implement similar functionality.

  • Limit on the number of data records to read

    By default, options.tunnel.use_instance_tunnel is set to False for PyODPS nodes in DataWorks, limiting reads to a maximum of 10,000 records. To read more records, you must globally enable instance tunnel by setting options.tunnel.use_instance_tunnel to True.

Procedure

  1. Create a PyODPS node.

    Go to the Data Development page in DataWorks to create a PyODPS node. PyODPS nodes come in two types:

    • PyODPS 2, which is based on Python 2.

    • PyODPS 3, which is based on Python 3.

    Create a PyODPS node based on the Python version you use. For detailed steps, see Develop a PyODPS 2 task and Develop a PyODPS 3 task.

  2. Develop code for the PyODPS task.

    After creating the node, follow the examples in the following sections to learn about the main capabilities of PyODPS.

    For more information about how to use PyODPS, see Overview of basic operations and DataFrame (not recommended). You can also follow the example in Use a PyODPS node for Jieba Chinese word segmentation to perform a simple end-to-end operation.

  3. Configure the scheduling properties, then save, commit, and deploy the node to enable it to run periodically.

MaxCompute entry point

In a PyODPS node, DataWorks provides a global variable, odps or o, as the MaxCompute entry point. You do not need to define it manually. For example:

# Check whether the pyodps_iris table exists.
print(o.exist_table('pyodps_iris'))

The code returns True, indicating that the pyodps_iris table exists.

Note

The credentials for the entry point object o can access only MaxCompute, not other cloud services. DataWorks provides only these credentials during the execution of a PyODPS node. You cannot obtain additional credentials by calling methods on this object, such as o.from_global.

Execute SQL

General capabilities

  • You can run SQL commands in a PyODPS node by using methods such as execute_sql() or run_sql(). These methods primarily support Data Definition Language (DDL) and Data Manipulation Language (DML) statements.

    Note

    The execute_sql() and run_sql() methods do not support all SQL statements. For non-DDL or non-DML statements, you must use other methods. For example, use the run_security_query method to run GRANT or REVOKE statements, and use the run_xflow or execute_xflow method to run API commands.

  • To read SQL execution results in a PyODPS node, use open_reader().

For more information about SQL-related operations in a PyODPS node, see SQL.

Precautions: data format and record limits

For historical compatibility reasons, Instance Tunnel is not enabled by default in DataWorks. This means the instance.open_reader parameter uses the Result interface by default. This interface is subject to a limit of 10,000 records and has issues supporting complex data types. If your Project does not have a data protection mechanism and you need to iteratively retrieve all data or read fields of complex data types such as Array, you need to enable Instance Tunnel and disable the limit.

  • Disabling the limit at runtime

    You can use the following statement to enable Instance Tunnel globally and disable the limit restriction.

    options.tunnel.use_instance_tunnel = True
    options.tunnel.limit_instance_tunnel = False  # Disable the limit to read all data.
    with instance.open_reader() as reader:
        # You can read all data through Instance Tunnel.
        # You can use reader.count to get the record count.
  • You can disable the limit for the current Reader only.

    You can also add tunnel=True to open_reader to enable the instance tunnel for the current open_reader call only. Additionally, you can add limit=False to disable the limit restriction for the current call only.

    Important

    If you do not enable Instance Tunnel, you may encounter data format errors. For solutions, see Python SDK FAQ.

    with instance.open_reader(tunnel=True, limit=False) as reader:
        # This open_reader call uses the Instance Tunnel interface and can read all data.

For more information about Instance Tunnel and data reading limits, see Read SQL execution results.

DataFrame

  • Execution

    In DataWorks, DataFrame operations require an explicit call to an immediate execution method, such as execute or persist. For example:

    # Call an immediate execution method to process each record and print all data from the pyodps_iris table where iris.sepalwidth is less than 3.
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():  
        print(record)
  • Verbose output

    By default, the options.verbose option is enabled in DataWorks, printing detailed information such as the Logview URL during node execution. You can configure this option to control whether these details are displayed.

For more examples of DataFrame operations, see DataFrame (not recommended).

Scheduling parameters

In a PyODPS node, you can use scheduling parameters to obtain values such as the business date for a task run. While the definition and use of scheduling parameters are consistent with SQL nodes, they are referenced differently in code.

  • SQL nodes use a string such as ${param_name} directly in the code.

  • Instead of performing string replacement for ${param_name}, DataWorks adds a global dictionary named args before the node runs. You can obtain the value of a scheduling parameter by accessing this dictionary, for example, args[param_name].

For example, if you set the scheduling parameter ds=${yyyymmdd} in the Basic properties > Parameters pane, you can retrieve this parameter in your code as follows:

  • Obtain the value of the ds parameter.

    print('ds=' + args['ds'])
    # Returns the time of ds, for example, ds=20161116
  • Obtain data from a table in a partition named ds=${yyyymmdd}.

    o.get_table('table_name').get_partition('ds=' + args['ds'])
    # Obtain data from the ds partition of the table_name table.

For more information about scheduling parameters, see Configure and use scheduling parameters.

Runtime parameter hints

To set runtime parameters for a task, use the hints parameter, which is a dict.

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

You can also configure sql.settings globally. Once configured, these settings are automatically added to each execution.

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris')  # Hints are added based on the global configuration.

Third-party packages

DataWorks nodes come pre-installed with the following third-party packages. The following table lists the available versions.

Package name

Python 2 version

Python 3 version

requests

2.11.1

2.26.0

numpy

1.16.6

1.18.1

pandas

0.24.2

1.0.5

scipy

0.19.0

1.3.0

scikit_learn

0.18.1

0.22.1

pyarrow

0.16.0

2.0.0

lz4

2.1.4

3.1.10

zstandard

0.14.1

0.17.0

If you need a package that is not pre-installed, you can use the load_resource_package method to load a third-party package from a MaxCompute resource. First, create a package archive with pyodps-pack, then use load_resource_package to load it before importing its contents. For more information about how to use pyodps-pack, see Create a third-party package for PyODPS and Use a third-party package in PyODPS.

Note

When you create a package for a Python 2 node, add the --dwpy27 parameter to the pyodps-pack command.

Example:

  1. Use the following command to package ipaddress.

    pyodps-pack -o ipaddress-bundle.tar.gz ipaddress
  2. After uploading and committing ipaddress-bundle.tar.gz as a resource, you can use the ipaddress package in a PyODPS 3 node as follows:

    load_resource_package("ipaddress-bundle.tar.gz")
    import ipaddress

DataWorks limits the total size of downloaded packages to 100 MB. To exclude pre-installed packages from your bundle, use the --exclude parameter of pyodps-pack. For example, the following command excludes the pre-installed numpy and pandas packages.

pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <YOUR_PACKAGE>

Other accounts

To access MaxCompute with a different account, use the as_account method on the MaxCompute entry point object. This creates a new entry point object for the specified account, which is independent of the default o instance.

Important

The as_account method requires PyODPS 0.11.3 or later. You cannot use this method if your DataWorks environment runs an earlier version.

Procedure

  1. Grant the other user the required project permissions. For more information, see Appendix: Grant permissions to another account.

  2. In the PyODPS node, use the as_account method to switch accounts and create a new entry point object.

    import os
    # Make sure the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID,
    # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey Secret.
    # We recommend that you do not use the AccessKey ID and AccessKey Secret strings directly.
    new_odps = o.as_account(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
  3. Check whether the account was switched.

    Query the current user information by adding the following statement to your code. If the output shows the UID of the other user, you are successfully accessing MaxCompute with that account.

    print(new_odps.get_project().current_user)
    Note

    new_odps represents the entry point object for the new account.

Example

  1. Create a table and import data.

    1. Download the Iris dataset iris.data and rename it to iris.csv.

    2. Create a table named pyodps_iris and upload the iris.csv dataset. For instructions, see Create a table and upload data.

      The following is the table creation statement.

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment 'sepal length (cm)',
      sepalwidth   DOUBLE comment 'sepal width (cm)',
      petallength  DOUBLE comment 'petal length (cm)',
      petalwidth   DOUBLE comment 'petal width (cm)',
      name         STRING comment 'species'
      );
  2. Create an ODPS SQL node to grant the necessary permissions. For more information, see Appendix: Grant permissions to another account.

  3. Create a PyODPS node to switch accounts. The following code provides an example. For more information, see Develop a PyODPS 3 task.

    from odps import ODPS
    import os
    import sys
    # Make sure the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to the user's AccessKey ID,
    # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to the user's AccessKey Secret.
    # We recommend that you do not use the AccessKey ID and AccessKey Secret strings directly.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'] = '<AccessKey ID>'
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'] = '<AccessKey Secret>'
    od = o.as_account(os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), 
             os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
            )
    iris = DataFrame(od.get_table('pyodps_iris'))
    # Print the output by using a conditional statement.
    with od.execute_sql('select * from pyodps_iris WHERE sepallength > 5 ').open_reader() as reader4:
        print(reader4.raw)
        for record in reader4:
            print(record["sepallength"],record["sepalwidth"],record["petallength"],record["petalwidth"],record["name"])
    # Print the UID of the current user.
    print(od.get_project().current_user)
  4. Run the code and view the result.

    Executing user script with PyODPS 0.11.4.post0
    "sepallength","sepalwidth","petallength","petalwidth","name"
    5.4,3.9,1.7,0.4,"Iris-setosa"
    5.4,3.7,1.5,0.2,"Iris-setosa"
    5.8,4.0,1.2,0.2,"Iris-setosa"
    5.7,4.4,1.5,0.4,"Iris-setosa"
    5.4,3.9,1.3,0.4,"Iris-setosa"
    5.1,3.5,1.4,0.3,"Iris-setosa"
    5.7,3.8,1.7,0.3,"Iris-setosa"
    5.1,3.8,1.5,0.3,"Iris-setosa"
    5.4,3.4,1.7,0.2,"Iris-setosa"
    5.1,3.7,1.5,0.4,"Iris-setosa"
    5.1,3.3,1.7,0.5,"Iris-setosa"
    5.2,3.5,1.5,0.2,"Iris-setosa"
    5.2,3.4,1.4,0.2,"Iris-setosa"
    5.4,3.4,1.5,0.4,"Iris-setosa"
    5.2,4.1,1.5,0.1,"Iris-setosa"
    5.5,4.2,1.4,0.2,"Iris-setosa"
    5.5,3.5,1.3,0.2,"Iris-setosa"
    5.1,3.4,1.5,0.2,"Iris-setosa"
    5.1,3.8,1.9,0.4,"Iris-setosa"
    5.1,3.8,1.6,0.2,"Iris-setosa"
    5.3,3.7,1.5,0.2,"Iris-setosa"
    7.0,3.2,4.7,1.4,"Iris-versicolor"
    6.4,3.2,4.5,1.5,"Iris-versicolor"
    6.9,3.1,4.9,1.5,"Iris-versicolor"
    5.5,2.3,4.0,1.3,"Iris-versicolor"
    6.5,2.8,4.6,1.5,"Iris-versicolor"
    5.7,2.8,4.5,1.3,"Iris-versicolor"
    6.3,3.3,4.7,1.6,"Iris-versicolor"
    6.6,2.9,4.6,1.3,"Iris-versicolor"
    5.2,2.7,3.9,1.4,"Iris-versicolor"
    5.9,3.0,4.2,1.5,"Iris-versicolor"
    6.0,2.2,4.0,1.0,"Iris-versicolor"
    6.1,2.9,4.7,1.4,"Iris-versicolor"
    5.6,2.9,3.6,1.3,"Iris-versicolor"
    6.7,3.1,4.4,1.4,"Iris-versicolor"
    5.6,3.0,4.5,1.5,"Iris-versicolor"
    5.8,2.7,4.1,1.0,"Iris-versicolor"
    6.2,2.2,4.5,1.5,"Iris-versicolor"
    5.6,2.5,3.9,1.1,"Iris-versicolor"
    5.9,3.2,4.8,1.8,"Iris-versicolor"
    6.1,2.8,4.0,1.3,"Iris-versicolor"
    6.3,2.5,4.9,1.5,"Iris-versicolor"
    6.1,2.8,4.7,1.2,"Iris-versicolor"
    6.4,2.9,4.3,1.3,"Iris-versicolor"
    6.6,3.0,4.4,1.4,"Iris-versicolor"
    6.8,2.8,4.8,1.4,"Iris-versicolor"
    6.7,3.0,5.0,1.7,"Iris-versicolor"
    6.0,2.9,4.5,1.5,"Iris-versicolor"
    5.7,2.6,3.5,1.0,"Iris-versicolor"
    5.5,2.4,3.8,1.1,"Iris-versicolor"
    5.5,2.4,3.7,1.0,"Iris-versicolor"
    5.8,2.7,3.9,1.2,"Iris-versicolor"
    6.0,2.7,5.1,1.6,"Iris-versicolor"
    5.4,3.0,4.5,1.5,"Iris-versicolor"
    6.0,3.4,4.5,1.6,"Iris-versicolor"
    6.7,3.1,4.7,1.5,"Iris-versicolor"
    6.3,2.3,4.4,1.3,"Iris-versicolor"
    5.6,3.0,4.1,1.3,"Iris-versicolor"
    5.5,2.5,4.0,1.3,"Iris-versicolor"
    5.5,2.6,4.4,1.2,"Iris-versicolor"
    6.1,3.0,4.6,1.4,"Iris-versicolor"
    5.8,2.6,4.0,1.2,"Iris-versicolor"
    5.6,2.7,4.2,1.3,"Iris-versicolor"
    5.7,3.0,4.2,1.2,"Iris-versicolor"
    5.7,2.9,4.2,1.3,"Iris-versicolor"
    6.2,2.9,4.3,1.3,"Iris-versicolor"
    5.1,2.5,3.0,1.1,"Iris-versicolor"
    5.7,2.8,4.1,1.3,"Iris-versicolor"
    6.3,3.3,6.0,2.5,"Iris-virginica"
    5.8,2.7,5.1,1.9,"Iris-virginica"
    7.1,3.0,5.9,2.1,"Iris-virginica"
    6.3,2.9,5.6,1.8,"Iris-virginica"
    6.5,3.0,5.8,2.2,"Iris-virginica"
    7.6,3.0,6.6,2.1,"Iris-virginica"
    7.3,2.9,6.3,1.8,"Iris-virginica"
    6.7,2.5,5.8,1.8,"Iris-virginica"
    7.2,3.6,6.1,2.5,"Iris-virginica"
    6.5,3.2,5.1,2.0,"Iris-virginica"
    6.4,2.7,5.3,1.9,"Iris-virginica"
    6.8,3.0,5.5,2.1,"Iris-virginica"
    5.7,2.5,5.0,2.0,"Iris-virginica"
    5.8,2.8,5.1,2.4,"Iris-virginica"
    6.4,3.2,5.3,2.3,"Iris-virginica"
    6.5,3.0,5.5,1.8,"Iris-virginica"
    7.7,3.8,6.7,2.2,"Iris-virginica"
    7.7,2.6,6.9,2.3,"Iris-virginica"
    6.0,2.2,5.0,1.5,"Iris-virginica"
    6.9,3.2,5.7,2.3,"Iris-virginica"
    5.6,2.8,4.9,2.0,"Iris-virginica"
    7.7,2.8,6.7,2.0,"Iris-virginica"
    6.3,2.7,4.9,1.8,"Iris-virginica"
    6.7,3.3,5.7,2.1,"Iris-virginica"
    7.2,3.2,6.0,1.8,"Iris-virginica"
    6.2,2.8,4.8,1.8,"Iris-virginica"
    6.1,3.0,4.9,1.8,"Iris-virginica"
    6.4,2.8,5.6,2.1,"Iris-virginica"
    7.2,3.0,5.8,1.6,"Iris-virginica"
    7.4,2.8,6.1,1.9,"Iris-virginica"
    7.9,3.8,6.4,2.0,"Iris-virginica"
    6.4,2.8,5.6,2.2,"Iris-virginica"
    6.3,2.8,5.1,1.5,"Iris-virginica"
    6.1,2.6,5.6,1.4,"Iris-virginica"
    7.7,3.0,6.1,2.3,"Iris-virginica"
    6.3,3.4,5.6,2.4,"Iris-virginica"
    6.4,3.1,5.5,1.8,"Iris-virginica"
    6.0,3.0,4.8,1.8,"Iris-virginica"
    6.9,3.1,5.4,2.1,"Iris-virginica"
    6.7,3.1,5.6,2.4,"Iris-virginica"
    6.9,3.1,5.1,2.3,"Iris-virginica"
    5.8,2.7,5.1,1.9,"Iris-virginica"
    6.8,3.2,5.9,2.3,"Iris-virginica"
    6.7,3.3,5.7,2.5,"Iris-virginica"
    6.7,3.0,5.2,2.3,"Iris-virginica"
    6.3,2.5,5.0,1.9,"Iris-virginica"
    6.5,3.0,5.2,2.0,"Iris-virginica"
    6.2,3.4,5.4,2.3,"Iris-virginica"
    5.9,3.0,5.1,1.8,"Iris-virginica"
    <User 139xxxxxxxxxxxxx>

Troubleshooting

If your code becomes unresponsive during execution and produces no output, you can add the following comment at the beginning of your code. DataWorks then prints the stack trace of all threads every 30 seconds.

# -*- dump_traceback: true -*-
Note

This method applies to PyODPS 3 nodes, version 0.11.4.1 or later.

PyODPS version

You can check the current PyODPS version by running Python code or by viewing the runtime log of any PyODPS task.

  • Run the following code in a PyODPS node:

    # Enter the following code:
    import odps; print(odps.__version__)
    # Sample result
    0.11.2.3
  • Check the runtime log of a PyODPS task. The version is shown in the log output, as in this example:

    2023-07-26 15:39:53  INFO SKYNET_DAGTYPE=100:
    2023-07-26 15:39:53  INFO SKYNET_SYSTEM_ENV=dev:
    2023-07-26 15:39:53  INFO SKYNET_GMTDATE=20230726:
    2023-07-26 15:39:53  INFO SKYNET_BIZDATE=20230726:
    2023-07-26 15:39:53  INFO SKYNET_ENVTYPE=1:
    2023-07-26 15:39:53  INFO SKYNET_PROJECTID=342277:
    2023-07-26 15:39:53  INFO SKYNET_BUSINESS_ID=10756717:
    2023-07-26 15:39:53  INFO SKYNET_ARGS_ENABLE=true:
    2023-07-26 15:39:53  INFO SKYNET_ACCOUNT_ID=xxx
    2023-07-26 15:39:53  INFO TASK_SOURCE=dataworks_datastudio:
    2023-07-26 15:39:53  INFO SKYNET_REGION=xxx:
    2023-07-26 15:39:53  INFO LINK_FILE_ID=503440413:
    2023-07-26 15:39:53  INFO TASK_PLUGIN_NAME=pyodps:
    2023-07-26 15:39:53  INFO ALISA_UNIQUEKEY=xxx
    2023-07-26 15:39:53  INFO ALISA_TASK_ID=T3_xxx
    2023-07-26 15:39:53  INFO ALISA_TASK_EXEC_TARGET=group_599314507759873_dev:
    2023-07-26 15:39:53  INFO ALISA_TASK_PRIORITY=0:
    2023-07-26 15:39:53  INFO --- Invoking Shell command line now ---
    2023-07-26 15:39:53  INFO ============================================================
    Executing user script with PyODPS 0.11.4.post0
    "sepallength","sepalwidth","petallength","petalwidth","name"
    5.4,3.9,1.7,0.4,"Iris-setosa"
    5.4,3.7,1.5,0.2,"Iris-setosa"
    5.8,4.0,1.2,0.2,"Iris-setosa"

Appendix: Grant user permissions

To allow another Alibaba Cloud user (not a RAM user) to access projects and tables in your DataWorks workspace, you must create an ODPS SQL node and run the following authorization commands. For more information about how to create an ODPS SQL node, see Create an ODPS SQL node. For more information about permissions, see Users and permissions.

-- Add another Alibaba Cloud account user at the project level.
add user ALIYUN$<account_name>;
-- Grant the CreateInstance permission on the project.
grant CreateInstance on project <project_name> to USER ALIYUN$<account_name>;
-- Grant the Describe and Select permissions on the table.
grant Describe, Select on table <table_name> to USER ALIYUN$<account_name>;
-- View the authorization results.
show grants for ALIYUN$<account_name>;

The following is a sample authorization result from running show grants for ALIYUN$<account_name>;:

Authorization Type: ACL
[user/ALIYUN$:xxx.com]
A    projects/mc_xxx : CreateInstance
A    projects/mc_xxx/tables/pyodps_xxx : Describe | Select

Appendix: Sample data

To follow the examples in this topic, first create the pyodps_iris table and populate it with data by following the instructions in Step 1: Create a table and import data. This table is used to demonstrate basic operations.

Related topics

Use the Operating history feature to view the history of all tasks run in DataStudio in the last three days. From this view, you can stop running tasks and save SQL statements from task records as temporary files. For more information, see Operating history.