Notebook 进阶开发

更新时间:
复制为 MD 格式

本文介绍如何通过代码复用、数据挂载、参数管理等工程化手段提升开发效率,以及连接 MaxCompute Spark、EMR Serverless Spark 及 AnalyticDB for Spark 等多种计算引擎的实践技巧与调试方法。

说明

推荐您优先阅读Notebook 基础开发

理解开发与生产环境的差异

DataWorks Notebook 的核心定位是可被调度执行的开发和分析工具。这意味着它存在两种运行环境:

  • 开发环境:在Data StudioNotebook 节点编辑页面,直接点击运行单元格,代码在个人开发环境实例中执行。此环境为快速验证、调试代码逻辑而设计。

  • 生产环境:Notebook 节点被提交并发布后,通过周期调度或补数据等触发执行。代码在一个独立的、临时的任务实例中运行。此环境为稳定、可靠地执行生产任务而设计。

这两种环境在功能支持上存在显著差异,提前理解这些差异是高效开发的关键。

开发与生产环境能力差异速查表

功能点

开发环境(运行单元格)

生产环境(周期调度/补数据等)

引用项目资源 (.py)

  • 首次引用:自动下载并生效。

  • 更新后:您需要点击工具栏的重启按钮,以重新加载已更新的 .py 模块。

    重要

    需在Data Studio设置中,修改资源冲突处理策略Dataworks › Notebook › Resource Reference: Download StrategyautoOverwrite。

自动生效。

读写数据集 (OSS/NAS)

需在个人开发环境中挂载数据集。

需在调度配置中挂载数据集。

引用工作空间参数 (${...})

生效,在代码执行前自动进行文本替换。

生效,在任务执行前自动进行文本替换。

Spark 会话管理

Spark会话的默认闲置释放时间为2小时,2小时内无新的代码执行将自动释放。

任务实例级短会话,随任务实例执行而自动创建与销毁。

在生产环境复用代码与数据

引用项目资源(.py 文件)

将通用的函数或类封装在独立的 .py 文件中,通过##@resource_reference{"自定义名.py"}引用MaxCompute资源的方式,实现代码的模块化与复用,提升代码的可维护性。

  1. 创建并发布 Python 资源

    1. 在 DataWorks Data Studio左侧导航栏单击image,进入资源管理

    2. 在资源管理目录树下,右键单击目标目录或单击右上角的 + ,选择新建资源 > MaxCompute Python,命名为 my_utils.py

    3. 文件内容处,点击在线编辑,将您调试好的工具函数代码粘贴到代码编辑框并保存

      # my_utils.py
      def greet(name):
          return f"Hello, {name} from resource file!"
    4. 单击工具栏中的保存,然后发布该资源,使其对开发和生产任务可见

  2. 在 Notebook 中引用资源

    在 Notebook 的Python 单元格第一行,使用 ##@resource_reference 语法引用已发布的资源。

    ##@resource_reference{"my_utils.py"}
    # 如果资源在目录下,比如 my_folder/my_utils.py,也是通过##@resource_reference{"my_utils.py"},无需带目录名称。
    from my_utils import greet
    
    message = greet('DataWorks')
    print(message)
  3. 开发环境调试运行

    运行Python单元格,将打印如下结果:

    Hello, DataWorks from resource file!
    重要

    在开发环境调试运行时,如果识别到代码中包含##@resource_reference,系统将会把资源管理中的目标文件自动下载至个人目录的workspace/_dataworks/resource_references路径中,从而进行目标文件的引用。

    若报错ModuleNotFoundError,点击编辑器上方工具栏的重启按钮重新加载资源后再重试。

  4. 发布至生产环境并验证

    保存发布此 Notebook 节点后,前往运维中心 > 周期任务单击测试运行。任务成功后,您将在日志中看到输出 Hello, DataWorks from resource file!

    重要

    若报错There is no file with id ...,请先将Python资源发布至生产环境。

更多操作,请参见MaxCompute资源与函数

读写数据集(OSS/NAS)

在 Notebook 任务运行时,方便地读写存储在 OSS 或 NAS 上的大规模文件数据。

