Tunnel是MaxCompute提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载,仅提供每次批量大于等于64MB数据的场景,小批量流式数据场景请使用DataHub实时数据通道以获得更好的性能和体验。

SDK上传最佳实践

import java.io.IOException;
 import java.util.Date;
 import com.aliyun.odps.Column;
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.data.RecordWriter;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.tunnel.TunnelException;
 import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
 public class UploadSample {
         private static String accessId = "<your access id>";
         private static String accessKey = "<your access Key>";
         private static String odpsUrl = "http://service.odps.aliyun.com/api";
         private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
                         //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。此处给的是华东2经典网络Tunnel Endpoint,其他region可以参考文档《访问域名和数据中心》。
         private static String project = "<your project>";
         private static String table = "<your table name>";
         private static String partition = "<your partition spec>";
         public static void main(String args[]) {
                 // 准备工作,仅需做一次
                 Account account = new AliyunAccount(accessId, accessKey);
                 Odps odps = new Odps(account);
                 odps.setEndpoint(odpsUrl);
                 odps.setDefaultProject(project);
                 try {
                         TableTunnel tunnel = new TableTunnel(odps);
                         //tunnelUrl设置
                         tunnel.setEndpoint(tunnelUrl);
                         // 确定写入分区                     
                         PartitionSpec partitionSpec = new PartitionSpec(partition);
                         // 在服务端创建一个在本表本分区上有效期24小时的session,24小时内该session可以共计上传20000个Block数据 
                         // 创建Session的时耗为秒级,会在服务端使用部分资源、创建临时目录等,操作较重,因此强烈建议同一个分区数  
据尽可能复用Session上传。   
                         UploadSession uploadSession = tunnel.createUploadSession(project,
                                         table, partitionSpec);
                         System.out.println("Session Status is : "
                                         + uploadSession.getStatus().toString());
                         TableSchema schema = uploadSession.getSchema();
                          // 准备数据后打开Writer开始写入数据,准备数据后写入一个Block,每个Block仅能成功上传一次,不可重复上传,CloseWriter成功代表该Block上传完成,失败可以重新上传该Block,同一个Session下最多允许20000个BlockId,即0-19999,若超出请CommitSession并且再创建一个新Session使用,以此类推。
                          // 单个Block内写入数据过少会产生大量小文件 严重影响计算性能, 强烈建议每次写入64MB以上数据(100GB以内数据均可写入同一Block)
                          // 可通过数据的平均大小与记录数量大致计算总量即64MB < 平均记录大小*记录数 < 100GB
                          // maxBlockID服务端限制为20000,用户可以根据自己业务需求,每个Session使用一定数量的block例如100个,但是建议每个Session内使用block越多越好,因为创建Session是一个很重的操作 // 如果创建一个Session后仅仅上传少量数据,不仅会造成小文件、空目录等问题,还会严重影响上传整体性能(创建Session花费秒级,真正上传可能仅仅用了十几毫秒)
                          // 如果创建一个Session后仅仅上传少量数据,不仅会造成小文件、空目录等问题,还会严重影响上传整体性能(创建Session花费秒级,真正上传可能仅仅用了十几毫秒)
                          // 在该Block上创建一个Writer,writer创建后任意一段时间内,若某连续2分钟没有写入4KB以上数据,则会超时断开连接 
                          // 因此建议在创建writer前在内存中准备可以直接进行写入的数据
                         RecordWriter recordWriter = uploadSession.openRecordWriter(0);
                         Record record = uploadSession.newRecord();
                         for (int i = 0; i < schema.getColumns().size(); i++) {
                                 Column column = schema.getColumn(i);
                                 switch (column.getType()) {
                                 case BIGINT:
                                         record.setBigint(i, 1L);
                                         break;
                                 case BOOLEAN:
                                         record.setBoolean(i, true);
                                         break;
                                 case DATETIME:
                                         record.setDatetime(i, new Date());
                                         break;
                                 case DOUBLE:
                                         record.setDouble(i, 0.0);
                                         break;
                                 case STRING:
                                         record.setString(i, "sample");
                                         break;
                                 default:
                                         throw new RuntimeException("Unknown column type: "
                                                         + column.getType());
                                 }
                         }
                         for (int i = 0; i < 10; i++) {
                                  // Write数据至服务端,每写入8KB数据会进行一次网络传输
                                  // 若120s没有网络传输服务端将会关闭连接,届时该Writer将不可用,需要重新写入
                                 recordWriter.write(record);
                         }
                         recordWriter.close();
                         uploadSession.commit(new Long[]{0L});
                         System.out.println("upload success!");
                 } catch (TunnelException e) {
                         // 建议重试一定次数
                         e.printStackTrace();
                 } catch (IOException e) {
                         // 建议重试一定次数
                         e.printStackTrace();
                 }
         }
 }

构造器举例说明:

PartitionSpec(String spec):通过字符串构造此类对象。

参数说明:

