通过Spark SQL读MySQL数据

云原生数据仓库 AnalyticDB MySQL 版支持提交Spark SQL作业,您可以通过View或Catalog两种方式访问自建MySQL数据库或云数据库RDS MySQL、云原生数据库 PolarDB MySQL。本文以RDS MySQL为例,介绍如何通过Spark SQL访问RDS MySQL数据。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • AnalyticDB for MySQL集群与RDS MySQL实例位于同一地域。

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建AnalyticDB for MySQL集群的数据库账号。

  • 已将RDS MySQL实例添加到安全组中,且安全组规则的入方向与出方向放行RDS MySQL端口的访问请求。具体操作,请参见设置安全组添加安全组规则

步骤一:数据准备

在RDS MySQL中创建数据库和表,并插入数据。示例语句如下:

CREATE DATABASE `db`;

CREATE TABLE `db`.`test` (
  `id` int(11) DEFAULT NULL,
  `first_name` varchar(32) DEFAULT NULL,
  `laster_name` varchar(32) DEFAULT NULL,
  `age` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;

INSERT INTO test VALUES(1,'a','b',5);
INSERT INTO test VALUES(2,'c','d',6);
INSERT INTO test VALUES(3,'e','f',7);

(可选)步骤二:上传CA证书及RDS MySQL驱动程序

说明

如果您不需要通过SSL链路访问RDS MySQL数据,可跳过该步骤,直接提交Spark SQL作业,详情请参见提交Spark SQL作业

  1. 为RDS MySQL实例开启SSL加密,并下载CA证书。具体操作,请参见使用云端证书快速开启SSL链路加密

    重要
    • 开启SSL加密时可以选择加密内网或公网链路。本文中必须加密内网链路。

    • CA证书的默认有效期为1年,过期需要重新生成。 使用过期的CA证书则无法通过SSL链路访问RDS的数据。

  2. 在官方网站下载适配RDS MySQL版本的驱动程序。下载地址,请参见https://dev.mysql.com/downloads/connector/j/

    本文以mysql-connector-java-8.0.29.jar包为例。

  3. 解压CA证书压缩包,并将JKS文件和RDS MySQL驱动程序上传至OSS中。具体操作,请参见上传文件

步骤三:提交Spark SQL作业

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和Job型资源组。

  4. SQLConsole窗口中按访问方式输入以下作业内容:

    View方式访问

    View方式访问数据时创建的视图为临时视图,不会被持久化,每次启动作业时需重新创建视图。通过View方式访问RDS MySQL数据时,也可进一步选择通过ENI或SSL加密链路访问

    通过ENI访问RDS MySQL数据

    set spark.adb.eni.enabled=true;
    set spark.adb.eni.vswitchId=<vsw-bp1sxxsodv28ey5dl****>;   
    set spark.adb.eni.securityGroupId=<sg-bp19mr685pmg4ihc****>;    
    
    CREATE TEMPORARY VIEW table_tmp
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url '<jdbc:mysql://rm-bp1k87323a7ia****.mysql.rds.aliyuncs.com:3306>',     
      dbtable '<db.test>',    
      user '<user>',       
      password '<password>'      
    );
    
    select * from table_tmp;

    参数说明如下:

    参数

    说明

    spark.adb.eni.enabled

    开启ENI访问。

    访问数据时,需将spark.adb.eni.enabled参数设置为true

    spark.adb.eni.vswitchId

    交换机ID。在RDS MySQL实例的数据库连接页面,将鼠标移动至VPC处,获取交换机ID。

    spark.adb.eni.securityGroupId

    RDS MySQL实例中添加的安全组ID。如未添加安全组,请参见设置安全组

    table_tmp

    视图名称。本文以table_tmp为例。

    USING org.apache.spark.sql.jdbc

    参数取值固定为USING org.apache.spark.sql.jdbc

    OPTIONS

    • url:RDS MySQL实例的内网地址和端口。格式为:jdbc:mysql://rm-bp1k87323a7ia****.mysql.rds.aliyuncs.com:3306

    • dbtable:RDS MySQL的表名。格式为db_name.table_name。本文以db_1.table_11为例。

    • user:RDS MySQL数据库的账号。

    • password:RDS MySQL数据库账号的密码。

    通过SSL链路访问RDS MySQL数据

    add jar oss://<bucketname>/mysql-connector-java-8.0.11.jar;
    set spark.app.name=SSL_RDS_SQL;
    set spark.adb.eni.enabled=true;
    set spark.adb.eni.vswitchId=<vsw-bp1sxxsodv28ey5dl****>;
    set spark.adb.eni.securityGroupId=<sg-bp19mr685pmg4ihc****>;
    set spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES=oss://<bucketname>/ApsaraDB-CA-Chain.jks;
    set spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES=oss://<bucketname>/ApsaraDB-CA-Chain.jks;
    
    CREATE TEMPORARY VIEW table_view
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url '<jdbc:mysql://rm-bp1k87323a7ia****.mysql.rds.aliyuncs.com:3306/?useSSL=true&clientCertificateKeyStoreUrl=file:///tmp/<bucketname>/folder/ApsaraDB-CA-Chain.jks&clientCertificateKeyStorePassword=apsaradb&trustCertificateKeyStoreUrl=file:///tmp/<bucketname>/folder/ApsaraDB-CA-Chain.jks&trustCertificateKeyStorePassword=apsaradb&trustCertificateKeyStoreType=JKS&clientCertificateKeyStoreType=JKS>',
      dbtable '<db.test>',
      user '<user>',
      password '<password>'
    );

    参数说明如下:

    参数

    说明

    add jar

    RDS MySQL驱动程序所在的OSS路径。

    本文示例为mysql-connector-java-8.0.11.jar包所在的OSS路径

    spark.app.name

    Spark SQL作业名称。

    spark.adb.eni.enabled

    开启ENI访问。

    访问数据时,需将spark.adb.eni.enabled参数设置为true

    spark.adb.eni.vswitchId

    交换机ID。在RDS MySQL实例的数据库连接页面,将鼠标移动至VPC处,获取交换机ID。

    spark.adb.eni.securityGroupId

    RDS MySQL实例中添加的安全组ID。如未添加安全组,请参见设置安全组

    spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES

    Spark Driver节点参数,用于指定JKS证书所在的OSS路径。多个JKS证书中间用英文逗号(,)分隔。例如:spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: "oss://testBucketName/a.jks,oss://testBucketName/b.jks"

    spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES

    Spark Executor节点参数,用于指定JKS证书所在的OSS路径。多个JKS证书中间用英文逗号(,)分隔。例如:spark.executor.ADB_SPARK_DOWNLOAD_FILES: "oss://testBucketName/a.jks,oss://testBucketName/b.jks"

    table_view

    视图名称。本文以table_view为例。

    USING org.apache.spark.sql.jdbc

    参数取值固定为USING org.apache.spark.sql.jdbc

    OPTIONS

    • url:RDS MySQL实例的内网地址、端口及SSL链路对应参数。格式为jdbc:mysql://rm-bp1k87323a7ia****.mysql.rds.aliyuncs.com:3306/?useSSL=true&clientCertificateKeyStoreUrl=file:///tmp/<bucketname>/folder/ApsaraDB-CA-Chain.jks&clientCertificateKeyStorePassword=apsaradb&trustCertificateKeyStoreUrl=file:///tmp/<bucketname>/folder/ApsaraDB-CA-Chain.jks&trustCertificateKeyStorePassword=apsaradb&trustCertificateKeyStoreType=JKS&clientCertificateKeyStoreType=JKS

      SSL链路对应参数的详细信息,请参见参数说明

    • dbtable:RDS MySQL的表名。格式为db_name.table_name。本文以db.test为例。

    • user:RDS MySQL数据库的账号。

    • password:RDS MySQL数据库账号的密码。

    Catalog方式访问

    set spark.adb.eni.enabled=true;
    set spark.adb.eni.vswitchId=<vsw-bp1d14ddiw46fkgu1****>;
    set spark.adb.eni.securityGroupId=<sg-bp19varsa8j0hyb****>;
    set spark.sql.catalog.jdbc=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog;
    set spark.sql.catalog.jdbc.url=<jdbc:mysql://rm-bp11mpql1e01l****.mysql.rds.aliyuncs.com:3306>;
    set spark.sql.catalog.jdbc.user=<user>;
    set spark.sql.catalog.jdbc.password=<password>;
    use jdbc;
    select * from db.test;

    参数说明如下:

    参数

    说明

    spark.sql.catalog.jdbc

    Spark SQL支持的配置数据源的方式。

    参数取值固定为org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog

    说明

    spark.sql.catalog.catalog_name参数名称中的catalog_name可自定义,本文示例均为jdbc

    spark.sql.catalog.jdbc.url

    RDS MySQL实例的内网地址和端口。格式为:jdbc:mysql://rm-bp1k87323a7ia****.mysql.rds.aliyuncs.com:3306

    spark.sql.catalog.jdbc.user

    RDS MySQL数据库的账号。

    spark.sql.catalog.jdbc.password

    RDS MySQL数据库账号的密码。

    说明

    若您想了解更多关于JDBC访问数据源的参数,请参见JDBC To Other Databases