Tunnel SDK是MaxCompute提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载。
典型的表数据上传流程
创建表,如果是分区表先创建分区。
创建TableTunnel。
创建UploadSession。
创建RecordWriter,写入Record。
提交上传。
示例
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
public class UploadSample {
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String tunnelUrl = "https://dt.cn-shanghai-intranet.maxcompute.aliyun-inc.com";
//默认情况下,使用公网进行传输。如果需要通过内网进行数据传输,必须设置tunnelUrl变量。
//此处取值为华东2云产品互联网络Tunnel Endpoint。
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
// 该部分为准备工作,仅需执行一次。
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
// 设置tunnelUrl。
tunnel.setEndpoint(tunnelUrl);
// 确定写入分区。
PartitionSpec partitionSpec = new PartitionSpec(partition);
// 在服务端的本表本分区上创建一个有效期为24小时的Session。
// 24小时内,该Session共计可以上传20000个分区(Block)数据。
// 创建Session的耗时在秒级,且需要在服务端使用部分资源、创建临时目录等,需要消耗较多的资源。
// 建议同一个分区的数据尽可能通过复用Session的方式上传。
UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);
System.out.println("Session Status is : " + uploadSession.getStatus().toString());
TableSchema schema = uploadSession.getSchema();
// 数据准备完成后,打开Writer开始写入数据,将数据写入一个Block。
// 每个Block仅能成功上传一次,不可重复上传。CloseWriter执行成功即代表该Block上传成功,如果失败可重新上传该Block。
// 同一个Session最多可以包含20000个BlockId(即0-19999)。如果超出20000个需要执行Commit Session并重新创新一个新的Session。
// 单个Block内如果写入的数据过少将产生大量小文件,严重影响计算性能。强烈建议每次写入64 MB以上的数据(同一Block支持写入100 GB以内的数据)。
// 如果创建一个Session后仅上传少量数据,不仅会造成小文件、空目录等问题,还会严重影响上传性能(创建Session耗时数秒,而真正数据上传可能仅花费十几毫秒)。
// Writer创建后任意一段时间内,如果任意连续两分钟内没有写入4 KB以上的数据,将会超时并自动断开连接。
// 建议在创建Writer前,在内存中准备好可以直接写入的数据。
RecordWriter recordWriter = uploadSession.openRecordWriter(0);
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: " + column.getType());
}
}
for (int i = 0; i < 10; i++) {
// 数据写入服务端时,每写入8 KB的数据将进行一次网络传输。
// 如果120秒内无网络传输,服务端将自动关闭连接,该Writer将不可用,需要重新创建Writer执行数据写入。
recordWriter.write(record);
}
recordWriter.close();
uploadSession.commit(new Long[]{0L});
System.out.println("upload success!");
} catch (TunnelException e) {
// 建议重试一定次数。
e.printStackTrace();
} catch (IOException e) {
// 建议重试一定次数。
e.printStackTrace();
}
}
}
构造器举例说明:
PartitionSpec(String spec):通过字符串构造此类对象。
参数说明:
spec:分区定义字符串,例如pt=’1’,ds=’2’。
因此程序中应该配置如下:
private static String partition = “pt=’XXX’,ds=’XXX’”;
说明
文中使用的是华东2(上海)云产品互联网络Tunnel Endpoint,其他Region的Tunnel Endpoint设置可以参考文档Endpoint。
文档内容是否对您有帮助?