本文为您介绍流式数据通道SDK接口的使用方法。
使用说明
您可以基于MaxCompute Studio通过Java SDK使用MaxCompute流式数据通道服务。
您可以使用以下配置在MaxCompute Studio上添加指定版本的pom依赖。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.36.4-public</version>
</dependency>
接口介绍
不同版本的SDK在使用上有差别,详情请参见SDK Java Doc。
接口 | 描述 |
TableTunnel | 访问MaxCompute Tunnel服务的入口类。您可以通过外网或阿里云内网环境对MaxCompute及其Tunnel进行访问。 |
TableTunnel.StreamUploadSession | 上传数据会话。 |
TableTunnel.StreamRecordPack | 数据缓存会话。 |
StreamUploadSession
public interface StreamUploadSession {
/**
* 设置P2P模式。
* @param mode
*/
public void setP2pMode(boolean mode);
/**
* 获取Session ID。
* @return Session ID
*/
public String getId();
/**
* 获取表结构。
*/
public TableSchema getSchema();
/**
* 创建一个无压缩{@Link StreamRecordPack}对象。
* @return StreamRecordPack对象。
*/
public StreamRecordPack newRecordPack() throws IOException;
/**
* 创建一个压缩{@Link StreamRecordPack}对象。
* @param compressOption 数据传输压缩选项。
* @return StreamRecordPack对象。
*/
public StreamRecordPack newRecordPack(CompressOption compressOption) throws IOException, TunnelException;
/**
* 创建一个{@Link Record}对象。
* @return Record对象。
*/
public Record newRecord();
}
StreamRecordPack
public interface StreamRecordPack {
/**
* 追加一条记录。
* @param record
*/
public void append(Record record) throws IOException;
/**
* @return 返回当前pack存储的记录数。
*/
public long getRecordCount();
/**
* 注意:由于在写到内存缓冲区前,数据会经过多层缓冲区。
* 因此这个值的变化并不是连续的,有可能出现追加数据后,getDataSize不变的场景。
* @return 返回当前pack存储数据的大小。
*/
public long getDataSize();
/**
* 数据发送到服务端。
* pack对象在flush成功以后可以复用。
* @return traceId。
* @throws IOException。
*/
public String flush() throws IOException;
/**
* 数据发送到服务端。
* pack对象在flush成功以后可以复用。
* @param flushOption 设置write参数 {@link FlushOption}。
* @return flush result。
* @throws IOException。
*/
public FlushResult flush(FlushOption flushOption) throws IOException;
}
使用建议
在使用上述接口时,您可以借鉴如下实践结果实现性能最大化:
未开启压缩场景下,控制每个RecordPack的数据量超过4 MB可以达到最佳性能。
开启压缩场景下,控制每个RecordPack的数据量超过1 MB可以达到最佳性能。
文档内容是否对您有帮助?