本文将以两种方式为您介绍如何在Python 3.6及更高版本中,利用Python的工具包PyJindo来操作OSS-HDFS。
背景信息
方式一:直接使用PyJindo包是直接使用PyJindo包的原生接口来操作OSS-HDFS。这种方式更依赖于PyJindo自己的API,且通常需要更深入了解PyJindo包的特定函数和类。
方式二:使用fsspec接口则是通过fsspec
接口来操作OSS-HDFS,使用PyJindo包中实现了fsspec
协议的JindoOssFileSystem
类。这种方式对于已经熟悉fsspec
或希望在不同存储系统间无缝切换的用户来说,无疑提供了极大的便利与灵活性。
总结来说,上述两种方式都能够有效地实现对OSS-HDFS的互动操作,但它们的接口风格和集成方式有所不同。您可以根据自己的需求和偏好选择使用方式一还是方式二。
前提条件
方式一:直接使用PyJindo包
直接使用PyJindo中所提供的原生API接口和类实现对OSS-HDFS的深度操作。日志级别和API相关内容,请参见日志等级和API说明。
步骤一:安装PyJindo
EMR-5.17.x及之后版本、EMR-3.51.x及之后版本
自EMR-5.17.x及后续版本,以及EMR-3.51.x及其后续版本起,在创建的集群中已预装了Python 3.8版本的PyJindo库。因此,您无需进行手动安装,可直接跳过该步骤。
EMR-5.17.x之前版本、EMR-3.51.x之前版本
在下载页面,下载最新的tar.gz包。
本文示例是下载6.3.x目录下6.3.2版本的tar.gz包。例如,jindosdk-6.3.2-linux.tar.gz。
解压缩下载好的tar.gz包,并在以下目录结构中找到PyJindo安装包。
本示例以Python 3.8环境为例。例如,pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl。
. ├── bin │ ├── xxx ├── conf │ ├── xxx ├── include │ ├── xxx ├── lib │ ├── xxx │ ├── native │ │ ├── xxxx │ └── site-packages │ ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl │ └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl ├── plugins │ └── xxxx ├── tools │ ├── xxx └── versions ├── xxx
上传pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl到目标服务器。
本示例上传至您EMR集群的home目录中。登录集群Master节点的具体操作,请参见登录集群。
(可选)确认环境变量。
EMR环境:默认存在以下环境变量,无需配置。
非EMR环境:配置方式请参见在非EMR集群中部署JindoSDK。其中,Hadoop配置文件及HADOOP_CONF_DIR不是必须,仅为兼容HADOOP环境中的配置。
export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
安装和升级pip及PyJindo安装包。
python3.8 -m ensurepip python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/ python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl
步骤二:编写并执行测试文件
编写测试文件fs_test.py。
from pyjindo import fs # 阿里云OSS的Bucket名称,请根据实际情况替换。 bucket = "jindosdk-****" # 阿里云OSS-HDFS的Endpoint,请根据实际情况替换。 endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com" root_path = "oss://" + endpoint + "/" sub_dir = root_path + "pyjindotest/" file_path = root_path + "hello.txt" file_path2 = sub_dir + "hello.txt" config = fs.read_config() fs = fs.connect(root_path, "root", config) # 使用fs.open()函数以二进制写入模式打开指定路径的文件,如果文件不存在则创建新文件。 out_file = fs.open(file_path, "wb") # 写入数据。 out_file.write(str.encode("hello world, pyjindo")) out_file.close() in_file = fs.open(file_path, "rb") # 读取文件全部内容并保存在变量data中。 data = in_file.read() print("写入的数据为%s." % (data)) in_file.close() # 列出文件。 ls_file = fs.listdir(root_path) print("目录文件为%s." % (ls_file)) # 创建目录。 fs.mkdir(sub_dir) # 移动并重命名文件。 fs.rename(file_path, file_path2) # 列出文件。 mv_file = fs.listdir(sub_dir) print("移动后的目录文件为%s." % (mv_file)) # 删除测试文件,重新列出文件。 fs.remove(file_path2) de_file = fs.listdir(sub_dir) print("删除文件后的pyjindotest目录下文件为%s." % (de_file))
执行测试文件。
python3.8 fs_test.py
执行结果如下所示。
写入的数据为b'hello world, pyjindo'. 目录文件为[<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt': type=File, size=20>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/': type=Directory>]. 移动后的目录文件为[<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt': type=File, size=20>]. 删除文件后的pyjindotest目录下文件为[].
方式二:使用fsspec接口
通过使用fsspec这一通用文件系统接口标准,您可以方便地使用集成的JindoOssFileSystem类来与阿里云OSS-HDFS进行交互。日志级别和API相关内容,请参见日志等级和API说明。更多接口上说明,请参见fsspec。
步骤一:安装PyJindo
EMR-5.17.x及之后版本、EMR-3.51.x及之后版本
自EMR-5.17.x及后续版本,以及EMR-3.51.x及其后续版本起,在创建的集群中已预装了Python 3.8版本的PyJindo库。因此,您无需进行手动安装,可直接跳过该步骤。
EMR-5.17.x之前版本、EMR-3.51.x之前版本
在下载页面,下载最新的tar.gz包。
本文示例是下载6.3.x目录下6.3.2版本的tar.gz包。例如,jindosdk-6.3.2-linux.tar.gz。
解压缩下载好的tar.gz包,并在以下目录结构中找到PyJindo安装包。
本示例以Python 3.8环境为例。例如,pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl。
. ├── bin │ ├── xxx ├── conf │ ├── xxx ├── include │ ├── xxx ├── lib │ ├── xxx │ ├── native │ │ ├── xxxx │ └── site-packages │ ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl │ └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl ├── plugins │ └── xxxx ├── tools │ ├── xxx └── versions ├── xxx
上传pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl到目标服务器。
本示例上传至您EMR集群的home目录中。登录集群Master节点的具体操作,请参见登录集群。
(可选)确认环境变量。
EMR环境:默认存在以下环境变量,无需配置。
非EMR环境:配置方式请参见在非EMR集群中部署JindoSDK。其中,Hadoop配置文件及HADOOP_CONF_DIR不是必须,仅为兼容HADOOP环境中的配置。
export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
安装和升级pip及PyJindo安装包。
python3.8 -m ensurepip python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/ python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl
步骤二:安装依赖fsspec
本文以Python3.8环境安装fssepc为例,安装命令如下所示。
python3.8 -m pip install fsspec --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/
步骤三:编写并执行测试文件
编写测试文件ossfs_test.py。
from pyjindo.ossfs import JindoOssFileSystem # 阿里云OSS的Bucket名称,请根据实际情况替换。 bucket = "jindosdk-****" # 阿里云OSS-HDFS的Endpoint,请根据实际情况替换。 endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com" root_path = "oss://" + endpoint + "/" sub_dir = root_path + "pyjindotest/" file_path = root_path + "hello.txt" file_path2 = sub_dir + "hello.txt" fs = JindoOssFileSystem(root_path) # 使用fs.open()函数以二进制写入模式打开指定路径的文件,如果文件不存在则创建新文件。 out_file = fs.open(file_path, "wb") # 写入数据。 out_file.write(str.encode("hello world, pyjindo")) out_file.close() in_file = fs.open(file_path, "rb") # 读取文件全部内容并保存在变量data中。 data = in_file.read() print("写入的数据为%s." % (data)) in_file.close() # 列出文件。 ls_file = fs.ls(root_path, detail=False) print("目录文件为%s." % (ls_file)) assert file_path in fs.glob(root_path + "*") # 创建目录。 fs.mkdir(sub_dir) # 移动并重命名文件。 fs.rename(file_path, file_path2) # 列出文件。 mv_file = fs.listdir(sub_dir, detail=False) print("移动后的目录文件为%s." % (mv_file)) # 删除测试文件,重新列出文件。 fs.rm(file_path2) de_file = fs.ls(sub_dir) print("删除文件后的pyjindotest目录下文件为%s." % (de_file))
执行测试文件。
python3.8 ossfs_test.py
执行结果如下所示。
写入的数据为b'hello world, pyjindo'. 目录文件为['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/test/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/']. 移动后的目录文件为['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt']. 删除文件后的pyjindotest目录下文件为[].
日志等级
调整JINDOSDK_CONF_DIR下的jindosdk.cfg配置,EMR环境中对应/etc/taihao-apps/jindosdk-conf。
[common]
logger.dir = /var/log/emr/jindosdk
logger.level = 2
logger.verbose = 0
logger.sync = false
logger.jnilogger = true
logger.consolelogger = false
logger.cleaner.enable = true
配置项 | 描述 |
logger.dir | 日志文件的存储目录。 |
logger.level | 日志等级。推荐设置为2,这通常意味着记录INFO级别以上的日志。设置小于等于1,表示设置日记级别为WARN及以上。 |
logger.verbose | 详细日志的等级,数字越大,记录的日志越详细。取值范围为0~99。 |
logger.sync | 是否同步输出日志。推荐为false,表示不同步输出日志。 |
logger.jnilogger | 与Java Native Interface (JNI) 相关的日志设置,与PyJindo无关。 |
logger.consolelogger | 是否在终端输出日志,通常用于调试,与PyJindo无关。 |
logger.cleaner.enable | 是否开启日志自动清理功能。这个功能会定期清理旧的日志文件,以避免占用过多磁盘空间。推荐为true,表示开启日志自动清理。 |
API说明
Config类
成员函数 | 返回值类型 | 描述 |
set(key, val) | 无 | 设置字符串类型的配置项,其中key和val都是字符串。 |
get(key, default='') | str | 获取字符串类型的配置项值。 |
contains(key) | bool | 检查配置中是否存在指定的key。 |
FileType枚举
枚举类型 | 枚举值 | 描述 |
Unknown | 0 | 未知类型或者无法识别的文件。 |
Directory | 1 | 目录。 |
File | 2 | 文件。 |
Symlink | 3 | 软链接。 |
FileInfo类
成员属性 | 返回值类型 | 描述 |
type | FileType | 文件类型。 |
is_file | bool | 表示是否为文件。 |
is_dir | bool | 表示是否为目录。 |
is_symlink | bool | 表述是否为软链接。 |
path | str | 表示文件的完整路径。 |
user | str | 表示文件的所有者用户名。 |
group | str | 表示文件所属的用户组名。 |
size | int | 表示文件大小。 |
perm | int | 表示文件的权限位。 |
atime | datetime | 表示文件最后一次访问的时间。 |
mtime | datetime | 表示文件最后一次修改的时间。 |
FileStream类
成员函数 | 返回值类型 | 描述 |
readable() | bool | 表示该文件流是否可读。 |
writable() | bool | 表示该文件流是否可写。 |
seekable() | bool | 表示该文件流是否支持随机访问(即能否通过seek()函数改变文件读写位置)。 |
closed() | bool | 表示该文件流是否已关闭。 |
close() | 无 | 关闭当前文件流。在关闭过程中如果发生错误,则抛出IOError异常。 |
size() | int | 表示文件大小(仅当文件可读时可用)。若失败则抛出IOError异常。 |
tell() | int | 当前文件流的位置。若失败则抛出IOError异常。 |
flush() | 无 | 将缓冲区中的数据强制写入到文件中。若失败则抛出IOError异常。 |
write(data) | 无 | 接收一个bytes类型的数据,并将其写入到文件中。若失败则抛出IOError异常。 |
read(nbytes) | bytes | 从文件中读取指定大小(nbytes)的数据,返回一个bytes对象。若失败则抛出IOError异常。 |
pread(nbytes, offset) | bytes | 从文件的特定偏移量(offset)处开始,读取指定大小(nbytes)的数据。若失败则抛出IOError异常。 |
readall() | bytes | 读取整个文件内容。若失败则抛出IOError异常。 |
download(stream_or_path, buffer_size) | 无 | 从当前文件流下载内容,并将其写入到本地路径或目标流中。stream_or_path参数可以是本地路径也可以是另一个文件流。若失败则抛出IOError异常。 |
upload(stream, buffer_size) | 无 | 从给定的stream流中读取数据并写入到当前文件中。若失败则抛出IOError异常。 |
FileSystem类型
成员函数 | 返回值类型 | 描述 |
mkdir(path, recursive) | bool | 创建指定路径的目录,如果recursive参数为True,则会递归创建所有不存在的父目录。若失败则抛出IOError异常。 |
rename(src, dest) | bool | 将src路径的文件或目录重命名为dest路径。若失败则抛出IOError异常。 |
get_file_info(path) | FileInfo | 获取指定路径文件的详细信息。若失败则抛出IOError异常。 |
exists(path) | bool | 检查指定路径的文件或目录是否存在。若失败则抛出IOError异常。 |
listdir(path, recursive) | FileInfo列表 | 列举指定路径下的文件或子目录信息。若recursive为True,则递归列出所有子目录下的文件信息。若失败则抛出IOError异常。 |
chmod(path, perm) | bool | 修改指定路径文件或目录的权限,类似setPermission。perm参数是八进制表示的权限码,如0o777。若失败则抛出IOError异常。 |
chown(path, owner, group) | bool | 改变指定路径文件或目录的所有者和所属组,类似setOwner。owner和group分别代表新的用户名和用户组名。若失败则抛出IOError异常。 |
open(path, mode, buffer_size=None) | FileStream | 打开指定路径的文件,mode支持 |
download(path, stream_or_path, buffer_size=None) | 无 | 从远程文件系统下载指定路径的文件到本地,stream_or_path可以是一个本地文件路径或一个文件流。buffer_size默认为64KB,实际使用时如果配置中存在fs.oss.read.buffer.size,则优先采用该配置值来确定读取缓冲区大小。若失败则抛出IOError异常。 |
upload(path, stream, buffer_size=None) | 无 | 将给定的流stream上传至远程文件系统指定的路径。buffer_size默认为64KB,实际使用时如果配置中存在fs.oss.write.buffer.size,则优先采用该配置值来确定写入缓冲区大小。若失败则抛出IOError异常。 |
copy_file(src, dest, buffer_size=None) | 无 | 在文件系统内部拷贝文件,从src路径拷贝到dest路径。buffer_size默认为64KB,实际使用时如果配置中存在fs.oss.read.buffer.size,则优先采用该配置值来确定读取缓冲区大小。若失败则抛出IOError异常。 |
fs模块
全局函数 | 返回值类型 | 描述 |
read_config() | Config | 用于读取配置信息。 它会优先检查环境变量 |
connect(uri, user, config) | FileSystem | 初始化FileSystem。若失败则抛出IOError异常。 |