Notebook开发Spark作业

更新时间:2025-04-18 09:44:30

Notebook开发是一个交互式数据分析开发平台,提供了作业编辑、数据分析、数据可视化等功能。如果您需要使用Notebook开发Spark SQL作业,可以参考本文档,使用DMSNotebook功能完成作业开发。

前提条件

使用限制

目前仅部分地域支持Notebook功能,包括:华东1(杭州)、华北2(北京)、华东2(上海)、华南1(深圳)地域。

操作步骤

  1. 创建项目并进入项目空间。

    1. 登录数据管理DMS 5.0
    2. 单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > Data+AI > Notebook

      说明

      若您使用的是非极简模式的控制台,请在顶部菜单栏中,选择Data+AI > Notebook

    3. 单击新建项目空间,在对话框中填入项目空间名地域等信息,单击确认

    4. 单击操作列的进入空间,进入目标项目空间。

  2. (可选)添加项目成员。如果项目空间有多个用户参与,则需要完成此步骤,给用户授予不同的项目空间角色。

  3. 配置代码存储空间。

    1. 在项目设置image页签内,单击存储配置

    2. 配置代码存储OSS路径。

  4. 添加资源。

    1. 在项目设置image页签内,单击资源配置

    2. 单击添加资源,配置资源的相关信息。

      参数

      是否必填

      说明

      参数

      是否必填

      说明

      资源名称

      资源名称。您可以自定义。

      资源简介

      填写该资源的使用用途。您可以自定义。

      镜像

      仅支持选择Spark3.5+Python3.9镜像。

      ADB实例

      AnalyticDB for MySQL集群ID。

      说明

      如果搜索不到目标集群,请检查集群是否录入至DMS

      ADB资源组

      选择目标Job型资源组。

      Executor规格

      选择Spark Executor的资源规格。本文以默认值medium规格为例。

      不同型号的取值对应不同的规格,详情请参见Spark应用配置参数说明的型号列。

      Executor上限

      Executor下限

      Spark Executor数量。

      选择Spark3.5+Python3.9镜像后,Executor下限默认为2,Executor上限默认为8。

      Notebook规格

      选择Notebook规格,本文以General_Tiny_v1(14 GB)为例。

      VPC ID

      选择AnalyticDB for MySQL所属的VPC,以保证NotebookAnalyticDB for MySQL可以正常通信。

      重要

      如果后续AnalyticDB for MySQL切换了新的专有网络和交换机,则需要将资源中的VPC IDVSwitch ID修改为新的专有网络和交换机,否则作业会提交失败。

      Zone ID

      选择AnalyticDB for MySQL所在的可用区。

      VSwitch ID

      选择AnalyticDB for MySQL所属的交换机。

      安全组ID

      选择可用的安全组,以保证NotebookAnalyticDB for MySQL可以正常通信。

      资源释放

      资源的闲置状态到达指定时长后,将自动释放。

      依赖的Jars

      Jar包的OSS存储路径。仅通过Python提交作业且使用了Jar包时,填写该参数。

      SparkConf

      与开源Spark中的配置项基本一致,参数格式为key: value形式。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

    3. 单击保存

    4. 单击目标资源右侧操作列的启动,启动资源。

  5. 初始化数据。

    1. 单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > 数据资产 > 实例管理

    2. 单击+新增,在弹出的新增实例对话框中配置如下参数:

      参数

      说明

      参数

      说明

      数据来源

      阿里云页签下,选择OSS

      基本信息

      文件及日志

      固定选择OSS

      实例地区

      选择AnalyticDB for MySQL集群所在地域。

      录入方式

      固定选择连接串地址

      连接串地址

      固定填写oss-cn-hangzhou.aliyuncs.com

      Bucket

      选择Bucket名称。

      访问方式

      访问方式,本文以安全托管-手动为例。

      AccessKey ID

      阿里云账号或具备OSS访问权限的RAM用户的AccessKey ID。

      如何获取AccessKey IDAccessKey Secret,请参见账号与权限

      AccessKey Sercert

      阿里云账号或具备OSS访问权限的RAM用户的AccessKey Secret。

      如何获取AccessKey IDAccessKey Secret,请参见账号与权限

      高级信息

      选填参数。参数说明,请参见高级信息

    3. 填写完成以上信息后,单击左下角的测试连接

      说明

      如果测试连接失败,请按照报错提示检查您录入的实例信息。

    4. 出现连接成功提示后,单击提交

    5. 进入项目空间,单击image页签下。

    6. 湖仓数据页签下,单击添加OSS,选择步骤b新增的Bucket。

  6. 创建Notebook。

    在文件image页签下,单击image,选择Notebook

    image

  7. Notebook页面开发Spark SQL作业。

    说明

    Notebook页面中各个按钮的详细介绍,请参见Notebook界面介绍

    1. 执行以下命令,下载Python依赖:

      pip install delta
    2. Cell类型切换为SQL,执行以下语句新建数据库。

      说明

      步骤b创建的db_delta库和步骤c创建的sample_data外表会在AnalyticDB for MySQL中自动显示,后续您可在AnalyticDB for MySQL控制台上分析sample_data表。

      image

      CREATE DATABASE db_delta 
      LOCATION 'oss://testBucketName/db_delta/';    -- 指定db_delta库中数据的存储路径
    3. Cell类型切换为Code,执行以下代码,创建sample_data外表并插入数据。sample_data外表数据会存储在步骤b指定的OSS路径中。

      # -*- coding: utf-8 -*-
      
      import pyspark
      from delta import *
      from pyspark.sql.types import *
      from pyspark.sql.functions import *
      
      
      print("Starting Delta table creation")
      
      data = [
          ("Robert", "Baratheon", "Baratheon", "Storms End", 48),
          ("Eddard", "Stark", "Stark", "Winterfell", 46),
          ("Jamie", "Lannister", "Lannister", "Casterly Rock", 29),
          ("Robert", "Baratheon", "Baratheon", "Storms End", 48),
          ("Eddard", "Stark", "Stark", "Winterfell", 46),
          ("Jamie", "Lannister", "Lannister", "Casterly Rock", 29),
          ("Robert", "Baratheon", "Baratheon", "Storms End", 48),
          ("Eddard", "Stark", "Stark", "Winterfell", 46),
          ("Jamie", "Lannister", "Lannister", "Casterly Rock", 29)
              ]
      
      schema = StructType([
          StructField("firstname", StringType(), True),
          StructField("lastname", StringType(), True),
          StructField("house", StringType(), True),
          StructField("location", StringType(), True),
          StructField("age", IntegerType(), True)
      ])
      
      sample_dataframe = spark.createDataFrame(data=data, schema=schema)
      
      sample_dataframe.write.format('delta').mode("overwrite").option('mergeSchema','true').saveAsTable("db_delta.sample_data")
    4. Cell类型切换为SQL,执行以下语句,查询sample_data表中的数据。

      SELECT * FROM db_delta.sample_data;
  8. 如果您想在AnalyticDB for MySQL控制台中使用Spark SQL分析sample_data表,可执行以下步骤。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。

    2. 在左侧导航栏单击作业开发 > SQL开发页面,选择Spark引擎和Interactive型资源组。

    3. 查询sample_data数据。

      SELECT * FROM db_delta.sample_data LIMIT 1000;

相关文档

Notebook:了解更多关于Notebook的信息。

  • 本页导读 (1)
  • 前提条件
  • 使用限制
  • 操作步骤
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等