访问RDS MySQL数据源
AnalyticDB MySQL湖仓版(3.0)支持通过ENI网络和SSL链路两种方式访问RDS MySQL数据,通过SSL链路访问RDS MySQL数据的方式更加安全。本文主要介绍通过ENI网络和SSL链路访问RDS MySQL数据的具体方法。
前提条件
已购买与AnalyticDB MySQL湖仓版(3.0)集群位于同一地域的RDS MySQL实例。具体操作,请参见创建集群和创建RDS MySQL实例。
已创建Job型资源组。具体操作,请参见新建资源组。
已创建数据库账号。
如果您是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
已将RDS MySQL实例添加到安全组中,且安全组规则的入方向与出方向放行RDS MySQL端口的访问请求。具体操作,请参见设置安全组和添加安全组规则。
已开通OSS服务,并创建与AnalyticDB MySQL湖仓版(3.0)集群位于相同地域的存储空间。具体操作,请参见开通OSS服务和创建存储空间。
准备数据
在RDS MySQL中创建数据库和表,并插入数据。示例语句如下:
CREATE DATABASE `test`;
CREATE TABLE `test`.`persons` (
`id` int(11) DEFAULT NULL,
`first_name` varchar(32) DEFAULT NULL,
`laster_name` varchar(32) DEFAULT NULL,
`age` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
INSERT INTO persons VALUES(1,'a','b',5);
INSERT INTO persons VALUES(2,'c','d',6);
INSERT INTO persons VALUES(3,'e','f',7);
通过ENI网络访问RDS MySQL数据
步骤一:上传驱动程序及Spark作业依赖的Jar包
编写访问RDS MySQL表的示例程序(即Spark作业依赖的Jar包),并进行编译打包。本文生成的Jar包名称为
rds_test.jar
。示例代码如下:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkRDS { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder() .appName("rds mysql test") .getOrCreate() // RDS MySQL实例的内网地址。查看方法,请参见查看或修改内外网地址和端口。 val url = "jdbc:mysql://rm-bp11mpql1e01****.mysql.rds.aliyuncs.com" // RDS MySQL的表名。格式为"db_name.table_name"。 val dbtable = "test.persons" // 连接RDS MySQL数据库的账号。 val user = "mysql_username" // RDS MySQL数据库账号的密码。 val password = "mysql_password" val jdbcDF = sparkSession.read .format("jdbc") .option("url", url) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", dbtable) .option("user", user) .option("password", password) .load() jdbcDF.show() } }
在官方网站下载适配RDS MySQL版本的驱动程序。下载地址,请参见https://dev.mysql.com/downloads/connector/j/。
将Spark作业依赖的Jar包及RDS MySQL驱动程序上传至OSS中。具体操作,请参见上传文件。
步骤二:提交Spark作业
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。
在编辑器中输入以下作业内容。
{ "name": "rds-mysql-example", "jars": [ "oss://<bucket_name>/mysql-connector-java.jar" ], "file": "oss://<bucket_name>/rds_test.jar", "className": "com.aliyun.spark.SparkRDS", "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
参数说明如下:
参数
说明
name
Spark作业名称。
jars
RDS MySQL驱动程序所在的OSS路径。
file
Spark作业依赖的Jar包所在的OSS路径。
className
Java或者Scala程序入口类名称。
spark.adb.eni.enabled
开启ENI访问。
spark.adb.eni.vswitchId
交换机ID。在RDS MySQL实例的数据库连接页面,将鼠标移动至VPC处,获取交换机ID。
spark.adb.eni.securityGroupId
RDS MySQL实例中添加的安全组ID。如未添加安全组,请参见设置安全组。
conf
其他参数与开源Spark中的配置项基本一致,参数格式为
key:value
形式,多个参数之间以英文逗号(,)分隔。更多应用配置参数,请参见Spark应用配置参数说明。单击立即执行。
Spark作业执行成功后,您可以在Spark日志中查看RDS MySQL表的数据。如何查看日志,请参见查看Spark应用信息。
通过SSL链路访问RDS MySQL数据
通过SSL链路访问RDS MySQL数据时,RDS MySQL实例需开启SSL加密,且开启SSL加密时必须选择加密内网链路。具体操作,请参见设置SSL加密。
步骤一:下载CA证书并上传至OSS。
登录RDS管理控制台,在左上角选择集群所在地域。在左侧导航栏,单击实例列表,单击目标集群ID。
在左侧导航栏单击数据安全性。
单击下载CA证书。
重要CA证书的默认有效期为1年,过期需要重新生成。 使用过期的CA证书则无法通过SSL链路访问RDS的数据。
解压CA证书压缩包,并将JKS文件上传至OSS中。具体操作,请参见上传文件。
步骤二:上传驱动程序及Spark作业依赖的Jar包
编写访问RDS MySQL表的示例程序,并进行编译打包。本文生成的Jar包名称为
test.jar
。示例代码如下:package org.example import org.apache.spark.sql.SparkSession object Test { def main(args: Array[String]): Unit = { // JKS文件所在的OSS路径<bucketname>/folder/asparadb.jks val JKS_FILE_PATH = args(0) // 连接RDS MySQL数据库的账号。 val USERNAME = args(1) // RDS MySQL数据库账号的密码。 val PASSWORD = args(2) // RDS MySQL的数据库名。 val DATABASE_NAME = args(3) // RDS MySQL的表名。 val TABLE_NAME = args(4) // RDS MySQL实例的内网地址。 val mysqlUrl = "jdbc:mysql://rm-bp11mpql1e01****.mysql.rds.aliyuncs.com:3306/?" + "useSSL=true" + s"&clientCertificateKeyStoreUrl=file:///tmp/<JKS_FILE_PATH>" + "&clientCertificateKeyStorePassword=apsaradb" + s"&trustCertificateKeyStoreUrl=file:///tmp/<JKS_FILE_PATH>" + "&trustCertificateKeyStorePassword=apsaradb" + "&trustCertificateKeyStoreType=JKS" + "&clientCertificateKeyStoreType=JKS" val spark = SparkSession.builder().getOrCreate() spark.read.format("jdbc") .option("driver", "com.mysql.cj.jdbc.Driver") .option("url", mysqlUrl) .option("user", USERNAME) .option("password", PASSWORD) .option("dbtable", s"${DATABASE_NAME}.${TABLE_NAME}") .load() .show() } }
参数说明:
参数
说明
useSSL
是否使用SSL加密链接。取值:
true:是。
false(默认值):否。
clientCertificateKeyStoreUrl
JKS证书所在的本地路径,格式为
file:///tmp/<JKS_FILE_PATH>
,其中,JKS_FILE_PATH
为JKS证书所在的OSS路径。例如:JKS证书所在的OSS路径为
oss://<bucketname>/folder/ApsaraDB-CA-Chain.jks
,则JKS证书所在的本地路径为file:///tmp/<bucketname>/folder/ApsaraDB-CA-Chain.jks
。clientCertificateKeyStorePassword
JKS证书的密码,固定为apsaradb。
trustCertificateKeyStoreUrl
JKS证书所在的本地路径,格式为
file:///tmp/<JKS_FILE_PATH>
,其中,JKS_FILE_PATH
为JKS证书所在的OSS路径。例如:JKS证书所在的OSS路径为
oss://<bucketname>/folder/ApsaraDB-CA-Chain.jks
,则JKS证书所在的本地路径为file:///tmp/<bucketname>/folder/ApsaraDB-CA-Chain.jks
。trustCertificateKeyStorePassword
JKS证书的密码,固定为apsaradb。
clientCertificateKeyStoreType
证书的存储格式,固定为JKS。
trustCertificateKeyStoreType
证书的存储格式,固定为JKS。
将
test.jar
包上传至OSS中。具体操作,请参见上传文件。
步骤二:提交Spark作业
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。
在编辑器中输入以下作业内容。
{ "file": "oss://<bucketname>/test.jar", "className": "org.example.Test", "name": "MYSQL PEM Test", "args": [ "mybucket/ApsaraDB-CA-Chain.jks", "mysql_user_name", "mysql_passoword", "mysql_db", "mysql_table" ], "conf": { "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://<bucketname>/folder/ApsaraDB-CA-Chain.jks", "spark.executor.ADB_SPARK_DOWNLOAD_FILES": "oss://<bucketname>/folder/ApsaraDB-CA-Chain.jks", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****" } }
参数说明:
参数
说明
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES
JKS证书所在的OSS路径。多个JKS证书中间用英文逗号(,)分隔。例如:
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://<bucketname>/a.jks,oss://<bucketname>/b.jks
。spark.executor.ADB_SPARK_DOWNLOAD_FILES
JKS证书所在的OSS路径。多个JKS证书中间用英文逗号(,)分隔。
spark.executor.ADB_SPARK_DOWNLOAD_FILES": "oss://<bucketname>/a.jks,oss://<bucketname>/b.jks
。更多参数,请参见Spark应用开发介绍。
单击立即执行。
Spark作业执行成功后,您可以在Spark日志中查看RDS MySQL表的数据。如何查看日志,请参见查看Spark应用信息。