开发环境调试

  1. 挂载数据集:进入个人开发环境详情页,在存储配置 > 数据集中配置数据集。

    image

  2. 在代码中访问:数据集将被挂载到个人开发环境的挂载路径下,在代码中直接读写该路径。

    # 假设已在个人开发环境设置了挂载路径为/mnt/data/dataset的数据集
    import pandas as pd
    
    # 直接使用挂载路径
    file_path = '/mnt/data/dataset/testfile.csv'
    df = pd.read_csv(file_path)
    
    # 使用 PyODPS 将数据写入 MaxCompute
    o = %odps
    o.write_table('mc_test_table', df, overwrite=True)
    print(f"成功将数据写入 MaxCompute 表 mc_test_table。")

生产环境部署

  1. 挂载数据集:在 Notebook 节点编辑页面的右侧导航栏,进入调度配置 > 调度策略,添加上述数据集。

    image

  2. 在代码中访问:提交发布之后,数据集将被挂载到生产环境的挂载路径下,在代码中直接读写该路径。

    # 假设已在个人开发环境设置了挂载路径为/mnt/data/dataset的数据集
    import pandas as pd
    
    # 直接使用挂载路径
    file_path = '/mnt/data/dataset/testfile.csv'
    df = pd.read_csv(file_path)
    
    # 使用 PyODPS 将数据写入 MaxCompute
    o = %odps
    o.write_table('mc_test_table', df, overwrite=True)
    print(f"成功将数据写入 MaxCompute 表 mc_test_table。")
更多操作,请参见在个人开发环境中使用数据集

引用工作空间参数

重要

仅支持DataWorks专业版及以上。

DataWorks 在已有调度参数的基础上引入工作空间参数,主要是为了解决跨任务、跨节点的全局配置复用与环境隔离问题。可在SQL单元格和Python单元格内以${workspace.param}格式来引用工作空间参数,其中param是您创建的工作空间参数名称。

1、创建工作空间参数:使用前,需前往 DataWorks 运维中心 > 调度设置 > 工作空间参数 页面创建所需参数。

2、引用工作空间参数

  • SQL单元格引用工作空间参数。

    SELECT '${workspace.param}';

    查询工作空间参数,运行成功后,将打印输出工作空间参数的具体赋值。

  • Python单元格引用工作空间参数。

    print('${workspace.param}')

    输出工作空间参数,运行成功后,将打印输出工作空间参数的具体赋值。

更多详情,请参见使用工作空间参数

Magic Command与计算引擎交互

Magic Command 是以 %%% 开头的特殊命令,用于简化Python单元格与各类计算资源的交互。

连接 MaxCompute

说明

在建立MaxCompute计算资源连接前,请确保已绑定MaxCompute计算资源

  • %odps:获取 PyODPS 入口对象

    此命令返回一个与当前MaxCompute项目绑定、已认证的 PyODPS 对象,避免在代码中硬编码 AccessKey,是与 MaxCompute 交互的推荐方式。

    1. 使用Magic Command创建MaxCompute连接。 输入%odps,右下角会出现MaxCompute的计算资源选择器入口(并自动选择计算资源)。点击右下角的MaxCompute项目名称,可更换MaxCompute项目。

      o=%odps 
    2. 使用获取到的MaxCompute计算资源运行PyODPS脚本。

      例如,获取当前项目下的所有表:

      with o.execute_sql('show tables').open_reader() as reader:
          print(reader.raw)
  • %maxframe:建立 MaxFrame 连接

    此命令用于创建 MaxFrame 会话,为 MaxCompute 提供类似 Pandas 的分布式数据处理能力。

    # 连接并访问MaxCompute MaxFrame Session
    mf_session = %maxframe
    
    df = mf_session.read_odps_table('your_mc_table')
    print(df.head())
    
    # 开发调试结束后,应手动销毁会话以释放资源
    mf_session.destroy()

连接 Spark 计算资源

DataWorks Notebook 支持连接多种 Spark 引擎。不同引擎在连接方式、执行上下文和资源管理上存在差异。

重要

同一个Notebook节点,仅支持使用Magic Command连接一种计算资源。

引擎对比

特性

MaxCompute Spark

