本文介绍如何使用Databricks 读写阿里云RDS(SQL Server)数据源数据。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建SQL Server实例,具体参见创建SQL Server实例。
已创建DDI集群,具体请参见DDI集群创建。
创建集群并通过knox账号访问NoteBook。
使用Databricks 读写SQL Server数据
DDI集群与SQL Server实例网络打通。
登录RDS管理控制台RDS管理控制台
点击右侧导航栏实例列表选择实例所在region
点击实例ID进入实例详情页面
点击实例详情右侧导航栏数据库连接
如图所示查看RDS实例所在的VPC和VSwitch
登录到Databricks数据洞察集群阿里云Databricks控制台
选择集群所在region进入集群列表
点击集群实例进入集群详情页面
点击详情页面上方数据源页签进入数据源页面点击添加
选择通用网络,选择对应的VPC和VSwith点击下一步点击确认等待创建成功
将数据源ENI IP添加至RDS白名单
等待1-j步骤的数据源实例创建好以后找到ENI IP
进入RDS管控实例点击数据库连接,选择白名单
选择创建好的白名单组,点击修改
添加白名单选配项,点击确认
3.登录Databricks数据洞察集群进入Notebook,代码实现SQL Server数据读写。
示例文本下载:The_Sorrows_of_Young_Werther.txt
在Notebook中使用%spark读取OSS文件,并执行WordCount代码实现。
%spark
// 从oss读取数据到spark的rdd
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
// 从oss地址读取文本文档(注意oss文件在账号下上传到对应目录)
val text = sc.textFile("oss://your bucket/demo/The_Sorrows_of_Young_Werther.txt")
// 使用Scala做WordCount处理
val counts = text.flatMap(_.split("\\s+")).map(s => s.replaceAll("""[\.";,!]""", "")).map(word => (word, 1L)).reduceByKey(_+_).map(e => Row.apply(e._1, e._2))
lazy val schema = StructType(
StructField("word", StringType) ::
StructField("count", LongType) :: Nil)
val count_df = sqlContext.createDataFrame(counts, schema)
count_df.show()
在Notebook中使用%spark读写SQL Server数据
%spark
// ddi读写sqlserver数据
val sqlServer = "your server.rds.aliyuncs.com"
val sqlServerDb = "ddi_test"
val sqlServerUrl = s"jdbc:sqlserver://$sqlServer:1433;databaseName=$sqlServerDb"
//write
count_df.write.mode("overwrite").format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").save()
//read
val sqlServer_read_df = spark.read.format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").load
sqlServer_read_df.show()
文档内容是否对您有帮助?