Tunnel SDK示例(Python)

Tunnel是MaxCompute的数据通道,您可以通过Tunnel向MaxCompute中上传或者下载数据,TunnelSDK是PyODPS的一部分,本文为您介绍使用Python版TunnelSDK上传下载数据的简单示例。

注意事项

  • 下文为您介绍简单的上传、下载数据的Python SDK示例,更多其他应用场景的Python SDK示例请参见Python SDK文档

  • 如果您安装了Cython,在安装PyODPS时会编译C代码,在上传和下载场景下,会加速Tunnel的上传和下载速度。

上传示例

import os
from odps import ODPS
from odps.tunnel import TableTunnel

# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

table = o.get_table('my_table')

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer(0) as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

# 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
upload_session.commit([0])

下载示例

from odps.tunnel import TableTunnel

tunnel = TableTunnel(odps)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

with download_session.open_record_reader(0, download_session.count) as reader:
     for record in reader:
         # 处理每条记录

with download_session.open_arrow_reader(0, download_session.count) as reader:
     for batch in reader:
         # 处理每个 Arrow RecordBatch