EMR Serverless Spark

AnalyticDB for Spark

连接命令

%maxcompute_spark

%emr_serverless_spark

%adb_spark add

说明

执行后,整个 Notebook 内核的执行上下文会切换至远端的 PySpark 环境。后续单元格可直接编写 PySpark 代码。

前提条件

绑定 MaxCompute 资源

绑定 EMR 计算资源 并创建 Livy Gateway。

绑定 ADB Spark 计算资源。

开发环境模式

自动创建/复用 Livy 会话。

连接已有的 Livy Gateway,创建会话。

自动创建/复用 Spark Connect Server。

生产环境模式

Livy模式:通过 Livy 服务提交 Spark 作业。

spark-submit 批处理模式:纯批处理,不保留会话状态。

Spark Connect Server 模式:通过 Spark 连接服务进行交互。

生产资源释放

任务实例结束后自动释放会话。

任务实例结束后自动清理资源。

任务实例结束后自动释放资源。

适用场景

与 MaxCompute 生态紧密集成的通用批处理、ETL 任务。

需要灵活配置、与开源大数据生态(如 Hudi, Iceberg)交互的复杂分析任务。

针对 ADB for MySQL C-Store 表的高性能交互式查询与分析。

MaxCompute Spark

说明

在建立MaxCompute计算资源连接前,请确保已绑定MaxCompute计算资源

通过 Livy 连接到 MaxCompute 项目内置的 Spark 引擎。

  1. 建立连接:在 Python 单元格中运行以下命令。系统将自动创建或复用 Spark 会话。

    # 创建Spark Session,并停止Livy。
    %maxcompute_spark
  2. 执行 PySpark 代码:连接成功后,在新的 Python 单元格中,使用 %%spark Cell Magic 执行 PySpark 代码。

    # 在使用MaxCompute Spark时,Python单元格中必须以%%spark作为首行代码
    %%spark
    
    df = spark.sql("SELECT * FROM your_mc_table LIMIT 10")
    df.show()
  3. 手动释放连接:在开发调试结束后,可手动停止或删除会话。在生产环境运行时,系统将自动停止并删除当前任务实例的Livy,无需手动处理。

    # 清理Spark Session,并停止Livy。
    %maxcompute_spark stop
    
    # 清理Spark Session,并停止Livy,最后删除Livy
    %maxcompute_spark delete

EMR Serverless Spark

说明

在建立计算资源连接前,请先在工作空间绑定EMR Serverless Spark计算资源,并创建Livy Gateway

通过连接您预先创建的 Livy Gateway 与 EMR Serverless Spark 交互。

  1. 建立连接:在执行命令前,需在单元格右下角选择要连接的 EMR 计算资源Livy Gateway

    # 基础连接
    %emr_serverless_spark
    
    # 或在连接时传入自定义 Spark 参数,注意需要自定义Spark参数时,需要写两个百分号
    %%emr_serverless_spark
    {
      "spark_conf": {
        "spark.emr.serverless.environmentId": "<EMR Serverless Spark 运行环境ID>",
        "spark.emr.serverless.network.service.name": "<EMR Serverless Spark 网络连接ID>",
        "spark.driver.cores": "1",
        "spark.driver.memory": "8g",
        "spark.executor.cores": "1",
        "spark.executor.memory": "2g",
        "spark.driver.maxResultSize": "32g"
      }
    }
    说明

    自定义参数与全局配置的关系

    • 默认行为:您在此处定义的自定义参数仅对本次连接(Session)生效,是一次性的。如果您未提供自定义参数,系统将自动使用在管理中心中配置的全局参数。

    • 推荐用法:对于需要在多个任务或被多人复用的配置,建议在管理中心 > Serverless Spark > SPARK参数中进行全局配置,以保证一致性并方便统一管理。

    • 优先级规则:当同一个参数在自定义参数和全局配置中都被设置时,哪个最终生效,取决于管理中心中的局配置是否优先选项:

      • 勾选:全局配置将覆盖本次的自定义参数。

      • 不勾选:本次的自定义参数将覆盖全局配置。

  2. (可选)重新连接:Livy gateway页面的token被管理员误删后可通过该命令重新创建。

    # 重新连接,刷新当前个人开发环境的livy token
    %emr_serverless_spark refresh_token
  3. 执行 PySpark 或 SQL 代码:连接成功后,内核已切换。您可以在 Python 单元格 中直接编写 PySpark 代码,或在 EMR Spark SQL 单元格 中编写 SQL。

    1. 通过EMR Spark SQL向计算资源提交并执行SQL代码

      经过%emr_serverless_spark成功建立连接后,可在EMR Spark SQL Cell中,直接写SQL语句,无需在单元格中选择计算资源。

      EMR Spark SQL Cell将复用%emr_serverless_spark的连接,提交至目标计算资源中运行。

      image

    2. 通过Python向计算资源提交并执行PySpark代码

      通过 %emr_serverless_spark 成功建立连接后,可以在新的Python单元格中提交和执行PySpark代码,无需在单元格中添加%%spark前缀。

      image

  4. 手动释放连接

    重要

    在多人共享一个 Livy Gateway 的情况下,`stop` 或 `delete` 命令会影响所有正在使用该网关的用户,请谨慎操作。

    # 清理Spark Session,并停止Livy。
    %emr_serverless_spark stop
    
    # 清理Spark Session,并停止Livy,最后删除Livy
    %emr_serverless_spark delete

