全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

多线程上传示例

更新时间:2017-11-09 17:19:41

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import com.aliyun.odps.Column;
  8. import com.aliyun.odps.Odps;
  9. import com.aliyun.odps.PartitionSpec;
  10. import com.aliyun.odps.TableSchema;
  11. import com.aliyun.odps.account.Account;
  12. import com.aliyun.odps.account.AliyunAccount;
  13. import com.aliyun.odps.data.Record;
  14. import com.aliyun.odps.data.RecordWriter;
  15. import com.aliyun.odps.tunnel.TableTunnel;
  16. import com.aliyun.odps.tunnel.TunnelException;
  17. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  18. class UploadThread implements Callable<Boolean> {
  19. private long id;
  20. private RecordWriter recordWriter;
  21. private Record record;
  22. private TableSchema tableSchema;
  23. public UploadThread(long id, RecordWriter recordWriter, Record record,
  24. TableSchema tableSchema) {
  25. this.id = id;
  26. this.recordWriter = recordWriter;
  27. this.record = record;
  28. this.tableSchema = tableSchema;
  29. }
  30. @Override
  31. public Boolean call() {
  32. for (int i = 0; i < tableSchema.getColumns().size(); i++) {
  33. Column column = tableSchema.getColumn(i);
  34. switch (column.getType()) {
  35. case BIGINT:
  36. record.setBigint(i, 1L);
  37. break;
  38. case BOOLEAN:
  39. record.setBoolean(i, true);
  40. break;
  41. case DATETIME:
  42. record.setDatetime(i, new Date());
  43. break;
  44. case DOUBLE:
  45. record.setDouble(i, 0.0);
  46. break;
  47. case STRING:
  48. record.setString(i, "sample");
  49. break;
  50. default:
  51. throw new RuntimeException("Unknown column type: "
  52. + column.getType());
  53. }
  54. }
  55. for (int i = 0; i < 10; i++) {
  56. try {
  57. recordWriter.write(record);
  58. } catch (IOException e) {
  59. recordWriter.close();
  60. e.printStackTrace();
  61. return false;
  62. }
  63. }
  64. recordWriter.close();
  65. return true;
  66. }
  67. }
  68. public class UploadThreadSample {
  69. private static String accessId = "<your access id>";
  70. private static String accessKey = "<your access Key>";
  71. private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
  72. private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";
  73. //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。此处给的是华东2经典网络Tunnel Endpoint,其他region可以参考文档《访问域名和数据中心》。
  74. private static String project = "<your project>";
  75. private static String table = "<your table name>";
  76. private static String partition = "<your partition spec>";
  77. private static int threadNum = 10;
  78. public static void main(String args[]) {
  79. Account account = new AliyunAccount(accessId, accessKey);
  80. Odps odps = new Odps(account);
  81. odps.setEndpoint(odpsUrl);
  82. odps.setDefaultProject(project);
  83. try {
  84. TableTunnel tunnel = new TableTunnel(odps);
  85. tunnel.setEndpoint(tunnelUrl);//tunnelUrl设置
  86. PartitionSpec partitionSpec = new PartitionSpec(partition);
  87. UploadSession uploadSession = tunnel.createUploadSession(project,
  88. table, partitionSpec);
  89. System.out.println("Session Status is : "
  90. + uploadSession.getStatus().toString());
  91. ExecutorService pool = Executors.newFixedThreadPool(threadNum);
  92. ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
  93. for (int i = 0; i < threadNum; i++) {
  94. RecordWriter recordWriter = uploadSession.openRecordWriter(i);
  95. Record record = uploadSession.newRecord();
  96. callers.add(new UploadThread(i, recordWriter, record,
  97. uploadSession.getSchema()));
  98. }
  99. pool.invokeAll(callers);
  100. pool.shutdown();
  101. Long[] blockList = new Long[threadNum];
  102. for (int i = 0; i < threadNum; i++)
  103. blockList[i] = Long.valueOf(i);
  104. uploadSession.commit(blockList);
  105. System.out.println("upload success!");
  106. } catch (TunnelException e) {
  107. e.printStackTrace();
  108. } catch (IOException e) {
  109. e.printStackTrace();
  110. } catch (InterruptedException e) {
  111. e.printStackTrace();
  112. }
  113. }
  114. }

注意:

对于 Tunnel Endpoint,支持指定或者不指定。

  • 如果指定,按照指定的 Endpoint 路由。

  • 如果不指定,默认为公网。

本文导读目录