全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
MaxCompute

简单上传示例

更新时间:2017-11-09 17:19:41

  1. import java.io.IOException;
  2. import java.util.Date;
  3. import com.aliyun.odps.Column;
  4. import com.aliyun.odps.Odps;
  5. import com.aliyun.odps.PartitionSpec;
  6. import com.aliyun.odps.TableSchema;
  7. import com.aliyun.odps.account.Account;
  8. import com.aliyun.odps.account.AliyunAccount;
  9. import com.aliyun.odps.data.Record;
  10. import com.aliyun.odps.data.RecordWriter;
  11. import com.aliyun.odps.tunnel.TableTunnel;
  12. import com.aliyun.odps.tunnel.TunnelException;
  13. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
  14. public class UploadSample {
  15. private static String accessId = "<your access id>";
  16. private static String accessKey = "<your access Key>";
  17. private static String odpsUrl = "http://service.odps.aliyun.com/api";
  18. private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
  19. //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。此处给的是华东2经典网络Tunnel Endpoint,其他region可以参考文档《访问域名和数据中心》。
  20. private static String project = "<your project>";
  21. private static String table = "<your table name>";
  22. private static String partition = "<your partition spec>";
  23. public static void main(String args[]) {
  24. Account account = new AliyunAccount(accessId, accessKey);
  25. Odps odps = new Odps(account);
  26. odps.setEndpoint(odpsUrl);
  27. odps.setDefaultProject(project);
  28. try {
  29. TableTunnel tunnel = new TableTunnel(odps);
  30. tunnel.setEndpoint(tunnelUrl);//tunnelUrl设置
  31. PartitionSpec partitionSpec = new PartitionSpec(partition);
  32. UploadSession uploadSession = tunnel.createUploadSession(project,
  33. table, partitionSpec);
  34. System.out.println("Session Status is : "
  35. + uploadSession.getStatus().toString());
  36. TableSchema schema = uploadSession.getSchema();
  37. // 准备数据后打开Writer开始写入数据,准备数据后写入一个Block
  38. // 单个Block内写入数据过少会产生大量小文件 严重影响计算性能, 强烈建议每次写入64MB以上数据(100GB以内数据均可写入同一Block)
  39. // 可通过数据的平均大小与记录数量大致计算总量即 64MB < 平均记录大小*记录数 < 100GB
  40. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
  41. Record record = uploadSession.newRecord();
  42. for (int i = 0; i < schema.getColumns().size(); i++) {
  43. Column column = schema.getColumn(i);
  44. switch (column.getType()) {
  45. case BIGINT:
  46. record.setBigint(i, 1L);
  47. break;
  48. case BOOLEAN:
  49. record.setBoolean(i, true);
  50. break;
  51. case DATETIME:
  52. record.setDatetime(i, new Date());
  53. break;
  54. case DOUBLE:
  55. record.setDouble(i, 0.0);
  56. break;
  57. case STRING:
  58. record.setString(i, "sample");
  59. break;
  60. default:
  61. throw new RuntimeException("Unknown column type: "
  62. + column.getType());
  63. }
  64. }
  65. for (int i = 0; i < 10; i++) {
  66. // Write数据至服务端,每写入8KB数据会进行一次网络传输
  67. // 若120s没有网络传输服务端将会关闭连接,届时该Writer将不可用,需要重新写入
  68. recordWriter.write(record);
  69. }
  70. recordWriter.close();
  71. uploadSession.commit(new Long[]{0L});
  72. System.out.println("upload success!");
  73. } catch (TunnelException e) {
  74. e.printStackTrace();
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }

构造器举例说明:

PartitionSpec(String spec):通过字符串构造此类对象。

参数说明:

spec:分区定义字符串,比如:pt=’1’,ds=’2’。

因此程序中应该配置如下:

private static String partition = “pt=’XXX’,ds=’XXX’”;

本文导读目录