全部产品
MaxCompute

导入数据

更新时间:2017-07-30 10:33:31   分享:   

MaxCompute 提供多种数据导入导出方式:直接在客户端使用 Tunnel命令 或者通过 TUNNEL 提供的 SDK 自行编写 Java 工具,通过 Flume 及 Fluentd 插件方式导入,以及通过大数据开发套件对数据导入导出,详情请参见:数据同步简介

导出数据请参见: Tunnel 命令操作 中 Download 的相关命令。

Tunnel 命令导入数据

准备数据

假设您已准备本地文件 wc_example.txt,内容如下:

  1. I LOVE CHINA!
  2. MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!

此处,您把该数据文件保存在 D:\odps\odps\bin 目录下。

创建 MaxCompute 表

您需要把上面的数据导入到 MaxCompute 的一张表中,所以需要创建一张表:

  1. CREATE TABLE wc_in (word string);

执行 tunnel 命令

输入表创建成功后,可以在 MaxCompute 客户端输入 tunnel 命令进行数据的导入,如下:

  1. tunnel upload D:\odps\odps\bin\wc_example.txt wc_in;

执行成功后,查看表 wc_in 的记录,如下:

1

注意:

  • 有关 Tunnel 命令的更多详细介绍,例如:如何将数据导入分区表,请参见: Tunnel 操作
  • 当表中含有多个列时,可以通过 -fd 参数指定列分隔符。

Tunnel SDK

关于如何利用 tunnel SDK 进行上传数据,下面也将通过场景介绍。

