开放存储SDK示例-Java SDK

MaxCompute支持第三方引擎(如Spark on EMR、StarRocks、Presto、PAIHologres)通过SDK调用Storage API直接访问MaxCompute数据,本文为您介绍使用Java SDK访问MaxCompute的代码示例。

概述

使用Java SDK访问MaxCompute的主要接口如下。

主要接口

描述

TableReadSessionBuilder

用于创建一个MaxCompute读表会话。

TableBatchReadSession

表示一个从MaxCompute表中读取数据的会话。

SplitReader

用于读取数据会话包含的一个数据分片。

如果您使用Maven,可以从Maven中搜索odps-sdk-table-api获取不同版本的Java SDK,相关配置信息如下。

<dependency>
	<groupId>com.aliyun.odps</groupId>
	<artifactId>odps-sdk-table-api</artifactId>
	<version>0.48.8-public</version>
</dependency>

MaxCompute提供了开放存储相关接口,详情请参见odps-sdk-table-api

TableReadSessionBuilder

TableReadSessionBuilder接口用于创建一个MaxCompute读表会话,其中主要接口定义如下。更多详情,请参见Java-sdk-doc

接口定义

public class TableReadSessionBuilder {

    public TableReadSessionBuilder table(Table table);

    public TableReadSessionBuilder identifier(TableIdentifier identifier);

    public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);

    public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);

    public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);

    public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);

    public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);

    public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);

    public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);

    public TableReadSessionBuilder withSettings(EnvironmentSettings settings);

    public TableReadSessionBuilder withSessionId(String sessionId);

    public TableBatchReadSession buildBatchReadSession();
}

接口说明

方法名称

说明

table(Table table)

将传入的参数Table,定义为当前会话中的目标表。

identifier(TableIdentifier identifier)

将传入的参数TableIdentifier ,定义为当前会话中的目标表。

requiredDataColumns(List<String> requiredDataColumns)

读取指定字段的数据,并确保返回的数据中的字段顺序与参数requiredDataColumns指定的字段顺序一致,适用于数据字段裁剪的场景。

说明

如果参数requiredDataColumns为空,则返回所有数据。

requiredDataColumns(List<String> requiredDataColumns)requiredDataColumns(List<String> requiredDataColumns)

读取指定表下指定分区的数据,适用于进行分区裁剪的场景。

说明

如果参数requiredPartitions为空,则会返回所有分区数据。

requiredBucketIds(List<Integer> requiredBucketIds)

读取指定的Bucket数据,仅对聚簇表生效,适用于进行Bucket裁剪场景。

说明

如果参数requiredBucketIds为空,则会返回所有Bucket数据。

withSplitOptions(SplitOptions splitOptions)

切分表数据,其中SplitOptions对象参数定义如下:

public class SplitOptions {

    public static SplitOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

      public SplitOptions.Builder SplitByByteSize(long splitByteSize);
  
      public SplitOptions.Builder SplitByRowOffset();
  
      public SplitOptions.Builder withCrossPartition(boolean crossPartition);
  
      public SplitOptions.Builder withMaxFileNum(int splitMaxFileNum);
  
      public SplitOptions build();
    }
}
  • SplitByByteSize(long splitByteSize):按照指定的参数splitByteSize切分数据,服务端返回的单个数据分片大小不超过splitByteSize(单位为:byte)。

    说明
    • 若您未使用SplitByByteSize(long splitByteSize)自定义切分数据值,系统将默认按照256*1024*1024(256 MB)进行切分数据。

    • 您自定义切分数据值时不能小于10*1024*1024(10 MB)。

  • SplitByRowOffset():按照行进行切分数据,允许客户端从指定的行索引读取数据。

  • withCrossPartition(boolean crossPartition):是否允许单个数据分片包含多个数据分区。crossPartition参数取值如下:

    • true(默认值):允许单个数据分片包含多个数据分区。

    • false:不允许。

  • withMaxFileNum(int splitMaxFileNum):在表文件数量较多的情况下,可以通过指定单个数据分片中最大包含的物理文件数量来产生更多的数据分片。

    说明

    默认不限制单个数据分片中包含的物理文件数量。

  • build():创建SplitOptions对象。

