本文首先介绍了冷数据的特点和适应场景,接着对海量结构化数据的冷热分层进行了方案和架构的设计,最后通过Tablestore结合Delta Lake对冷热分层进行了生动的实战示例演示。通过冷热分层可以让计算和存储的资源得到充分利用,进而让业务能够用更低的成本承载更优质的服务。

背景信息

在海量大数据场景下,随着业务和数据量的不断增长,性能和成本的权衡变成了大数据系统设计面临的关键挑战。所以在架构设计之初,我们就需要把整套架构的成本考虑进来,这对应的就是数据的分层存储和存储计算引擎的选择。

Delta Lake是DataBricks公司推出的一种新型数据湖方案,围绕数据流入、数据组织管理、数据查询和数据流出,推出了一系列功能特性,同时提供了数据操作的ACID和CRUD。通过结合Delta Lake和上下游组件,可以搭建出一个便捷、易用、安全的数据湖架构。在数据湖架构设计中,通常我们会应用HTAP(Hybrid Transaction and Analytical Process)体系结构,通过合理的选择分层存储组件和计算引擎,既能支持海量数据分析和快速的事务更新写入,又能有效的进行冷热数据的分离进而降低成本。

更多介绍可参见结构化大数据分析平台设计面向海量数据的极致成本优化-云HBase的一体化冷热分离云上如何做冷热数据分离

冷数据介绍

数据按照实际访问的频率可以区分为热数据、温数据和冷数据,其中冷数据的数据量较大,很少被访问,甚至整个生命周期都可能不会被访问,只是为了满足业务合规或者特定场景需求在一定时间内保存。通常我们有两个方式来区分冷热数据:

  • 按照数据的创建时间:常见于交易类数据、时序监控、IM聊天等场景,通常数据写入初期用户的关注度较高且访问频繁,但随着时间的推移,旧的数据访问频率会越来越低,仅存在少量查询甚至完全不查询。
  • 按照访问热度:有些数据的访问频率并非按时间,例如某些大V的旧博客由于某些原因突然大量被访问到,这样的冷数据也会变成热数据。这个时候就不应该再按时间区分,需要根据具体的业务和数据分布规律来区分冷热。

本文主要讨论按数据的创建时间的冷热数据分层,而对于按访问热度的数据分层,通常可以采用业务打标或系统自识别等手段。

冷数据特点

从冷热数据的区分,可以看出,冷数据具备以下一些特点:
  • 数据量大:不同于热数据,冷数据通常需要保存较长时间甚至是所有时间的数据。
  • 成本敏感:数据量大且访问频率较低,不宜投入过多的成本。
  • 性能要求不高:这里是相对的概念,相比于一般的TP请求不需要查询在毫秒级别返回,冷数据的查询可以容忍到数十秒甚至更长时间才出结果,或者可以进行异步处理。
  • 业务场景较简单:对于冷数据基本都是批量的写入和删除,一般没有更新操作。在查询时,一般只需要读取指定条件的数据,且查询条件不会过于复杂。

冷热分层适用场景

针对冷数据的特点,挖掘出一些冷热分层适用的场景:

  • 时序类数据场景:时序类数据天然具备时间属性,数据量大,且几乎只做追加操作。时序数据无处不在,常见于监控类数据、交易类数据、物联网数据、环境监测等场景。
    • IM场景:如钉钉,用户一般会查阅最近的若干条聊天记录,历史的数据一般只有在有特殊需求的时候才会去查询。
    • 监控场景:如云监控,用户通常只会查看近期的监控,历史数据一般只有在调查问题或者制定报表时才会去查询。
    • 账单场景:如支付宝,我们通常只会查询最近几天或者一个月内的账单,超过一年以上的账单基本不会去查询。
    • 物联网场景:如IOT,通常设备近期上报的数据是热点数据,会经常被分析,而历史数据的分析频率都较低。
  • 归档类场景:对于重写轻读的数据,可以将数据定期归档到成本更低的存储组件或更高压缩比的存储介质中,达到降低成本的目的。

海量结构化数据Delta Lake架构

针对结构化冷热分层数据场景,表格存储(Tablestore)联合EMR团队推出了一个海量结构化数据的Delta Lake架构。针对冷热数据方案设计的几个问题,都进行了很好的解决。基于表格存储的通道服务,可以将原始数据利用CDC技术派生到多种存储组件中,例如,将原始数据派生到Delta Lake和Tablestore引擎自身的列存中,进而完成冷热数据的分离和异构。同时表格存储提供灵活的上游数据入口和TTL功能,您可以定制热数据的生命周期,将冷数据不断的实时投递到Delta Lake和列存中,达到降成本的目的。最后对于计算和查询层,Tablestore结合Spark可以完成对冷热数据的全增量一体的定制化计算,并可以最终将计算结果存入Tablestore的索引引擎中进行统一的查询。

Delta Lake

冷热分层示例

