文档

MaxCompute

更新时间:
一键部署
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

本文介绍如何使用Databricks 读写MaxCompute数据。

前提条件

使用Databricks 读写MaxCompute数据

  • 读取maxCompute数据集的dwd_product_movie_basic_info表中ds=20170519分区的数据到DataFrame中,代码实现。

说明

警告

odpsUrl和tunnelUrl都需要设置为VPC内网访问格式,否则提交job的时候会因为集群中worker节点(无公网ip)连接不上odps导致job超时。

data
%spark

val akId = "your akId"
val aks = "your aks"
val project = "maxcompute_public_data"
val table = "dwd_product_movie_basic_info"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"  //参考文档
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"  //参考文档
//spark读取dwd_product_movie_basic_info表中ds=20170519分区的数据到DataFrame中;
val readDF = spark.read.format("org.apache.spark.aliyun.odps.datasource").option("odpsUrl", odpsUrl).option("tunnelUrl", tunnelUrl).option("project",project).option("table",table).option("accessKeySecret",aks).option("accessKeyId", akId).option("partitionSpec", "ds=20170519").load()
readDF.show()
data
  • 通过自定义Schema创建MaxCompute分区表数据,

说明

MC数据读取依赖ddi-datasources_shaded_2.11-1.0-SNAPSHOT.jar,该包中封装了spark调用odps数据的sdk,并封装了MaxCompute建表的工具类OdpsUtils。

%spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.aliyun.odps.{Column, OdpsType, TableSchema}
import org.apache.spark.aliyun.utils.OdpsUtils
//定义参数
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"  //参考文档
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"  //参考文档
val flag=true
//创建schema
val schema = new TableSchema
schema.addColumn(new Column("movie_name", OdpsType.STRING))
schema.addColumn(new Column("dirctor", OdpsType.STRING))
schema.addColumn(new Column("scriptwriter", OdpsType.STRING))
schema.addColumn(new Column("area", OdpsType.STRING))
schema.addColumn(new Column("actors", OdpsType.STRING))
schema.addColumn(new Column("type", OdpsType.STRING))
schema.addColumn(new Column("movie_length", OdpsType.STRING))
schema.addColumn(new Column("movie_date", OdpsType.STRING))
schema.addColumn(new Column("movie_language", OdpsType.STRING))
schema.addColumn(new Column("imdb_url", OdpsType.STRING))
schema.addPartitionColumn(new Column("ds", OdpsType.STRING))
//databricks整合了odps建表工具类OdpsUtils
val odpsUtils= OdpsUtils(akId, aks, odpsUrl)
//创建MC表
odpsUtils.createTable(project, table, schema, flag)
//参数说明:flag:是否覆盖原有的表(true:覆盖,false:不覆盖)
  • 写入数据到分区表table_movie中

%spark
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"  //参考文档
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"  //参考文档
//写入数据到maxcompute新创建的分区表中
readDF.write.format("org.apache.spark.aliyun.odps.datasource")
  .option("odpsUrl", odpsUrl)
  .option("tunnelUrl", tunnelUrl)
  .option("table", table)
  .option("project",project)
  .option("accessKeySecret", aks)
  .option("accessKeyId", akId)
  .option("partitionSpec", "ds='20190522'").mode(SaveMode.Overwrite).save()
  • 查看数据是否写入成功

%spark
val project = "your project"
val table = "table_movie"
//读取分区ds=20190522数据
val DF = spark.read.format("org.apache.spark.aliyun.odps.datasource")
        .option("odpsUrl", odpsUrl)
        .option("tunnelUrl", tunnelUrl)
        .option("project",project)
        .option("table",table)
        .option("accessKeySecret",aks)
        .option("accessKeyId", akId)
        .option("inferSchema","true")
        .option("partitionSpec", "ds=20190522")
        .load()
DF.show()
data