DataHub成本节省攻略

在DataHub发布的最新版本中,DataHub序列化相关的模块进行了一次重大升级,在性能、成本、资源使用方面都有较大的优化,同时DataHub技术升级所带来的成本红利会辐射到每个用户身上,根据我们实际的调研发现,大部分用户的使用成本都可以达到30%以上的降幅,部分用户使用成本降幅可以达到90%,本文将会详细介绍这次改动的具体内容,以及相关的最佳实践。

主要升级内容

1、支持zstd压缩

zstd是一种高性能压缩算法,由Facebook开发,并于2016年开源。zstd在压缩速度和压缩比两方面都有不俗的表现,非常契合datahub的使用场景,因此datahub在新版本中对zstd压缩算法做了支持,相较于DataHub目前支持的lz4和deflate压缩算法,整体效果会好很多。

2、序列化改造

DataHub因为在设计上是存在TUPLE这种强schema结构的,我们最初为了防止脏数据,在服务端校验了数据的有效性,这就导致了需要在服务端解析出来完整的数据,然后根据schema做个校验,如果类型不匹配,那么会返回错误。随着版本的迭代发展,我们我发现这样的做法并没有起到很有效的避免脏数据的效果(因为脏数据更多是业务层面的脏数据而不是数据类型),反倒是给服务端增加了巨大的cpu开销,同时读写延迟也更大。

历史经验告诉我们,读写操作在服务端感知用户的数据内容是一个相对冗余的操作,所以我们让数据使用一个大的buffer来交互,不再去感知真正的数据内容,真正需要用到数据内容的地方再解析出来(例如同步任务),写入时的数据校验全部推到客户端上来做,实际上客户端本来就有校验的,所以对客户端而言其实并没有增加开销,然后服务端通过crc校验来保证数据buffer的正确性,这样既节省了服务端的资源消耗,也提供了更好的性能。

这个就是我们引入的batch序列化,batch序列化本质上就是DataHub数据传输中数据的定义的一种组织方式,batch并不是特指某种序列化的方式,而是对序列化的数据做了一个二次封装,比如我一次发送100条数据,那我把这100条数据序列化后得到一个buffer,给这个buffer选择一个压缩算法得到压缩后的buffer,这个时候给这个压缩后的buffer添加一个header记录这个buffer的大小、数据条数、压缩算法、crc等信息,这个时候就是一条完整的batch的buffer。

yuque_diagram (6).jpg

这个batch的buffer发送到服务端后,因为客户端已经做了充分的数据有效性的校验,所以服务端只检验一下数据中的crc确认为有效buffer后,便可以直接落盘,省去了序列化反序列化、加解压以及校验的操作,服务端性能提升超过80%,因为是多条数据一起压缩的,所以压缩率也提高了,存储成本也降低了。

主要费用对比

为了验证batch所带来的实际效果,我们拿一些数据进行了对比测试

  • 测试数据为广告投放相关的数据,大约200列,数据中null比例大约20%~30%

  • 1000条数据一个batch

  • batch内部的序列化使用的是avro

  • lz4是之前版本默认的压缩算法,压缩使用zstd来替代lz4

测试结果如下表所示

数据原大小(Byte)

lz4压缩(Byte)

zstd压缩(Byte)

protobuf序列化

11,506,677

3,050,640

1,158,868

batch序列化

11,154,596

2,931,729

1,112,693

就以上面的测试结果为准,我们可以评估一下从费用角度来评估一下使用batch后可以节省多少,根据线上的运行情况来看,之前一般都是lz4的压缩居多,所以我们就假设由protobuf+lz4替换到了batch+zstd。费用就从存储流量两个角度来考虑,因为Datahub的收费项主要是这两个维度,其他主要是为了防止滥用而设置的惩罚性质的收费项,正常使用情况下可以直接忽略不计。

  • 从存储成本上来看,Datahub的protobuf序列化是没有存储压缩的(只是http传输环节压缩),如果替换为batch+zstd,那么存储会由11506KB,降为1112KB,也就是说,存储成本下降幅度达到约90%

  • 从流量成本上来看,Datahub的protobuf+lz4后的大小为3050KB,batch+zstd的大小为1112KB,也就是说,流量成本会降低约60%!

以上为样本数据测试结果,不同数据测试效果有差异,请您根据您业务实际情况进行测试。

如何使用

说明

只有开启了多version的才能使用batch