本示例结合Tablestore和Delta Lake进行数据湖冷热分层。

  1. 数据源说明。
    数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID),两个属性列price(价格)和timestamp(订单时间)。使用Tablestore SDK的BatchWrite接口进行订单数据的写入,订单的时间戳的时间范围为最近的90天(本文的模拟时间范围为2020-02-26~2020-05-26),共计写入3112400条。OrderSource

    在模拟订单写入时,对应Tablestore表中的属性列的版本号也会被设置为相应的时间戳,这样通过配置表上的TTL属性,当写入数据的保留时长超过设置的TTL后,系统会自动清理对应版本号的数据。

  2. 实时流式投递。
    1. 创建数据源表,同时在表格存储控制台上创建增量通道,利用通道提供的CDC(日志变更捕获)技术将新增的主表数据不断同步到Delta中,创建得到的通道ID将会用于后续的SQL配置。
      通道列表
    2. 在EMR集群的Header机器上启动streaming-sql交互式命令行。
      streaming-sql --master yarn --use-emr-datasource --num-executors 16 --executor-memory 4g --executor-cores 4
      // 源表和目的表
      // 1. 创建源表
      DROP TABLE IF EXISTS order_source;
      CREATE TABLE order_source
      USING tablestore
      OPTIONS(
      endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
      access.key.id="",
      access.key.secret="",
      instance.name="vehicle-test",
      table.name="OrderSource",
      catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"col": "price", "type": "double"}, "timestamp": {"col": "timestamp", "type": "long"}}}',
      );
      
      
      // 2. 创建Delta Lake Sink: delta_orders
      DROP TABLE IF EXISTS delta_orders;
      CREATE TABLE delta_orders(
      UserId string,
      OrderId string,
      price double,
      timestamp long
      )
      USING delta
      LOCATION '/delta/orders';
      
      // 3. 在源表上创建增量SCAN视图
      CREATE SCAN incremental_orders ON order_source USING STREAM 
      OPTIONS(
      tunnel.id="324c6bee-b10d-4265-9858-b829a1b71b4b", 
      maxoffsetsperchannel="10000");
      
      // 4. 启动Stream作业,将Tablestore CDC数据实时同步到Delta Lake中。
      CREATE STREAM orders_job
      OPTIONS (
      checkpointLocation='/delta/orders_checkpoint',
      triggerIntervalMs='3000'
      )
      MERGE INTO delta_orders
      USING incremental_orders AS delta_source
      ON delta_orders.UserId=delta_source.UserId AND delta_orders.OrderId=delta_source.OrderId
      WHEN MATCHED AND delta_source.__ots_record_type__='DELETE' THEN
      DELETE
      WHEN MATCHED AND delta_source.__ots_record_type__='UPDATE' THEN
      UPDATE SET UserId=delta_source.UserId, OrderId=delta_source.OrderId, price=delta_source.price, timestamp=delta_source.timestamp
      WHEN NOT MATCHED AND delta_source.__ots_record_type__='PUT' THEN
      INSERT (UserId, OrderId, price, timestamp) values (delta_source.UserId, delta_source.OrderId, delta_source.price, delta_source.timestamp);
      • Tablestore源表创建:创建order_source源表,其中OPTIONS参数中catalog为表字段的Schema定义(本例中对应UserId、OrderId、price和timestamp四列)。
      • Delta Lake Sink表创建:创建写完Delta的delta_orders目的表,LOCATION中指定的是Delta文件存储的位置。
      • Tablestore源表上创建增量SCAN视图:本例创建incremental_orders的流式视图,其中OPTIONS参数中tunnel.id为第1步创建的增量通道ID,maxoffsetsperchannel为通道每个分区(每个Spark微批)写的最大数据量。
      • 启动Stream作业进行实时投递:本例中会根据Tablestore的主键列(UserId和OrderId)主键进行聚合,同时根据CDC日志的操作类型(PUT,UPDATE,DELETE)转化为对应的Delta操作。特别说明的是 __ots_record_type__ 是Tablestore流式Source提供的预定义列,表示的是行操作类型。
  3. 查询冷热数据。
    在实际的设计中,我们一般会把热数据保存在Tablestore表中进行高效的TP查询,冷数据或全量数据保存在Delta中。通过配置Tablestore表的生命周期(TTL),我们可以灵活的对热数据量进行控制。
    1. 在配置主表的TTL之前,对源表(order_source)和目的表(delta_orders)进行一些查询,此时两边的查询结果保持一致。
      Stream
    2. 配置Tablestore的TTL为最近30天。
      这样Tablestore中的热数据只有最近30天的数据,而Delta中依旧保留的是全量数据,进而达到冷热分层的目的。TTL
    3. 展示一些分层之后的简单查询,具体的查询路由需要结合业务逻辑进行一些路由选择。分层之后热数据总条数为1017004条,冷数据(全量数据)保持不变依旧为3112400条。
      Order