Example of multithreading uploads

更新时间:
复制 MD 格式

This example shows how to use StreamUploadSession and StreamRecordPack to upload data to MaxCompute from multiple concurrent threads.

Each thread opens its own independent StreamUploadSession. Sessions are not shared across threads, which lets them flush data in parallel without synchronization overhead.

Prerequisites

Before you begin, ensure that you have:

  • A MaxCompute project with a partitioned table

  • An AccessKey ID and AccessKey secret stored as environment variables (ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET)

  • The MaxCompute endpoint for your region and, optionally, the tunnel endpoint

Use a RAM (Resource Access Management) user instead of your Alibaba Cloud account root credentials. The AccessKey pair of an Alibaba Cloud account has permissions on all API operations, which makes it a high-risk credential for routine use. To create a RAM user, go to the RAM console.

How it works

  1. Create a TableTunnel connected to your MaxCompute project.

  2. Start a fixed thread pool using Executors.newFixedThreadPool().

  3. Each thread independently builds a StreamUploadSession for the target table and partition.

  4. Inside each thread, create a StreamRecordPack, append records to it, and call pack.flush() to commit the data.

  5. After a successful flush, the data is immediately visible and the StreamRecordPack can be reused for the next batch.

  6. If flush() throws an IOException, discard the current StreamRecordPack, create a new one, and retry. The example retries up to three times with a 500 ms delay between attempts.

Upload data using multiple threads

The following example uses 10 threads. Each thread appends 10 records of mixed types (BIGINT, BOOLEAN, DATETIME, DOUBLE, STRING) and then flushes them.

Replace the placeholders in the StreamUploadThreadSample class before running the code:

Placeholder Description Example
<endpoint> MaxCompute endpoint for your region https://service.ap-southeast-1.maxcompute.aliyun.com/api
<tunnel_endpoint> Tunnel endpoint (optional; uncomment the tunnel.setEndpoint line to activate) https://dt.ap-southeast-1.maxcompute.aliyun.com
<your_project> Name of your MaxCompute project my_project
<your_table_name> Name of the target table my_table
<your_partition_spec> Partition specification of the table ds=20240101
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

class StreamUploadThread implements Callable<Boolean> {
  private String project;
  private String table;
  private String partition;
  private TableTunnel tunnel;
  public StreamUploadThread(String project, String table, String partition, TableTunnel tunnel) {
    this.project = project;
    this.table = table;
    this.partition = partition;
    this.tunnel = tunnel;
  }
  @Override
  public Boolean call() {
    try {
      PartitionSpec partitionSpec = new PartitionSpec(partition);
      TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
      TableSchema schema = uploadSession.getSchema();
      TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
      Record record = uploadSession.newRecord();
      for (int i = 0; i < schema.getColumns().size(); i++) {
        Column column = schema.getColumn(i);
        switch (column.getType()) {
          case BIGINT:
            record.setBigint(i, 1L);
            break;
          case BOOLEAN:
            record.setBoolean(i, true);
            break;
          case DATETIME:
            record.setDatetime(i, new Date());
            break;
          case DOUBLE:
            record.setDouble(i, 0.0);
            break;
          case STRING:
            record.setString(i, "sample");
            break;
          default:
            throw new RuntimeException("Unknown column type: "
                    + column.getType());
        }
      }
      for (int i = 0; i < 10; i++) {
        pack.append(record);
      }

      int retry = 0;
      while (retry < 3) {
        try {
          String traceId = pack.flush();
          System.out.println("flush success:" + traceId);
          break;
        } catch (IOException e) {
          retry++;
          e.printStackTrace();
          Thread.sleep(500);
        }
      }

      System.out.println("upload success!");
    } catch (TunnelException e) {
      e.printStackTrace();
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
    return true;
  }
}

public class StreamUploadThreadSample {
  // Credentials are read from environment variables to avoid hardcoding sensitive values.
  private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
  private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

  private static String odpsEndpoint = "<endpoint>";

  private static String tunnelEndpoint = "<tunnel_endpoint>";
  private static String project = "<your_project>";
  private static String table = "<your_table_name>";
  private static String partition = "<your_partition_spec>";
  private static int threadNum = 10;
  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsEndpoint);
    odps.setDefaultProject(project);
    try {
      TableTunnel tunnel = new TableTunnel(odps);
      // tunnel.setEndpoint(tunnelEndpoint);
      ExecutorService pool = Executors.newFixedThreadPool(threadNum);
      ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
      for (int i = 0; i < threadNum; i++) {
        callers.add(new StreamUploadThread(project, table, partition, tunnel));
      }
      pool.invokeAll(callers);
      pool.shutdown();
      System.out.println("upload success!");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

flush() behavior and retry

Outcome Data visible StreamRecordPack reusable Action
Flush succeeds Yes Yes — reuse to avoid repeated memory allocation Call pack.flush() again for the next batch
Flush fails (IOException) No No — the pack cannot be reused Discard the pack, create a new StreamRecordPack, and retry

The example retries a failed flush up to three times, sleeping 500 ms between attempts. If all retries fail, the stack trace is printed.