本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文介绍如何使用Databricks 读写MaxCompute数据。
前提条件
已创建MaxCompute实例,具体参见快速体验MaxCompute
通过主账号登录阿里云 Databricks控制台。
已创建DDI集群,具体请参见DDI集群创建。
创建集群并通过knox账号访问NoteBook。
使用Databricks 读写MaxCompute数据
读取maxCompute数据集的dwd_product_movie_basic_info表中ds=20170519分区的数据到DataFrame中,代码实现。
警告
odpsUrl和tunnelUrl都需要设置为VPC内网访问格式,否则提交job的时候会因为集群中worker节点(无公网ip)连接不上odps导致job超时。
%spark
val akId = "your akId"
val aks = "your aks"
val project = "maxcompute_public_data"
val table = "dwd_product_movie_basic_info"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" //参考文档
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" //参考文档
//spark读取dwd_product_movie_basic_info表中ds=20170519分区的数据到DataFrame中;
val readDF = spark.read.format("org.apache.spark.aliyun.odps.datasource").option("odpsUrl", odpsUrl).option("tunnelUrl", tunnelUrl).option("project",project).option("table",table).option("accessKeySecret",aks).option("accessKeyId", akId).option("partitionSpec", "ds=20170519").load()
readDF.show()
通过自定义Schema创建MaxCompute分区表数据,
说明
MC数据读取依赖ddi-datasources_shaded_2.11-1.0-SNAPSHOT.jar,该包中封装了spark调用odps数据的sdk,并封装了MaxCompute建表的工具类OdpsUtils。
%spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.aliyun.odps.{Column, OdpsType, TableSchema}
import org.apache.spark.aliyun.utils.OdpsUtils
//定义参数
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" //参考文档
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" //参考文档
val flag=true
//创建schema
val schema = new TableSchema
schema.addColumn(new Column("movie_name", OdpsType.STRING))
schema.addColumn(new Column("dirctor", OdpsType.STRING))
schema.addColumn(new Column("scriptwriter", OdpsType.STRING))
schema.addColumn(new Column("area", OdpsType.STRING))
schema.addColumn(new Column("actors", OdpsType.STRING))
schema.addColumn(new Column("type", OdpsType.STRING))
schema.addColumn(new Column("movie_length", OdpsType.STRING))
schema.addColumn(new Column("movie_date", OdpsType.STRING))
schema.addColumn(new Column("movie_language", OdpsType.STRING))
schema.addColumn(new Column("imdb_url", OdpsType.STRING))
schema.addPartitionColumn(new Column("ds", OdpsType.STRING))
//databricks整合了odps建表工具类OdpsUtils
val odpsUtils= OdpsUtils(akId, aks, odpsUrl)
//创建MC表
odpsUtils.createTable(project, table, schema, flag)
//参数说明:flag:是否覆盖原有的表(true:覆盖,false:不覆盖)
写入数据到分区表table_movie中
%spark
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" //参考文档
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" //参考文档
//写入数据到maxcompute新创建的分区表中
readDF.write.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", odpsUrl)
.option("tunnelUrl", tunnelUrl)
.option("table", table)
.option("project",project)
.option("accessKeySecret", aks)
.option("accessKeyId", akId)
.option("partitionSpec", "ds='20190522'").mode(SaveMode.Overwrite).save()
查看数据是否写入成功
%spark
val project = "your project"
val table = "table_movie"
//读取分区ds=20190522数据
val DF = spark.read.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", odpsUrl)
.option("tunnelUrl", tunnelUrl)
.option("project",project)
.option("table",table)
.option("accessKeySecret",aks)
.option("accessKeyId", akId)
.option("inferSchema","true")
.option("partitionSpec", "ds=20190522")
.load()
DF.show()
文档内容是否对您有帮助?