When you upload data using MaxCompute Tunnel, synchronous flush calls block the record-appending loop and limit throughput. This example shows how to decouple write and flush operations using StreamRecordPack and a dual-queue architecture, so flush I/O never blocks your business logic.
Prerequisites
Before you begin, ensure that you have:
Java 8 or later installed
The MaxCompute Java SDK added to your project (Maven artifact:
com.aliyun.odps:odps-sdk-core)A MaxCompute project with a partitioned table that has at least one column of type BIGINT, BOOLEAN, DATETIME, DOUBLE, or STRING
A partition already created on the target table
A RAM user with write permission on the target table. Using your Alibaba Cloud account's AccessKey pair directly is a high-risk operation — use a RAM user with the minimum required permissions instead.
The AccessKey ID and AccessKey secret of the RAM user set as environment variables
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRET
How it works
The sample uses two thread pools and two shared queues to pipeline writes and flushes:
WriteThread checks the
flushedQueuefor a reusable pack. If none is available, it allocates a newStreamRecordPack. It then appends 10 records to the pack and places it on theflushingQueue.FlushThread takes a pack from the
flushingQueueand callsflush()to commit the data to MaxCompute Tunnel. On success, it returns the pack to theflushedQueuefor reuse. On failure, it discards the pack and retries with a new one.
This pipeline keeps write throughput high because flushing to MaxCompute Tunnel runs in a separate thread pool and never blocks the record-appending loop.
Usage notes
| Condition | Behavior |
|---|---|
flush() succeeds | Data is committed and visible. Return the StreamRecordPack to the flushedQueue to avoid frequent memory allocation. |
flush() fails with IOException | Discard the pack and create a new StreamRecordPack. A failed pack may be in an inconsistent state and must not be reused. |
| Flush retry | The sample retries a failed flush up to 3 times with a 500 ms pause between attempts. |
Key parameters
Adjust the following constants in StreamUploadAsyncIOSample to match your workload:
| Parameter | Default | Description |
|---|---|---|
writeThreadNum | 10 | Number of threads that append records to packs. Increase for higher write concurrency. |
flushThreadNum | 10 | Number of threads that flush packs to MaxCompute Tunnel. Balance with writeThreadNum to avoid starving either queue. |
flushQueueSize | 100 | Maximum number of packs waiting to be flushed. If the queue is full, write threads back off and retry every second. |
Run the sample
Replace the placeholders in the code before running:
| Placeholder | Description | Example |
|---|---|---|
<endpoint> | MaxCompute project endpoint | http://service.cn-hangzhou.maxcompute.aliyun.com/api |
<tunnel_endpoint> | MaxCompute Tunnel endpoint | http://dt.cn-hangzhou.maxcompute.aliyun.com |
<your_project> | MaxCompute project name | my_project |
<your_table_name> | Target table name | my_table |
<your_partition_spec> | Partition specification | ds=20260101 |
The tunnelEndpoint line is commented out by default. Uncomment it and set the correct Tunnel endpoint if your project uses a dedicated Tunnel endpoint.import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
abstract class StoppableThread implements Callable<Boolean> {
private AtomicBoolean stop = new AtomicBoolean(false);
public void stop() {
stop.set(true);
}
public boolean isStop() {
return stop.get();
}
};
class FlushThread extends StoppableThread {
private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;
public FlushThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue) {
this.flushingQueue = flushingQueue;
this.flushedQueue = flushedQueue;
}
@Override
public Boolean call() throws Exception {
while (!isStop()) {
TableTunnel.StreamRecordPack pack = flushingQueue.poll(1000, TimeUnit.MILLISECONDS);
if (pack != null) {
int retry = 0;
while (retry < 3) {
try {
String traceId = pack.flush();
// On success: data is committed. Return the pack for reuse.
flushedQueue.offer(pack, 1000, TimeUnit.MILLISECONDS);
System.out.println("flush success:" + traceId);
break;
} catch (IOException e) {
// On failure: discard this pack. Do not reuse it.
retry++;
e.printStackTrace();
Thread.sleep(500);
}
}
}
}
return null;
}
};
class WriteThread extends StoppableThread {
private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;
private String project;
private String table;
private String partition;
private TableTunnel tunnel;
public WriteThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue,
String project,
String table,
String partition,
TableTunnel tunnel) {
this.flushingQueue = flushingQueue;
this.flushedQueue = flushedQueue;
this.project = project;
this.table = table;
this.partition = partition;
this.tunnel = tunnel;
}
@Override
public Boolean call() {
try {
PartitionSpec partitionSpec = new PartitionSpec(partition);
TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
TableSchema schema = uploadSession.getSchema();
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
while (!isStop()) {
// Reuse a returned pack if available; otherwise allocate a new one.
TableTunnel.StreamRecordPack pack = flushedQueue.poll();
if (pack == null) {
pack = uploadSession.newRecordPack();
}
try {
for (int i = 0; i < 10; i++) {
pack.append(record);
}
while (!isStop() && !flushingQueue.offer(pack, 1000, TimeUnit.MILLISECONDS)) {
System.out.println("flushing Queue full");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch(TunnelException e){
e.printStackTrace();
} catch(IOException e){
e.printStackTrace();
}
return true;
}
}
public class StreamUploadAsyncIOSample {
// Load credentials from environment variables.
// Do not hardcode the AccessKey ID or secret in your code.
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsEndpoint = "<endpoint>";
private static String tunnelEndpoint = "<tunnel_endpoint>";
private static String project = "<your_project>";
private static String table = "<your_table_name>";
private static String partition = "<your_partition_spec>";
private static int writeThreadNum = 10;
private static int flushThreadNum = 10;
private static int flushQueueSize = 100;
private static BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue = new LinkedBlockingDeque<>(flushQueueSize);
private static BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue = new LinkedBlockingDeque<>();
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsEndpoint);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
// tunnel.setEndpoint(tunnelEndpoint);
ExecutorService pool = Executors.newFixedThreadPool(writeThreadNum + flushThreadNum);
ArrayList<StoppableThread> threads = new ArrayList<StoppableThread>();
for (int i = 0; i < writeThreadNum; i++) {
threads.add(new WriteThread(flushingQueue, flushedQueue, project, table, partition, tunnel));
}
for (int i = 0; i < flushThreadNum; i++) {
threads.add(new FlushThread(flushingQueue, flushedQueue));
}
pool.invokeAll(threads);
for (StoppableThread thread : threads) {
thread.stop();
}
pool.shutdown();
System.out.println("upload success!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}Verify the upload
After the program exits normally, the output includes:
upload success!Each successful flush also prints a trace ID:
flush success:<traceId>Use the trace ID to look up the flush request in MaxCompute logs when troubleshooting a specific batch.
To confirm that data was written, query the target table:
SELECT COUNT(*) FROM <your_table_name> WHERE <your_partition_spec>;What's next
To learn about
StreamUploadSessionand other Tunnel SDK classes, see the MaxCompute Tunnel SDK reference.To upload data without multithreading, see the simple stream upload example.
To manage access permissions for MaxCompute, see Resource Access Management (RAM).