使用示例

// 1. 按照SplitSize切分数据,设置SplitSize256MB

SplitOptions splitOptionsByteSize = 
      SplitOptions.newBuilder().SplitByByteSize(256 * 1024L * 1024L).build()

// 2. 按照RowOffset切分数据

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().build()

//3. 指定单个Split包含的最大文件数为1

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().withMaxFileNum(1).build()

withArrowOptions(ArrowOptions arrowOptions)

指定Arrow数据选项,ArrowOptions定义如下:

public class ArrowOptions {
    
    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withTimestampUnit(TimestampUnit unit);

        public Builder withDatetimeUnit(TimestampUnit unit);

        public ArrowOptions build();
    }

    public enum TimestampUnit {
        SECOND,
        MILLI,
        MICRO,
        NANO;
    }
}
  • TimestampUnit:用于指定TimestampDatetime数据类型的单位,取值如下:

    • SECOND:秒(s)。

    • MILLI:毫秒(ms)。

    • MICRO:微秒(μs)。

    • NANO:纳秒(ns)。

  • withTimestampUnit(TimestampUnit unit):指定Timestamp数据类型的单位,默认单位为NANO。

  • withDatetimeUnit(TimestampUnit unit):指定Datetime数据类型的单位,默认单位为MILLI。

使用示例

ArrowOptions options = ArrowOptions.newBuilder()
          .withDatetimeUnit(ArrowOptions.TimestampUnit.MILLI)
          .withTimestampUnit(ArrowOptions.TimestampUnit.NANO)
          .build()

withFilterPredicate(Predicate filterPredicate)

指定谓词下推(Predicate Pushdown)选项,其中Predicate定义如下:

// 1. 二元运算

public class BinaryPredicate extends Predicate {

  public enum Operator {
    /**
     * 二元运算操作符
     */
    EQUALS("="),
    NOT_EQUALS("!="),
    GREATER_THAN(">"),
    LESS_THAN("<"),
    GREATER_THAN_OR_EQUAL(">="),
    LESS_THAN_OR_EQUAL("<=");
   
  }

  public BinaryPredicate(Operator operator, Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate equals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate notEquals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate lessThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThanOrEqual(Serializable leftOperand,
                                                   Serializable rightOperand);

  public static BinaryPredicate lessThanOrEqual(Serializable leftOperand,
                                                Serializable rightOperand);
}

// 2. 一元运算
public class UnaryPredicate extends Predicate {

  public enum Operator {
    /**
     * 一元运算操作符
     */
    IS_NULL("is null"),
    NOT_NULL("is not null");
  }

  public static UnaryPredicate isNull(Serializable operand);
  public static UnaryPredicate notNull(Serializable operand);
}

### 3. IN and NOT IN
public class InPredicate extends Predicate {

  public enum Operator {
    /**
     * IN and NOT IN operators for set membership check
     */
    IN("in"),
    NOT_IN("not in");
  }

  public InPredicate(Operator operator, Serializable operand, List<Serializable> set);

  public static InPredicate in(Serializable operand, List<Serializable> set);

  public static InPredicate notIn(Serializable operand, List<Serializable> set);
}

// 4. 列名
public class Attribute extends Predicate {

  public Attribute(Object value);
    
  public static Attribute of(Object value);
}

// 5. 常量
public class Constant extends Predicate {

  public Constant(Object value);

  public static Constant of(Object value);
}

// 6. 组合运算
public class CompoundPredicate extends Predicate {

  public enum Operator {
    /**
     * 复合谓词运算符
     */
    AND("and"),
    OR("or"),
    NOT("not");
  }

  public CompoundPredicate(Operator logicalOperator, List<Predicate> predicates);

  public static CompoundPredicate and(Predicate... predicates);
  
  public static CompoundPredicate or(Predicate... predicates);

  public static CompoundPredicate not(Predicate predicates);

