DataHub成本节省攻略

本文介绍了DataHub新版本带来的改动,介绍batch的原理和实现,以及使用batch后所带来的性能的提升和费用的减少。切换为batch后,对于DataHub而言,服务端的资源消耗会明显降低,同时,性能会明显提升,使用费用也会大幅降低。

升级内容

支持zstd压缩

DataHub在新版本中对zstd压缩算法做了支持,相较于DataHub支持的lz4deflate压缩算法,效果卓越。

说明

zstd是一种高性能压缩算法,由Facebook开发,于2016年开源,zstd在压缩速度和压缩比两方面都有不俗的表现,非常契合DataHub的使用场景。

序列化改造

DataHub引入了batch序列化,batch序列化本质上就是DataHub对数据传输中数据的定义的一种组织方式,batch并不是特指某种序列化的方式,而是对序列化的数据做了二次封装。例如:一次发送100条数据,将100条数据序列化后得到一个buffer,给这个buffer选择一个压缩算法得到压缩后的buffer,这个时候给这个压缩后的buffer添加一个header记录这个buffer大小、数据条数、压缩算法、crc等信息,从而获得一条完整batch buffer。

解决的问题:

  • 可以有效避免业务层面的脏数据。

  • 减少服务端CPU开销,提高数据处理性能。

  • 较少同时读写延迟。

image
说明

batchbuffer发送到服务端后,因客户端已经做了充分的数据有效性的校验,所以服务端只需检验数据中的crc确认为有效buffer后,便可以直接落盘,省去了序列化、反序列化、加解压以及校验的操作,服务端性能提升超过80%,因为是多条数据一起压缩的,所以压缩率也提高了,存储成本也降低了。

费用对比

验证batch所带来的实际效果,进行以下测试,假设场景如下:

  • 测试数据为广告投放相关的数据,大约200列,数据中null比例大约20%~30%。

  • 1000条数据一个batch。

  • batch内部的序列化使用的是avro。

  • lz4是之前版本默认的压缩算法,压缩使用zstd来替代lz4。

测试结果如下:

数据源大小(Byte)

lz4压缩(Byte)

zstd压缩(Byte)

protobuf序列化

11,506,677

3,050,640

1,158,868

batch序列化

11,154,596

2,931,729

1,112,693

Datahub的收费项主要是这两个维度为存储与流量,其他主要是为了防止滥用而设置的惩罚性质的收费项。因此,以上测试结果从存储流量两个角度进行分析。

  • 从存储成本上来看,DataHubprotobuf序列化是没有存储压缩的(只是HTTP传输环节压缩),如果替换为batch+zstd,那么存储会由11506KB降为1112KB,也就是说,存储成本下降幅度达到约90%。

  • 从流量成本上来看,DataHubprotobuf+lz4后的大小为3050KB,batch+zstd的大小为1112KB,也就是说,流量成本会降低约60%。

重要

以上为样本数据测试结果,不同数据测试效果有差异,请您根据您业务实际情况进行测试。

使用batch

注意事项

  • batch写入最大的优势需要充分攒批,如果客户端无法攒批,或者攒批的数据较少,可能带来的效果提升并不显著。

  • 为了让用户迁移更加方便,DataHub在各种读写方式之间做了兼容,保证用户中间状态可以更平滑地过渡,即batch写入依旧可以使用原方式读取,原方式写入依旧可以使用batch读取。因此,在写入端更新为batch写入之后,最好消费端也更新为batch,写入和消费不对应反而会降低性能

前提条件

  • 需在Topic中已开启Version后才可使用。

  • DataHub版本须在client-library 1.4及以上的版本

  • 目前仅支持Java SDKbatch写入。

开启Version

控制台开启多Version

在控制台上无法修改已创建的Topic信息,所以若需使用batch,则需新建Topic启动多Version。新建Topic详情可参见:Topic操作

image

SDK开启多Version

  public static void createTopicWithOption() {
    try {RecordSchema recordSchema = new RecordSchema() {{
        this.addField(new Field("field1", FieldType.STRING));
        this.addField(new Field("field2", FieldType.BIGINT));
      }};
      TopicOption option = new TopicOption();
       //开启多version  
      option.setEnableSchemaRegistry(true);
     
      option.setComment(Constant.TOPIC_COMMENT);
      option.setExpandMode(ExpandMode.ONLY_EXTEND);
      option.setLifeCycle(Constant.LIFE_CYCLE);
      option.setRecordType(RecordType.TUPLE);
      option.setRecordSchema(recordSchema);
      option.setShardCount(Constant.SHARD_COUNT);
      datahubClient.createTopic(Constant.PROJECT_NAME, Constant.TOPIC_NAME, option);
      LOGGER.info("create topic successful");
    } catch (ResourceAlreadyExistException e) {
      LOGGER.info("topic already exists, please check if it is consistent");
    } catch (ResourceNotFoundException e) {
      // project not found
      e.printStackTrace();
      throw e;
    } catch (DatahubClientException e) {
      // other error
      e.printStackTrace();
      throw e;
    }
  }

配置batch

如果服务端已经支持batch,则DataHub会默认使用batch。如果服务端没有支持(例如服务端还未升级到新版本或者专有云较老的版本),那么会自动回退到原来的序列化方式,客户端自适应,用户无需增加额外配置。下面以1.4.1版本为例给出一个简单示例。压缩算法也会自适应选择一个较优的方案,目前1.4版本以后会默认选择zstd算法。

Maven依赖

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.25.3</version>
</dependency>
<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>datahub-client-library</artifactId>
  <version>1.4.3</version>
</dependency>

配置batch

ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

RecordSchema schema = producer.getTopicSchema();
List<RecordEntry> recordList = new ArrayList<>();
// 为了达到更好的性能,建议在RecordList中添加尽可能多的Record; 
// 尽可能使整个RecordList大小为512KB~1M
for (int i = 0; i < 1000; ++i) {
    RecordEntry record = new RecordEntry();
    TupleRecordData data = new TupleRecordData(schema);
    // 假设schema为 {"fields":[{"name":"f1", "type":"STRING"},{"name":"f2", "type":"BIGINT"}]}
    data.setField("f1", "value" + i);
    data.setField("f2", i);
    record.setRecordData(data);

    // 添加用户自定义属性,可选
    record.addAttribute("key1", "value1");
    recordList.add(record);
}

try {
      // 循环写入1000遍
      for (int i = 0; i < 1000; ++i) {
        try {
          String shardId = datahubProducer.send(recordList);
          LOGGER.info("Write shard {} success, record count:{}", shardId, recordList.size());
        } catch (DatahubClientException e) {
          if (!ExceptionChecker.isRetryableException(e)) {
            LOGGER.info("Write data fail", e);
            break;
          }
          // sleep重试
          Thread.sleep(1000);
        }
      }
    } finally {
      // 关闭producer相关资源
      datahubProducer.close();
    }

后续支持

如果您在使用中有任何问题或者疑问,欢迎提工单咨询或加入用户群咨询,群号:33517130。