Example of simple data uploads

更新时间:
复制 MD 格式

Use the StreamUploadSession and StreamRecordPack interfaces of the MaxCompute Tunnel SDK to stream table data into MaxCompute.

Prerequisites

Before you begin, ensure that you have:

  • A MaxCompute project with a target table

  • The MaxCompute Tunnel SDK added to your project dependencies

  • AccessKey ID and AccessKey secret stored as environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET

  • (Optional) The partition spec for the target table, if it is partitioned

How streaming upload works

A streaming upload follows four steps:

  1. Create a TableTunnel instance and connect it to your MaxCompute project.

  2. Build a StreamUploadSession for the target table (and partition, if applicable).

  3. Create a StreamRecordPack, populate records for each column type, and append them to the pack.

  4. Call pack.flush() to commit the buffered records to MaxCompute.

Flush behavior:

OutcomeReuse StreamRecordPack
Flush succeedsYes — reuse the pack for the next batch
Flush failsNo — discard this pack and create a new one before retrying

pack.flush() returns a traceId string that identifies the committed batch.

Upload table data

The following example uploads 10 records to a partitioned table and retries the flush up to three times on failure.

import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

public class StreamUploadSample {

  // Load credentials from environment variables — never hardcode AccessKey pairs in source code.
  // Use a RAM user with the minimum required permissions instead of an Alibaba Cloud account.
  private static String accessId  = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
  private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

  // Replace the placeholders below with your actual values.
  private static String odpsEndpoint   = "<endpoint>";            // MaxCompute project endpoint
  private static String tunnelEndpoint = "<tunnel_endpoint>";     // MaxCompute Tunnel endpoint (optional)
  private static String project        = "<your_project>";        // MaxCompute project name
  private static String table          = "<your_table_name>";     // Target table name
  private static String partition      = "<your_partition_spec>"; // Partition spec, e.g. pt=20240101

  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsEndpoint);
    odps.setDefaultProject(project);

    try {
      TableTunnel tunnel = new TableTunnel(odps);

      // Tunnel endpoint routing is automatic by default.
      // Specify an explicit endpoint only when the automatically routed network is unreachable:
      // tunnel.setEndpoint(tunnelEndpoint);

      PartitionSpec partitionSpec = new PartitionSpec(partition);

      // To create the partition automatically if it does not exist, add .setCreatePartition(true):
      // TableTunnel.StreamUploadSession uploadSession =
      //     tunnel.buildStreamUploadSession(project, table)
      //           .setPartitionSpec(partitionSpec)
      //           .setCreatePartition(true)
      //           .build();
      TableTunnel.StreamUploadSession uploadSession =
          tunnel.buildStreamUploadSession(project, table)
                .setPartitionSpec(partitionSpec)
                .build();

      TableSchema schema = uploadSession.getSchema();
      TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
      Record record = uploadSession.newRecord();

      // Populate a sample value for each column based on its data type.
      for (int i = 0; i < schema.getColumns().size(); i++) {
        Column column = schema.getColumn(i);
        switch (column.getType()) {
          case BIGINT:
            record.setBigint(i, 1L);
            break;
          case BOOLEAN:
            record.setBoolean(i, true);
            break;
          case DATETIME:
            record.setDatetime(i, new Date());
            break;
          case DOUBLE:
            record.setDouble(i, 0.0);
            break;
          case STRING:
            record.setString(i, "sample");
            break;
          default:
            throw new RuntimeException("Unknown column type: " + column.getType());
        }
      }

      // Append 10 records to the pack before flushing.
      for (int i = 0; i < 10; i++) {
        pack.append(record);
      }

      // Flush with up to 3 retries on failure.
      // On success: data is committed and visible; reuse this pack for the next batch.
      // On failure: discard this pack and create a new StreamRecordPack — do not reuse a failed pack.
      int retry = 0;
      while (retry < 3) {
        try {
          String traceId = pack.flush();
          System.out.println("Flush succeeded. Trace ID: " + traceId);
          break;
        } catch (IOException e) {
          retry++;
          e.printStackTrace();
          Thread.sleep(500);
        }
      }

      System.out.println("Upload complete.");
    } catch (TunnelException e) {
      e.printStackTrace();
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Parameters

Replace the following placeholders before running the example:

PlaceholderDescription
<endpoint>Endpoint of your MaxCompute project
<tunnel_endpoint>Endpoint of MaxCompute Tunnel (leave commented out to use automatic routing)
<your_project>Name of your MaxCompute project
<your_table_name>Name of the target table
<your_partition_spec>Partition spec of the target table