访问MongoDB数据源

本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark访问云数据库MongoDB数据。

前提条件

  • 已创建企业版或湖仓版集群。具体操作,请参见创建集群

  • 已创建数据库账号。

  • 已创建Job型资源组。具体操作,请参见新建资源组

  • 已创建与企业版或湖仓版集群同地域的云数据库MongoDB,并在MongoDB中创建数据库、创建集合和写入数据。具体操作,请参见MongoDB入门概述

  • 已将云数据库MongoDB的交换机IP添加到MongoDB的白名单中。具体操作,请参见设置白名单

    说明

    在云数据库MongoDB控制台的基本信息页面查看交换机ID。登录专有网络管理控制台,查看目标交换机的IP。

  • 已将云数据库MongoDB添加到安全组中,且安全组规则的入方向与出方向放行MongoDB端口的访问请求。具体操作,请参见添加安全组添加安全组规则

操作步骤

  1. 下载AnalyticDB for MySQLSpark访问云数据库MongoDB依赖的Jar包。下载链接mongo-spark-connector_2.12-10.1.1.jarmongodb-driver-sync-4.8.2.jarbson-4.8.2.jarbson-record-codec-4.8.2.jarmongodb-driver-core-4.8.2.jar

  2. 在pom.xml文件的dependencies中添加依赖项。

      <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.12</artifactId>
        <version>10.1.1</version>
      </dependency>
  3. 编写如下示例程序访问云数据库MongoDB,并进行编译打包。本文生成的Jar包名称为spark-mongodb.jar。示例代码如下:

    package com.aliyun.spark
    
    import org.apache.spark.sql.SparkSession
    
    object SparkOnMongoDB {
      def main(args: Array[String]): Unit = {
        // 访问MongoDB的专有网络地址,可在MongoDB控制台的数据库连接页面查看。
        val connectionUri = args(0)
        // MongoDB数据库名称。
        val database = args(1)
        // MongoDB集合名称。
        val collection = args(2)
        
        val spark = SparkSession.builder()
          .appName("MongoSparkConnectorIntro")
          .config("spark.mongodb.read.connection.uri", connectionUri)
          .config("spark.mongodb.write.connection.uri", connectionUri)
          .getOrCreate()
    
        val df = spark.read.format("mongodb").option("database", database).option("collection", collection).load()
        
        df.show()
        
        spark.stop()
      }
    }
    说明

    Spark访问MongoDB的更多配置请参见Configuration Options,更多代码请参见Write to MongoDBRead from MongoDB

  4. 将步骤1和步骤3中的Jar包上传至OSS。具体操作,请参见简单上传

  5. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版湖仓版页签下,单击目标集群ID。

  6. 在左侧导航栏,单击作业开发 > Spark Jar开发

  7. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  8. 在编辑器中输入以下作业内容。

    重要
    • AnalyticDB for MySQLSpark访问云数据库MongoDB,支持通过专有网络和公网访问。

    • 建议使用专有网络访问。

    {
      "args": [
        -- 访问MongoDB的专有网络地址,可在MongoDB控制台的数据库连接页面查看。
    	  "mongodb://<username>:<password>@<host1>:<port1>,<host2>:<port2>,...,<hostN>:<portN>/<database_name>",
        -- MongoDB数据库的名称。
        "<database_name>",
        -- MongoDB集合名称。
        "<collection_name>"
    	],
      "file": "oss://<bucket_name>/spark-mongodb.jar",
      "jars": [
        "oss://<bucket_name>/mongo-spark-connector_2.12-10.1.1.jar",
        "oss://<bucket_name>/mongodb-driver-sync-4.8.2.jar",
        "oss://<bucket_name>/bson-4.8.2.jar",
        "oss://<bucket_name>/bson-record-codec-4.8.2.jar",
        "oss://<bucket_name>/mongodb-driver-core-4.8.2.jar"
    	],
      "name": "MongoSparkConnectorIntro",
      "className": "com.aliyun.spark.SparkOnMongoDB",
      "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium",
        "spark.adb.eni.enabled": "true",
        "spark.adb.eni.vswitchId": "vsw-bp14pj8h0****",
        "spark.adb.eni.securityGroupId": "sg-bp11m93k021tp****"
      }
    }

    参数说明如下。

    参数

    说明

    args

    请根据业务需求,填写使用Jar包时需要的参数。多个参数之间以英文逗号(,)分隔。

    file

    示例程序spark-mongodb.jar所在的OSS路径。

    jars

    Spark访问MongoDB依赖的Jar包所在的OSS路径。

    name

    Spark作业名称。

    className

    Java或者Scala程序入口类名称。Python不需要指定入口类。

    spark.adb.eni.enabled

    是否开启ENI访问。

    使用湖仓版Spark访问MongoDB数据源时,需要开启ENI访问。

    spark.adb.eni.vswitchId

    交换机ID。在目标云数据库MongoDB控制台的基本信息页面获取交换机ID。

    spark.adb.eni.securityGroupId

    云数据库MongoDB中添加的安全组ID。如未添加安全组,请参见添加安全组

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。更多conf参数,请参见Conf配置参数

  9. 单击立即执行

  10. 应用列表中目标应用的状态为已完成,您可以单击操作列的日志查看MongoDB表的数据。