Example of data upload based on asynchronous I/O multithreading

更新时间:
复制 MD 格式

When you upload data using MaxCompute Tunnel, synchronous flush calls block the record-appending loop and limit throughput. This example shows how to decouple write and flush operations using StreamRecordPack and a dual-queue architecture, so flush I/O never blocks your business logic.

Prerequisites

Before you begin, ensure that you have:

  • Java 8 or later installed

  • The MaxCompute Java SDK added to your project (Maven artifact: com.aliyun.odps:odps-sdk-core)

  • A MaxCompute project with a partitioned table that has at least one column of type BIGINT, BOOLEAN, DATETIME, DOUBLE, or STRING

  • A partition already created on the target table

  • A RAM user with write permission on the target table. Using your Alibaba Cloud account's AccessKey pair directly is a high-risk operation — use a RAM user with the minimum required permissions instead.

  • The AccessKey ID and AccessKey secret of the RAM user set as environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET

How it works

The sample uses two thread pools and two shared queues to pipeline writes and flushes:

  1. WriteThread checks the flushedQueue for a reusable pack. If none is available, it allocates a new StreamRecordPack. It then appends 10 records to the pack and places it on the flushingQueue.

  2. FlushThread takes a pack from the flushingQueue and calls flush() to commit the data to MaxCompute Tunnel. On success, it returns the pack to the flushedQueue for reuse. On failure, it discards the pack and retries with a new one.

This pipeline keeps write throughput high because flushing to MaxCompute Tunnel runs in a separate thread pool and never blocks the record-appending loop.

Usage notes

ConditionBehavior
flush() succeedsData is committed and visible. Return the StreamRecordPack to the flushedQueue to avoid frequent memory allocation.
flush() fails with IOExceptionDiscard the pack and create a new StreamRecordPack. A failed pack may be in an inconsistent state and must not be reused.
Flush retryThe sample retries a failed flush up to 3 times with a 500 ms pause between attempts.

Key parameters

Adjust the following constants in StreamUploadAsyncIOSample to match your workload:

ParameterDefaultDescription
writeThreadNum10Number of threads that append records to packs. Increase for higher write concurrency.
flushThreadNum10Number of threads that flush packs to MaxCompute Tunnel. Balance with writeThreadNum to avoid starving either queue.
flushQueueSize100Maximum number of packs waiting to be flushed. If the queue is full, write threads back off and retry every second.

Run the sample

Replace the placeholders in the code before running:

PlaceholderDescriptionExample
<endpoint>MaxCompute project endpointhttp://service.cn-hangzhou.maxcompute.aliyun.com/api
<tunnel_endpoint>MaxCompute Tunnel endpointhttp://dt.cn-hangzhou.maxcompute.aliyun.com
<your_project>MaxCompute project namemy_project
<your_table_name>Target table namemy_table
<your_partition_spec>Partition specificationds=20260101
The tunnelEndpoint line is commented out by default. Uncomment it and set the correct Tunnel endpoint if your project uses a dedicated Tunnel endpoint.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

abstract class StoppableThread implements Callable<Boolean> {
  private AtomicBoolean stop = new AtomicBoolean(false);
  public void stop() {
    stop.set(true);
  }
  public boolean isStop() {
    return stop.get();
  }
};

class FlushThread extends StoppableThread {
  private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
  private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;

  public FlushThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
                       BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue) {
    this.flushingQueue = flushingQueue;
    this.flushedQueue = flushedQueue;
  }

  @Override
  public Boolean call() throws Exception {
    while (!isStop()) {
      TableTunnel.StreamRecordPack pack = flushingQueue.poll(1000, TimeUnit.MILLISECONDS);
      if (pack != null) {
        int retry = 0;
        while (retry < 3) {
          try {
            String traceId = pack.flush();
            // On success: data is committed. Return the pack for reuse.
            flushedQueue.offer(pack, 1000, TimeUnit.MILLISECONDS);
            System.out.println("flush success:" + traceId);
            break;
          } catch (IOException e) {
            // On failure: discard this pack. Do not reuse it.
            retry++;
            e.printStackTrace();
            Thread.sleep(500);
          }
        }
      }
    }
    return null;
  }
};

