MaxCompute提供多种数据导入导出方式,如下所示:
  • 直接在客户端使用Tunnel命令
  • 通过MaxCompute Studio工具可视化方式实现本地数据文件导入导出,详情请参见导入导出数据
  • 通过Tunnel提供的SDK自行编写Java工具。
  • 通过Flume及Fluentd插件方式导入。
  • 通过DataWorks对数据导入和导出,详情请参见数据集成概述

导出数据请参见Tunnel命令操作中Download的相关命令。

Tunnel命令导入数据

  1. 准备数据
    假设您已准备本地文件wc_example.txt,本地存放路径为D:\odps\odps\bin,内容如下:
    I LOVE CHINA!MY NAME IS MAGGIE.
    I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!
  2. 创建MaxCompute表
    您需要把上面的数据导入到MaxCompute的一张表中,所以需要创建MaxCompute表。
    CREATE TABLE wc_in (word string);
  3. 执行tunnel命令
    输入表创建成功后,可以在MaxCompute客户端输入Tunnel命令进行数据的导入,如下所示:
    tunnel upload D:\odps\odps\bin\wc_example.txt wc_in;
  4. 执行成功后,查看表wc_in的记录。

    说明
    • 有关Tunnel命令的更多详细介绍,例如如何将数据导入分区表等,请参见Tunnel操作
    • 当表中含有多个列时,可以通过-fd参数指定列分隔符。

MaxCompute Studio导入数据

在使用MaxCompute Studio导入数据前,您需要先安装MaxCompute Studio,并配置项目空间链接

  1. 准备数据。
    假设您已准备本地文件wc_example.txt,本地存放路径为D:\odps\odps\bin,内容如下:
    I LOVE CHINA!MY NAME IS MAGGIE.
    I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!
  2. 创建MaxCompute表。
    您需要把上面的数据导入到MaxCompute的一张表中,所以需要创建MaxCompute表。右键单击项目的 tables&views列表:

    执行成功,则建表成功。
  3. 上传数据文件。
    右键单击tables&views列表中新建的表wc_in
    说明
    如果新建表未显示,请单击刷新


Tunnel SDK

下文将通过场景示例,为您介绍如何利用Tunnel SDK上传数据。

场景描述

上传数据到MaxCompute,其中,项目空间为odps_public_dev,表名为tunnel_sample_test,分区为pt=20150801,dt=hangzhou。

操作步骤
  1. 创建表,添加分区,SQL语句如下所示:
    CREATE TABLE IF NOT EXISTS tunnel_sample_test( 
    id STRING, 
    name STRING)
    PARTITIONED BY (pt STRING, dt STRING); --创建表
    ALTER TABLE tunnel_sample_test 
    ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --添加分区
  2. 创建UploadSample的工程目录结构,如下所示:
    |---pom.xml
    |---src 
        |---main 
            |---java 
                |---com 
                    |---aliyun 
                        |---odps 
                            |---tunnel 
                                |---example 
                                    |---UploadSample.java
    目录说明:
    • pom.xml:maven工程文件。
    • UploadSample:Tunnel源文件。
  3. 编写UploadSample程序。代码如下所示:
    
    package com.aliyun.odps.tunnel.example;
    import java.io.IOException;
    import java.util.Date;
    
    import com.aliyun.odps.Column;
    import com.aliyun.odps.Odps;
    import com.aliyun.odps.PartitionSpec;
    import com.aliyun.odps.TableSchema;
    import com.aliyun.odps.account.Account;
    import com.aliyun.odps.account.AliyunAccount;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.data.RecordWriter;
    import com.aliyun.odps.tunnel.TableTunnel;
    import com.aliyun.odps.tunnel.TunnelException;
    import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
    
    public class UploadSample {
      private static String accessId = "####";
      private static String accessKey = "####";
      private static String tunnelUrl = "http://dt.odps.aliyun.com";
    
      private static String odpsUrl = "http://service.odps.aliyun.com/api";
    
      private static String project = "odps_public_dev";
      private static String table = "tunnel_sample_test";
      private static String partition = "pt=20150801,dt=hangzhou";
    
    public static void main(String args[]) {
      Account account = new AliyunAccount(accessId, accessKey);
      Odps odps = new Odps(account);
      odps.setEndpoint(odpsUrl);
      odps.setDefaultProject(project);
      try {
            TableTunnel tunnel = new TableTunnel(odps);
            tunnel.setEndpoint(tunnelUrl);
            PartitionSpec partitionSpec = new PartitionSpec(partition);
            UploadSession uploadSession = tunnel.createUploadSession(project,
            table, partitionSpec);
    
    
            System.out.println("Session Status is : "
            + uploadSession.getStatus().toString());
    
    
            TableSchema schema = uploadSession.getSchema();
            RecordWriter recordWriter = uploadSession.openRecordWriter(0);
            Record record = uploadSession.newRecord();
              for (int i = 0; i < schema.getColumns().size(); i++) {
                Column column = schema.getColumn(i);
                switch (column.getType()) {
                    case BIGINT:
                record.setBigint(i, 1L);
                break;
                case BOOLEAN:
                record.setBoolean(i, true);
                break;
                case DATETIME:
                record.setDatetime(i, new Date());
                break;
                case DOUBLE:
                record.setDouble(i, 0.0);
                break;
                case STRING:
                record.setString(i, "sample");
                break;
                default:
                throw new RuntimeException("Unknown column type: "
                + column.getType());
                }
              }
              for (int i = 0; i < 10; i++) {
                recordWriter.write(record);
              }
                recordWriter.close();
                uploadSession.commit(new Long[]{0L});
                System.out.println("upload success!");
    
          } catch (TunnelException e) {
              e.printStackTrace();
          } catch (IOException e) {
              e.printStackTrace();
        }
      }
    }
    说明
    此处省略了AccessKeyID和AccessKeySecret的配置,实际运行时请换上您自己的相关信息。
  4. 配置pom.xml文件。如下所示:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun.odps.tunnel.example</groupId>
    <artifactId>UploadSample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-sdk-core</artifactId>
        <version>0.20.7-public</version>
      </dependency>
    </dependencies>
    <repositories>
      <repository>
      <id>alibaba</id>
      <name>alibaba Repository</name>
      <url>http://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
      </repository>
    </repositories>
    </project>
    
    
  5. 编译与运行。
    编译UploadSample工程,如下所示:
    mvn package
    运行UploadSample程序,此处使用eclipse导入maven project。
    1. 右键单击java工程并导航至Import > Maven > Existing Maven Projects, 设置如下:

    2. 右键单击UploadSample.java 并单击Run As > Run Configurations,如下所示:

    3. 单击Run 运行成功,控制台显示如下:
      Session Status is : NORMAL
      upload success!
  6. 查看运行结果。
    您在客户端输入如下语句,即可查看运行结果。
    select * from tunnel_sample_test;
    显示结果如下:
    +----+------+----+----+
    | id | name | pt | dt |
    +----+------+----+----+
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    | sample | sample | 20150801 | hangzhou |
    +----+------+----+----+
    说明
    • Tunnel作为MaxCompute中一个独立的服务,有专属的访问端口提供给大家。当您在阿里云内网环境中,使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。此外内网地址仅对上海域的云产品有效。
    • MaxCompute阿里云内网和公网访问地址请参见访问域名和数据中心

其他导入方式

除了通过客户端及Tunnel Java SDK导入数据,阿里云数加数据集成、开源的Sqoop、Fluentd、Flume、LogStash 等工具都可以进行数据导入到MaxCompute,详情请参见数据上传下载-工具介绍