Shard操作

Shard操作

Shard操作分为Shard水平扩展和Shard分裂合并两种模式,应用场景如下

  1. Shard水平扩展不允许合并Shard,分裂合并方式则允许

  2. 使用kafka方式消费Topic必须开启Shard水平扩展

  3. 开启Shard水平扩展后,key range无法使用, 所有Shard的BeginHashKey与EndHashKey是一样的,无法按HashKey和PartitionKey方式写入数据,需要自定义在应用层hash取模,并且需要注意扩容导致的写入shard发生变化

Shard水平扩展模式

DataHub支持Topic Shard水平扩展,创建Topic时开启Shard扩展模式即可

步骤一

开启Shard扩展模式步骤一

步骤二

点击图标(如下图所示),修改Shard数量

b2.1-2

步骤三

查看水平扩展后的Shard步骤三

shard分裂合并

DataHub 支持为Topic动态扩容/缩容,通过SplitShard/MergeShard来实现。

使用场景

DataHub具有服务弹性伸缩功能,用户可根据实时的流量调整Shard数量,来应对突发性的流量增长或达到节约资源的目的。例如在双11大促期间,大部分Topic数据流量会激增,平时的Shard数量可能完全无法满足这样的流量增长,此时可以对其中一些Shard进行Split操作,一变二,二变四,最大可扩容至256个Shard,按目前的流控限制足以达到1280MB/s的流量。在双11大促后,流量下降,多余的Shard会占用没有必要的quota,因此可以进行Merge操作,每两个Shard合并为一个,直到合适为止。

Shard属性

可以通过ListShard接口获取所有Shard的信息,每个Shard拥有如下属性, 样例:

{
    "ShardId": "string",
    "State": "string",
    "ClosedTime": uint64,
    "BeginHashKey": "string",
    "EndHashKey": "string",
    "ParentShardIds": [string,string,],
    "LeftShardId": "string",
    "RightShardId": "string"
}

SplitShard

指定一个128 bit的HashKey以及一个ShardID,通过SDK或者Console进行操作.SplitShard操作会将指定的Shard分裂为两个ChildShard,并且返回ChildShard的ID以及Key信息,同时Parent Shard会被置为CLOSED状态。例如,Split之前存在如下一个Shard:

ShardId:0 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
    EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

通过SDK进行Split操作:

String shardId = "0";
SplitShardRequest req = new SplitShardRequest(projectName, topicName, shardId, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
SplitShardResult resp = client.splitShard(req);

最终将会变成如下3个Shard:

ShardId:0 Status:CLOSED BeginHashKey:00000000000000000000000000000000
                    EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
ShardId:1 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
                    EndHashKey:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
ShardId:2 Status:ACTIVE BeginHashKey:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
                    EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

MergeShard

指定两个相邻的ShardID,通过SDK或者Console进行操作.MergeShard操作会将指定的两个Shard合并为一个新的Shard,并且返回新Shard的ID以及Key信息,同时两个ParentShard会被置为CLOSED状态。例如,Merge之前存在如下两个Shard:

ShardId:0 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
                    EndHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
ShardId:1 Status:ACTIVE BeginHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
                    EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

通过SDK进行Merge操作:

String shardId = "0";
String adjacentShardId = "1";
MergeShardRequest req = new MergeShardRequest(projectName, topicName, shardId, adjacentShardId);
MergeShardResult resp = client.mergeShard(req);

最终将会变成如下3个Shard:

ShardId:0 Status:CLOSED BeginHashKey:00000000000000000000000000000000
                    EndHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
ShardId:1 Status:CLOSED BeginHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
                    EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
ShardId:2 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
                    EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

注意事项

当Shard进行Merge/Split后会被置为CLOSED状态,该状态可以继续消费读取数据,但是不可写入,也不可再次进行Merge/Split操作,当到达Topic的lifecycle后该Shard会被回收。如果配置了Connector,对应任务会在复制完该Shard数据后自动挂起,待该Shard回收后会自动删除任务。Topic在进行Merge/Split后新的Shard需要等待变为ACTIVE状态后方可正常使用,通常不会超过5秒。