全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

多线程下载示例

更新时间:2017-11-09 17:19:41

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.List;
  5. import java.util.concurrent.Callable;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.Future;
  10. import com.aliyun.odps.Column;
  11. import com.aliyun.odps.Odps;
  12. import com.aliyun.odps.PartitionSpec;
  13. import com.aliyun.odps.TableSchema;
  14. import com.aliyun.odps.account.Account;
  15. import com.aliyun.odps.account.AliyunAccount;
  16. import com.aliyun.odps.data.Record;
  17. import com.aliyun.odps.data.RecordReader;
  18. import com.aliyun.odps.tunnel.TableTunnel;
  19. import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
  20. import com.aliyun.odps.tunnel.TunnelException;
  21. class DownloadThread implements Callable<Long> {
  22. private long id;
  23. private RecordReader recordReader;
  24. private TableSchema tableSchema;
  25. public DownloadThread(int id,
  26. RecordReader recordReader, TableSchema tableSchema) {
  27. this.id = id;
  28. this.recordReader = recordReader;
  29. this.tableSchema = tableSchema;
  30. }
  31. @Override
  32. public Long call() {
  33. Long recordNum = 0L;
  34. try {
  35. Record record;
  36. while ((record = recordReader.read()) != null) {
  37. recordNum++;
  38. System.out.print("Thread " + id + "\t");
  39. consumeRecord(record, tableSchema);
  40. }
  41. recordReader.close();
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. }
  45. return recordNum;
  46. }
  47. private static void consumeRecord(Record record, TableSchema schema) {
  48. for (int i = 0; i < schema.getColumns().size(); i++) {
  49. Column column = schema.getColumn(i);
  50. String colValue = null;
  51. switch (column.getType()) {
  52. case BIGINT: {
  53. Long v = record.getBigint(i);
  54. colValue = v == null ? null : v.toString();
  55. break;
  56. }
  57. case BOOLEAN: {
  58. Boolean v = record.getBoolean(i);
  59. colValue = v == null ? null : v.toString();
  60. break;
  61. }
  62. case DATETIME: {
  63. Date v = record.getDatetime(i);
  64. colValue = v == null ? null : v.toString();
  65. break;
  66. }
  67. case DOUBLE: {
  68. Double v = record.getDouble(i);
  69. colValue = v == null ? null : v.toString();
  70. break;
  71. }
  72. case STRING: {
  73. String v = record.getString(i);
  74. colValue = v == null ? null : v.toString();
  75. break;
  76. }
  77. default:
  78. throw new RuntimeException("Unknown column type: "
  79. + column.getType());
  80. }
  81. System.out.print(colValue == null ? "null" : colValue);
  82. if (i != schema.getColumns().size())
  83. System.out.print("\t");
  84. }
  85. System.out.println();
  86. }
  87. }
  88. public class DownloadThreadSample {
  89. private static String accessId = "<your access id>";
  90. private static String accessKey = "<your access Key>";
  91. private static String odpsUrl = "http://service.odps.aliyun.com/api";
  92. private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
  93. //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。此处给的是华东2经典网络Tunnel Endpoint,其他region可以参考文档《访问域名和数据中心》。
  94. private static String project = "<your project>";
  95. private static String table = "<your table name>";
  96. private static String partition = "<your partition spec>";
  97. private static int threadNum = 10;
  98. public static void main(String args[]) {
  99. Account account = new AliyunAccount(accessId, accessKey);
  100. Odps odps = new Odps(account);
  101. odps.setEndpoint(odpsUrl);
  102. odps.setDefaultProject(project);
  103. TableTunnel tunnel = new TableTunnel(odps);
  104. tunnel.setEndpoint(tunnelUrl);//tunnelUrl设置
  105. PartitionSpec partitionSpec = new PartitionSpec(partition);
  106. DownloadSession downloadSession;
  107. try {
  108. downloadSession = tunnel.createDownloadSession(project, table,
  109. partitionSpec);
  110. System.out.println("Session Status is : "
  111. + downloadSession.getStatus().toString());
  112. long count = downloadSession.getRecordCount();
  113. System.out.println("RecordCount is: " + count);
  114. ExecutorService pool = Executors.newFixedThreadPool(threadNum);
  115. ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();
  116. long start = 0;
  117. long step = count / threadNum;
  118. for (int i = 0; i < threadNum - 1; i++) {
  119. RecordReader recordReader = downloadSession.openRecordReader(
  120. step * i, step);
  121. callers.add(new DownloadThread( i, recordReader, downloadSession.getSchema()));
  122. }
  123. RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count
  124. - ((threadNum - 1) * step));
  125. callers.add(new DownloadThread( threadNum - 1, recordReader, downloadSession.getSchema()));
  126. Long downloadNum = 0L;
  127. List<Future<Long>> recordNum = pool.invokeAll(callers);
  128. for (Future<Long> num : recordNum)
  129. downloadNum += num.get();
  130. System.out.println("Record Count is: " + downloadNum);
  131. pool.shutdown();
  132. } catch (TunnelException e) {
  133. e.printStackTrace();
  134. } catch (IOException e) {
  135. e.printStackTrace();
  136. } catch (InterruptedException e) {
  137. e.printStackTrace();
  138. } catch (ExecutionException e) {
  139. e.printStackTrace();
  140. }
  141. }
  142. }

注意:

对于 Tunnel Endpoint,支持指定或者不指定。

  • 如果指定,按照指定的 Endpoint 下载。

  • 如果不指定,默认为公网Endpoint。

本文导读目录