AnalyticDB for Spark

说明

在建立计算资源连接前,请先在工作空间绑定AnalyticDB for Spark计算资源

通过创建 Spark Connect Server 连接到 AnalyticDB for Spark 引擎。

  1. 建立连接:为确保网络连通,必须在连接参数中正确配置交换机 ID 和安全组 ID。 在执行命令前,需在单元格右下角选择 ADB Spark 计算资源

    # 必须配置交换机ID和安全组ID以建立网络连接
    %adb_spark add \
     --spark-conf spark.adb.version=3.5 \
     --spark-conf spark.adb.eni.enabled=true \
     --spark-conf spark.adb.eni.vswitchId=<ADB的交换机ID> \
     --spark-conf spark.adb.eni.securityGroupId=<个人开发环境的安全组ID>

    如何查找交换机ID和安全组ID?

    • 交换机ID (vswitchId):前往阿里云AnalyticDB MySQL控制台,在实例详情页查看网络信息交换机ID

    • 安全组ID (securityGroupId):进入个人开发环境详情页的网络设置中所选安全组,查看以sg-开头的安全组ID。

      重要

      为确保网络连通,创建个人开发环境时,建议选择与您的AnalyticDB for Spark实例相同的VPC和交换机。

  2. 执行 PySpark 代码:连接成功后,在新的 Python 单元格中,执行 PySpark 代码。

    # 只能对 C-Store 表执行操作
    df = spark.sql("SELECT * FROM my_adb_cstore_table LIMIT 10")
    df.show()
    说明:AnalyticDB for Spark 引擎当前只能处理具有 'storagePolicy'='COLD' 属性的 C-Store 表。
  3. 手动释放连接:在开发环境调试结束后,手动清理连接会话以节约资源。生产环境运行时,系统将自动清理资源。

    %adb_spark cleanup

连接 Lindorm Ray 计算资源

Lindorm计算引擎的RAY资源组提供分布式计算服务,支持AI负载端到端处理。通过 Magic Command,在 Notebook 中无缝连接 Lindorm Ray 资源,进行交互式开发、调试,并将其发布为生产调度任务。

