场景实践

MaxCompute提供用户自定义函数(UDF)及Python(PyODPS和MaxFrame)开发能力,本文为您介绍如何在MaxCompute UDF、PyODPS及MaxFrame作业开发中使用镜像。

在SQL UDF开发中使用镜像

以下以使用Pandas实现一个列求和的UDF为例,为您介绍如何在SQL UDF开发中使用镜像。

  1. 编写Python UDF脚本,并将其保存为sum_pandas.py文件。脚本示例如下:

    from odps.udf import annotate
    import pandas as pd
    
    @annotate("string, string -> string")
    class SumColumns(object):
        def evaluate(self, arg1, arg2):
            # 将输入参数转换为pandas DataFrame
            df = pd.DataFrame({'col1': arg1.split(','), 'col2': arg2.split(',')})
    
            # 使用pandas进行数据处理操作
            # 这里以计算两列的和为例
            df['sum'] = df['col1'].astype(int) + df['col2'].astype(int)
    
            # 将处理结果转换为字符串并返回
            result = ','.join(df['sum'].astype(str).values)
            return result
  2. sum_pandas.py脚本以资源形式上传至MaxCompute项目空间,详情请参见添加资源。命令示例如下:

    ADD PY sum_pandas.py -f;
  3. 将已上传的sum_pandas.py脚本注册为自定义函数SumColumns,具体操作请参见注册函数。命令示例如下:

    CREATE FUNCTION SumColumns AS 'sum_pandas.SumColumns' USING 'sum_pandas.py';
  4. 准备测试表testsum及测试数据。

    CREATE TABLE testsum (col1 string, col2 string);
    INSERT INTO testsum VALUES ('1,2,3','1,2,3'),('1,2,3','3,2,1'),('1,2,3','4,5,6');
  5. 调用UDF函数时通过Flag指定已存在的镜像。

    set odps.sql.python.version=cp37;
    set odps.session.image = <镜像名称>;
    SELECT SumColumns(col1,col2) AS result FROM testsum;

    返回结果:

    +------------+
    | result     |
    +------------+
    | 2,4,6      |
    | 4,4,4      |
    | 5,7,9      |
    +------------+

在PyODPS开发中使用镜像

以下以实现scipy包中的psi函数为例,为您介绍如何在PyODPS中使用镜像。

  1. 准备测试表test_float_col及测试数据。

    CREATE TABLE test_float_col (col1 double);
    INSERT INTO test_float_col VALUES (3.75),(2.51);
  2. 编写PyODPS代码,计算psi(col1)的值,并保存为psi_col.py文件执行。代码示例如下:

    import os
    from odps import ODPS, options
    
    def my_psi(v):
        from scipy.special import psi
    
        return float(psi(v))
    
    # 如果 Project 开启了 Isolation,下面的选项不是必需的
    options.sql.settings = {"odps.isolation.session.enable": True}
    
    o = ODPS(
          # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
          # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
          # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point'
    )
    
    df = o.get_table("test_float_col").to_df()
    # 直接执行并取得结果
    df.col1.map(my_psi).execute(image='scipy')
    # 保存到另一张表
    df.col1.map(my_psi).persist("result_table", image='scipy')

    参数说明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。

    • your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。

    • your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。详情请参见Endpoint

  3. 查看结果表result_table。

    SELECT * FROM result_table

    返回结果:

    +------------+
    | col1       |
    +------------+
    | 1.1825373886117962 |
    | 0.7080484451910534 |
    +------------+

在MaxFrame开发中使用镜像

以下以实现scipy包中的psi函数为例,为您介绍如何在MaxFrame作业中使用镜像。

  1. 准备测试表test_float_col及测试数据。

    CREATE TABLE test_float_col (col1 double);
    INSERT INTO test_float_col VALUES (3.75),(2.51);
  2. 编写MaxFrame代码,计算psi(col1)的值,并保存为psi_col.py文件执行。代码示例如下:

    import os
    from odps import ODPS, options
    from maxframe.session import new_session
    import maxframe.dataframe as md
    
    
    from maxframe.config import options
    from maxframe import config
    
    # 引用内置scipy镜像
    config.options.sql.settings = {
        "odps.session.image": "scipy"
    }
    def my_psi(v):
        from scipy.special import psi
        return float(psi(v))
    
    o = ODPS(
          # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
          # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
          # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point'
    )
    
    # 创建MaxFrame Session
    session = new_session(o)
    df = md.read_odps_table('test_float_col')
    
    # 直接执行并取得结果
    print(df.col1.map(my_psi).execute().fetch())

    参数说明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。

    • your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。

    • your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。详情请参见Endpoint

    返回结果:

    0    1.182537
    1    0.708048
    Name: col1, dtype: float64