访问RDS MySQL数据源

更新时间:

云原生数据仓库 AnalyticDB MySQL 版支持访问同账号或其他阿里云账号(跨账号)下RDS MySQL中的数据。您可以通过ENI和SSL链路两种方式访问RDS MySQL数据,SSL链路访问RDS MySQL数据时可以加密网络连接,保证数据的安全性,对比ENI访问方式更加安全。本文主要介绍通过ENI和SSL链路访问RDS MySQL数据的具体方法。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • AnalyticDB for MySQL集群与RDS MySQL实例位于同一地域。具体操作,请参见创建集群创建RDS MySQL实例

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建AnalyticDB for MySQL集群的数据库账号。

  • 已将RDS MySQL实例添加到安全组中,且安全组规则的入方向与出方向放行RDS MySQL端口的访问请求。具体操作,请参见设置安全组添加安全组规则

  • 已开通OSS服务,并创建与AnalyticDB for MySQL集群位于相同地域的存储空间。具体操作,请参见开通OSS服务创建存储空间

  • 已完成授权操作。具体操作,请参见账号授权

    重要

    同账号访问时,需具备AliyunADBSparkProcessingDataRole权限;跨账号访问时,需要对其他阿里云账号授权。

数据准备

在RDS MySQL中创建数据库和表,并插入数据。示例语句如下:

CREATE DATABASE `test`;

CREATE TABLE `test`.`persons` (
  `id` int(11) DEFAULT NULL,
  `first_name` varchar(32) DEFAULT NULL,
  `laster_name` varchar(32) DEFAULT NULL,
  `age` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;

INSERT INTO persons VALUES(1,'a','b',5);
INSERT INTO persons VALUES(2,'c','d',6);
INSERT INTO persons VALUES(3,'e','f',7);

通过ENI访问RDS MySQL数据

上传驱动程序及Spark作业依赖的Jar包

  1. 编写访问RDS MySQL表的示例程序(即Spark作业依赖的Jar包),并进行编译打包。本文生成的Jar包名称为rds_test.jar。示例代码如下:

    package com.aliyun.spark
    import org.apache.spark.sql.SparkSession
    object SparkRDS {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder()
        .appName("rds mysql test")
        .getOrCreate()
        // RDS MySQL实例的内网地址。查看方法,请参见查看或修改内外网地址和端口。
        val url = "jdbc:mysql://rm-bp11mpql1e01****.mysql.rds.aliyuncs.com"
        // RDS MySQL的表名。格式为"db_name.table_name"。
        val dbtable = "test.persons"
        //  连接RDS MySQL数据库的账号。
        val user = "mysql_username"
        //  RDS MySQL数据库账号的密码。
        val password = "mysql_password"
        val jdbcDF = sparkSession.read
        .format("jdbc")
        .option("url", url)
        .option("driver", "com.mysql.jdbc.Driver")
        .option("dbtable", dbtable)
        .option("user", user)
        .option("password", password)
        .load()
        jdbcDF.show()
      }
    }
  2. 在官方网站下载适配RDS MySQL版本的驱动程序。下载地址,请参见https://dev.mysql.com/downloads/connector/j/

    本文以mysql-connector-java-8.0.11.jar为例。

  3. 将Spark作业依赖的Jar包及RDS MySQL驱动程序上传至OSS中。具体操作,请参见上传文件

