云原生数据仓库 AnalyticDB MySQL 版Spark支持访问OSS-HDFS数据源,本文介绍如何使用Spark来操作OSS-HDFS数据。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
集群与OSS存储空间位于相同地域。
已创建Job型资源组。具体操作,请参见新建资源组。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
开启OSS-HDFS服务。具体操作,请参见开通OSS-HDFS服务。
Spark Jar模式读写OSS-HDFS数据源
编写访问OSS-HDFS的示例程序(即Spark作业依赖的JAR包),进行编译打包后生成的JAR包名称为
oss_hdfs_demo.jar
。示例代码如下:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkHDFS { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("Spark HDFS TEST") .getOrCreate() val welcome = "hello, adb-spark" //hdfs目录用于存放内容 val hdfsPath = args(0); //将welcome字符串存入指定的hdfs目录 sparkSession.sparkContext.parallelize(Seq(welcome)).saveAsTextFile(hdfsPath) //从指定的hdfs目录中读取内容,并打印 sparkSession.sparkContext.textFile(hdfsPath).collect.foreach(println) } }
将
oss_hdfs_demo.jar
包上传到OSS-HDFS。具体操作,请参见通过Hadoop Shell命令访问。登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和Spark应用类型。本文以Batch类型为例。
在编辑器中输入以下Spark代码。读取OSS中的文件并打印出来行数和第一行内容。
{ "args": ["oss://testBucketName/data/oss_hdfs"], "file": "oss://testBucketName/data/oss_hdfs_demo.jar", "name": "spark-on-hdfs", "className": "com.aliyun.spark.SparkHDFS", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", "spark.adb.connectors": "jindo" } }
参数说明:
参数名称
参数说明
args
Spark JAR作业运行的参数。本文示例代码需要在args传入读写的OSS-HDFS路径。
本文示例为:
oss://testBucketName/data/oss_hdfs
。file
JAR包所属的OSS-HDFS路径。
本文示例为:
oss://testBucketName/data/oss_hdfs_demo.jar
。name
Spark应用的名称。
spark.adb.connectors
本文读取OSS-HDFS数据使用的连接器为:
jindo
。conf
与开源Spark中的配置项基本一致,参数格式为
key: value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。单击立即执行,执行完成后,您可以在Spark Jar开发页面应用列表页签中的日志查看数据。详情请参见Spark开发编辑器。
Spark SQL模式读写OSS-HDFS数据源
在OSS-HDFS上创建库路径和表路径。具体操作,请参见通过Hadoop Shell命令访问。本文示例为:
库路径:
oss://{bucket}/jindo_test
;表路径:oss://{bucket}/jindo_test/tbl
。编写访问OSS-HDFS的Spark SQL。
SET spark.driver.resourceSpec=small; SET spark.executor.instances=1; SET spark.executor.resourceSpec=small; SET spark.adb.connectors=jindo; CREATE DATABASE IF NOT EXISTS jindo_test LOCATION 'oss://{bucket}/jindo_test'; USE jindo_test; CREATE TABLE IF NOT EXISTS tbl(id int, name string) LOCATION 'oss://{bucket}/jindo_test/tbl'; INSERT INTO tbl values(1, 'aaa'); SELECT * FROM tbl;
单击立即执行。