  public void addPredicate(Predicate predicate);
}

使用示例

// 1. c1 > 20000 and c2 < 100000
BinaryPredicate c1 = new BinaryPredicate(BinaryPredicate.Operator.GREATER_THAN, Attribute.of("c1"), Constant.of(20000));
BinaryPredicate c2 = new BinaryPredicate(BinaryPredicate.Operator.LESS_THAN, Attribute.of("c2"), Constant.of(100000));
CompoundPredicate predicate =
        new CompoundPredicate(CompoundPredicate.Operator.AND, ImmutableList.of(c1, c2));

// 2. c1 is not null
Predicate predicate = new UnaryPredicate(UnaryPredicate.Operator.NOT_NULL,  Attribute.of("c1"));

  
// 3. c1 in (1, 10001)
Predicate predicate =
        new InPredicate(InPredicate.Operator.IN,  Attribute.of("c1"), ImmutableList.of(Constant.of(1), Constant.of(10001)));

withSettings(EnvironmentSettings settings)

指定运行环境信息,EnvironmentSettings 接口定义如下:

public class EnvironmentSettings {

    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withDefaultProject(String projectName);

        public Builder withDefaultSchema(String schema);

        public Builder withServiceEndpoint(String endPoint);

        public Builder withTunnelEndpoint(String tunnelEndPoint);

        public Builder withQuotaName(String quotaName);

        public Builder withCredentials(Credentials credentials);

        public Builder withRestOptions(RestOptions restOptions);

        public EnvironmentSettings build();
    }
}
  • withDefaultProject(String projectName):设置当前的项目名称。

    说明

    projectName参数为MaxCompute项目名称,您可以登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>项目管理,查看具体的MaxCompute项目名称。

  • withDefaultSchema(String schema):设置为当前默认的schema。

    说明

    schema参数为MaxCompute Schema名称,关于Schema详情,请参见Schema操作

  • withServiceEndpoint(String endPoint):设置当前服务链接地址Endpoint。

    说明

    各地域的Endpoint信息,请参见Endpoint

  • withTunnelEndpoint(String tunnelEndPoint):设置当前服务链接地址TunnelEndpoint。

    说明

    各地域的TunnelEndpoint信息,请参见Endpoint

  • withQuotaName(String quotaName):指定当前使用的Quota名称。

    访问MaxCompute支持独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源,获取Quota名称的方式分别如下:

    • 独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见计算资源-Quota管理

    • 开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储,开放存储资源名称默认为pay-as-you-go

  • withCredentials(Credentials credentials):指定当前认证信息,其中Credentials定义如下:

    public class Credentials {
    
        public static Builder newBuilder() {
            return new Builder();
        }
    
        public static class Builder {
    
            public Builder withAccount(Account account);
    
            public Builder withAppAccount(AppAccount appAccount);
    
            public Builder withAppStsAccount(AppStsAccount appStsAccount);
    
            public Credentials build();
        }
    
    }
    • withAccount(Account account):指定Odps Account对象。

    • withAppAccount(AppAccount appAccount):指定Odps appAccount对象。

    • withAppStsAccount(AppStsAccount appStsAccount):指定Odps appStsAccount对象。

    • withRestOptions(RestOptions restOptions):指定当前访问网络的配置,RestOptions定义如下:

      public class RestOptions implements Serializable {
      
          public static Builder newBuilder() {
              return new RestOptions.Builder();
          }
      
          public static class Builder {
              public Builder witUserAgent(String userAgent);
              public Builder withConnectTimeout(int connectTimeout);
              public Builder withReadTimeout(int readTimeout);
              public RestOptions build();
          }
      }
      • witUserAgent(String userAgent):指定当前userAgent信息。

      • withConnectTimeout(int connectTimeout):指定当前底层网络建立超时时间,默认为10秒(s)。

      • withReadTimeout(int readTimeout):指定当前底层网络连接超时时间,120秒(s)。

withSessionId(String sessionId)

指定SessionID信息,用于重新加载已创建的会话。

buildBatchReadSession()

创建或获取读表会话。若提供入参SessionID,则根据SessionID返回已创建的Session;若未提供入参,将创建一个新的读表会话。

说明

创建操作开销较大,当文件数很多时,耗时会比较长。

TableBatchReadSession

TableBatchReadSession接口表示一个从MaxCompute表中读取数据的会话,主要接口定义如下。

