Java API访问DLF

更新时间:
复制为 MD 格式

本文为您介绍如何通过Iceberg Java API访问DLF Iceberg REST Catalog,实现对Iceberg表的元数据管理和数据读写操作。

前提条件

  • 运行环境为JDK 11及以上版本。

  • 已获取阿里云AccessKey IDAccessKey Secret。获取方法请参见创建AccessKey

  • 已创建DLF数据目录。

Maven依赖

在项目的pom.xml文件中添加以下仓库和依赖。

<repositories>
    <!-- DLF Maven仓库 -->
    <repository>
        <id>dlf-mvn-repo</id>
        <url>https://dlf-mvn-repo.oss-cn-shanghai.aliyuncs.com/mvn-repo/release/</url>
    </repository>
    <!-- jindo Maven仓库 -->
    <repository>
      <id>jindodata</id>
      <url>https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
      <groupId>com.aliyun.jindodata</groupId>
      <artifactId>jindo-core</artifactId>
      <version>${jindo.version}</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.jindodata</groupId>
      <artifactId>jindo-sdk</artifactId>
      <version>${jindo.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-aws-bundle</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.iceberg</groupId>
      <artifactId>iceberg-aws</artifactId>
      <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>dlf-iceberg-plugin-1.10.1</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.4</version>
    </dependency>
</dependencies>

使用示例

该示例演示了如何创建Catalog实例,并执行列出命名空间、列出表、加载表等常见操作。运行前请将代码中的AccessKey ID、AccessKey Secret和数据目录名称替换为您的实际值。

package com.aliyun;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DlfIcebergCatalogDemo {

    public static void main(String[] args) {
        // 请替换为您的实际配置信息。
        String region = "cn-hangzhou";
        String warehouse = "dlf_test";
        String namespaceName = "default";       // 替换为实际的命名空间名称。
        String tableName = "iceberg_table";               // 替换为实际的表名称。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        // 创建并初始化 Catalog。
        RESTCatalog catalog = createCatalog(region, warehouse, accessKeyId, accessKeySecret);

        try {
            // 加载指定的数据表并输出详细信息。
            TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespaceName), tableName);
            Table table = catalog.loadTable(tableId);

            System.out.println("=== 表 [" + tableId + "] 详细信息 ===");
            System.out.println("Location: " + table.location());

            Schema schema = table.schema();
            System.out.println("Schema:");
            for (Types.NestedField column : schema.columns()) {
                System.out.println("  " + column.name() + " : " + column.type());
            }

            System.out.println("Partition: " + table.spec());
            System.out.println("Properties: " + table.properties());

            table.snapshots().forEach(snapshot ->
                    System.out.println("Snapshot ID: " + snapshot.snapshotId()
                            + ", Timestamp: " + snapshot.timestampMillis()));
        } finally {
            // 关闭 Catalog,释放资源。
            try {
                catalog.close();
            } catch (IOException e) {
                System.err.println("Failed to close catalog: " + e.getMessage());
            }
        }
    }

    /**
     * 创建并初始化一个 DLF Iceberg REST Catalog 实例。
     *
     * @param region         DLF 服务所在的地域 ID,例如 cn-hangzhou。
     * @param warehouse      DLF 数据目录名称。
     * @param accessKeyId    阿里云账号的 AccessKey ID。
     * @param accessKeySecret 阿里云账号的 AccessKey Secret。
     * @return 已初始化的 RESTCatalog 实例。
     */
    private static RESTCatalog createCatalog(String region,
                                             String warehouse,
                                             String accessKeyId,
                                             String accessKeySecret) {
        Map<String, String> props = new HashMap<>();
        props.put("uri", "https://" + region + "-vpc.dlf.aliyuncs.com/iceberg");
        props.put("warehouse", warehouse);
        props.put("io-impl", "org.apache.iceberg.rest.DlfFileIO");
        props.put("rest.auth.type", "sigv4");
        props.put("rest.auth.sigv4.delegate-auth-type", "none");
        props.put("rest.signing-region", region);
        props.put("rest.access-key-id", accessKeyId);
        props.put("rest.secret-access-key", accessKeySecret);
        props.put("rest.signing-name", "DlfNext");

        RESTCatalog catalog = new RESTCatalog();
        Configuration conf = new Configuration();
        catalog.setConf(conf);
        catalog.initialize(warehouse, props);
        return catalog;
    }
}
重要
  • 请勿将AccessKey IDAccessKey Secret硬编码到代码中,建议通过环境变量进行管理。

  • uri中的地域需要与rest.signing-region保持一致。

  • 目前Iceberg REST仅支持通过VPC内网访问DLF服务,例如https://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg。各地域的接入地址请参见服务接入点