在开始之前,请确保您已完成以下准备工作(点击展开)。

  • 购买Lindorm实例时,请在开通计算引擎中勾选

  • 已将您的 Lindorm 集群添加为 DataWorks 计算资源,详情请参见绑定Lindorm计算资源

  • 已在 Lindorm 控制台为您的集群开通RAY资源组。创建时,请务必在高级配置中指定正确的镜像,以确保环境一致性。

    如何配置 Ray 资源组镜像?

    在 Lindorm 控制台创建 Ray 资源组时,找到高级配置,填入以下 JSON 内容。请注意,您需要将镜像地址中的 region 替换为您 Lindorm 集群所在的地域,例如将 beijing 替换为 shanghai

    {
      "IMAGE": "spark-repo-beijing-registry-vpc.cn-beijing.cr.aliyuncs.com/lindorm-compute/ray:2.39.0-0.7.0-py311-cpu"
    }
  • 确保您的个人开发环境、Serverless资源组网络配置与 Lindorm 集群处于同一个 VPC 网络中,以保障网络连通性。

  1. 建立连接:在 Python 单元格中运行 %lindorm_ray 命令。运行后,单元格右下角将出现计算资源选择器,请在此处选择您的Lindorm 计算资源和创建好的Ray 资源组

    # 连接到指定的Lindorm Ray资源组
    %lindorm_ray
    重要
    • 连接Lindorm Ray计算资源后,同一个 Notebook将不再支持运行SQL单元格。Lindorm Ray 引擎专注于执行 Python 和 Ray 代码。

    • 当您多次运行同一个代码单元格时,系统会自动终止上一个正在运行的 Ray Job,并启动一个新的 Job。这可以有效避免资源浪费和任务冲突。

  2. 执行 Ray 代码:连接成功后,在新的 Python 单元格中直接编写和执行 Ray 代码。日志会实时回传到单元格的输出区域,方便您进行交互式调试。

    以下示例定义了一个简单的远程任务(使用 @ray.remote 装饰器),它会在 Ray 集群上执行,并将日志和最终结果返回到单元格的输出区。

    import ray
    import time
    
    @ray.remote
    def hello_world():
      print("Hello from Lindorm Ray!")
      time.sleep(5)
      return "Task finished."
    # 提交远程任务
    result_ref = hello_world.remote()
    print(ray.get(result_ref))
  3. (可选)自定义启动参数:如需为 Ray 环境指定额外的配置,例如安装第三方 Python 包或上传本地代码文件,可以使用 %%lindorm_ray 命令建立连接。

    • 示例1:安装依赖包

      通过 pip 参数在 Ray 环境中安装 jieba 包。

      %%lindorm_ray
      {
        "runtime_env": {
          "pip": ["jieba"]
        }
      }

      环境准备就绪后,在后续的 Ray 任务中导入并使用该包。以下示例演示如何在远程函数中调用jieba进行中文分词:

      import ray 
      
      @ray.remote
      def do_work(x):
          import jieba
      
          return "/".join(jieba.cut(x))
      
      print(ray.get(do_work.remote("欢迎使用DataWorks+LindormRay解决方案")))
    • 示例2:上传并使用DataWorks资源

      通过 working_dir 参数,将 DataWorks 资源管理中已上传的资源进一步上传到 Ray 集群,使其可以在任务中被导入和调用。

      重要
      • 使用 working_dir 上传资源时,文件会直接从您的开发环境上传到 Ray 集群,存在 100MB 的大小限制。如果资源包过大,可能导致上传失败或 Ray 节点不稳定。

      • 对于体积较大的资源文件或依赖(超过100MB),建议您先将其上传到 OSS,然后在代码中从 OSS 下载和读取,或者直接将其打包到自定义镜像中。这能提供更好的稳定性和性能。

      # 引用Data Studio资源管理中上传的资源,声明路径
      %%lindorm_ray
      {
          "runtime_env": {
              "working_dir": "/mnt/workspace/_dataworks/resource_references"
          }
      }

      假设在DataWorksData Stuido资源管理中上传了一个ray_resource.py文件,编写并执行以下单元格时,系统会自动解析后续代码中的##@resource_reference 声明,并将下载相应资源到 /mnt/workspace/_dataworks/resource_references路径。

      ray_resource.py的代码内容示例如下:

      def fun():
          print("This is a test function in ray_resource.py")
      重要

      在开发环境中,当执行完包含##@resource_reference的单元格后,您需要重新运行上面的%%lindorm_ray单元格,会将下载好的资源包含在working_dir中上传到Ray集群。在生产环境中,您无需重新运行。

      import ray 
      
      ##@resource_reference{"ray_resource.py"}
      
      @ray.remote
      def do_work(x):
          print('Ray says:', x)
      
          from ray_resource import fun
          fun()
          return x
      
      worker = do_work.remote("欢迎使用DataWorks+LindormRay解决方案")
      print(ray.get(worker))
  4. 生产调度与运维:开发调试完成后,您可以将此 Notebook 节点提交发布,它将作为一个 Lindorm Ray 节点在 DAG 中进行周期性调度。

    • 参数化:您的代码可以正常使用 DataWorks 的标准调度参数,例如 ${bizdate}

    • 日志查看:在生产环境中,为了避免日志过多影响性能,系统默认只加载前 1MB 的日志。如果日志被截断,输出结果中会提供一个链接,引导您跳转至 Lindorm 控制台查看完整的任务日志。

    • 资源释放:生产调度任务结束后,Lindorm Ray任务的状态会变为终态,不再占用资源;交互式开发时,你可以通过重启Kernel、关闭NotebookLindorm Ray的任务变为终态。

