基于MongoDB官方提供的Spark Connector,EMR Serverless Spark可以在开发时添加对应的配置来连接MongoDB。本文为您介绍在EMR Serverless Spark环境中实现MongoDB的数据读取和写入操作。
前提条件
使用限制
仅Serverless Spark以下引擎版本支持本文操作:
esr-4.x:esr-4.1.0及之后版本。
esr-3.x:esr-3.1.0及之后版本。
esr-2.x:esr-2.5.0及之后版本。
操作流程
步骤一:获取MongoDB Spark Connector JAR并上传至OSS
根据MongoDB Spark Connector官方文档,结合Spark和MongoDB的版本,从Maven下载所需的依赖包,样例中的5.0.1为MongoDB的版本号,所需JAR包如下所示:
mongo-spark-connector_2.12-10.4.1.jar
mongodb-driver-core-5.0.1.jar
mongodb-driver-sync-5.0.1.jar
bson-5.0.1.jar
将下载的MongoDB Spark Connector JAR上传至阿里云OSS中,上传操作可以参见简单上传。
步骤二:创建网络连接
Serverless Spark需要能够打通与MongoDB之间的网络才可以正常访问MongoDB。更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通。
配置安全组规则时,端口范围请根据实际需求选择性开放必要的端口。端口范围的取值为1-65535。
步骤三:Serverless Spark读取MongoDB表
创建Notebook会话,详情请参见管理Notebook会话。
创建会话时,在引擎版本下拉列表中选择支持本文操作的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在Spark配置中添加以下参数来加载MongoDB Spark Connector。
spark.mongodb.write.connection.uri mongodb://<MongoDB地址>:27017 spark.mongodb.read.connection.uri mongodb://<MongoDB地址>:27017 spark.user.defined.jars oss://<bucketname>/path/to/mongo-spark-connector_2.12-10.4.1.jar,oss://<bucketname>/path/to/mongodb-driver-core-5.0.1.jar,oss://<bucketname>/path/to/mongodb-driver-sync-5.0.1.jar,oss://<bucketname>/path/to/bson-5.0.1.jar
涉及参数说明如下,请根据实际情况替换。
参数
描述
示例
参数
描述
示例
spark.mongodb.write.connection.uri
Spark写入和读取数据到MongoDB的连接URI。其中:
<MongoDB地址>
:MongoDB服务的IP地址。27017
:MongoDB默认监听的端口号。
mongodb://192.168.x.x:27017
spark.mongodb.read.connection.uri
spark.user.defined.jars
Spark所需的外部依赖项。
上传至OSS的文件,例如,
oss://<yourBucketname>/spark/mongodb/mongo-spark-connector_2.12-10.4.1.jar
。在数据开发页面,选择创建一个
类型的任务,然后在右上角选择创建的Notebook会话。更多操作,请参见管理Notebook会话。
拷贝如下代码到新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行。
df = spark.read \ .format("mongodb") \ .option("database", "<yourDatabase>") \ .option("collection", "<yourCollection>") \ .load() df.printSchema() df.show()
涉及参数信息说明如下,请根据实际情况填写。
参数
描述
参数
描述
<yourDatabase>
MongoDB服务中实际的数据库名称。例如,本文为mongo_table。
<yourCollection>
MongoDB服务中实际的集合名称。例如,本文为MongoCollection。
如果能够正常返回已有的数据,则表明配置正确。
步骤四:Serverless Spark写入MongoDB表
拷贝如下代码到前一个步骤中新增的Notebook页签中,并根据需要修改相应的参数信息,然后单击运行。
from pyspark.sql import Row
data = [
Row(name="Sam", age=25, city="New York"),
Row(name="Charlie", age=35, city="Chicago")
]
df = spark.createDataFrame(data)
df.show()
df.write \
.format("mongodb") \
.option("database", "<yourDatabase>") \
.option("collection", "<yourCollection>") \
.mode("append") \
.save()
如果能够正常返回写入的数据,则表明配置正确。
- 本页导读
- 前提条件
- 使用限制
- 操作流程
- 步骤一:获取MongoDB Spark Connector JAR并上传至OSS
- 步骤二:创建网络连接
- 步骤三:Serverless Spark读取MongoDB表
- 步骤四:Serverless Spark写入MongoDB表