Streaming Tunnel SDK example (Python)

更新时间:
复制 MD 格式

Streaming Tunnel is MaxCompute's streaming data channel for uploading records to a table in real time. This topic shows how to use the Streaming Tunnel SDK for Python to write records to a partitioned MaxCompute table.

Streaming Tunnel supports uploads only. To modify or delete existing data, use other MaxCompute data manipulation methods.

Prerequisites

Before you begin, ensure that you have:

  • Installed the MaxCompute Python SDK (pyodps): pip install pyodps

  • An Alibaba Cloud account with an AccessKey ID and AccessKey secret

  • A MaxCompute project and the endpoint for your region

  • A target table with at least one partition

Upload data

The following example creates a stream upload session, opens a record writer, and writes two records to the pt=test partition.

import os
from odps import ODPS
from odps.tunnel import TableTunnel

# Authenticate using environment variables.
# Store your credentials as environment variables instead of hardcoding them.
odps = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),      # AccessKey ID
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),  # AccessKey secret
    project='<your-project>',
    endpoint='<your-endpoint>',
)

tunnel = TableTunnel(odps)
upload_session = tunnel.create_stream_upload_session('<your-table-name>', partition_spec='pt=test')

with upload_session.open_record_writer() as writer:
    # Method 1: assign field values by index
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    # Method 2: pass all field values as a list
    record = table.new_record(['test2', 'id2'])
    writer.write(record)

Replace the following placeholders before running the code:

Placeholder Description Example
<your-project> Name of your MaxCompute project my_project
<your-endpoint> Endpoint for your region http://service.cn-hangzhou.maxcompute.aliyun.com/api
<your-table-name> Name of the target table my_table

Verify the upload

After running the code, query the partition to confirm the records were written:

SELECT * FROM <your-table-name> WHERE pt='test';

The query returns two rows: ('test1', 'id1') and ('test2', 'id2').