文档

RDS (MySQL)

更新时间:
一键部署

本文介绍如何使用Databricks 读写阿里云RDS(MySQL)数据源数据。

前提条件

使用Databricks 读写MySQL数据

  1. DDI集群与MySQL实例网络打通。

    1. 登录RDS管理控制台RDS管理控制台

    2. 点击右侧导航栏实例列表选择实例所在region

    3. 点击实例ID进入实例详情页面

    4. 点击实例详情右侧导航栏数据库连接

    5. 如图所示查看RDS实例所在的VPCVSwitchdata

    6. 登录到databricks数据洞察集群阿里云Databricks控制台

    7. 选择集群所在region进入集群列表

    8. 点击集群实例进入集群详情页面

    9. 点击详情页面上方数据源页签进入数据源页面点击添加打他

    10. 选择通用网络,选择对应的VPC和VSwith点击下一步点击确认等待创建成功data

  2. 将数据源ENI IP添加至RDS白名单

    1. 等待1-j步骤的数据源实例创建好以后找到ENI IP打他

    2. 进入RDS管控实例点击数据库连接,选择白名单data

    3. 按照白名单添加的规则将IP白名单列表添加进去。data

    4. 添加白名单选配项,点击确认data

3.登录到RDS数据库并执行创表语句

dat

建表语句:

CREATE DATABASE case_demos;
USE case_demos;
CREATE TABLE `word_count_demo` (
  `word` text,
  `count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

4.登录Databricks数据洞察集群进入Notebook,代码实现MySQL数据读写。

示例文本下载:The_Sorrows_of_Young_Werther.txt

%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//链接数据库配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo"
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
val inputPath = "oss://ddi-test/The Sorrows of Young Werther"
val numPartitions = 3
//分区读取OSS文件,并计算WordCount;
val input = sc.textFile(inputPath, numPartitions)
val counts = input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).map(e => Row.apply(e._1, e._2))
//创建自定义schema;
lazy val schema = StructType(
    StructField("word", StringType) ::
    StructField("count", IntegerType) :: Nil)
//读取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//创建的DataFrame;
val df = sqlContext.createDataFrame(counts, schema)
//将wordCount计算好结果的DF写入到MySQL;
df.write.mode("append").option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
说明

当链接MySQL8.x需要加入配选项option("driver","com.mysql.cj.jdbc.Driver") 如果不加默认会用com.mysql.jdbc.Driver只能链接MySQL5.x版本。

查询数据库是否插入成功

data

5.读取数据库数据

%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//链接数据库配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo"  //选择过滤条件
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
//读取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//读取数据
val rds_data=spark.read.option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
//数据展示
rds_data.show()
data