场景描述:上传数据到 MaxCompute,其中,项目空间为“odps_public_dev”,表名为“tunnel_sample_test”,分区为“pt=20150801,dt=hangzhou”。 操作步骤如下:

  1. 创建表,添加分区:
    1. CREATE TABLE IF NOT EXISTS tunnel_sample_test(
    2. id STRING,
    3. name STRING)
    4. PARTITIONED BY (pt STRING, dt STRING); --创建表
    5. ALTER TABLE tunnel_sample_test
    6. ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --添加分区
  2. 创建 UploadSample 的工程目录结构,如下:

    1. |---pom.xml
    2. |---src
    3. |---main
    4. |---java
    5. |---com
    6. |---aliyun
    7. |---odps
    8. |---tunnel
    9. |---example
    10. |---UploadSample.java

    UploadSample : tunnel 源文件;pom.xml : maven 工程文件。

  3. 编写 UploadSample 程序;

    程序如下:

    1. package com.aliyun.odps.tunnel.example;
    2. import java.io.IOException;
    3. import java.util.Date;
    4. import com.aliyun.odps.Column;
    5. import com.aliyun.odps.Odps;
    6. import com.aliyun.odps.PartitionSpec;
    7. import com.aliyun.odps.TableSchema;
    8. import com.aliyun.odps.account.Account;
    9. import com.aliyun.odps.account.AliyunAccount;
    10. import com.aliyun.odps.data.Record;
    11. import com.aliyun.odps.data.RecordWriter;
    12. import com.aliyun.odps.tunnel.TableTunnel;
    13. import com.aliyun.odps.tunnel.TunnelException;
    14. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
    15. public class UploadSample {
    16. private static String accessId = "####";
    17. private static String accessKey = "####";
    18. private static String tunnelUrl = "http://dt.odps.aliyun.com";
    19. private static String odpsUrl = "http://service.odps.aliyun.com/api";
    20. private static String project = "odps_public_dev";
    21. private static String table = "tunnel_sample_test";
    22. private static String partition = "pt=20150801,dt=hangzhou";
    23. public static void main(String args[]) {
    24. Account account = new AliyunAccount(accessId, accessKey);
    25. Odps odps = new Odps(account);
    26. odps.setEndpoint(odpsUrl);
    27. odps.setDefaultProject(project);
    28. try {
    29. TableTunnel tunnel = new TableTunnel(odps);
    30. tunnel.setEndpoint(tunnelUrl);
    31. PartitionSpec partitionSpec = new PartitionSpec(partition);
    32. UploadSession uploadSession = tunnel.createUploadSession(project,
    33. table, partitionSpec);
    34. System.out.println("Session Status is : "
    35. + uploadSession.getStatus().toString());
    36. TableSchema schema = uploadSession.getSchema();
    37. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
    38. Record record = uploadSession.newRecord();
    39. for (int i = 0; i < schema.getColumns().size(); i++) {
    40. Column column = schema.getColumn(i);
    41. switch (column.getType()) {
    42. case BIGINT:
    43. record.setBigint(i, 1L);
    44. break;
    45. case BOOLEAN:
    46. record.setBoolean(i, true);
    47. break;
    48. case DATETIME:
    49. record.setDatetime(i, new Date());
    50. break;
    51. case DOUBLE:
    52. record.setDouble(i, 0.0);
    53. break;
    54. case STRING:
    55. record.setString(i, "sample");
    56. break;
    57. default:
    58. throw new RuntimeException("Unknown column type: "
    59. + column.getType());
    60. }
    61. }
    62. for (int i = 0; i < 10; i++) {
    63. recordWriter.write(record);
    64. }
    65. recordWriter.close();
    66. uploadSession.commit(new Long[]{0L});
    67. System.out.println("upload success!");
    68. } catch (TunnelException e) {
    69. e.printStackTrace();
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. }
    73. }
    74. }

    备注:这里省略了 accessId 和 accesskey 的配置,实际运行时请换上您自己的 accessId 以及 accessKey 。

  4. 配置 pom.xml 文件;

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.aliyun.odps.tunnel.example</groupId>
    7. <artifactId>UploadSample</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>com.aliyun.odps</groupId>
    12. <artifactId>odps-sdk-core</artifactId>
    13. <version>0.20.7-public</version>
    14. </dependency>
    15. </dependencies>
    16. <repositories>
    17. <repository>
    18. <id>alibaba</id>
    19. <name>alibaba Repository</name>
    20. <url>http://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
    21. </repository>
    22. </repositories>
    23. </project>
  5. 编译与运行;

    编译 UploadSample 工程:

    1. mvn package

    运行 UploadSample 程序,这里使用 eclipse 导入 maven project:

    • 右击 java 工程 并单击 Import->Maven->Existing Maven Projects 设置如下:

    • 右击 UploadSample.java 并单击 Run As->Run Configurations,如下所示:

    • 单击 Run 运行成功,控制台显示:
      1. Session Status is : NORMAL
      2. upload success!
  6. 查看运行结果;

    在客户端输入:

    1. select * from tunnel_sample_test;

    显示结果如下:

    1. +----+------+----+----+
    2. | id | name | pt | dt |
    3. +----+------+----+----+
    4. | sample | sample | 20150801 | hangzhou |
    5. | sample | sample | 20150801 | hangzhou |
    6. | sample | sample | 20150801 | hangzhou |
    7. | sample | sample | 20150801 | hangzhou |
    8. | sample | sample | 20150801 | hangzhou |
    9. | sample | sample | 20150801 | hangzhou |
    10. | sample | sample | 20150801 | hangzhou |
    11. | sample | sample | 20150801 | hangzhou |
    12. | sample | sample | 20150801 | hangzhou |
    13. | sample | sample | 20150801 | hangzhou |
    14. +----+------+----+----+

    备注:

    • Tunnel 作为 MaxCompute 中一个独立的服务,有专属的访问端口提供给大家 。当用户在阿里云内网环境中,使用 Tunnel 内网连接下载数据时,MaxCompute 不会将该操作产生的流量计入计费。此外内网地址仅对上海域的云产品有效。
    • MaxCompute 阿里云内网地址:http://odps-ext.aliyun-inc.com/api
    • MaxCompute 公网地址:http://service.odps.aliyun.com/api

其他方式导入

除了通过客户端及 Tunnel Java SDK 导入数据,阿里云数加数据集成、开源的Sqoop、Fluentd、Flume、LogStash等工具都可以进行数据导入到MaxCompute,具体介绍请参见:数据上传下载-工具介绍

本文导读目录
本文导读目录
以上内容是否对您有帮助?