本文为您介绍如何通过Java API接口方式进行Kudu表的相关操作。
前提条件
已创建集群,并且选择了Kudu服务,详情请参见创建集群。
背景信息
创建表
通过createTable方法创建表对象,其中需要指定表的schema和分区信息。
public static void createTable(String masterAddress, String tableName) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
List<ColumnSchema> columnSchemas = new ArrayList<>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("ID", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("score", Type.INT32).nullable(false).build());
Schema tableSchema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
List<String> partitionKey = new ArrayList<>();
partitionKey.add("ID");
options.addHashPartitions(partitionKey, 16);
options.setNumReplicas(1);
KuduTable personTable = client.createTable(tableName, tableSchema, options);
System.out.println("Table " + personTable.getName());
}
说明 示例代码中,定义了一张表,包含ID和score两列。其中ID是INT32类型的主键字段,score是INT32类型的非主键字段且可以为空;同时该表在主键字段ID上做了16个hash分区,表示数据会分成16个独立的tablet。
修改表
- 通过alterTable方法修改表对象,增加名称为newCol的列。
public static void alterTable(String masterAddress, String tableName) throws KuduException { System.out.println("Connecting to " + masterAddress); KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build(); String newColumnName = "newCol"; AlterTableOptions addCollumnOptions = new AlterTableOptions(); addCollumnOptions.addColumn(newColumnName, Type.STRING, ""); AlterTableResponse response = client.alterTable(tableName, addCollumnOptions); System.out.println("Add column " + response.getElapsedMillis()); KuduTable table = client.openTable(tableName); List<ColumnSchema> columnSchemas = table.getSchema().getColumns(); for (int i=0; i<columnSchemas.size(); ++i) { ColumnSchema columnSchema = columnSchemas.get(i); System.out.println("Column " + i + ":" + columnSchema.getName());; } }
- 通过alterTable方法修改表对象,删除名称为newCol的列。
public static void alterTable(String masterAddress, String tableName) throws KuduException { System.out.println("Connecting to " + masterAddress); KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build(); String newColumnName = "newCol"; AlterTableOptions deleteColumnOption = new AlterTableOptions(); deleteColumnOption.dropColumn(newColumnName); response = client.alterTable(tableName, deleteColumnOption); System.out.println("Delete column " + response.getElapsedMillis()); table = client.openTable(tableName); columnSchemas = table.getSchema().getColumns(); for (int i=0; i<columnSchemas.size(); ++i) { ColumnSchema columnSchema = columnSchemas.get(i); System.out.println("Column " + i + ":" + columnSchema.getName());; } }
写数据
通过newSession方法循环插入多条记录到已创建好的Kudu表,并检查返回结果。
public static void insertRows(String masterAddress, String tableName, int numRows) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
for (int i=0; i<numRows; ++i) {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
row.addInt("ID", i+1);
row.addInt("score", i+10);
session.apply(upsert);
}
// Call session.close() to end the session and ensure the rows are
// flushed and errors are returned.
// You can also call session.flush() to do the same without ending the session.
// When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended
// for most workloads, you must check the pending errors as shown below, since
// write operations are flushed to Kudu in background threads.
session.close();
if (session.countPendingErrors() != 0) {
System.out.println("errors inserting rows");
org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();
org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();
int numErrs = Math.min(errs.length, 5);
System.out.println("there were errors inserting rows to Kudu");
System.out.println("the first few errors follow:");
for (int i = 0; i < numErrs; i++) {
System.out.println(errs[i]);
}
if (roStatus.isOverflowed()) {
System.out.println("error buffer overflowed: some errors were discarded");
}
throw new RuntimeException("error inserting rows to Kudu");
}
System.out.println("Inserted " + numRows + " rows");
}
说明 示例代码中,
numRows
是要写入的记录条数。
读数据
通过newScannerBuilder方法,依次读取Kudu表的所有记录。
public static void scanTable(String masterAddress, String tableName, int numRows) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
KuduTable table = client.openTable(tableName);
Schema schema = table.getSchema();
// Scan with a predicate on the 'key' column, returning the 'value' and "added" columns.
List<String> projectColumns = new ArrayList<>(2);
projectColumns.add("ID");
projectColumns.add("score");
int lowerBound = 0;
KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
schema.getColumn("ID"),
KuduPredicate.ComparisonOp.GREATER_EQUAL,
lowerBound);
int upperBound = numRows / 2;
KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
schema.getColumn("ID"),
KuduPredicate.ComparisonOp.LESS,
upperBound);
KuduScanner scanner = client.newScannerBuilder(table)
.setProjectedColumnNames(projectColumns)
.addPredicate(lowerPred)
.addPredicate(upperPred)
.build();
// Check the correct number of values and null values are returned, and
// that the default value was set for the new column on each row.
// Note: scanning a hash-partitioned table will not return results in primary key order.
int resultCount = 0;
int nullCount = 0;
while (scanner.hasMoreRows()) {
RowResultIterator results = scanner.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
resultCount++;
}
}
int expectedResultCount = upperBound - lowerBound -1;
if (resultCount != expectedResultCount) {
throw new RuntimeException("scan error: expected " + expectedResultCount +
" results but got " + resultCount + " results");
}
System.out.println("Scanned some rows and checked the results");
}
删除表
通过deleteTable方法,删除已存在的Kudu表。
public static void deleteTable(String masterAddress, String tableName) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
DeleteTableResponse response = client.deleteTable(tableName);
System.out.println("Table delete " + response.getElapsedMillis());
}