本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark访问云数据库MongoDB数据。
前提条件
已创建企业版或湖仓版集群。具体操作,请参见创建集群。
已创建数据库账号。
如果您是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定RAM用户与数据库账号。
已创建Job型资源组。具体操作,请参见新建资源组。
已创建与企业版或湖仓版集群同地域的云数据库MongoDB,并在MongoDB中创建数据库、创建集合和写入数据。具体操作,请参见MongoDB入门概述。
已将云数据库MongoDB的交换机IP添加到MongoDB的白名单中。具体操作,请参见设置白名单。
说明在云数据库MongoDB控制台的基本信息页面查看交换机ID。登录专有网络管理控制台,查看目标交换机的IP。
已将云数据库MongoDB添加到安全组中,且安全组规则的入方向与出方向放行MongoDB端口的访问请求。具体操作,请参见添加安全组和添加安全组规则。
操作步骤
下载AnalyticDB for MySQLSpark访问云数据库MongoDB依赖的Jar包。下载链接mongo-spark-connector_2.12-10.1.1.jar、mongodb-driver-sync-4.8.2.jar、bson-4.8.2.jar、bson-record-codec-4.8.2.jar和mongodb-driver-core-4.8.2.jar。
在pom.xml文件的dependencies中添加依赖项。
<dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.1.1</version> </dependency>
编写如下示例程序访问云数据库MongoDB,并进行编译打包。本文生成的Jar包名称为
spark-mongodb.jar
。示例代码如下:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkOnMongoDB { def main(args: Array[String]): Unit = { // 访问MongoDB的专有网络地址,可在MongoDB控制台的数据库连接页面查看。 val connectionUri = args(0) // MongoDB数据库名称。 val database = args(1) // MongoDB集合名称。 val collection = args(2) val spark = SparkSession.builder() .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", connectionUri) .config("spark.mongodb.write.connection.uri", connectionUri) .getOrCreate() val df = spark.read.format("mongodb").option("database", database).option("collection", collection).load() df.show() spark.stop() } }
说明Spark访问MongoDB的更多配置请参见Configuration Options,更多代码请参见Write to MongoDB和Read from MongoDB。
将步骤1和步骤3中的Jar包上传至OSS。具体操作,请参见简单上传。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。
在编辑器中输入以下作业内容。
重要AnalyticDB for MySQLSpark访问云数据库MongoDB,支持通过专有网络和公网访问。
建议使用专有网络访问。
{ "args": [ -- 访问MongoDB的专有网络地址,可在MongoDB控制台的数据库连接页面查看。 "mongodb://<username>:<password>@<host1>:<port1>,<host2>:<port2>,...,<hostN>:<portN>/<database_name>", -- MongoDB数据库的名称。 "<database_name>", -- MongoDB集合名称。 "<collection_name>" ], "file": "oss://<bucket_name>/spark-mongodb.jar", "jars": [ "oss://<bucket_name>/mongo-spark-connector_2.12-10.1.1.jar", "oss://<bucket_name>/mongodb-driver-sync-4.8.2.jar", "oss://<bucket_name>/bson-4.8.2.jar", "oss://<bucket_name>/bson-record-codec-4.8.2.jar", "oss://<bucket_name>/mongodb-driver-core-4.8.2.jar" ], "name": "MongoSparkConnectorIntro", "className": "com.aliyun.spark.SparkOnMongoDB", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 2, "spark.executor.resourceSpec": "medium", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp14pj8h0****", "spark.adb.eni.securityGroupId": "sg-bp11m93k021tp****" } }
参数说明如下。
参数
说明
args
请根据业务需求,填写使用Jar包时需要的参数。多个参数之间以英文逗号(,)分隔。
file
示例程序
spark-mongodb.jar
所在的OSS路径。jars
Spark访问MongoDB依赖的Jar包所在的OSS路径。
name
Spark作业名称。
className
Java或者Scala程序入口类名称。Python不需要指定入口类。
spark.adb.eni.enabled
是否开启ENI访问。
使用湖仓版Spark访问MongoDB数据源时,需要开启ENI访问。
spark.adb.eni.vswitchId
交换机ID。在目标云数据库MongoDB控制台的基本信息页面获取交换机ID。
spark.adb.eni.securityGroupId
云数据库MongoDB中添加的安全组ID。如未添加安全组,请参见添加安全组。
conf其他参数
与开源Spark中的配置项基本一致,参数格式为
key:value
形式,多个参数之间以英文逗号(,)分隔。更多conf
参数,请参见Conf配置参数。单击立即执行。
待应用列表中目标应用的状态为已完成,您可以单击操作列的日志查看MongoDB表的数据。