MaxCompute支持第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通过SDK调用Storage API直接访问MaxCompute数据,本文为您介绍使用Java SDK访问MaxCompute的代码示例。
概述
使用Java SDK访问MaxCompute的主要接口如下。
主要接口 | 描述 |
用于创建一个MaxCompute读表会话。 | |
表示一个从MaxCompute表中读取数据的会话。 | |
用于读取数据会话包含的一个数据分片。 |
如果您使用Maven,可以从Maven库中搜索odps-sdk-table-api获取不同版本的Java SDK,相关配置信息如下。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>0.48.8-public</version>
</dependency>
MaxCompute提供了开放存储相关接口,详情请参见odps-sdk-table-api。
TableReadSessionBuilder
TableReadSessionBuilder接口用于创建一个MaxCompute读表会话,其中主要接口定义如下。更多详情,请参见Java-sdk-doc。
接口定义
public class TableReadSessionBuilder {
public TableReadSessionBuilder table(Table table);
public TableReadSessionBuilder identifier(TableIdentifier identifier);
public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);
public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);
public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);
public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);
public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);
public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);
public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);
public TableReadSessionBuilder withSettings(EnvironmentSettings settings);
public TableReadSessionBuilder withSessionId(String sessionId);
public TableBatchReadSession buildBatchReadSession();
}
接口说明
方法名称 | 说明 |
| 将传入的参数Table,定义为当前会话中的目标表。 |
| 将传入的参数TableIdentifier ,定义为当前会话中的目标表。 |
| 读取指定字段的数据,并确保返回的数据中的字段顺序与参数 说明 如果参数 |
| 读取指定表下指定分区的数据,适用于进行分区裁剪的场景。 说明 如果参数 |
| 读取指定的Bucket数据,仅对聚簇表生效,适用于进行Bucket裁剪场景。 说明 如果参数 |
| 切分表数据,其中SplitOptions对象参数定义如下:
使用示例
|
| 指定Arrow数据选项,
使用示例
|
| 指定谓词下推(Predicate Pushdown)选项,其中Predicate定义如下:
使用示例
|
| 指定运行环境信息,EnvironmentSettings 接口定义如下:
|
| 指定SessionID信息,用于重新加载已创建的会话。 |
| 创建或获取读表会话。若提供入参SessionID,则根据SessionID返回已创建的Session;若未提供入参,将创建一个新的读表会话。 说明 创建操作开销较大,当文件数很多时,耗时会比较长。 |
TableBatchReadSession
TableBatchReadSession接口表示一个从MaxCompute表中读取数据的会话,主要接口定义如下。
接口定义
public interface TableBatchReadSession {
String getId();
TableIdentifier getTableIdentifier();
SessionStatus getStatus();
DataSchema readSchema();
InputSplitAssigner getInputSplitAssigner() throws IOException;
SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;
SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;
}
接口说明
方法名称 | 说明 |
| 获取当前会话ID,读取会话ID的默认超时时长为:24小时(h)。 |
| 获取当前会话下的表名称。 |
| 获取当前会话状态,状态值如下:
|
| 获取当前会话的表结构信息,DataSchema定义如下:
|
| 获取当前会话的InputSplitAssigner。InputSplitAssigner接口定义了在当前读取会话中分配InputSplit实例的方法。每InputSplit代表一个数据分片,可由单个SplitReader处理。InputSplitAssigner定义如下:
|
| 构建
|
| 构建 |
SplitReader
介绍SplitReader接口,此接口用于读取表数据。
接口定义
public interface SplitReader<T> {
boolean hasNext() throws IOException;
T get();
Metrics currentMetricsValues();
void close() throws IOException;
}
接口说明
方法名称 | 说明 |
| 确认是否还有更多数据项可读。如果还有下一个数据项可以读取,则返回true;否则,返回false。 |
| 获取当前的数据项。调用此方法前应确保通过 |
| 获取SplitReader相关的指标。 |
| 读取结束后,关闭连接。 |
使用示例
配置连接MaxCompute服务的环境。
// 阿里云账号或RAM用户的AccessKey ID和AccessKey Secret // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户 // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); //访问MaxCompute使用的Quota名称 String quotaName = "<quotaName>"; //MaxCompute项目名称 String project = "<project>"; //创建Odps对象来连接MaxCompute服务 Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); //MaxCompute服务的连接地址,当前仅支持使用阿里云VPC网络 odps.setEndpoint(endpoint); Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build(); EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).build();
说明获取独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源的Quota名称的方式分别如下:
独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见计算资源-Quota管理。
开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储,开放存储资源名称默认为
pay-as-you-go
。
读表操作。
创建数据读取会话,读取MaxCompute数据。
//MaxCompute项目对应的表名称 String tableName = "<table.name>"; //创建表数据读取会话 TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings) .withSplitOptions(SplitOptions.newBuilder() .SplitByByteSize(256 * 1024L * 1024L) .withCrossPartition(false).build()) .requiredDataColumns(Arrays.asList("timestamp")) .requiredPartitionColumns(Arrays.asList("pt1")) .buildBatchReadSession();
说明在数据量较大、网络延迟或不稳定的情况下,可能会导致创建数据读取会话时间过长,从而自动切换到异步流程创建数据读取会话。
遍历每个切片中的MaxCompute数据,并使用Arrow读取器逐个读取每个切片中的数据并输出数据内容。
//遍历所有输入切片数据,并使用Arrow读取器逐个读取每个切片中的数据批次,最后输出每批数据的内容 InputSplitAssigner assigner = scan.getInputSplitAssigner(); for (InputSplit split : assigner.getAllSplits()) { SplitReader<VectorSchemaRoot> reader = scan.createArrowReader(split, ReaderOptions.newBuilder() .withSettings(settings) .withCompressionCodec(CompressionCodec.ZSTD) .withReuseBatch(true) .build()); int rowCount = 0; List<VectorSchemaRoot> batchList = new ArrayList<>(); while (reader.hasNext()) { VectorSchemaRoot data = reader.get(); rowCount += data.getRowCount(); System.out.println(data.contentToTSVString()); } reader.close(); }
相关文档
关于MaxCompute开放存储详情,请参见开放存储概述。