Streaming Tunnel is MaxCompute's streaming data channel for uploading records to a table in real time. This topic shows how to use the Streaming Tunnel SDK for Python to write records to a partitioned MaxCompute table.
Streaming Tunnel supports uploads only. To modify or delete existing data, use other MaxCompute data manipulation methods.
Prerequisites
Before you begin, ensure that you have:
-
Installed the MaxCompute Python SDK (
pyodps):pip install pyodps -
An Alibaba Cloud account with an AccessKey ID and AccessKey secret
-
A MaxCompute project and the endpoint for your region
-
A target table with at least one partition
Upload data
The following example creates a stream upload session, opens a record writer, and writes two records to the pt=test partition.
import os
from odps import ODPS
from odps.tunnel import TableTunnel
# Authenticate using environment variables.
# Store your credentials as environment variables instead of hardcoding them.
odps = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), # AccessKey ID
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), # AccessKey secret
project='<your-project>',
endpoint='<your-endpoint>',
)
tunnel = TableTunnel(odps)
upload_session = tunnel.create_stream_upload_session('<your-table-name>', partition_spec='pt=test')
with upload_session.open_record_writer() as writer:
# Method 1: assign field values by index
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
# Method 2: pass all field values as a list
record = table.new_record(['test2', 'id2'])
writer.write(record)
Replace the following placeholders before running the code:
| Placeholder | Description | Example |
|---|---|---|
<your-project> |
Name of your MaxCompute project | my_project |
<your-endpoint> |
Endpoint for your region | http://service.cn-hangzhou.maxcompute.aliyun.com/api |
<your-table-name> |
Name of the target table | my_table |
Verify the upload
After running the code, query the partition to confirm the records were written:
SELECT * FROM <your-table-name> WHERE pt='test';
The query returns two rows: ('test1', 'id1') and ('test2', 'id2').