访问OSS数据源
AnalyticDB MySQLAnalyticDB MySQL支持通过Spark访问对象存储OSS中的数据。本文介绍如何通过Spark访问对象存储OSS中的数据。
前提条件
AnalyticDB MySQL湖仓版(3.0)集群与OSS存储空间位于相同地域。
已在湖仓版(3.0)集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建湖仓版(3.0)集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
已授权AnalyticDB MySQL扮演AliyunADBSparkProcessingDataRole角色来访问其他云资源。具体操作,请参见快速授权。
操作步骤
上传文件至OSS。具体操作,请参见上传文件。
准备用于数据读取的文本文件,文件内容可以自定义。将文本文件上传至OSS。本文示例的文本文件名为
readme.txt
。准备Python文件,将Python文件上传至OSS。本文示例的Python文件名为
example.py
,用于读取文本文件readme.txt的第一行内容。import sys from pyspark.sql import SparkSession # 初始Spark spark = SparkSession.builder.appName('OSS Example').getOrCreate() # 读取指定的文件,文件路径由agrs传入的参数值来指定 textFile = spark.sparkContext.textFile(sys.argv[1]) # 计算文件行数并打印 print("File total lines: " + str(textFile.count())) # 打印文件的第一行内容 print("First line is: " + textFile.first())
进入Spark开发编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
在左侧导航栏,单击 。
在编辑器窗口上方,选择Job型资源组和Spark应用类型。本文以Batch类型为例。
在编辑器中输入以下Spark代码。以下作业并读取OSS中的文件并打印出来行数和第一行内容。
{ "args": ["oss://testBucketName/data/readme.txt"], "name": "spark-oss-test", "file": "oss://testBucketName/data/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.resourceSpec": "small", "spark.executor.instances": 1 } }
参数说明如下。
参数名称
参数说明
args
传入Spark应用的参数值,多个参数值之间以英文逗号(,)分隔。
本文示例将文本文件的OSS路径赋值给
textFile
。file
Spark应用主文件的存储路径。主文件是入口类所在的JAR包或者Python的入口执行文件。
重要Spark应用主文件目前只支持存储在OSS中。
name
Spark应用的名称。
conf
与开源Spark中的配置项基本一致,参数格式为
key: value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数,请参见Spark应用配置参数说明。单击立即执行。
相关文档
Spark应用的开发概述,请参见Spark离线应用开发。
Spark应用的配置参数说明,请参见Spark应用配置参数说明。