全部产品
MaxCompute

多线程上传示例

更新时间:2017-06-07 13:26:11   分享:   

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import com.aliyun.odps.Column;
  8. import com.aliyun.odps.Odps;
  9. import com.aliyun.odps.PartitionSpec;
  10. import com.aliyun.odps.TableSchema;
  11. import com.aliyun.odps.account.Account;
  12. import com.aliyun.odps.account.AliyunAccount;
  13. import com.aliyun.odps.data.Record;
  14. import com.aliyun.odps.data.RecordWriter;
  15. import com.aliyun.odps.tunnel.TableTunnel;
  16. import com.aliyun.odps.tunnel.TunnelException;
  17. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  18. class UploadThread implements Callable<Boolean> {
  19. private long id;
  20. private RecordWriter recordWriter;
  21. private Record record;
  22. private TableSchema tableSchema;
  23. public UploadThread(long id, RecordWriter recordWriter, Record record,
  24. TableSchema tableSchema) {
  25. this.id = id;
  26. this.recordWriter = recordWriter;
  27. this.record = record;
  28. this.tableSchema = tableSchema;
  29. }
  30. @Override
  31. public Boolean call() {
  32. for (int i = 0; i < tableSchema.getColumns().size(); i++) {
  33. Column column = tableSchema.getColumn(i);
  34. switch (column.getType()) {
  35. case BIGINT:
  36. record.setBigint(i, 1L);
  37. break;
  38. case BOOLEAN:
  39. record.setBoolean(i, true);
  40. break;
  41. case DATETIME:
  42. record.setDatetime(i, new Date());
  43. break;
  44. case DOUBLE:
  45. record.setDouble(i, 0.0);
  46. break;
  47. case STRING:
  48. record.setString(i, "sample");
  49. break;
  50. default:
  51. throw new RuntimeException("Unknown column type: "
  52. + column.getType());
  53. }
  54. }
  55. for (int i = 0; i < 10; i++) {
  56. try {
  57. recordWriter.write(record);
  58. } catch (IOException e) {
  59. recordWriter.close();
  60. e.printStackTrace();
  61. return false;
  62. }
  63. }
  64. recordWriter.close();
  65. return true;
  66. }
  67. }
  68. public class UploadThreadSample {
  69. private static String accessId = "<your access id>";
  70. private static String accessKey = "<your access Key>";
  71. private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
  72. private static String project = "<your project>";
  73. private static String table = "<your table name>";
  74. private static String partition = "<your partition spec>";
  75. private static int threadNum = 10;
  76. public static void main(String args[]) {
  77. Account account = new AliyunAccount(accessId, accessKey);
  78. Odps odps = new Odps(account);
  79. odps.setEndpoint(odpsUrl);
  80. odps.setDefaultProject(project);
  81. try {
  82. TableTunnel tunnel = new TableTunnel(odps);
  83. PartitionSpec partitionSpec = new PartitionSpec(partition);
  84. UploadSession uploadSession = tunnel.createUploadSession(project,
  85. table, partitionSpec);
  86. System.out.println("Session Status is : "
  87. + uploadSession.getStatus().toString());
  88. ExecutorService pool = Executors.newFixedThreadPool(threadNum);
  89. ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
  90. for (int i = 0; i < threadNum; i++) {
  91. RecordWriter recordWriter = uploadSession.openRecordWriter(i);
  92. Record record = uploadSession.newRecord();
  93. callers.add(new UploadThread(i, recordWriter, record,
  94. uploadSession.getSchema()));
  95. }
  96. pool.invokeAll(callers);
  97. pool.shutdown();
  98. Long[] blockList = new Long[threadNum];
  99. for (int i = 0; i < threadNum; i++)
  100. blockList[i] = Long.valueOf(i);
  101. uploadSession.commit(blockList);
  102. System.out.println("upload success!");
  103. } catch (TunnelException e) {
  104. e.printStackTrace();
  105. } catch (IOException e) {
  106. e.printStackTrace();
  107. } catch (InterruptedException e) {
  108. e.printStackTrace();
  109. }
  110. }
  111. }

注意:对于tunnel endpoint,支持指定或者不指定。如果指定,按照指定的endpoint路由。如果不指定,支持自动路由。

本文导读目录
本文导读目录
以上内容是否对您有帮助?