接口定义

public interface TableBatchReadSession {

    String getId();

    TableIdentifier getTableIdentifier();

    SessionStatus getStatus();

    DataSchema readSchema();
    
    InputSplitAssigner getInputSplitAssigner() throws IOException;

    SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;

    SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;    

}

接口说明

方法名称

说明

String getId()

获取当前会话ID,读取会话ID的默认超时时长为:24小时(h)。

getTableIdentifier()

获取当前会话下的表名称。

getStatus()

获取当前会话状态,状态值如下:

  • INIT:创建一个会话时设置的初始值。

  • NORMAL:创建会话成功。

  • CRITICAL:创建会话失败。

  • EXPIRED:会话超时。

readSchema()

获取当前会话的表结构信息,DataSchema定义如下:

public class DataSchema implements Serializable {
    
    List<Column> getColumns();

    List<String> getPartitionKeys();

    List<String> getColumnNames();

    List<TypeInfo> getColumnDataTypes();

    Optional<Column> getColumn(int columnIndex);

    Optional<Column> getColumn(String columnName);

}
  • getColumns():获取要读取的表和分区的Column信息。

  • getPartitionKeys():获取要读取的分区Column名称。

  • getColumnNames():获取要读取的表和分区的Column名称。

  • getColumnDataTypes():获取要读取的表和分区的Column信息。

  • getColumn(int columnIndex):根据索引获取Column对象,若索引超出当前Column的范围,则返回为空。

  • getColumn(String columnName):根据参数columnName获取Column对象,若当前表的Column不包含columnName,则返回为空。

getInputSplitAssigner()

获取当前会话的InputSplitAssigner。InputSplitAssigner接口定义了在当前读取会话中分配InputSplit实例的方法。每InputSplit代表一个数据分片,可由单个SplitReader处理。InputSplitAssigner定义如下:

public interface InputSplitAssigner {

    int getSplitsCount();

    long getTotalRowCount();

    InputSplit getSplit(int index);

    InputSplit getSplitByRowOffset(long startIndex, long numRecord);
}
  • getSplitsCount():获取会话包含的数据分片数量。

    说明

    SplitOptionsSplitByByteSize时,该接口返回值大于等于0。

  • getTotalRowCount():获取会话包含的数据行数。

    说明

    SplitOptionsSplitByByteSize时,该接口返回值大于等于0

  • getSplit(int index):根据指定的数据分片参数Index获取对应的InputSplit,参数index取值范围为:[0,SplitsCount-1]

  • getSplitByRowOffset(long startIndex, long numRecord):获取对应的InputSplit。参数说明如下:

    • startIndex:指定InputSplit读取的数据行起始索引,取值范围为[0,RecordCount-1]

    • numRecord:指定InputSplit读取的数据行数。

// 1. 若SplitOptionsSplitByByteSize

TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
int splitCount = assigner.getSplitsCount();
for (int k = 0; k < splitCount; k++) {
    InputSplit split = assigner.getSplit(k);
    ...
}

// 2. 若SplitOptionsSplitByRowOffset
TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
long rowCount = assigner.getTotalRowCount();
long recordsPerSplit = 10000;
for (long offset = 0; offset < numRecords; offset += recordsPerSplit) {
    recordsPerSplit = Math.min(recordsPerSplit, numRecords - offset);
    InputSplit split = assigner.getSplitByRowOffset(offset, recordsPerSplit);
    ...
}

createRecordReader(InputSplit split, ReaderOptions options)

构建SplitReader<ArrayRecord> 对象。其中ReaderOptions定义如下:

public class ReaderOptions {
    
    public static ReaderOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withMaxBatchRowCount(int maxBatchRowCount);

        public Builder withMaxBatchRawSize(long batchRawSize);

        public Builder withCompressionCodec(CompressionCodec codec);

        public Builder withBufferAllocator(BufferAllocator allocator);

        public Builder withReuseBatch(boolean reuseBatch);

        public Builder withSettings(EnvironmentSettings settings);

