全部产品
MaxCompute

多线程下载示例

更新时间:2017-06-07 13:26:11   分享:   

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.List;
  5. import java.util.concurrent.Callable;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.Future;
  10. import com.aliyun.odps.Column;
  11. import com.aliyun.odps.Odps;
  12. import com.aliyun.odps.PartitionSpec;
  13. import com.aliyun.odps.TableSchema;
  14. import com.aliyun.odps.account.Account;
  15. import com.aliyun.odps.account.AliyunAccount;
  16. import com.aliyun.odps.data.Record;
  17. import com.aliyun.odps.data.RecordReader;
  18. import com.aliyun.odps.tunnel.TableTunnel;
  19. import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
  20. import com.aliyun.odps.tunnel.TunnelException;
  21. class DownloadThread implements Callable<Long> {
  22. private long id;
  23. private RecordReader recordReader;
  24. private TableSchema tableSchema;
  25. public DownloadThread(int id,
  26. RecordReader recordReader, TableSchema tableSchema) {
  27. this.id = id;
  28. this.recordReader = recordReader;
  29. this.tableSchema = tableSchema;
  30. }
  31. @Override
  32. public Long call() {
  33. Long recordNum = 0L;
  34. try {
  35. Record record;
  36. while ((record = recordReader.read()) != null) {
  37. recordNum++;
  38. System.out.print("Thread " + id + "\t");
  39. consumeRecord(record, tableSchema);
  40. }
  41. recordReader.close();
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. }
  45. return recordNum;
  46. }
  47. private static void consumeRecord(Record record, TableSchema schema) {
  48. for (int i = 0; i < schema.getColumns().size(); i++) {
  49. Column column = schema.getColumn(i);
  50. String colValue = null;
  51. switch (column.getType()) {
  52. case BIGINT: {
  53. Long v = record.getBigint(i);
  54. colValue = v == null ? null : v.toString();
  55. break;
  56. }
  57. case BOOLEAN: {
  58. Boolean v = record.getBoolean(i);
  59. colValue = v == null ? null : v.toString();
  60. break;
  61. }
  62. case DATETIME: {
  63. Date v = record.getDatetime(i);
  64. colValue = v == null ? null : v.toString();
  65. break;
  66. }
  67. case DOUBLE: {
  68. Double v = record.getDouble(i);
  69. colValue = v == null ? null : v.toString();
  70. break;
  71. }
  72. case STRING: {
  73. String v = record.getString(i);
  74. colValue = v == null ? null : v.toString();
  75. break;
  76. }
  77. default:
  78. throw new RuntimeException("Unknown column type: "
  79. + column.getType());
  80. }
  81. System.out.print(colValue == null ? "null" : colValue);
  82. if (i != schema.getColumns().size())
  83. System.out.print("\t");
  84. }
  85. System.out.println();
  86. }
  87. }
  88. public class DownloadThreadSample {
  89. private static String accessId = "<your access id>";
  90. private static String accessKey = "<your access Key>";
  91. private static String odpsUrl = "http://service.odps.aliyun.com/api";
  92. private static String project = "<your project>";
  93. private static String table = "<your table name>";
  94. private static String partition = "<your partition spec>";
  95. private static int threadNum = 10;
  96. public static void main(String args[]) {
  97. Account account = new AliyunAccount(accessId, accessKey);
  98. Odps odps = new Odps(account);
  99. odps.setEndpoint(odpsUrl);
  100. odps.setDefaultProject(project);
  101. TableTunnel tunnel = new TableTunnel(odps);
  102. PartitionSpec partitionSpec = new PartitionSpec(partition);
  103. DownloadSession downloadSession;
  104. try {
  105. downloadSession = tunnel.createDownloadSession(project, table,
  106. partitionSpec);
  107. System.out.println("Session Status is : "
  108. + downloadSession.getStatus().toString());
  109. long count = downloadSession.getRecordCount();
  110. System.out.println("RecordCount is: " + count);
  111. ExecutorService pool = Executors.newFixedThreadPool(threadNum);
  112. ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();
  113. long start = 0;
  114. long step = count / threadNum;
  115. for (int i = 0; i < threadNum - 1; i++) {
  116. RecordReader recordReader = downloadSession.openRecordReader(
  117. step * i, step);
  118. callers.add(new DownloadThread( i, recordReader, downloadSession.getSchema()));
  119. }
  120. RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count
  121. - ((threadNum - 1) * step));
  122. callers.add(new DownloadThread( threadNum - 1, recordReader, downloadSession.getSchema()));
  123. Long downloadNum = 0L;
  124. List<Future<Long>> recordNum = pool.invokeAll(callers);
  125. for (Future<Long> num : recordNum)
  126. downloadNum += num.get();
  127. System.out.println("Record Count is: " + downloadNum);
  128. pool.shutdown();
  129. } catch (TunnelException e) {
  130. e.printStackTrace();
  131. } catch (IOException e) {
  132. e.printStackTrace();
  133. } catch (InterruptedException e) {
  134. e.printStackTrace();
  135. } catch (ExecutionException e) {
  136. e.printStackTrace();
  137. }
  138. }
  139. }

注意:对于tunnel endpoint,支持指定或者不指定。如果指定,按照指定的下载。如果不指定,按照我们的自动路由下载。

本文导读目录
本文导读目录
以上内容是否对您有帮助?