本文为您展示DataHub的 Java SDK的Shard操作。
Shard说明
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:
Opening:为启动中状态。
Active:为启动完成可服务状态。
每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
列出Shard
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
public static void listShard(String projectName, String topicName) {
try {
ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
if (listShardResult.getShards().size() > 0) {
for (ShardEntry entry : listShardResult.getShards()) {
System.out.println(entry.getShardId() + "\t"
+ entry.getState() + "\t"
+ entry.getLeftShardId() + "\t"
+ entry.getRightShardId());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
分裂Shard
指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。CLOSED状态的Shard只可以读,不可以写,可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
shardId | String | The shard which to split. |
splitKey | String | The split key which is used to split shard. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
public static void splitShard(String projectName, String topicName, String shardId) {
try {
shardId = "0";
SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
for (ShardEntry entry : splitShardResult.getNewShards()) {
System.out.println(entry.getShardId());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
合并Shard
合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
shardId | String | The shard which will be merged. |
adjacentShardId | String | The adjacent shard of the specified shard. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
public static void mergeShard() {
try {
String shardId = "7";
//adjacentShardId位置必须和shardId相邻,shard相邻信息可在listShard返回结果中查看
String adjacentShardId = "8";
MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
System.out.println("merge successful");
System.out.println(mergeShardResult.getShardId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
扩展Shard
shard扩展要求扩展的shard数量不得小于原有shard数量。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
shardCount | int | The num of shards to extend to. |
adjacentShardId | String | The adjacent shard of the specified shard. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
public static void extendTopic(String projectName, String topicName, int shardCount) {
try {
ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}