流计算实现细节

了解对接Structured Streaming的微批模式的过程,以及表格存储对接Spark Structured Streaming的详细接入流程。

背景信息

在对接Spark Structured Streaming的微批模式时,以Spark DataSource v1接口为例说明过程。

  1. 调用GetOffset方法获取当前批次可以读取到的最大Offset(EndOffset)。

  2. 调用GetBatch方法获取当前批次的StartOffset(上一个批次的EndOffset)到EndOffset间的数据并进行转换。

  3. 执行自定义的Spark计算逻辑。

在此种微批模式对接中,上游数据库的流式接口需要提供灵活且准确的Seek功能,能够实时获取到每个分区的起始游标或结束游标等,从而进行Spark微批模式中Offset的预估。

在分布式NoSQL数据库中,对于基于数据捕获变更(CDC)技术的流式接口,该接口是一种持续流的模式,通常不具备灵活的Seek接口,对接Structured Streaming的微批接口有较大的难度。如果在GetOffset阶段进行提前的数据拉取,可以获取到预期的EndOffset,但是需要提前进行一次额外的RDD的计算并将其进行持久化缓存,大大降低了Source的性能。

接入流程

下图为表格存储对接Structured Streaming方案的UML时序图。

fig_newuml

图中MicroBatchExecutorSpark的微批处理框架,SourceStructured Streaming抽象的接口类,SourceRDDSparkRDD抽象类,TablestoreClient为表格存储的客户端。实线表示详细操作,虚线表示请求成功返回。

从整体上看,时序流程是GetOffset、 CommitGetBatch三个步骤的循环执行,分别对应于Spark Streaming的一个Batch执行。

详细接入流程步骤说明如下:

  1. 调用GetOffset方法获取当前批次可以读取到的最大Offset(EndOffset)。

    1. MicroBatchExecutor调用SourceGetOffset方法,获取当前批次的最大可达的Offset,即EndOffset。

    2. Source中生成一个随机的UUID字符串,UUID会与Offset一一对应。

    3. (可选)获取表格存储的通道信息。

      说明

      只有首次调用SourceGetOffset方法时,才执行此步骤;否则系统会跳过此步骤。

      1. 从表格存储的Tunnel服务端获取当前所有Channel(分区)的消费位点(Checkpoint)。

      2. 将获取到所有Channel的消费位点持久化到Meta表中,建立UUIDcheckpoints的映射关系。

    4. UUID包装成Offset(OffsetUUID可以互相转换),并返回给MicroBatchExecutor作为当前批次的EndOffset。

  2. (可选)将checkpoints持久化到Tunnel服务端。

    说明

    只有当前批次的BatchId大于0时,才执行此步骤;否则系统会跳过此步骤

    1. MicroBatchExecutor调用SourceCommit逻辑。

    2. 将上一个批次的EndOffset(当前批次的StartOffset)对应的checkpoints持久化到Tunnel服务端。

    说明

    正常情况下Commit逻辑无需处理额外内容,此操作的目的如下:

    • 持久化到Tunnel服务端后,可以实时显示消费进度。

    • Tunnel为了实现数据保序有父子分区关系,需要由客户端将父分区已消费结束checkpoint返回服务端,子分区才能加载。

  3. 调用GetBatch方法获取当前批次的StartOffset(上一个批次的EndOffset)到EndOffset间的数据并进行转换。

    1. MicroBatchExecutor根据当前批次的StartOffsetGetOffset返回的EndOffset调用Source中的GetBatch操作,以期获得当前批次的数据,供计算逻辑使用。

    2. 根据StartOffset对应的UUIDMeta表中获取Tunnel对应的Channel列表的实时消费位点。

    3. Tunnel服务端定时获取Channel的消费位点。

      由于表的分区变化可能会存在新的分区(例如子分区),所以需要定时获取Channel的消费位点。

    4. 合并从Meta表中获取的实时消费位点和从Tunnel服务端定时获取消费位点checkpoints,得到当前所有Channel的消费位点。

    5. 根据最新的checkpointsEndOffset对应的UUID等信息来构建SourceRDD。

      RDDSpark中最基本的数据抽象,它代表一个不可变、可分区、元素可并行计算的集合。

    6. SourceRDD中将通道的channelRDDpartition进行绑定,因此每个Channel都会在Spark的执行节点上分布式的进行数据并行转换和处理。

  4. 执行自定义的Spark计算逻辑。

    1. 执行每个RDD partition上的计算逻辑,此处会进行channel上的数据读取,并更新内存中的每个channel的消费位点。

    2. 每个RDD partition在本批次执行完成后(例如读到指定条数或无新数据),将channel对应的最新消费位点持久化到Meta表对应UUID的行中,即填充EndOffset对应UUIDcheckpoints,每个批次结束后,新的批次的StartOffset对应的checkpoints都能够在Meta表中查询到。

    3. 待所有RDD partition的计算逻辑完成后,返回当前批次对应Offset范围内的数据给MicroBatchExecutor。

  5. 当前批次的所有逻辑计算完成后,递增BatchId,从步骤1重新开始,循环执行。