        public ReaderOptions build();
    }

}
  • withMaxBatchRowCount(int maxBatchRowCount):指定服务端返回的每批次数据中最大行数。参数maxBatchRowCount默认最大值为4096。

  • withMaxBatchRawSize(long batchRawSize):指定服务端返回的每个批次数据包含的最大原始字节数。

  • withCompressionCodec(CompressionCodec codec):指定数据压缩类型,目前只支持ZSTDLZ4_FRAME压缩类型。

    说明
    • 在直接传输大量未加压缩的Arrow数据时,由于网络带宽的限制,可能会显著增加数据传输时间。

    • 若未指定压缩类型,默认不进行数据压缩。

  • withBufferAllocator(BufferAllocator allocator):指定读取Arrow数据的内存分配器。

  • withReuseBatch(boolean reuseBatch):指定ArrowBatch内存是否可复用。reuseBatch参数取值如下:

    • true(默认值):ArrowBatch内存可复用。

    • false:ArrowBatch内存不可复用。

  • withSettings(EnvironmentSettings settings):指定运行环境信息。

createArrowReader(InputSplit split, ReaderOptions options)

构建SplitReader<VectorSchemaRoot>对象。

SplitReader

介绍SplitReader接口,此接口用于读取表数据。

接口定义

public interface SplitReader<T> {

    boolean hasNext() throws IOException;

    T get();

    Metrics currentMetricsValues();

    void close() throws IOException;
}

接口说明

方法名称

说明

hasNext()

确认是否还有更多数据项可读。如果还有下一个数据项可以读取,则返回true;否则,返回false。

get()

获取当前的数据项。调用此方法前应确保通过hasNext()方法确认有下一个元素。

currentMetricsValues()

获取SplitReader相关的指标。

close()

读取结束后,关闭连接。

使用示例

  1. 配置连接MaxCompute服务的环境

    // 阿里云账号或RAM用户的AccessKey IDAccessKey Secret
    // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
    // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
    // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
    private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    //访问MaxCompute使用的Quota名称
    String quotaName = "<quotaName>";
    //MaxCompute项目名称
    String project = "<project>";
    //创建Odps对象来连接MaxCompute服务
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setDefaultProject(project);
    //MaxCompute服务的连接地址,当前仅支持使用阿里云VPC网络
    odps.setEndpoint(endpoint);
    Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build();
    EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();
    说明

    获取独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源的Quota名称的方式分别如下:

    • 独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见计算资源-Quota管理

    • 开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储,开放存储资源名称默认为pay-as-you-go

  2. 读表操作。

    1. 创建数据读取会话,读取MaxCompute数据。

      //MaxCompute项目对应的表名称
      String tableName = "<table.name>";
      //创建表数据读取会话
      TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
      TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings)
              .withSplitOptions(SplitOptions.newBuilder()
                      .SplitByByteSize(256 * 1024L * 1024L)
                      .withCrossPartition(false).build())
              .requiredDataColumns(Arrays.asList("timestamp"))
              .requiredPartitionColumns(Arrays.asList("pt1"))
              .buildBatchReadSession();
      说明

      在数据量较大、网络延迟或不稳定的情况下,可能会导致创建数据读取会话时间过长,从而自动切换到异步流程创建数据读取会话。

    2. 遍历每个切片中的MaxCompute数据,并使用Arrow读取器逐个读取每个切片中的数据并输出数据内容。

      //遍历所有输入切片数据,并使用Arrow读取器逐个读取每个切片中的数据批次,最后输出每批数据的内容
      InputSplitAssigner assigner = scan.getInputSplitAssigner();
      for (InputSplit split : assigner.getAllSplits()) {
          SplitReader<VectorSchemaRoot> reader =
                  scan.createArrowReader(split, ReaderOptions.newBuilder()
                          .withSettings(settings)
                          .withCompressionCodec(CompressionCodec.ZSTD)
                          .withReuseBatch(true)
                          .build());
      
          int rowCount = 0;
          List<VectorSchemaRoot> batchList = new ArrayList<>();
          while (reader.hasNext()) {
              VectorSchemaRoot data = reader.get();
              rowCount += data.getRowCount();
              System.out.println(data.contentToTSVString());
          }
          reader.close();
      }

相关文档

关于MaxCompute开放存储详情,请参见开放存储概述