class WriteThread extends StoppableThread {
  private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
  private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;
  private String project;
  private String table;
  private String partition;
  private TableTunnel tunnel;
  public WriteThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
                     BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue,
                     String project,
                     String table,
                     String partition,
                     TableTunnel tunnel) {
    this.flushingQueue = flushingQueue;
    this.flushedQueue = flushedQueue;
    this.project = project;
    this.table = table;
    this.partition = partition;
    this.tunnel = tunnel;
  }

  @Override
  public Boolean call() {
    try {
      PartitionSpec partitionSpec = new PartitionSpec(partition);
      TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
      TableSchema schema = uploadSession.getSchema();

      Record record = uploadSession.newRecord();
      for (int i = 0; i < schema.getColumns().size(); i++) {
        Column column = schema.getColumn(i);
        switch (column.getType()) {
          case BIGINT:
            record.setBigint(i, 1L);
            break;
          case BOOLEAN:
            record.setBoolean(i, true);
            break;
          case DATETIME:
            record.setDatetime(i, new Date());
            break;
          case DOUBLE:
            record.setDouble(i, 0.0);
            break;
          case STRING:
            record.setString(i, "sample");
            break;
          default:
            throw new RuntimeException("Unknown column type: "
                    + column.getType());
        }
      }

      while (!isStop()) {
        // Reuse a returned pack if available; otherwise allocate a new one.
        TableTunnel.StreamRecordPack pack = flushedQueue.poll();
        if (pack == null) {
          pack = uploadSession.newRecordPack();
        }
        try {
          for (int i = 0; i < 10; i++) {
            pack.append(record);
          }

          while (!isStop() && !flushingQueue.offer(pack, 1000, TimeUnit.MILLISECONDS)) {
            System.out.println("flushing Queue full");
          }
        } catch (IOException e) {
          e.printStackTrace();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    } catch(TunnelException e){
      e.printStackTrace();
    } catch(IOException e){
      e.printStackTrace();
    }
    return true;
  }
}
public class StreamUploadAsyncIOSample {
  // Load credentials from environment variables.
  // Do not hardcode the AccessKey ID or secret in your code.
  private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
  private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
  private static String odpsEndpoint = "<endpoint>";
  private static String tunnelEndpoint = "<tunnel_endpoint>";
  private static String project = "<your_project>";
  private static String table = "<your_table_name>";
  private static String partition = "<your_partition_spec>";
  private static int writeThreadNum = 10;
  private static int flushThreadNum = 10;
  private static int flushQueueSize = 100;
  private static BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue = new LinkedBlockingDeque<>(flushQueueSize);
  private static BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue = new LinkedBlockingDeque<>();
  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsEndpoint);
    odps.setDefaultProject(project);
    try {
      TableTunnel tunnel = new TableTunnel(odps);
      // tunnel.setEndpoint(tunnelEndpoint);
      ExecutorService pool = Executors.newFixedThreadPool(writeThreadNum + flushThreadNum);
      ArrayList<StoppableThread> threads = new ArrayList<StoppableThread>();
      for (int i = 0; i < writeThreadNum; i++) {
        threads.add(new WriteThread(flushingQueue, flushedQueue, project, table, partition, tunnel));
      }
      for (int i = 0; i < flushThreadNum; i++) {
        threads.add(new FlushThread(flushingQueue, flushedQueue));
      }
      pool.invokeAll(threads);

      for (StoppableThread thread : threads) {
        thread.stop();
      }

      pool.shutdown();
      System.out.println("upload success!");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Verify the upload

After the program exits normally, the output includes:

upload success!

Each successful flush also prints a trace ID:

flush success:<traceId>

Use the trace ID to look up the flush request in MaxCompute logs when troubleshooting a specific batch.

To confirm that data was written, query the target table:

SELECT COUNT(*) FROM <your_table_name> WHERE <your_partition_spec>;

What's next

  • To learn about StreamUploadSession and other Tunnel SDK classes, see the MaxCompute Tunnel SDK reference.

  • To upload data without multithreading, see the simple stream upload example.

  • To manage access permissions for MaxCompute, see Resource Access Management (RAM).