全部产品

Spark对接MongoDB快速入门

更新时间:2019-06-18 11:25:55

简介

云数据库 MongoDB 版(ApsaraDB for MongoDB)基于飞天分布式系统和高可靠存储引擎,完全兼容MongoDB协议,提供稳定可靠、弹性伸缩的数据库服务。云Spark分析引擎已支持对接云数据库 MongoDB 版,提供分析MongoDB数据库的能力。这里主要介绍通过“数据工作台”使用Spark对接MongoDB数据的使用方法。

前置条件

  1. Spark集群和MongoDB在同一个VPC下。
    进入Spark分析集群页面,选择“数据库连接”>“连接信息”,查看Spark集群的VPC ID信息。如下图:
  2. Spark集群关联MongoDB。
    进入Spark分析集群页面,选择“关联数据库”>“关联MongoDB”页面中,支持关联以及取消关联。如下图:

  3. 创建mongo的collection准备数据。参考文档连接到mongo实例,然后执行下面命令在config这个database下面创建test_collection,然后插入测试数据:
    1. use config;
    2. db.createCollection("test_collection");
    3. show collections;
    4. db.test_collection.insert( {"id":"id01","name":"name02"});
    5. db.test_collection.insert( {"id":"id02","name":"name02"});
    6. db.test_collection.insert( {"id":"id03","name":"name03"});
    7. db.test_collection.find().pretty()
    test_collection,内容结构如下:
    1. [{"id":"id01",
    2. "name":"name02"
    3. },
    4. {"id":"id02",
    5. "name":"name02"
    6. },
    7. {"id":"id03",
    8. "name":"name03"
    9. }]

使用“数据工作台”>“作业管理”运行样例

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及MongoDB依赖的jar包“alimongo-spark-2.4.0_2.3.2-1.0.jar”和“mongo-java-driver-3.8.2.jar”到本地目录。

  1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/spark-examples-0.0.1-SNAPSHOT.jar
  2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/alimongo-spark-2.4.0_2.3.2-1.0.jar
  3. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mongo-java-driver-3.8.2.jar

在“数据工作台”>“资源管理”中添加文件夹“spark_on_mongodb”。上传下载的三个jar包到此文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

  1. --class com.aliyun.spark.mongodb.SparkOnMongoDBSparkSession
  2. --jars /spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar,/spark_on_mongodb/mongo-java-driver-3.8.2.jar
  3. --driver-memory 1G
  4. --driver-cores 1
  5. --executor-cores 1
  6. --executor-memory 2G
  7. --num-executors 1
  8. --name spark_on_mongodb
  9. /spark_on_mongodb/spark-examples-0.0.1-SNAPSHOT.jar
  10. mongodb://root:passwordxxx@xxx1:3717,xxx2:3717/xxx3
  11. config test_collection spark_on_mongodb

作业内容参数说明:

参数 说明
/spark_on_mongodb/spark-examples-0.0.1-SNAPSHOT.jar
/spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar
/spark_on_mongodb/mongo-java-driver-3.8.2.jar
步骤 1上传的Jar包在“数据工作台”>“资源管理”中的路径。
mongodb://root:passwordxxx@xxx1:3717,xxx2:3717/xxx3 MongoDB集群中的“连接信息 (Connection String URI)”。
config test_collection 分别为MongoDB集群中的数据库名称“config”和collection名称“test_collection”。
spark_on_mongodb Spark中创建映射MongoDB中collection的表名。

作业内容如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:


运行成功后点击“YarnUI”,翻到最后看到如下结果表明Spark读取MongoDB成功。如下:

  1. +--------------------+----+------+
  2. | _id| id| name|
  3. +--------------------+----+------+
  4. |[5ca5d1ed130f717a...|id01|name01|
  5. +--------------------+----+------+
  6. only showing top 1 row
  7. +----+------+
  8. | id| name|
  9. +----+------+
  10. |id01|name01|
  11. +----+------+
  12. +--------------------+----+------+
  13. | _id| id| name|
  14. +--------------------+----+------+
  15. |[5ca5d1ed130f717a...|id01|name01|
  16. +--------------------+----+------+

使用“交互式查询”运行样例

步骤 1:通过“会话管理”创建会话

在“数据工作台”>“会话管理”中点击“创建会话”。填写“会话名称”:spark_on_mongodb, 选择需要执行的Spark集群。
点击“确认”后,编辑“会话内容”,内容如下:

  1. --driver-memory 1G
  2. --driver-cores 1
  3. --executor-cores 1
  4. --executor-memory 2G
  5. --num-executors 1
  6. --jars /spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar,/spark_on_mongodb/mongo-java-driver-3.8.2.jar
  7. --name spark_on_mongodb

其中,/spark_on_mongodb/alimongo-spark-2.4.0_2.3.2-1.0.jar,/spark_on_mongodb/mongo-java-driver-3.8.2.jar为:使用“作业管理”运行样例中步骤1上传的jar包。
内容编辑完成后,点击“运行”(此处点击运行是为了下一步骤中可以选择会话“spark_on_mongodb”)。如下图:

步骤 2:通过“交互式查询”创建查询

进入“数据工作台”>“交互式查询”,在“会话列表”下拉框中选择“spark_on_mongodb”,然后点击“新建查询”。填写“查询名称”:spark_on_mongodb,“查询类型”选择“sql”。如下图:

步骤 3:通过“交互式查询”编辑查询

打开上步骤创建的“spark_on_mongodb”,输入如下内容:

  1. create table spark_on_mongodb01(
  2. id string,
  3. name string
  4. )using com.mongodb.spark.sql
  5. options (
  6. uri 'mongodb://root:passwordxxx@xxx1:3717,xxx2:3717/xxx3',
  7. database 'config',
  8. collection 'test_collection'
  9. )

如下图:


建表语句关键字说明:

关键字 说明
spark_on_mongodb01 Sprk SQL中创建的表名。
id,name Spark SQL表中的字段,需要和MongoDB中collection的字段相同。
mongodb://root:passwordxxx@xxx1:3717,xxx2:3717/xxx3 MongoDB集群中的“连接信息 (Connection String URI)”。
config 指定MongoDB中数据库的名称。
test_collection 指定MongoDB中collection的名称。

步骤 4:通过“交互式查询”运行查询

编辑查询“spark_on_mongodb”完成后,点击“运行”;运行成功后,在“查询内容”中输入“select * from spark_on_mongodb01”,然后点击“运行”验证结果,出现如下图结果表示运行成功。如下图:

小结