Shard操作

本文为您展示DataHub的 Java SDKShard操作。

Shard说明

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:

  • Opening:为启动中状态。

  • Active:为启动完成可服务状态。

每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出Shard

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

public static void listShard(String projectName, String topicName) {
  try {
      ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
      if (listShardResult.getShards().size() > 0) {
          for (ShardEntry entry : listShardResult.getShards()) {
              System.out.println(entry.getShardId() + "\t"
                      + entry.getState() + "\t"
                      + entry.getLeftShardId() + "\t"
                      + entry.getRightShardId());
          }
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

分裂Shard

指定一个Topic中的一个状态为ACTIVEShard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。CLOSED状态的Shard只可以读,不可以写,可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardId

String

The shard which to split.

splitKey

String

The split key which is used to split shard.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

public static void splitShard(String projectName, String topicName, String shardId) {
    try {
        shardId = "0";
        SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
        for (ShardEntry entry : splitShardResult.getNewShards()) {
            System.out.println(entry.getShardId());
        }
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

合并Shard

合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardId

String

The shard which will be merged.

adjacentShardId

String

The adjacent shard of the specified shard.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

public static void mergeShard() {
    try {
        String shardId = "7";
        //adjacentShardId位置必须和shardId相邻,shard相邻信息可在listShard返回结果中查看
        String adjacentShardId = "8";
        MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
        System.out.println("merge successful");
        System.out.println(mergeShardResult.getShardId());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

扩展Shard

shard扩展要求扩展的shard数量不得小于原有shard数量。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardCount

int

The num of shards to extend to.

adjacentShardId

String

The adjacent shard of the specified shard.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

    public static void extendTopic(String projectName, String topicName, int shardCount) {
        try {
            ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount);

        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }

    }