使用PyJindo访问阿里云OSS-HDFS

本文将以两种方式为您介绍如何在Python 3.6及更高版本中,利用Python的工具包PyJindo来操作OSS-HDFS。

背景信息

方式一:直接使用PyJindo包是直接使用PyJindo包的原生接口来操作OSS-HDFS。这种方式更依赖于PyJindo自己的API,且通常需要更深入了解PyJindo包的特定函数和类。

方式二:使用fsspec接口则是通过fsspec接口来操作OSS-HDFS,使用PyJindo包中实现了fsspec协议的JindoOssFileSystem类。这种方式对于已经熟悉fsspec或希望在不同存储系统间无缝切换的用户来说,无疑提供了极大的便利与灵活性。

总结来说,上述两种方式都能够有效地实现对OSS-HDFS的互动操作,但它们的接口风格和集成方式有所不同。您可以根据自己的需求和偏好选择使用方式一还是方式二。

前提条件

已创建且登录集群,详情请参见创建集群登录集群

方式一:直接使用PyJindo包

直接使用PyJindo中所提供的原生API接口和类实现对OSS-HDFS的深度操作。日志级别和API相关内容,请参见日志等级API说明

步骤一:安装PyJindo

EMR-5.17.x及之后版本、EMR-3.51.x及之后版本

自EMR-5.17.x及后续版本,以及EMR-3.51.x及其后续版本起,在创建的集群中已预装了Python 3.8版本的PyJindo库。因此,您无需进行手动安装,可直接跳过该步骤。

EMR-5.17.x之前版本、EMR-3.51.x之前版本

  1. 下载页面,下载最新的tar.gz包。

    本文示例是下载6.3.x目录下6.3.2版本的tar.gz包。例如,jindosdk-6.3.2-linux.tar.gz。

  2. 解压缩下载好的tar.gz包,并在以下目录结构中找到PyJindo安装包。

    本示例以Python 3.8环境为例。例如,pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl。

        .
        ├── bin
        │   ├── xxx
        ├── conf
        │   ├── xxx
        ├── include
        │   ├── xxx
        ├── lib
        │   ├── xxx
        │   ├── native
        │   │   ├── xxxx
        │   └── site-packages
        │       ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl
        │       └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl
        ├── plugins
        │   └── xxxx
        ├── tools
        │   ├── xxx
        └── versions
            ├── xxx
  3. 上传pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl到目标服务器。

    本示例上传至您EMR集群的home目录中。登录集群Master节点的具体操作,请参见登录集群

  4. (可选)确认环境变量。

    • EMR环境:默认存在以下环境变量,无需配置。

    • 非EMR环境:配置方式请参见在非EMR集群中部署JindoSDK。其中,Hadoop配置文件及HADOOP_CONF_DIR不是必须,仅为兼容HADOOP环境中的配置。

      export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf
      export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  5. 安装和升级pip及PyJindo安装包。

    python3.8 -m ensurepip
    python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/
    python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl

步骤二:编写并执行测试文件

  1. 编写测试文件fs_test.py。

    from pyjindo import fs
    
    # 阿里云OSS的Bucket名称,请根据实际情况替换。
    bucket = "jindosdk-****"
    # 阿里云OSS-HDFS的Endpoint,请根据实际情况替换。
    endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com"
    root_path = "oss://" + endpoint + "/"
    sub_dir = root_path + "pyjindotest/"
    file_path = root_path + "hello.txt"
    file_path2 = sub_dir + "hello.txt"
    config = fs.read_config()
    fs = fs.connect(root_path, "root", config)
    # 使用fs.open()函数以二进制写入模式打开指定路径的文件,如果文件不存在则创建新文件。
    out_file = fs.open(file_path, "wb")
    # 写入数据。
    out_file.write(str.encode("hello world, pyjindo"))
    out_file.close()
    
    in_file = fs.open(file_path, "rb")
    # 读取文件全部内容并保存在变量data中。
    data = in_file.read()
    print("写入的数据为%s." % (data))
    in_file.close()
    
    # 列出文件。
    ls_file = fs.listdir(root_path)
    print("目录文件为%s." % (ls_file))
    
    # 创建目录。
    fs.mkdir(sub_dir)
    # 移动并重命名文件。
    fs.rename(file_path, file_path2)
    # 列出文件。
    mv_file = fs.listdir(sub_dir)
    print("移动后的目录文件为%s." % (mv_file))
    
    # 删除测试文件,重新列出文件。
    fs.remove(file_path2)
    de_file = fs.listdir(sub_dir)
    print("删除文件后的pyjindotest目录下文件为%s." % (de_file))
    
  2. 执行测试文件。

    python3.8 fs_test.py

    执行结果如下所示。

    写入的数据为b'hello world, pyjindo'.
    目录文件为[<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt': type=File, size=20>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/': type=Directory>].
    移动后的目录文件为[<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt': type=File, size=20>].
    删除文件后的pyjindotest目录下文件为[].

方式二:使用fsspec接口

通过使用fsspec这一通用文件系统接口标准,您可以方便地使用集成的JindoOssFileSystem类来与阿里云OSS-HDFS进行交互。日志级别和API相关内容,请参见日志等级API说明。更多接口上说明,请参见fsspec

步骤一:安装PyJindo

EMR-5.17.x及之后版本、EMR-3.51.x及之后版本

自EMR-5.17.x及后续版本,以及EMR-3.51.x及其后续版本起,在创建的集群中已预装了Python 3.8版本的PyJindo库。因此,您无需进行手动安装,可直接跳过该步骤。

EMR-5.17.x之前版本、EMR-3.51.x之前版本

  1. 下载页面,下载最新的tar.gz包。

    本文示例是下载6.3.x目录下6.3.2版本的tar.gz包。例如,jindosdk-6.3.2-linux.tar.gz。

  2. 解压缩下载好的tar.gz包,并在以下目录结构中找到PyJindo安装包。

    本示例以Python 3.8环境为例。例如,pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl。

        .
        ├── bin
        │   ├── xxx
        ├── conf
        │   ├── xxx
        ├── include
        │   ├── xxx
        ├── lib
        │   ├── xxx
        │   ├── native
        │   │   ├── xxxx
        │   └── site-packages
        │       ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl
        │       ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl
        │       └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl
        ├── plugins
        │   └── xxxx
        ├── tools
        │   ├── xxx
        └── versions
            ├── xxx
  3. 上传pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl到目标服务器。

    本示例上传至您EMR集群的home目录中。登录集群Master节点的具体操作,请参见登录集群

  4. (可选)确认环境变量。

    • EMR环境:默认存在以下环境变量,无需配置。

    • 非EMR环境:配置方式请参见在非EMR集群中部署JindoSDK。其中,Hadoop配置文件及HADOOP_CONF_DIR不是必须,仅为兼容HADOOP环境中的配置。

      export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf
      export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
  5. 安装和升级pip及PyJindo安装包。

    python3.8 -m ensurepip
    python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/
    python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl

步骤二:安装依赖fsspec

本文以Python3.8环境安装fssepc为例,安装命令如下所示。

python3.8 -m pip install fsspec --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/

步骤三:编写并执行测试文件

  1. 编写测试文件ossfs_test.py。

    from pyjindo.ossfs import JindoOssFileSystem
    
    # 阿里云OSS的Bucket名称,请根据实际情况替换。
    bucket = "jindosdk-****"
    # 阿里云OSS-HDFS的Endpoint,请根据实际情况替换。
    endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com"
    root_path = "oss://" + endpoint + "/"
    sub_dir = root_path + "pyjindotest/"
    file_path = root_path + "hello.txt"
    file_path2 = sub_dir + "hello.txt"
    fs = JindoOssFileSystem(root_path)
    # 使用fs.open()函数以二进制写入模式打开指定路径的文件,如果文件不存在则创建新文件。
    out_file = fs.open(file_path, "wb")
    # 写入数据。
    out_file.write(str.encode("hello world, pyjindo"))
    out_file.close()
    
    in_file = fs.open(file_path, "rb")
    # 读取文件全部内容并保存在变量data中。
    data = in_file.read()
    print("写入的数据为%s." % (data))
    in_file.close()
    
    # 列出文件。
    ls_file = fs.ls(root_path, detail=False)
    print("目录文件为%s." % (ls_file))
    assert file_path in fs.glob(root_path + "*")
    
    # 创建目录。
    fs.mkdir(sub_dir)
    # 移动并重命名文件。
    fs.rename(file_path, file_path2)
    # 列出文件。
    mv_file = fs.listdir(sub_dir, detail=False)
    print("移动后的目录文件为%s." % (mv_file))
    
    # 删除测试文件,重新列出文件。
    fs.rm(file_path2)
    de_file = fs.ls(sub_dir)
    print("删除文件后的pyjindotest目录下文件为%s." % (de_file))
    
  2. 执行测试文件。

    python3.8 ossfs_test.py

    执行结果如下所示。

    写入的数据为b'hello world, pyjindo'.
    目录文件为['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/test/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/'].
    移动后的目录文件为['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt'].
    删除文件后的pyjindotest目录下文件为[].

日志等级

调整JINDOSDK_CONF_DIR下的jindosdk.cfg配置,EMR环境中对应/etc/taihao-apps/jindosdk-conf。

[common]
logger.dir = /var/log/emr/jindosdk
logger.level = 2
logger.verbose = 0
logger.sync = false
logger.jnilogger = true
logger.consolelogger = false
logger.cleaner.enable = true

配置项

描述

logger.dir

日志文件的存储目录。

logger.level

日志等级。推荐设置为2,这通常意味着记录INFO级别以上的日志。设置小于等于1,表示设置日记级别为WARN及以上。

logger.verbose

详细日志的等级,数字越大,记录的日志越详细。取值范围为0~99。

logger.sync

是否同步输出日志。推荐为false,表示不同步输出日志。

logger.jnilogger

与Java Native Interface (JNI) 相关的日志设置,与PyJindo无关。

logger.consolelogger

是否在终端输出日志,通常用于调试,与PyJindo无关。

logger.cleaner.enable

是否开启日志自动清理功能。这个功能会定期清理旧的日志文件,以避免占用过多磁盘空间。推荐为true,表示开启日志自动清理。

API说明

Config类

成员函数

返回值类型

描述

set(key, val)

设置字符串类型的配置项,其中key和val都是字符串。

get(key, default='')

str

获取字符串类型的配置项值。

contains(key)

bool

检查配置中是否存在指定的key。

FileType枚举

枚举类型

枚举值

描述

Unknown

0

未知类型或者无法识别的文件。

Directory

1

目录。

File

2

文件。

Symlink

3

软链接。

FileInfo类

成员属性

返回值类型

描述

type

FileType

文件类型。

is_file

bool

表示是否为文件。

is_dir

bool

表示是否为目录。

is_symlink

bool

表述是否为软链接。

path

str

表示文件的完整路径。

user

str

表示文件的所有者用户名。

group

str

表示文件所属的用户组名。

size

int

表示文件大小。

perm

int

表示文件的权限位。

atime

datetime

表示文件最后一次访问的时间。

mtime

datetime

表示文件最后一次修改的时间。

FileStream类

成员函数

返回值类型

描述

readable()

bool

表示该文件流是否可读。

writable()

bool

表示该文件流是否可写。

seekable()

bool

表示该文件流是否支持随机访问(即能否通过seek()函数改变文件读写位置)。

closed()

bool

表示该文件流是否已关闭。

close()

关闭当前文件流。在关闭过程中如果发生错误,则抛出IOError异常。

size()

int

表示文件大小(仅当文件可读时可用)。若失败则抛出IOError异常。

tell()

int

当前文件流的位置。若失败则抛出IOError异常。

flush()

将缓冲区中的数据强制写入到文件中。若失败则抛出IOError异常。

write(data)

接收一个bytes类型的数据,并将其写入到文件中。若失败则抛出IOError异常。

read(nbytes)

bytes

从文件中读取指定大小(nbytes)的数据,返回一个bytes对象。若失败则抛出IOError异常。

pread(nbytes, offset)

bytes

从文件的特定偏移量(offset)处开始,读取指定大小(nbytes)的数据。若失败则抛出IOError异常。

readall()

bytes

读取整个文件内容。若失败则抛出IOError异常。

download(stream_or_path, buffer_size)

从当前文件流下载内容,并将其写入到本地路径或目标流中。stream_or_path参数可以是本地路径也可以是另一个文件流。若失败则抛出IOError异常。

upload(stream, buffer_size)

从给定的stream流中读取数据并写入到当前文件中。若失败则抛出IOError异常。

FileSystem类型

成员函数

返回值类型

描述

mkdir(path, recursive)

bool

创建指定路径的目录,如果recursive参数为True,则会递归创建所有不存在的父目录。若失败则抛出IOError异常。

rename(src, dest)

bool

将src路径的文件或目录重命名为dest路径。若失败则抛出IOError异常。

get_file_info(path)

FileInfo

获取指定路径文件的详细信息。若失败则抛出IOError异常。

exists(path)

bool

检查指定路径的文件或目录是否存在。若失败则抛出IOError异常。

listdir(path, recursive)

FileInfo列表

列举指定路径下的文件或子目录信息。若recursive为True,则递归列出所有子目录下的文件信息。若失败则抛出IOError异常。

chmod(path, perm)

bool

修改指定路径文件或目录的权限,类似setPermission。perm参数是八进制表示的权限码,如0o777。若失败则抛出IOError异常。

chown(path, owner, group)

bool

改变指定路径文件或目录的所有者和所属组,类似setOwner。owner和group分别代表新的用户名和用户组名。若失败则抛出IOError异常。

open(path, mode, buffer_size=None)

FileStream

打开指定路径的文件,mode支持rbwb。buffer_size默认为64KB,用于指定在执行upload、download、copy_file操作时的buffer大小。若失败则抛出IOError异常。

download(path, stream_or_path, buffer_size=None)

从远程文件系统下载指定路径的文件到本地,stream_or_path可以是一个本地文件路径或一个文件流。buffer_size默认为64KB,实际使用时如果配置中存在fs.oss.read.buffer.size,则优先采用该配置值来确定读取缓冲区大小。若失败则抛出IOError异常。

upload(path, stream, buffer_size=None)

将给定的流stream上传至远程文件系统指定的路径。buffer_size默认为64KB,实际使用时如果配置中存在fs.oss.write.buffer.size,则优先采用该配置值来确定写入缓冲区大小。若失败则抛出IOError异常。

copy_file(src, dest, buffer_size=None)

在文件系统内部拷贝文件,从src路径拷贝到dest路径。buffer_size默认为64KB,实际使用时如果配置中存在fs.oss.read.buffer.size,则优先采用该配置值来确定读取缓冲区大小。若失败则抛出IOError异常。

fs模块

全局函数

返回值类型

描述

read_config()

Config

用于读取配置信息。

它会优先检查环境变量JINDOSDK_CONF_DIR是否存在,如果存在,则尝试从${JINDOSDK_CONF_DIR}/jindosdk.cfg中加载配置。如果在该配置文件中未禁用Hadoop相关配置(即hadoopConf.enable不是false),还会进一步检查环境变量HADOOP_CONF_DIR,并尝试从${HADOOP_CONF_DIR}/core-site.xml中获取附加的配置信息。

connect(uri, user, config)

FileSystem

初始化FileSystem。若失败则抛出IOError异常。