本文为您介绍流式数据通道SDK接口的使用方法。

使用说明

您可以基于MaxCompute Studio通过Java SDK使用MaxCompute流式数据通道服务。

您可以使用以下配置在MaxCompute Studio上添加指定版本的pom依赖。
<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>odps-sdk-core</artifactId>
  <version>0.36.4-public</version>
</dependency>

接口介绍

不同版本的SDK在使用上有差别,详情请参见SDK Java Doc

接口 描述
TableTunnel 访问MaxCompute Tunnel服务的入口类。您可以通过外网或阿里云内网环境对MaxCompute及其Tunnel进行访问。
TableTunnel.StreamUploadSession 上传数据会话。
TableTunnel.StreamRecordPack 数据缓存会话。

StreamUploadSession

public interface StreamUploadSession {

    /**
     * 设置P2P模式。
     * @param mode
     */
    public void setP2pMode(boolean mode);

    /**
     * 获取Session ID。
     * @return Session ID
     */
    public String getId();

    /**
     * 获取表结构。
     */
    public TableSchema getSchema();

    /**
     * 创建一个无压缩{@Link StreamRecordPack}对象。
     * @return StreamRecordPack对象。
     */
    public StreamRecordPack newRecordPack() throws IOException;

    /**
     * 创建一个压缩{@Link StreamRecordPack}对象。
     * @param compressOption 数据传输压缩选项。
     * @return StreamRecordPack对象。
     */
    public StreamRecordPack newRecordPack(CompressOption compressOption) throws IOException, TunnelException;

    /**
     * 创建一个{@Link Record}对象。
     * @return Record对象。
     */
    public Record newRecord();

  }

StreamRecordPack

public interface StreamRecordPack {

    /**
     * 追加一条记录。
     * @param record
     */
    public void append(Record record) throws IOException;

    /**
     * @return 返回当前pack存储的记录数。
     */
    public long getRecordCount();

    /**
     * 注意:由于在写到内存缓冲区前,数据会经过多层缓冲区。
     * 因此这个值的变化并不是连续的,有可能出现最佳数据后,getDataSize不变的场景。
     * @return 返回当前pack存储数据的大小。
     */
    public long getDataSize();

    /**
     * 数据发送到服务端。
     * pack对象在flush成功以后可以复用。
     * @return traceId。
     * @throws IOException。
       */
    public String flush() throws IOException;

    /**
     * 数据发送到服务端。
     * pack对象在flush成功以后可以复用。
     * @param flushOption 设置write参数 {@link FlushOption}。
     * @return flush result。
     * @throws IOException。
     */
    public FlushResult flush(FlushOption flushOption) throws IOException;
  }

使用建议

在使用上述接口时,您可以借鉴如下实践结果实现性能最大化:
  • 未开启压缩场景下,控制每个RecordPack的数据量超过4 MB可以达到最佳性能。
  • 开启压缩场景下,控制每个RecordPack的数据量超过1 MB可以达到最佳性能。