本文为您介绍如何通过Java API接口方式进行Kudu表的相关操作。

前提条件

已创建集群,并且选择了Kudu服务,详情请参见创建集群

背景信息

Kudu表相关的操作如下所示:

创建表

通过createTable方法创建表对象,其中需要指定表的Schema和分区信息。

public static void createTable(String masterAddress, String tableName) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();

    List<ColumnSchema> columnSchemas = new ArrayList<>();
    columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("ID", Type.INT32).key(true).build());
    columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("score", Type.INT32).nullable(false).build());
    Schema tableSchema = new Schema(columnSchemas);

    CreateTableOptions options = new CreateTableOptions();
    List<String> partitionKey = new ArrayList<>();
    partitionKey.add("ID");
    options.addHashPartitions(partitionKey, 16);
    options.setNumReplicas(1);
    KuduTable personTable = client.createTable(tableName, tableSchema, options);
    System.out.println("Table " + personTable.getName());
}
说明 示例代码中,定义了一张表,包含ID和score两列。其中ID是INT32类型的主键字段,value是INT32类型的非主键字段且可以为空;同时该表在主键字段ID上做了16个HASH分区,表示数据会分成16个独立的tablet。

修改表

  • 通过alterTable方法修改表对象,增加名称为newCol的列。
    public static void alterTable(String masterAddress, String tableName) throws KuduException {
        System.out.println("Connecting to " + masterAddress);
        KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
        String newColumnName = "newCol";
    
        AlterTableOptions addCollumnOptions = new AlterTableOptions();
        addCollumnOptions.addColumn(newColumnName, Type.STRING, "");
    
        AlterTableResponse response = client.alterTable(tableName, addCollumnOptions);
        System.out.println("Add column " + response.getElapsedMillis());
    
        KuduTable table = client.openTable(tableName);
        List<ColumnSchema> columnSchemas = table.getSchema().getColumns();
        for (int i=0; i<columnSchemas.size(); ++i) {
            ColumnSchema columnSchema = columnSchemas.get(i);
            System.out.println("Column " + i + ":" + columnSchema.getName());;
        }
    }
                        
  • 通过alterTable方法修改表对象,删除名称为newCol的列。
    public static void alterTable(String masterAddress, String tableName) throws KuduException {
        System.out.println("Connecting to " + masterAddress);
        KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
        String newColumnName = "newCol";
    
        AlterTableOptions deleteColumnOption = new AlterTableOptions();
        deleteColumnOption.dropColumn(newColumnName);
        response = client.alterTable(tableName, deleteColumnOption);
        System.out.println("Delete column " + response.getElapsedMillis());
    
        table = client.openTable(tableName);
        columnSchemas = table.getSchema().getColumns();
        for (int i=0; i<columnSchemas.size(); ++i) {
            ColumnSchema columnSchema = columnSchemas.get(i);
            System.out.println("Column " + i + ":" + columnSchema.getName());;
        }
    }
                        

写数据

通过newSession方法循环插入多条记录到已创建好的Kudu表,并检查返回结果。

public static void insertRows(String masterAddress, String tableName, int numRows) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
    KuduTable table = client.openTable(tableName);
    KuduSession session = client.newSession();
    for (int i=0; i<numRows; ++i) {
        Upsert upsert = table.newUpsert();
        PartialRow row = upsert.getRow();
        row.addInt("ID", i+1);
        row.addInt("score", i+10);

        session.apply(upsert);
    }

    // Call session.close() to end the session and ensure the rows are
    // flushed and errors are returned.
    // You can also call session.flush() to do the same without ending the session.
    // When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended
    // for most workloads, you must check the pending errors as shown below, since
    // write operations are flushed to Kudu in background threads.
    session.close();
    if (session.countPendingErrors() != 0) {
        System.out.println("errors inserting rows");
        org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();
        org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();
        int numErrs = Math.min(errs.length, 5);
        System.out.println("there were errors inserting rows to Kudu");
        System.out.println("the first few errors follow:");
        for (int i = 0; i < numErrs; i++) {
            System.out.println(errs[i]);
        }
        if (roStatus.isOverflowed()) {
            System.out.println("error buffer overflowed: some errors were discarded");
        }
        throw new RuntimeException("error inserting rows to Kudu");
    }
    System.out.println("Inserted " + numRows + " rows");
}
说明 示例代码中,numRows是要写入的记录条数。

读数据

通过newScannerBuilder方法,依次读取Kudu表的所有记录。

public static void scanTable(String masterAddress, String tableName, int numRows) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
    KuduTable table = client.openTable(tableName);
    Schema schema = table.getSchema();

    // Scan with a predicate on the 'key' column, returning the 'value' and "added" columns.
    List<String> projectColumns = new ArrayList<>(2);
    projectColumns.add("ID");
    projectColumns.add("score");
    int lowerBound = 0;
    KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
            schema.getColumn("ID"),
            KuduPredicate.ComparisonOp.GREATER_EQUAL,
            lowerBound);
    int upperBound = numRows / 2;
    KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
            schema.getColumn("ID"),
            KuduPredicate.ComparisonOp.LESS,
            upperBound);
    KuduScanner scanner = client.newScannerBuilder(table)
            .setProjectedColumnNames(projectColumns)
            .addPredicate(lowerPred)
            .addPredicate(upperPred)
            .build();

    // Check the correct number of values and null values are returned, and
    // that the default value was set for the new column on each row.
    // Note: scanning a hash-partitioned table will not return results in primary key order.
    int resultCount = 0;
    int nullCount = 0;
    while (scanner.hasMoreRows()) {
        RowResultIterator results = scanner.nextRows();
        while (results.hasNext()) {
            RowResult result = results.next();
            resultCount++;
        }
    }
    int expectedResultCount = upperBound - lowerBound -1;
    if (resultCount != expectedResultCount) {
        throw new RuntimeException("scan error: expected " + expectedResultCount +
                " results but got " + resultCount + " results");
    }

    System.out.println("Scanned some rows and checked the results");
}

删除表

通过deleteTable方法,删除已存在的Kudu表。

public static void deleteTable(String masterAddress, String tableName) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();

    DeleteTableResponse response = client.deleteTable(tableName);
    System.out.println("Table delete " + response.getElapsedMillis());
}