本文介绍了DataHub新版本带来的改动,介绍batch的原理和实现,以及使用batch后所带来的性能的提升和费用的减少。切换为batch后,对于DataHub而言,服务端的资源消耗会明显降低,同时,性能会明显提升,使用费用也会大幅降低。
升级内容
支持zstd压缩
DataHub在新版本中对zstd压缩算法做了支持,相较于DataHub支持的lz4和deflate压缩算法,效果卓越。
zstd是一种高性能压缩算法,由Facebook开发,于2016年开源,zstd在压缩速度和压缩比两方面都有不俗的表现,非常契合DataHub的使用场景。
序列化改造
DataHub引入了batch序列化,batch序列化本质上就是DataHub对数据传输中数据的定义的一种组织方式,batch并不是特指某种序列化的方式,而是对序列化的数据做了二次封装。例如:一次发送100条数据,将100条数据序列化后得到一个buffer,给这个buffer选择一个压缩算法得到压缩后的buffer,这个时候给这个压缩后的buffer添加一个header记录这个buffer大小、数据条数、压缩算法、crc等信息,从而获得一条完整batch buffer。
解决的问题:
可以有效避免业务层面的脏数据。
减少服务端CPU开销,提高数据处理性能。
较少同时读写延迟。
batch的buffer发送到服务端后,因客户端已经做了充分的数据有效性的校验,所以服务端只需检验数据中的crc确认为有效buffer后,便可以直接落盘,省去了序列化、反序列化、加解压以及校验的操作,服务端性能提升超过80%,因为是多条数据一起压缩的,所以压缩率也提高了,存储成本也降低了。
费用对比
验证batch所带来的实际效果,进行以下测试,假设场景如下:
测试数据为广告投放相关的数据,大约200列,数据中null比例大约20%~30%。
1000条数据一个batch。
batch内部的序列化使用的是avro。
lz4是之前版本默认的压缩算法,压缩使用zstd来替代lz4。
测试结果如下:
数据源大小(Byte) | lz4压缩(Byte) | zstd压缩(Byte) | |
protobuf序列化 | 11,506,677 | 3,050,640 | 1,158,868 |
batch序列化 | 11,154,596 | 2,931,729 | 1,112,693 |
Datahub的收费项主要是这两个维度为存储与流量,其他主要是为了防止滥用而设置的惩罚性质的收费项。因此,以上测试结果从存储与流量两个角度进行分析。
从存储成本上来看,DataHub的protobuf序列化是没有存储压缩的(只是HTTP传输环节压缩),如果替换为batch+zstd,那么存储会由11506KB降为1112KB,也就是说,存储成本下降幅度达到约90%。
从流量成本上来看,DataHub的protobuf+lz4后的大小为3050KB,batch+zstd的大小为1112KB,也就是说,流量成本会降低约60%。
以上为样本数据测试结果,不同数据测试效果有差异,请您根据您业务实际情况进行测试。
使用batch
注意事项
batch写入最大的优势需要充分攒批,如果客户端无法攒批,或者攒批的数据较少,可能带来的效果提升并不显著。
为了让用户迁移更加方便,DataHub在各种读写方式之间做了兼容,保证用户中间状态可以更平滑地过渡,即batch写入依旧可以使用原方式读取,原方式写入依旧可以使用batch读取。因此,在写入端更新为batch写入之后,最好消费端也更新为batch,写入和消费不对应反而会降低性能。
前提条件
需在Topic中已开启多Version后才可使用。
DataHub版本须在client-library 1.4及以上的版本。
目前仅支持Java SDK的batch写入。
开启Version
控制台开启多Version
在控制台上无法修改已创建的Topic信息,所以若需使用batch,则需新建Topic启动多Version。新建Topic详情可参见:Topic操作。
SDK开启多Version
public static void createTopicWithOption() {
try {RecordSchema recordSchema = new RecordSchema() {{
this.addField(new Field("field1", FieldType.STRING));
this.addField(new Field("field2", FieldType.BIGINT));
}};
TopicOption option = new TopicOption();
//开启多version
option.setEnableSchemaRegistry(true);
option.setComment(Constant.TOPIC_COMMENT);
option.setExpandMode(ExpandMode.ONLY_EXTEND);
option.setLifeCycle(Constant.LIFE_CYCLE);
option.setRecordType(RecordType.TUPLE);
option.setRecordSchema(recordSchema);
option.setShardCount(Constant.SHARD_COUNT);
datahubClient.createTopic(Constant.PROJECT_NAME, Constant.TOPIC_NAME, option);
LOGGER.info("create topic successful");
} catch (ResourceAlreadyExistException e) {
LOGGER.info("topic already exists, please check if it is consistent");
} catch (ResourceNotFoundException e) {
// project not found
e.printStackTrace();
throw e;
} catch (DatahubClientException e) {
// other error
e.printStackTrace();
throw e;
}
}
配置batch
如果服务端已经支持batch,则DataHub会默认使用batch。如果服务端没有支持(例如服务端还未升级到新版本或者专有云较老的版本),那么会自动回退到原来的序列化方式,客户端自适应,用户无需增加额外配置。下面以1.4.1版本为例给出一个简单示例。压缩算法也会自适应选择一个较优的方案,目前1.4版本以后会默认选择zstd算法。
Maven依赖
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.25.3</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.3</version>
</dependency>
配置batch
ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
List<RecordEntry> recordList = new ArrayList<>();
// 为了达到更好的性能,建议在RecordList中添加尽可能多的Record;
// 尽可能使整个RecordList大小为512KB~1M
for (int i = 0; i < 1000; ++i) {
RecordEntry record = new RecordEntry();
TupleRecordData data = new TupleRecordData(schema);
// 假设schema为 {"fields":[{"name":"f1", "type":"STRING"},{"name":"f2", "type":"BIGINT"}]}
data.setField("f1", "value" + i);
data.setField("f2", i);
record.setRecordData(data);
// 添加用户自定义属性,可选
record.addAttribute("key1", "value1");
recordList.add(record);
}
try {
// 循环写入1000遍
for (int i = 0; i < 1000; ++i) {
try {
String shardId = datahubProducer.send(recordList);
LOGGER.info("Write shard {} success, record count:{}", shardId, recordList.size());
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重试
Thread.sleep(1000);
}
}
} finally {
// 关闭producer相关资源
datahubProducer.close();
}
后续支持
如果您在使用中有任何问题或者疑问,欢迎提工单咨询或加入用户群咨询,群号:33517130。