附录:Magic Command 速查表

Magic Command

Magic Command含义

适用计算引擎

o = %odps

获取 PyODPS 入口对象

MaxCompute

mf_session = %maxframe

建立 MaxFrame 连接

%maxcompute_spark

创建Spark Session

MaxCompute Spark

%maxcompute_spark stop

清理Spark Session,并停止Livy。

%maxcompute_spark delete

清理Spark Session,停止并删除Livy。

%%spark

Python单元格中,连接已创建的Spark计算资源。

%emr_serverless_spark

创建Spark Session

EMR Serverless Spark

%emr_serverless_spark info

查看Livy Gateway的详细信息。

%emr_serverless_spark stop

清理Spark Session,并停止Livy。

%emr_serverless_spark delete

清理Spark Session,停止并删除Livy。

%emr_serverless_spark refresh_token

刷新个人开发环境的Livy Token

%adb_spark add

创建并连接到一个可复用的ADB Spark会话(Session)。

AnalyticDB for Spark

%adb_spark info

查看Spark Session信息.

%adb_spark cleanup

停止并清理当前的Spark连接会话。

%lindorm_ray

建立 Lindorm Ray 连接。

Lindorm Ray

%%lindorm_ray

建立 Lindorm Ray 连接,并配置自定义运行时环境(如安装依赖、上传代码)。

常见问题

  • Q:引用工作空间资源时,报错ModuleNotFoundErrorThere is no file with id ...

    A:请按照以下步骤检查。

    • 请前往数据开发 > 资源管理,确保MaxCompute Python资源已保存;如果是生产环境报该错误,需确认资源已发布至生产环境。

    • 请点击Notebook编辑器上方工具栏的重启按钮,尝试重新加载资源。

  • Q:当我更新工作空间资源后,为什么还是引用旧资源?

    A:当对资源修改重新发布时,需在Data Studio设置中,修改资源冲突处理策略Dataworks › Notebook › Resource Reference: Download StrategyautoOverwrite,并在Notebook顶部工具栏点击重启Kernel

    image

  • Q:引用数据集时,开发环境报错FileNotFoundError

    A:请确保已在当前选择的个人开发环境中挂载数据集。

    image

  • Q:引用数据集时,开发环境成功,但生产环境报错Execute mount dataset exception! Please check your dataset config

    A:请确保已在Notebook节点的调度配置挂载数据集,并给OSS数据集完成授权

    image

  • Q:如何查看个人开发环境的版本?

    A:进入个人开发环境后,通过快捷键CMD+SHIFT+P,并输入ABOUT查看当前版本。若随着产品功能更新与迭代,需要0.5.69及以上版本的个人开发环境实例,可通过界面升级提示弹窗执行一键升级

  • Q:连接 Spark 引擎失败?

    A:请按照以下步骤检查。

    • 通用检查:前往工作空间详情的计算资源列表,确认对应的计算资源(MaxCompute/EMR/ADB)是否已正确绑定到当前工作空间,且您的账号具备相应权限。

    • EMR Serverless Spark:检查 Livy Gateway 是否已创建且状态正常。

    • AnalyticDB for Spark:重点排查网络问题。确认 vswitchIdsecurityGroupId 配置正确,确保个人开发环境与 ADB Spark 实例之间网络互通。检查安全组规则是否允许必要的端口通信。