spec:分区定义字符串,比如pt=’1’,ds=’2’。

因此程序中应该配置如下:
private static String partition = “pt=’XXX’,ds=’XXX’”;
说明 文中给出的是华东2经典网络Tunnel Endpoint,其他region的Tunnel Endpoint设置可以参考文档访问域名和数据中心

常见问题

以下内容总结了MaxCompute Tunnel应用过程中的常见问题。

  • MaxCompute Tunnel是什么?

    Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。目前Tunnel仅支持表(不包括视图View)数据的上传下载。

  • BlockId是否可以重复?

    同一个UploadSession里的blockId不能重复。也就是说,对于同一个UploadSession,用一个blockId打开RecordWriter,写入一批数据后,调用close,然后再commit完成后,写入成功后不可以重新再用该blockId打开另一个RecordWriter写入数据。 Block默认最多20000个,即0-19999。

  • Block大小是否存在限制?

    一个block大小上限 100GB,强烈建议大于64M的数据,每一个Block对应一个文件,小于64MB的文件统称为小文件,小文件过多将会影响使用性能。使用新版BufferedWriter可以更简单的进行上传功能避免小文件等问题Tunnel-SDK-BufferedWriter。

  • Session是否可以共享使用,存在生命周期吗?

    每个Session在服务端的生命周期为24小时,创建后24小时内均可使用,也可以跨进程/线程共享使用,但是必须保证同一个BlockId没有重复使用,分布式上传可以按照如下步骤:创建Session->数据量估算->分配Block(例如线程1使用0-100,线程2使用100-200)->准备数据->上传数据->Commit所有写入成功的Block。

  • Session创建后不使用是否对系统有消耗?

    每个Session在创建时会生成两个文件目录,如果大量创建而不使用,会导致临时目录增多,大量堆积时可能造成系统负担,请一定避免此类行为,尽量共享利用session。

  • 遇到Write/Read超时或IOException怎么处理?

    上传数据时,Writer每写入8KB数据会触发一次网络动作,如果120秒内没有网络动作,服务端将主动关闭连接,届时Writer将不可用,请重新打开一个新的Writer写入。

    建议使用Tunnel-SDK-BufferedWriter接口上传数据,该接口对用户屏蔽了blockId的细节,并且内部带有数据缓存区,会自动进行失败重试。

    下载数据时,Reader也有类似机制,若长时间没有网络IO会被断开连接,建议Read过程连续进行中间不穿插其他系统的接口。

  • MaxCompute Tunnel目前有哪些语言的SDK?

    MaxCompute Tunnel公共云目前提供Java版的SDK。

  • MaxCompute Tunnel 是否支持多个客户端同时上传同一张表?

    支持。

  • MaxCompute Tunnel适合批量上传还是流式上传?

    MaxCompute Tunnel用于批量上传,不适合流式上传,流式上传可以使用DataHub高速流式数据通道,毫秒级延时写入。

  • MaxCompute Tunnel上传数据时一定要先存在分区吗?

    是的,Tunnel不会自动创建分区。

  • Dship与MaxCompute Tunnel的关系?

    Dship是一个工具,通过MaxCompute Tunnel来上传下载。

  • Tunnel upload数据的行为是追加还是覆盖模式?

    追加的模式。

  • Tunnel路由功能是指?

    路由功能指的是Tunnel SDK通过设置MaxCompute获取Tunnel Endpoint的功能。因此,SDK可以只设置MaxCompute的Endpoint来正常工作。

  • 用MaxCompute Tunnel上传数据时,一个block的数据量大小多大比较合适?

    没有一个绝对最优的答案,要综合考虑网络情况,实时性要求,数据如何使用以及集群小文件等因素。一般,如果数量较大是持续上传的模式,可以在64M - 256M,如果是每天传一次的批量模式,可以设大一些到1G左右。

  • 使用MaxCompute Tunnel下载,总是提示timeout,怎么操作?

    一般是Endpoint错误,请检查Endpoint配置,简单的判断方法是通过telnet等方法检测网络连通性。

  • 通过MaxCompute Tunnel下载,抛出You have NO privilege ‘odps:Select‘ on {acs:odps:*:projects/XXX/tables/XXX}. project ‘XXX‘ is protected的异常,怎么操作?

    该project开启了数据保护功能,用户操作这是从一个项目的数据导向另一个项目,这需要该project的owner操作。

  • Tunnel上传抛出异常ErrorCode=FlowExceeded, ErrorMessage=Your flow quota is exceeded.**,怎么操作?
    Tunnel对请求的并发进行了控制,默认上传和下载的并发Quota为2000,任何相关的请求发出到结束过程中均会占用一个Quota单位。若出现类似错误,有如下几种建议的解决方案:
    1. sleep一下再重试。
    2. 将project的tunnel并发quota调大,需要联系管理员评估流量压力。
    3. 报告project owner调查谁占用了大量并发quota,控制一下。