云数据库MongoDB版(ApsaraDB for MongoDB)是基于飞天分布式系统和高可靠存储引擎的在线数据库服务,完全兼容MongoDB协议,提供稳定可靠、弹性伸缩的数据库服务。本文主要介绍如何通过DLA
Serverless Spark访问云数据库MongoDB。
前提条件
- 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参考开通OSS服务。
- 已经创建云数据库MongoDB实例。具体请参考创建实例。
- 创建MongoDB的连接准备数据。
通过DMS连接MongoDB分片集群实例,执行以下命令在DMS上创建数据库config,并插入以下测试数据创建连接准备数据。具体示例如下:
db.createCollection("test_collection");
db.test_collection.insert( {"id":"id01","name":"name01"});
db.test_collection.insert( {"id":"id02","name":"name02"});
db.test_collection.insert( {"id":"id03","name":"name03"});
db.test_collection.find().pretty()
- 准备DLA Spark访问MongoDB实例所需的安全组ID和交换机ID。具体操作请参见配置数据源网络。
- DLA Spark访问MongoDB实例所需的安全组ID或交换机ID,已添加到MongoDB实例的白名单中。具体操作请参见设置白名单。
操作步骤
- 准备以下测试代码和依赖包来访问MongoDB,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.bson.Document
object SparkOnMongoDB {
def main(args: Array[String]): Unit = {
//获取MongoDB的connectionStringURI,database和collection。
val connectionStringURI = args(0)
val database = args(1)
val collection = args(2)
//Spark侧的表名。
var sparkTableName = if (args.size > 3) args(3) else "spark_on_mongodb_sparksession_test01"
val sparkSession = SparkSession
.builder()
.appName("scala spark on MongoDB test")
.getOrCreate()
//Spark读取MongoDB数据有多种方式。
//使用Dataset API方式:
//设置MongoDB的参数。
val sparkConf = new SparkConf()
.set("spark.mongodb.input.uri", connectionStringURI)
.set("spark.mongodb.input.database", database)
.set("spark.mongodb.input.collection", collection)
.set("spark.mongodb.output.uri", connectionStringURI)
.set("spark.mongodb.output.database", database)
.set("spark.mongodb.output.collection", collection)
val readConf = ReadConfig(sparkConf)
//获取Dataframe。
val df = MongoSpark.load(sparkSession, readConf)
df.show(1)
//使用MongoSpark.save入库数据到MongoDB。
val docs =
"""
|{"id": "id105", "name": "name105"}
|{"id": "id106", "name": "name106"}
|{"id": "id107", "name": "name107"}
|"""
.trim.stripMargin.split("[\\r\\n]+").toSeq
val writeConfig: WriteConfig = WriteConfig(Map(
"uri" -> connectionStringURI,
"spark.mongodb.output.database" -> database,
"spark.mongodb.output.collection"-> collection))
MongoSpark.save(sparkSession.sparkContext.parallelize(docs.map(Document.parse)), writeConfig)
//使用Sql的方式,SQL的方式有两种,指定Schema和不指定Schema。
//指定Schema的创建方式,Schema中的字段必须和MongoDB中Collection的Schema一致。
var createCmd =
s"""CREATE TABLE ${sparkTableName} (
| id String,
| name String
| ) USING com.mongodb.spark.sql
| options (
| uri '$connectionStringURI',
| database '$database',
| collection '$collection'
| )""".stripMargin
sparkSession.sql(createCmd)
var querySql = "select * from " + sparkTableName + " limit 1"
sparkSession.sql(querySql).show
//不指定Schema的创建方式,不指定Schema,Spark会映射MOngoDB中collection的Schema。
sparkTableName = sparkTableName + "_noschema"
createCmd =
s"""CREATE TABLE ${sparkTableName} USING com.mongodb.spark.sql
| options (
| uri '$connectionStringURI',
| database '$database',
| collection '$collection'
| )""".stripMargin
sparkSession.sql(createCmd)
querySql = "select * from " + sparkTableName + " limit 1"
sparkSession.sql(querySql).show
sparkSession.stop()
}
}
MongoDB依赖的pom文件:
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.8.2</version>
</dependency>
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择MongoDB实例所在地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{
"args": [
"mongodb://root:xxx@xxx:3717,xxx:3717/xxx", #MongoDB集群中的“连接信息 (Connection StringURI)”。
"config", #MongoDB集群中的数据库名称。
"test_collection", #MongoDB集群中的collection名称。
"spark_on_mongodb" #Spark中创建映射MongoDB中collection的表名。
],
"file": "oss://spark_test/jars/mongodb/spark-examples-0.0.1-SNAPSHOT.jar", #存放测试软件包的OSS路径。
"name": "mongodb-test",
"jars": [
"oss://spark_test/jars/mongodb/mongo-java-driver-3.8.2.jar", ##存放测试软件依赖包的OSS路径。
"oss://spark_test/jars/mongodb/mongo-spark-connector_2.11-2.4.2.jar" ##存放测试软件依赖包的OSS路径。
],
"className": "com.aliyun.spark.SparkOnMongoDB",
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small",
"spark.dla.eni.enable": "true",
"spark.dla.eni.vswitch.id": "vsw-xxx", #可访问MongoDB的交换机id。
"spark.dla.eni.security.group.id": "sg-xxx" #可访问MongoDB的安全组id。
}
}
执行结果
作业运行成功后,在任务列表中单击,查看作业日志。出现如下日志说明作业运行成功: