RDS(SQL Server)

本文介绍如何使用Databricks 读写阿里云RDS(SQL Server)数据源数据。

前提条件

使用Databricks 读写SQL Server数据

  1. DDI集群与SQL Server实例网络打通。

    1. 登录RDS管理控制台RDS管理控制台

    2. 点击右侧导航栏实例列表选择实例所在region

    3. 点击实例ID进入实例详情页面

    4. 点击实例详情右侧导航栏数据库连接

    5. 如图所示查看RDS实例所在的VPCVSwitchdata

    6. 登录到Databricks数据洞察集群阿里云Databricks控制台

    7. 选择集群所在region进入集群列表

    8. 点击集群实例进入集群详情页面

    9. 点击详情页面上方数据源页签进入数据源页面点击添加打他

    10. 选择通用网络,选择对应的VPC和VSwith点击下一步点击确认等待创建成功data

  2. 将数据源ENI IP添加至RDS白名单

    1. 等待1-j步骤的数据源实例创建好以后找到ENI IP打他

    2. 进入RDS管控实例点击数据库连接,选择白名单data

    3. 选择创建好的白名单组,点击修改data

    4. 添加白名单选配项,点击确认data

3.登录Databricks数据洞察集群进入Notebook,代码实现SQL Server数据读写。

示例文本下载:The_Sorrows_of_Young_Werther.txt

在Notebook中使用%spark读取OSS文件,并执行WordCount代码实现。

%spark

// 从oss读取数据到spark的rdd

import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}

// 从oss地址读取文本文档(注意oss文件在账号下上传到对应目录)
val text = sc.textFile("oss://your bucket/demo/The_Sorrows_of_Young_Werther.txt")
// 使用Scala做WordCount处理
val counts = text.flatMap(_.split("\\s+")).map(s => s.replaceAll("""[\.";,!]""", "")).map(word => (word, 1L)).reduceByKey(_+_).map(e => Row.apply(e._1, e._2))

lazy val schema = StructType(
    StructField("word", StringType) ::
    StructField("count", LongType) :: Nil)
    
val count_df = sqlContext.createDataFrame(counts, schema)
count_df.show()
data

在Notebook中使用%spark读写SQL Server数据

%spark

// ddi读写sqlserver数据

val sqlServer = "your server.rds.aliyuncs.com"
val sqlServerDb = "ddi_test"
val sqlServerUrl = s"jdbc:sqlserver://$sqlServer:1433;databaseName=$sqlServerDb"
//write
count_df.write.mode("overwrite").format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").save()
//read
val sqlServer_read_df = spark.read.format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").load
sqlServer_read_df.show()
data