This example shows how to use StreamUploadSession and StreamRecordPack to upload data to MaxCompute from multiple concurrent threads.
Each thread opens its own independent StreamUploadSession. Sessions are not shared across threads, which lets them flush data in parallel without synchronization overhead.
Prerequisites
Before you begin, ensure that you have:
-
A MaxCompute project with a partitioned table
-
An AccessKey ID and AccessKey secret stored as environment variables (
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRET) -
The MaxCompute endpoint for your region and, optionally, the tunnel endpoint
Use a RAM (Resource Access Management) user instead of your Alibaba Cloud account root credentials. The AccessKey pair of an Alibaba Cloud account has permissions on all API operations, which makes it a high-risk credential for routine use. To create a RAM user, go to the RAM console.
How it works
-
Create a
TableTunnelconnected to your MaxCompute project. -
Start a fixed thread pool using
Executors.newFixedThreadPool(). -
Each thread independently builds a
StreamUploadSessionfor the target table and partition. -
Inside each thread, create a
StreamRecordPack, append records to it, and callpack.flush()to commit the data. -
After a successful flush, the data is immediately visible and the
StreamRecordPackcan be reused for the next batch. -
If
flush()throws anIOException, discard the currentStreamRecordPack, create a new one, and retry. The example retries up to three times with a 500 ms delay between attempts.
Upload data using multiple threads
The following example uses 10 threads. Each thread appends 10 records of mixed types (BIGINT, BOOLEAN, DATETIME, DOUBLE, STRING) and then flushes them.
Replace the placeholders in the StreamUploadThreadSample class before running the code:
| Placeholder | Description | Example |
|---|---|---|
<endpoint> |
MaxCompute endpoint for your region | https://service.ap-southeast-1.maxcompute.aliyun.com/api |
<tunnel_endpoint> |
Tunnel endpoint (optional; uncomment the tunnel.setEndpoint line to activate) |
https://dt.ap-southeast-1.maxcompute.aliyun.com |
<your_project> |
Name of your MaxCompute project | my_project |
<your_table_name> |
Name of the target table | my_table |
<your_partition_spec> |
Partition specification of the table | ds=20240101 |
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
class StreamUploadThread implements Callable<Boolean> {
private String project;
private String table;
private String partition;
private TableTunnel tunnel;
public StreamUploadThread(String project, String table, String partition, TableTunnel tunnel) {
this.project = project;
this.table = table;
this.partition = partition;
this.tunnel = tunnel;
}
@Override
public Boolean call() {
try {
PartitionSpec partitionSpec = new PartitionSpec(partition);
TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
TableSchema schema = uploadSession.getSchema();
TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
for (int i = 0; i < 10; i++) {
pack.append(record);
}
int retry = 0;
while (retry < 3) {
try {
String traceId = pack.flush();
System.out.println("flush success:" + traceId);
break;
} catch (IOException e) {
retry++;
e.printStackTrace();
Thread.sleep(500);
}
}
System.out.println("upload success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
public class StreamUploadThreadSample {
// Credentials are read from environment variables to avoid hardcoding sensitive values.
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsEndpoint = "<endpoint>";
private static String tunnelEndpoint = "<tunnel_endpoint>";
private static String project = "<your_project>";
private static String table = "<your_table_name>";
private static String partition = "<your_partition_spec>";
private static int threadNum = 10;
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsEndpoint);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
// tunnel.setEndpoint(tunnelEndpoint);
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < threadNum; i++) {
callers.add(new StreamUploadThread(project, table, partition, tunnel));
}
pool.invokeAll(callers);
pool.shutdown();
System.out.println("upload success!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
flush() behavior and retry
| Outcome | Data visible | StreamRecordPack reusable | Action |
|---|---|---|---|
| Flush succeeds | Yes | Yes — reuse to avoid repeated memory allocation | Call pack.flush() again for the next batch |
Flush fails (IOException) |
No | No — the pack cannot be reused | Discard the pack, create a new StreamRecordPack, and retry |
The example retries a failed flush up to three times, sleeping 500 ms between attempts. If all retries fail, the stack trace is printed.