多version示例:

  • 控制台开启示例:

    • 新建Topic时点击启动多Version 按钮开启

      image

  • SDK开启示例:

  public static void createTopicWithOption() {
    try {RecordSchema recordSchema = new RecordSchema() {{
        this.addField(new Field("field1", FieldType.STRING));
        this.addField(new Field("field2", FieldType.BIGINT));
      }};
      TopicOption option = new TopicOption();
       //开启多version  
      option.setEnableSchemaRegistry(true);
     
      option.setComment(Constant.TOPIC_COMMENT);
      option.setExpandMode(ExpandMode.ONLY_EXTEND);
      option.setLifeCycle(Constant.LIFE_CYCLE);
      option.setRecordType(RecordType.TUPLE);
      option.setRecordSchema(recordSchema);
      option.setShardCount(Constant.SHARD_COUNT);
      datahubClient.createTopic(Constant.PROJECT_NAME, Constant.TOPIC_NAME, option);
      LOGGER.info("create topic successful");
    } catch (ResourceAlreadyExistException e) {
      LOGGER.info("topic already exists, please check if it is consistent");
    } catch (ResourceNotFoundException e) {
      // project not found
      e.printStackTrace();
      throw e;
    } catch (DatahubClientException e) {
      // other error
      e.printStackTrace();
      throw e;
    }
  }

本文使用Java为例,介绍batch如何使用。

使用batch要求使用client-library 1.4及以上的版本,如果服务端已经支持,DataHub会默认使用batch,服务端没有支持(例如服务端还未升级到新版本或者专有云较老的版本),那么会自动回退到原来的序列化方式,客户端自适应,用户无需增加额外配置。下面已1.4.1版本为例给出一个简单示例。压缩算法也会自适应选择一个较优的方案,目前1.4版本以后会默认选择zstd算法。

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.25.3</version>
</dependency>
<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>datahub-client-library</artifactId>
  <version>1.4.3</version>
</dependency>

ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

RecordSchema schema = producer.getTopicSchema();
List<RecordEntry> recordList = new ArrayList<>();
// 为了达到更好的性能,建议在RecordList中添加尽可能多的Record; 
// 尽可能使整个RecordList大小为512KB~1M
for (int i = 0; i < 1000; ++i) {
    RecordEntry record = new RecordEntry();
    TupleRecordData data = new TupleRecordData(schema);
    // 假设schema为 {"fields":[{"name":"f1", "type":"STRING"},{"name":"f2", "type":"BIGINT"}]}
    data.setField("f1", "value" + i);
    data.setField("f2", i);
    record.setRecordData(data);

    // 添加用户自定义属性,可选
    record.addAttribute("key1", "value1");
    recordList.add(record);
}

try {
      // 循环写入1000遍
      for (int i = 0; i < 1000; ++i) {
        try {
          String shardId = datahubProducer.send(recordList);
          LOGGER.info("Write shard {} success, record count:{}", shardId, recordList.size());
        } catch (DatahubClientException e) {
          if (!ExceptionChecker.isRetryableException(e)) {
            LOGGER.info("Write data fail", e);
            break;
          }
          // sleep重试
          Thread.sleep(1000);
        }
      }
    } finally {
      // 关闭producer相关资源
      datahubProducer.close();
    }

注意事项

  1. batch写入最大的优势需要充分攒批,如果客户端无法攒批,或者攒批的数据较少,可能带来的效果提升并不显著

  2. 为了让用户迁移更加方便,我们在各种读写方式之间做了兼容,保证用户中间状态可以更平滑的过渡,即batch写入的依旧可以使用原方式读取,原方式写入依旧可以使用batch读取。所以在写入端更新为batch写入之后,最好消费端也更新为batch,写入和消费不对应反倒会降低性能

  3. 目前只有Java的SDK支持了batch写入,其他语言的客户端和相关云产品(如Flink、数据集成等)仍在努力开发中,如果您想要尽快切换到batch写入,可以直接提需求给我们。

总结

本文介绍了DataHub新版本带来的改动,其中介绍了batch的原理和实现,以及使用batch后所带来的性能的提升和费用的减少。切换为batch后,对于DataHub而言,服务端的资源消耗会明显降低,同时,性能会明显提升,使用费用也会大幅降低,所以是一个服务侧和用户侧同时受益的改动。如果您在使用中有任何问题或者疑问,欢迎提工单联系我们或者加入用户群来提问,群号:33517130。