大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的TB/PB级数据仓库解决方案。本文主要介绍如何通过DLA Serverless Spark访问MaxCompute。
操作步骤
- 准备以下测试代码和依赖包来访问MaxCompute,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
object MaxComputeDataSourcePartitionSample {
def main(args: Array[String]): Unit = {
val accessKeyId = args(0)
val accessKeySecret = args(1)
val odpsUrl = args(2)
val tunnelUrl = args(3)
val project = args(4)
val table = args(5)
var numPartitions = 1
if(args.length > 6)
numPartitions = args(6).toInt
val ss = SparkSession.builder().appName("Test Odps Read").getOrCreate()
import ss.implicits._
val dataSeq = (1 to 1000000).map {
index => (index, (index-3).toString)
}.toSeq
val df = ss.sparkContext.makeRDD(dataSeq).toDF("a", "b")
System.out.println("*****" + table + ",before overwrite table")
df.write.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", odpsUrl)
.option("tunnelUrl", tunnelUrl)
.option("table", table)
.option("project", project)
.option("accessKeySecret", accessKeySecret)
.option("partitionSpec", "pt='2018-04-01'")
.option("allowCreateNewPartition", true)
.option("accessKeyId", accessKeyId).mode(SaveMode.Overwrite).save()
System.out.println("*****" + table + ",after overwrite table, before read table")
val readDF = ss.read
.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", odpsUrl)
.option("tunnelUrl", tunnelUrl)
.option("table", table)
.option("project", project)
.option("accessKeySecret", accessKeySecret)
.option("accessKeyId", accessKeyId)
.option("partitionSpec", "pt='2018-04-01'")
.option("numPartitions",numPartitions).load()
readDF.collect().foreach(println)
}
}
MaxCompute依赖的pom文件:
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-commons</artifactId>
<version>0.28.4-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.apsaradb</groupId>
<artifactId>maxcompute-spark</artifactId>
<version>0.28.4-public_2.4.3-1.0-SNAPSHOT</version>
</dependency>
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择MaxCompute项目空间所在地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{
"args": [
"xxx1", #具备访问MaxCompute权限的AccessKey ID。
"xxx2", #具备访问MaxCompute权限的AccessKey Secret。
"http://service.cn.maxcompute.aliyun-inc.com/api", #MaxCompute的VPC网络Endpoint。
"http://dt.cn-shenzhen.maxcompute.aliyun-inc.com", #MaxCompute的VPC网络Tunnel Endpoint。
"spark_on_maxcompute", #MaxCompute的工作空间名称。
"sparktest", #MaxCompute的数据表名。
"2" #MaxCompute数据表的分区数。
],
"file": "oss://spark_test/jars/maxcompute/spark-examples-0.0.1-SNAPSHOT.jar", #存放测试软件包的OSS路径。
"name": "Maxcompute-test",
"jars": [
##存放测试软件依赖包的OSS路径。
"oss://spark_test/jars/maxcompute/maxcompute-spark-0.28.4-public_2.4.3-1.0-SNAPSHOT.jar",
"oss://spark_test/jars/maxcompute/odps-sdk-commons-0.28.4-public.jar",
"oss://spark_test/jars/maxcompute/odps-sdk-core-0.28.4-public.jar",
"oss://spark_test/jars/maxcompute/mail-1.4.7.jar"
],
"className": "com.aliyun.spark.MaxComputeDataSourcePartitionSample",
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small"
}
}
执行结果
作业运行成功后,在任务列表中单击,查看作业日志。出现如下日志说明作业运行成功: