本文为您介绍如何通过Iceberg Java API访问DLF Iceberg REST Catalog,实现对Iceberg表的元数据管理和数据读写操作。
前提条件
运行环境为JDK 11及以上版本。
已获取阿里云AccessKey ID和AccessKey Secret。获取方法请参见创建AccessKey。
已创建DLF数据目录。
Maven依赖
在项目的pom.xml文件中添加以下仓库和依赖。
<repositories>
<!-- DLF Maven仓库 -->
<repository>
<id>dlf-mvn-repo</id>
<url>https://dlf-mvn-repo.oss-cn-shanghai.aliyuncs.com/mvn-repo/release/</url>
</repository>
<!-- jindo Maven仓库 -->
<repository>
<id>jindodata</id>
<url>https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.aliyun.jindodata</groupId>
<artifactId>jindo-core</artifactId>
<version>${jindo.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.jindodata</groupId>
<artifactId>jindo-sdk</artifactId>
<version>${jindo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>dlf-iceberg-plugin-1.10.1</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>使用示例
该示例演示了如何创建Catalog实例,并执行列出命名空间、列出表、加载表等常见操作。运行前请将代码中的AccessKey ID、AccessKey Secret和数据目录名称替换为您的实际值。
package com.aliyun;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlfIcebergCatalogDemo {
public static void main(String[] args) {
// 请替换为您的实际配置信息。
String region = "cn-hangzhou";
String warehouse = "dlf_test";
String namespaceName = "default"; // 替换为实际的命名空间名称。
String tableName = "iceberg_table"; // 替换为实际的表名称。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 创建并初始化 Catalog。
RESTCatalog catalog = createCatalog(region, warehouse, accessKeyId, accessKeySecret);
try {
// 加载指定的数据表并输出详细信息。
TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespaceName), tableName);
Table table = catalog.loadTable(tableId);
System.out.println("=== 表 [" + tableId + "] 详细信息 ===");
System.out.println("Location: " + table.location());
Schema schema = table.schema();
System.out.println("Schema:");
for (Types.NestedField column : schema.columns()) {
System.out.println(" " + column.name() + " : " + column.type());
}
System.out.println("Partition: " + table.spec());
System.out.println("Properties: " + table.properties());
table.snapshots().forEach(snapshot ->
System.out.println("Snapshot ID: " + snapshot.snapshotId()
+ ", Timestamp: " + snapshot.timestampMillis()));
} finally {
// 关闭 Catalog,释放资源。
try {
catalog.close();
} catch (IOException e) {
System.err.println("Failed to close catalog: " + e.getMessage());
}
}
}
/**
* 创建并初始化一个 DLF Iceberg REST Catalog 实例。
*
* @param region DLF 服务所在的地域 ID,例如 cn-hangzhou。
* @param warehouse DLF 数据目录名称。
* @param accessKeyId 阿里云账号的 AccessKey ID。
* @param accessKeySecret 阿里云账号的 AccessKey Secret。
* @return 已初始化的 RESTCatalog 实例。
*/
private static RESTCatalog createCatalog(String region,
String warehouse,
String accessKeyId,
String accessKeySecret) {
Map<String, String> props = new HashMap<>();
props.put("uri", "https://" + region + "-vpc.dlf.aliyuncs.com/iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.rest.DlfFileIO");
props.put("rest.auth.type", "sigv4");
props.put("rest.auth.sigv4.delegate-auth-type", "none");
props.put("rest.signing-region", region);
props.put("rest.access-key-id", accessKeyId);
props.put("rest.secret-access-key", accessKeySecret);
props.put("rest.signing-name", "DlfNext");
RESTCatalog catalog = new RESTCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize(warehouse, props);
return catalog;
}
}
重要
请勿将AccessKey ID和AccessKey Secret硬编码到代码中,建议通过环境变量进行管理。
uri中的地域需要与rest.signing-region保持一致。目前Iceberg REST仅支持通过VPC内网访问DLF服务,例如
https://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg。各地域的接入地址请参见服务接入点。
该文章对您有帮助吗?