同账号访问RDS MySQL数据

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  3. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  4. 在编辑器中输入以下作业内容。

    {
        "name": "rds-mysql-example",
        "jars": [
            "oss://testBucketName/mysql-connector-java-8.0.11.jar"
        ],
        "file": "oss://testBucketName/rds_test.jar",
        "className": "com.aliyun.spark.SparkRDS",
        "conf": {
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small"
        }
    }

    参数说明:

    参数

    说明

    name

    Spark作业名称。

    jars

    RDS MySQL驱动程序所在的OSS路径。

    本文示例为mysql-connector-java-8.0.11.jar包所在的OSS路径。

    file

    Spark作业依赖的Jar包所在的OSS路径。

    className

    Java或者Scala程序入口类名称。

    本文示例为com.aliyun.spark.SparkRDS

    spark.adb.eni.enabled

    开启ENI访问。

    spark.adb.eni.vswitchId

    交换机ID。在RDS MySQL实例的数据库连接页面,将鼠标移动至VPC处,获取交换机ID。

    spark.adb.eni.securityGroupId

    RDS MySQL实例中添加的安全组ID。如未添加安全组,请参见设置安全组

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。更多应用配置参数,请参见Spark应用配置参数说明

  5. 单击立即执行

  6. Spark作业执行成功后,您可以在Spark日志中查看RDS MySQL表的数据。如何查看日志,请参见查看Spark应用信息

跨账号访问RDS MySQL数据

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  3. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  4. 在编辑器中输入以下作业内容。

    {
        "name": "rds-mysql-example",
        "jars": [
            "oss://testBucketName/mysql-connector-java-8.0.11.jar"
        ],
        "file": "oss://testBucketName/rds_test.jar",
        "className": "com.aliyun.spark.SparkRDS",
        "conf": {
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small"
            "spark.adb.eni.roleArn":"acs:ram::testAccountID:role/testUserName"
        }
    }

    参数说明:

    参数

    说明

    spark.adb.eni.roleArn

    跨账号访问RDS数据源时使用的RAM角色。多个角色之间使用英文逗号(,)隔开。格式为acs:ram::testAccountID:role/testUserName

    • testAccountID:RDS数据源所属的阿里云账号ID。

    • testUserName:跨账号授权时创建的RAM角色。详细信息,请参见跨账号授权

    更多参数,请参见参数说明

  5. 单击立即执行

  6. Spark作业执行成功后,您可以在Spark日志中查看RDS MySQL表的数据。如何查看日志,请参见查看Spark应用信息

通过SSL链路访问RDS MySQL数据

通过SSL链路访问RDS MySQL数据时,RDS MySQL实例需开启SSL加密,且开启SSL加密时必须选择加密内网链路。具体操作,请参见使用云端证书快速开启SSL链路加密

下载CA证书并上传至OSS

  1. 登录RDS管理控制台,在左上角选择集群所在地域。在左侧导航栏,单击实例列表,单击目标集群ID。

  2. 在左侧导航栏单击数据安全性

  3. 单击下载CA证书

    重要

    CA证书的默认有效期为1年,过期需要重新生成。 使用过期的CA证书则无法通过SSL链路访问RDS的数据。

  4. 解压CA证书压缩包,并将JKS文件上传至OSS中。具体操作,请参见上传文件

上传驱动程序及Spark作业依赖的Jar包

  1. 编写访问RDS MySQL表的示例程序,并进行编译打包。本文生成的Jar包名称为test.jar。示例代码如下:

    package org.example
    import org.apache.spark.sql.SparkSession
    
    object Test {
      def main(args: Array[String]): Unit = {
        // JKS文件所在的OSS路径oss://testBucketName/folder/ApsaraDB-CA-Chain.jks。
        val JKS_FILE_PATH = args(0)
        
        //  连接RDS MySQL数据库的账号。
        val USERNAME = args(1)
    
        //  RDS MySQL数据库账号的密码。
        val PASSWORD = args(2)
    
        // RDS MySQL的数据库名。
        val DATABASE_NAME = args(3)
    
        // RDS MySQL的表名。
        val TABLE_NAME = args(4)
        // RDS MySQL实例的内网地址。
        val mysqlUrl = "jdbc:mysql://rm-bp11mpql1e01****.mysql.rds.aliyuncs.com:3306/?" +
          "useSSL=true" +
          s"&trustCertificateKeyStoreUrl=file:///tmp/testBucketName/folder/ApsaraDB-CA-Chain.jks" +
          "&trustCertificateKeyStorePassword=apsaradb" +
          "&trustCertificateKeyStoreType=JKS" +
        
        val spark = SparkSession.builder().getOrCreate()
    
         spark.read.format("jdbc")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("url", mysqlUrl)
          .option("user", USERNAME)
          .option("password",  PASSWORD)
          .option("dbtable", s"${DATABASE_NAME}.${TABLE_NAME}")
          .load()
          .show()
      }
    }

    参数说明:

    参数

    说明

    useSSL

    是否使用SSL加密链接。取值:

    • true:是。

    • false(默认值):否。

    本文示例需选择true。

    trustCertificateKeyStoreUrl

    JKS证书所在的本地路径,格式为file:///tmp/<JKS_FILE_PATH>,其中,JKS_FILE_PATH为JKS证书所在的OSS路径。

    例如:JKS证书所在的OSS路径为oss://testBucketName/folder/ApsaraDB-CA-Chain.jks,则JKS证书所在的本地路径为file:///tmp/testBucketName/folder/ApsaraDB-CA-Chain.jks

    trustCertificateKeyStorePassword

    JKS证书的密码,固定为apsaradb。

    trustCertificateKeyStoreType

    证书的存储格式,固定为JKS

  2. test.jar包上传至OSS中。具体操作,请参见上传文件

