RDS (MySQL)
更新时间:
本文介绍如何使用Databricks 读写阿里云RDS(MySQL)数据源数据。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建MySQL实例,具体参见创建MySQL实例。
已创建DDI集群,具体请参见DDI集群创建。
创建集群并通过knox账号访问NoteBook。
使用Databricks 读写MySQL数据
DDI集群与MySQL实例网络打通。
登录RDS管理控制台RDS管理控制台
点击右侧导航栏实例列表选择实例所在region
点击实例ID进入实例详情页面
点击实例详情右侧导航栏数据库连接
如图所示查看RDS实例所在的VPC和VSwitch
登录到databricks数据洞察集群阿里云Databricks控制台
选择集群所在region进入集群列表
点击集群实例进入集群详情页面
点击详情页面上方数据源页签进入数据源页面点击添加
选择通用网络,选择对应的VPC和VSwith点击下一步点击确认等待创建成功
将数据源ENI IP添加至RDS白名单
等待1-j步骤的数据源实例创建好以后找到ENI IP
进入RDS管控实例点击数据库连接,选择白名单
按照白名单添加的规则将IP白名单列表添加进去。
添加白名单选配项,点击确认
3.登录到RDS数据库并执行创表语句
建表语句:
CREATE DATABASE case_demos;
USE case_demos;
CREATE TABLE `word_count_demo` (
`word` text,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
4.登录Databricks数据洞察集群进入Notebook,代码实现MySQL数据读写。
示例文本下载:The_Sorrows_of_Young_Werther.txt
%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//链接数据库配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo"
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
val inputPath = "oss://ddi-test/The Sorrows of Young Werther"
val numPartitions = 3
//分区读取OSS文件,并计算WordCount;
val input = sc.textFile(inputPath, numPartitions)
val counts = input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).map(e => Row.apply(e._1, e._2))
//创建自定义schema;
lazy val schema = StructType(
StructField("word", StringType) ::
StructField("count", IntegerType) :: Nil)
//读取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//创建的DataFrame;
val df = sqlContext.createDataFrame(counts, schema)
//将wordCount计算好结果的DF写入到MySQL;
df.write.mode("append").option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
说明
当链接MySQL8.x需要加入配选项option("driver","com.mysql.cj.jdbc.Driver")
如果不加默认会用com.mysql.jdbc.Driver
只能链接MySQL5.x版本。
查询数据库是否插入成功
5.读取数据库数据
%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//链接数据库配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo" //选择过滤条件
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
//读取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//读取数据
val rds_data=spark.read.option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
//数据展示
rds_data.show()
文档内容是否对您有帮助?