在EMR Serverless Spark中实现MongoDB读写操作

更新时间:2025-03-31 10:03:38

基于MongoDB官方提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接MongoDB。本文为您介绍在EMR Serverless Spark环境中实现MongoDB的数据读取和写入操作。

前提条件

使用限制

Serverless Spark以下引擎版本支持本文操作:

  • esr-4.x:esr-4.1.0及之后版本。

  • esr-3.x:esr-3.1.0及之后版本。

  • esr-2.x:esr-2.5.0及之后版本。

操作流程

步骤一:获取MongoDB Spark Connector JAR并上传至OSS

  1. 根据MongoDB Spark Connector官方文档,结合SparkMongoDB的版本,从Maven下载所需的依赖包,样例中的5.0.1MongoDB的版本号,所需JAR包如下所示:

    • mongo-spark-connector_2.12-10.4.1.jar

    • mongodb-driver-core-5.0.1.jar

    • mongodb-driver-sync-5.0.1.jar

    • bson-5.0.1.jar

  2. 将下载的MongoDB Spark Connector JAR上传至阿里云OSS中,上传操作可以参见简单上传

步骤二:创建网络连接

Serverless Spark需要能够打通与MongoDB之间的网络才可以正常访问MongoDB。更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通

重要

配置安全组规则时,端口范围请根据实际需求选择性开放必要的端口。端口范围的取值为1-65535。

步骤三:Serverless Spark读取MongoDB

  1. 创建Notebook会话,详情请参见管理Notebook会话

    创建会话时,在引擎版本下拉列表中选择支持本文操作的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在Spark配置中添加以下参数来加载MongoDB Spark Connector。

    spark.mongodb.write.connection.uri                mongodb://<MongoDB地址>:27017
    spark.mongodb.read.connection.uri                 mongodb://<MongoDB地址>:27017
    spark.user.defined.jars                           oss://<bucketname>/path/to/mongo-spark-connector_2.12-10.4.1.jar,oss://<bucketname>/path/to/mongodb-driver-core-5.0.1.jar,oss://<bucketname>/path/to/mongodb-driver-sync-5.0.1.jar,oss://<bucketname>/path/to/bson-5.0.1.jar

    涉及参数说明如下,请根据实际情况替换。

    参数

    描述

    示例

    参数

    描述

    示例

    spark.mongodb.write.connection.uri

    Spark写入和读取数据到MongoDB的连接URI。其中:

    • <MongoDB地址>:MongoDB服务的IP地址。

    • 27017:MongoDB默认监听的端口号。

    mongodb://192.168.x.x:27017

    spark.mongodb.read.connection.uri

    spark.user.defined.jars

    Spark所需的外部依赖项。

    上传至OSS的文件,例如,oss://<yourBucketname>/spark/mongodb/mongo-spark-connector_2.12-10.4.1.jar

  2. 数据开发页面,选择创建一个Python > Notebook类型的任务,然后在右上角选择创建的Notebook会话。

    更多操作,请参见管理Notebook会话

  3. 拷贝如下代码到新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行

    df = spark.read \
        .format("mongodb") \
        .option("database", "<yourDatabase>") \
        .option("collection", "<yourCollection>") \
        .load()
    
    df.printSchema()
    df.show()
    

    涉及参数信息说明如下,请根据实际情况填写。

    参数

    描述

    参数

    描述

    <yourDatabase>

    MongoDB服务中实际的数据库名称。例如,本文为mongo_table。

    <yourCollection>

    MongoDB服务中实际的集合名称。例如,本文为MongoCollection。

    如果能够正常返回已有的数据,则表明配置正确。

    image

步骤四:Serverless Spark写入MongoDB

拷贝如下代码到前一个步骤中新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行

from pyspark.sql import Row

data = [
    Row(name="Sam", age=25, city="New York"),
    Row(name="Charlie", age=35, city="Chicago")
]

df = spark.createDataFrame(data)
df.show()

df.write \
    .format("mongodb") \
    .option("database", "<yourDatabase>") \
    .option("collection", "<yourCollection>") \
    .mode("append") \
    .save()
    

如果能够正常返回写入的数据,则表明配置正确。

image

  • 本页导读
  • 前提条件
  • 使用限制
  • 操作流程
  • 步骤一:获取MongoDB Spark Connector JAR并上传至OSS
  • 步骤二:创建网络连接
  • 步骤三:Serverless Spark读取MongoDB表
  • 步骤四:Serverless Spark写入MongoDB表
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等