Use the StreamUploadSession and StreamRecordPack interfaces of the MaxCompute Tunnel SDK to stream table data into MaxCompute.
Prerequisites
Before you begin, ensure that you have:
A MaxCompute project with a target table
The MaxCompute Tunnel SDK added to your project dependencies
AccessKey ID and AccessKey secret stored as environment variables
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRET(Optional) The partition spec for the target table, if it is partitioned
How streaming upload works
A streaming upload follows four steps:
Create a
TableTunnelinstance and connect it to your MaxCompute project.Build a
StreamUploadSessionfor the target table (and partition, if applicable).Create a
StreamRecordPack, populate records for each column type, and append them to the pack.Call
pack.flush()to commit the buffered records to MaxCompute.
Flush behavior:
| Outcome | Reuse StreamRecordPack |
|---|---|
| Flush succeeds | Yes — reuse the pack for the next batch |
| Flush fails | No — discard this pack and create a new one before retrying |
pack.flush() returns a traceId string that identifies the committed batch.
Upload table data
The following example uploads 10 records to a partitioned table and retries the flush up to three times on failure.
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
public class StreamUploadSample {
// Load credentials from environment variables — never hardcode AccessKey pairs in source code.
// Use a RAM user with the minimum required permissions instead of an Alibaba Cloud account.
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Replace the placeholders below with your actual values.
private static String odpsEndpoint = "<endpoint>"; // MaxCompute project endpoint
private static String tunnelEndpoint = "<tunnel_endpoint>"; // MaxCompute Tunnel endpoint (optional)
private static String project = "<your_project>"; // MaxCompute project name
private static String table = "<your_table_name>"; // Target table name
private static String partition = "<your_partition_spec>"; // Partition spec, e.g. pt=20240101
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsEndpoint);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
// Tunnel endpoint routing is automatic by default.
// Specify an explicit endpoint only when the automatically routed network is unreachable:
// tunnel.setEndpoint(tunnelEndpoint);
PartitionSpec partitionSpec = new PartitionSpec(partition);
// To create the partition automatically if it does not exist, add .setCreatePartition(true):
// TableTunnel.StreamUploadSession uploadSession =
// tunnel.buildStreamUploadSession(project, table)
// .setPartitionSpec(partitionSpec)
// .setCreatePartition(true)
// .build();
TableTunnel.StreamUploadSession uploadSession =
tunnel.buildStreamUploadSession(project, table)
.setPartitionSpec(partitionSpec)
.build();
TableSchema schema = uploadSession.getSchema();
TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
Record record = uploadSession.newRecord();
// Populate a sample value for each column based on its data type.
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: " + column.getType());
}
}
// Append 10 records to the pack before flushing.
for (int i = 0; i < 10; i++) {
pack.append(record);
}
// Flush with up to 3 retries on failure.
// On success: data is committed and visible; reuse this pack for the next batch.
// On failure: discard this pack and create a new StreamRecordPack — do not reuse a failed pack.
int retry = 0;
while (retry < 3) {
try {
String traceId = pack.flush();
System.out.println("Flush succeeded. Trace ID: " + traceId);
break;
} catch (IOException e) {
retry++;
e.printStackTrace();
Thread.sleep(500);
}
}
System.out.println("Upload complete.");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}Parameters
Replace the following placeholders before running the example:
| Placeholder | Description |
|---|---|
<endpoint> | Endpoint of your MaxCompute project |
<tunnel_endpoint> | Endpoint of MaxCompute Tunnel (leave commented out to use automatic routing) |
<your_project> | Name of your MaxCompute project |
<your_table_name> | Name of the target table |
<your_partition_spec> | Partition spec of the target table |