异步化IO多线程上传示例

本文为您介绍如何通过StreamRecordPack接口实现IO与业务逻辑异步化。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

abstract class StoppableThread implements Callable<Boolean> {
  private AtomicBoolean stop = new AtomicBoolean(false);
  public void stop() {
    stop.set(true);
  }
  public boolean isStop() {
    return stop.get();
  }
};

class FlushThread extends StoppableThread {
  private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
  private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;

  public FlushThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
                       BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue) {
    this.flushingQueue = flushingQueue;
    this.flushedQueue = flushedQueue;
  }

  @Override
  public Boolean call() throws Exception {
    while (!isStop()) {
      TableTunnel.StreamRecordPack pack = flushingQueue.poll(1000, TimeUnit.MILLISECONDS);
      if (pack != null) {
        int retry = 0;
        while (retry < 3) {
          try {
            // flush成功表示数据写入成功,写入成功后数据立即可见。
            // flush成功后pack对象可以复用,避免频繁申请内存导致内存回收。
            // flush失败可以直接重试。
            // flush失败后pack对象不可重用,需要重新创建新的StreamRecordPack对象。
            String traceId = pack.flush();
            flushedQueue.offer(pack, 1000, TimeUnit.MILLISECONDS);
            System.out.println("flush success:" + traceId);
            break;
          } catch (IOException e) {
            retry++;
            e.printStackTrace();
            Thread.sleep(500);
          }
        }
      }
    }
    return null;
  }
};

class WriteThread extends StoppableThread {
  private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
  private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;
  private String project;
  private String table;
  private String partition;
  private TableTunnel tunnel;
  public WriteThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
                     BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue,
                     String project,
                     String table,
                     String partition,
                     TableTunnel tunnel) {
    this.flushingQueue = flushingQueue;
    this.flushedQueue = flushedQueue;
    this.project = project;
    this.table = table;
    this.partition = partition;
    this.tunnel = tunnel;
  }

  @Override
  public Boolean call() {
    try {
      PartitionSpec partitionSpec = new PartitionSpec(partition);
      TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
      TableSchema schema = uploadSession.getSchema();

      Record record = uploadSession.newRecord();
      for (int i = 0; i < schema.getColumns().size(); i++) {
        Column column = schema.getColumn(i);
        switch (column.getType()) {
          case BIGINT:
            record.setBigint(i, 1L);
            break;
          case BOOLEAN:
            record.setBoolean(i, true);
            break;
          case DATETIME:
            record.setDatetime(i, new Date());
            break;
          case DOUBLE:
            record.setDouble(i, 0.0);
            break;
          case STRING:
            record.setString(i, "sample");
            break;
          default:
            throw new RuntimeException("Unknown column type: "
                    + column.getType());
        }
      }

      while (!isStop()) {
        TableTunnel.StreamRecordPack pack = flushedQueue.poll();
        if (pack == null) {
          pack = uploadSession.newRecordPack();
        }
        try {
          for (int i = 0; i < 10; i++) {
            pack.append(record);
          }

          while (!isStop() && !flushingQueue.offer(pack, 1000, TimeUnit.MILLISECONDS)) {
            System.out.println("flushing Queue full");
          }
        } catch (IOException e) {
          e.printStackTrace();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    } catch(TunnelException e){
      e.printStackTrace();
    } catch(IOException e){
      e.printStackTrace();
    }
    return true;
  }
}
public class StreamUploadAsyncIOSample {
  // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
	// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
	// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
	private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
	private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
  // MaxCompute项目的Endpoint信息,详情请参见Endpoint。
  private static String odpsEndpoint = "<endpoint>";
  // MaxCompute项目的Tunnel Endpoint信息,详情请参见Endpoint。
  private static String tunnelEndpoint = "<tunnel_endpoint>";
  // MaxCompute项目的名称。
  private static String project = "<your_project>";
  // MaxCompute项目中的表名称。
  private static String table = "<your_table_name>";
  // MaxCompute项目中的表的分区信息。
  private static String partition = "<your_partition_spec>";
  private static int writeThreadNum = 10;
  private static int flushThreadNum = 10;
  private static int flushQueueSize = 100;
  private static BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue = new LinkedBlockingDeque<>(flushQueueSize);
  private static BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue = new LinkedBlockingDeque<>();
  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsEndpoint);
    odps.setDefaultProject(project);
    try {
      TableTunnel tunnel = new TableTunnel(odps);
      // tunnel.setEndpoint(tunnelEndpoint);
      ExecutorService pool = Executors.newFixedThreadPool(writeThreadNum + flushThreadNum);
      ArrayList<StoppableThread> threads = new ArrayList<StoppableThread>();
      for (int i = 0; i < writeThreadNum; i++) {
        threads.add(new WriteThread(flushingQueue, flushedQueue, project, table, partition, tunnel));
      }
      for (int i = 0; i < flushThreadNum; i++) {
        threads.add(new FlushThread(flushingQueue, flushedQueue));
      }
      pool.invokeAll(threads);

      for (StoppableThread thread : threads) {
        thread.stop();
      }

      pool.shutdown();
      System.out.println("upload success!");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}