全部产品
MaxCompute

简单上传示例

更新时间:2017-06-07 13:26:11   分享:   

  1. import java.io.IOException;
  2. import java.util.Date;
  3. import com.aliyun.odps.Column;
  4. import com.aliyun.odps.Odps;
  5. import com.aliyun.odps.PartitionSpec;
  6. import com.aliyun.odps.TableSchema;
  7. import com.aliyun.odps.account.Account;
  8. import com.aliyun.odps.account.AliyunAccount;
  9. import com.aliyun.odps.data.Record;
  10. import com.aliyun.odps.data.RecordWriter;
  11. import com.aliyun.odps.tunnel.TableTunnel;
  12. import com.aliyun.odps.tunnel.TunnelException;
  13. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  14. public class UploadSample {
  15. private static String accessId = "<your access id>";
  16. private static String accessKey = "<your access Key>";
  17. private static String odpsUrl = "http://service.odps.aliyun.com/api";
  18. private static String project = "<your project>";
  19. private static String table = "<your table name>";
  20. private static String partition = "<your partition spec>";
  21. public static void main(String args[]) {
  22. Account account = new AliyunAccount(accessId, accessKey);
  23. Odps odps = new Odps(account);
  24. odps.setEndpoint(odpsUrl);
  25. odps.setDefaultProject(project);
  26. try {
  27. TableTunnel tunnel = new TableTunnel(odps);
  28. PartitionSpec partitionSpec = new PartitionSpec(partition);
  29. UploadSession uploadSession = tunnel.createUploadSession(project,
  30. table, partitionSpec);
  31. System.out.println("Session Status is : "
  32. + uploadSession.getStatus().toString());
  33. TableSchema schema = uploadSession.getSchema();
  34. // 准备数据后打开Writer开始写入数据,准备数据后写入一个Block
  35. // 单个Block内写入数据过少会产生大量小文件 严重影响计算性能, 强烈建议每次写入64MB以上数据(100GB以内数据均可写入同一Block)
  36. // 可通过数据的平均大小与记录数量大致计算总量即 64MB < 平均记录大小*记录数 < 100GB
  37. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
  38. Record record = uploadSession.newRecord();
  39. for (int i = 0; i < schema.getColumns().size(); i++) {
  40. Column column = schema.getColumn(i);
  41. switch (column.getType()) {
  42. case BIGINT:
  43. record.setBigint(i, 1L);
  44. break;
  45. case BOOLEAN:
  46. record.setBoolean(i, true);
  47. break;
  48. case DATETIME:
  49. record.setDatetime(i, new Date());
  50. break;
  51. case DOUBLE:
  52. record.setDouble(i, 0.0);
  53. break;
  54. case STRING:
  55. record.setString(i, "sample");
  56. break;
  57. default:
  58. throw new RuntimeException("Unknown column type: "
  59. + column.getType());
  60. }
  61. }
  62. for (int i = 0; i < 10; i++) {
  63. // Write数据至服务端,每写入8KB数据会进行一次网络传输
  64. // 若120s没有网络传输服务端将会关闭连接,届时该Writer将不可用,需要重新写入
  65. recordWriter.write(record);
  66. }
  67. recordWriter.close();
  68. uploadSession.commit(new Long[]{0L});
  69. System.out.println("upload success!");
  70. } catch (TunnelException e) {
  71. e.printStackTrace();
  72. } catch (IOException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. }

构造器举例说明:PartitionSpec(String spec):通过字符串构造此类对象。参数:spec: 分区定义字符串,比如: pt=’1’,ds=’2’。因此程序中应该这样配置:private static String partition = “pt=’XXX’,ds=’XXX’”;

本文导读目录
本文导读目录
以上内容是否对您有帮助?