云数据库 MongoDB 版(ApsaraDB for MongoDB)基于飞天分布式系统和高可靠存储引擎,完全兼容MongoDB协议,提供稳定可靠、弹性伸缩的数据库服务。云Spark分析引擎已支持对接云数据库 MongoDB 版,提供分析MongoDB数据库的能力。这里主要介绍通过“数据工作台”使用Spark对接MongoDB数据的使用方法。

前置条件

  1. Spark集群和MongoDB在同一个VPC下。进入Spark分析集群页面,选择“数据库连接”>“连接信息”,查看Spark集群的VPC ID信息。如下图:
  2. Spark集群关联MongoDB。进入Spark分析集群页面,选择“关联数据库”>“关联MongoDB”页面中,支持关联以及取消关联。如下图:
  3. 创建mongo的collection准备数据。参考文档连接到mongo实例,然后执行下面命令在config这个database下面创建test_collection,然后插入测试数据:
    use config;
    db.createCollection("test_collection");
    show collections;
    db.test_collection.insert( {"id":"id01","name":"name02"});
    db.test_collection.insert( {"id":"id02","name":"name02"});
    db.test_collection.insert( {"id":"id03","name":"name03"});
    db.test_collection.find().pretty()
    test_collection,内容结构如下:
    [{"id":"id01",
    "name":"name02"
    },
    {"id":"id02",
    "name":"name02"
    },
    {"id":"id03",
    "name":"name03"
    }]

使用“数据工作台”>“作业管理”运行样例

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及MongoDB依赖的jar包“alimongo-spark-2.4.0_2.3.2-1.0.jar”和“mongo-java-driver-3.8.2.jar”到本地目录。

wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/spark-examples-0.0.1-SNAPSHOT.jar
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/alimongo-spark-2.4.0_2.3.2-1.0.jar
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mongo-java-driver-3.8.2.jar

在“数据工作台”>“资源管理”中添加文件夹“spark_on_mongodb”。上传下载的三个jar包到此文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

--class com.aliyun.spark.mongodb.SparkOnMongoDBSparkSession
--jars /spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar,/spark_on_mongodb/mongo-java-driver-3.8.2.jar
--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_mongodb
 /spark_on_mongodb/spark-examples-0.0.1-SNAPSHOT.jar
mongodb://testuser:passwordxxx@xxx1:3717,xxx2:3717/xxx3
 config test_collection spark_on_mongodb

作业内容参数说明:

参数说明
/spark_on_mongodb/spark-examples-0.0.1-SNAPSHOT.jar /spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar /spark_on_mongodb/mongo-java-driver-3.8.2.jar步骤 1上传的Jar包在“数据工作台”>“资源管理”中的路径。
mongodb://testuser:passwordxxx@xxx1:3717,xxx2:3717/xxx3MongoDB集群中的“连接信息 (Connection String URI)”。
config test_collection分别为MongoDB集群中的数据库名称“config”和collection名称“test_collection”。
spark_on_mongodbSpark中创建映射MongoDB中collection的表名。

作业内容如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:

运行成功后点击“YarnUI”,翻到最后看到如下结果表明Spark读取MongoDB成功。如下:

+--------------------+----+------+
|                 _id|  id|  name|
+--------------------+----+------+
|[5ca5d1ed130f717a...|id01|name01|
+--------------------+----+------+
only showing top 1 row

+----+------+
|  id|  name|
+----+------+
|id01|name01|
+----+------+

+--------------------+----+------+
|                 _id|  id|  name|
+--------------------+----+------+
|[5ca5d1ed130f717a...|id01|name01|
+--------------------+----+------+

使用“交互式查询”运行样例

步骤 1:通过“会话管理”创建会话

在“数据工作台”>“会话管理”中点击“创建会话”。填写“会话名称”:spark_on_mongodb, 选择需要执行的Spark集群。点击“确认”后,编辑“会话内容”,内容如下:

--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--jars /spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar,/spark_on_mongodb/mongo-java-driver-3.8.2.jar
--name spark_on_mongodb

其中,/spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar,/spark_on_mongodb/mongo-java-driver-3.8.2.jar为:使用“作业管理”运行样例中步骤1上传的jar包。内容编辑完成后,点击“运行”(此处点击运行是为了下一步骤中可以选择会话“spark_on_mongodb”)。如下图:

步骤 2:通过“交互式查询”创建查询

进入“数据工作台”>“交互式查询”,在“会话列表”下拉框中选择“spark_on_mongodb”,然后点击“新建查询”。填写“查询名称”:spark_on_mongodb,“查询类型”选择“sql”。如下图:

步骤 3:通过“交互式查询”编辑查询

打开上步骤创建的“spark_on_mongodb”,输入如下内容:

create table spark_on_mongodb01(
    id string,
    name string
)using com.mongodb.spark.sql
options (
uri 'mongodb://testuser:passwordxxx@xxx1:3717,xxx2:3717/xxx3',
database 'config',
collection 'test_collection'
)

如下图:

建表语句关键字说明:

关键字说明
spark_on_mongodb01Spark SQL中创建的表名。
id,nameSpark SQL表中的字段,需要和MongoDB中collection的字段相同。
mongodb://testuser:passwordxxx@xxx1:3717,xxx2:3717/xxx3MongoDB集群中的“连接信息 (Connection String URI)”。
config指定MongoDB中数据库的名称。
test_collection指定MongoDB中collection的名称。

步骤 4:通过“交互式查询”运行查询

编辑查询“spark_on_mongodb”完成后,点击“运行”;运行成功后,在“查询内容”中输入“select * from spark_on_mongodb01”,然后点击“运行”验证结果,出现如下图结果表示运行成功。如下图:

小结