同账号访问RDS MySQL数据

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  3. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  4. 在编辑器中输入以下作业内容。

    {
      "file": "oss://testBucketName/test.jar",
    	"className": "org.example.Test",
      "name": "MYSQL PEM Test",
      "conf": {
            "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks",
    	      "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
    	      "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
    	      "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****"
        }
    }
    

    参数说明:

    参数

    说明

    name

    Spark作业名称。

    file

    Spark作业依赖的Jar包所在的OSS路径。

    className

    Java或者Scala程序入口类名称。

    本文示例为org.example.Test

    spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES

    Spark Driver节点参数,用于指定JKS证书所在的OSS路径。多个JKS证书中间用英文逗号(,)分隔。例如:spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: "oss://testBucketName/a.jks,oss://testBucketName/b.jks"

    spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES

    Spark Executor节点参数,用于指定JKS证书所在的OSS路径。多个JKS证书中间用英文逗号(,)分隔。例如:spark.executor.ADB_SPARK_DOWNLOAD_FILES: "oss://testBucketName/a.jks,oss://testBucketName/b.jks"

    spark.adb.eni.enabled

    开启ENI访问。

    spark.adb.eni.vswitchId

    交换机ID。在RDS MySQL实例的数据库连接页面,将鼠标移动至VPC处,获取交换机ID。

    spark.adb.eni.securityGroupId

    RDS MySQL实例中添加的安全组ID。如未添加安全组,请参见设置安全组

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。更多应用配置参数,请参见Spark应用配置参数说明

  5. 单击立即执行

  6. Spark作业执行成功后,您可以在Spark日志中查看RDS MySQL表的数据。如何查看日志,请参见查看Spark应用信息

跨账号访问RDS MySQL数据

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  3. 在编辑器窗口上方,选择Job型资源组和作业类型。本文以Batch类型为例。

  4. 在编辑器中输入以下作业内容。

    {
      "file": "oss://testBucketName/test.jar",
      "className": "org.example.Test",
      "name": "MYSQL PEM Test",
      "conf": {
            "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks",
            "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****",
            "spark.adb.eni.roleArn":"acs:ram::testAccountID:role/testUserName", 
        }
    }

    参数说明:

    参数

    说明

    spark.adb.eni.roleArn

    跨账号访问RDS数据源时使用的RAM角色。多个角色之间使用英文逗号(,)隔开。格式为acs:ram::testAccountID:role/testUserName

    • testAccountID:RDS数据源所属的阿里云账号ID。

    • testUserName:跨账号授权时创建的RAM角色。详细信息,请参见跨账号授权

    更多参数,请参见参数说明

  5. 单击立即执行

  6. Spark作业执行成功后,您可以在Spark日志中查看RDS MySQL表的数据。如何查看日志,请参见查看Spark应用信息