通道服务是基于表格存储数据接口之上的全增量一体化服务,您可以简单地实现对表中历史存量和新增数据的消费处理。

Tunnel Client 为通道服务的自动化数据消费框架,Tunnel Client 通过每一轮的定时心跳探测(Heartbeat)来进行活跃 Channel 的探测,Channel 和 ChannelConnect 状态的更新,数据处理任务的初始化、运行和结束等。

Tunnel Client 可以解决全量和增量数据处理时的常见问题,例如,如何做负载均衡、故障恢复、Checkpoint、分区信息同步确保分区信息消费顺序等。使用 Tunnel Client 后,您只需要关心每条记录的处理逻辑即可。

下文将介绍 Tunnel Client 的自动化数据处理流程、自动化的负载均衡以及自动化的容错处理。更多的细节请参见 Tunnel Client 的源码,Tunnel Client 的源码已经开放在Github上。

自动化数据处理流程

Tunnel Client 通过每一轮的定时心跳探测(Heartbeat)来进行活跃 Channel 的探测,Channel 和 ChannelConnect 状态的更新,数据处理任务的初始化、运行和结束等,这里仅介绍大致的数据处理逻辑,更多的细节可以参阅源码。

  1. Tunnel Client 资源的初始化
    1. 将 Tunnel Client 状态由 Ready 置为 Started。
    2. 根据 TunnelWorkerConfig 里的 HeartbeatTimeout 和 ClientTag (客户端标识)等配置进行 ConnectTunnel 操作,并和 Tunnel 服务端进行联通,以获取当前 Tunnel Client 对应的 ClientId。
    3. 初始化 ChannelDialer (用于新建ChannelConnect), 每一个ChannelConnect都会和一个Channel一一对应,ChannelConnect上会记录有数据消费的位点。
    4. 根据用户传入的处理数据的 Callback 和 TunnelWorkerConfig 中CheckpointInterval (向服务端记数据位点的间隔)包装出一个带自动记Checkpoint功能的数据处理器。)
    5. 初始化TunnelStateMachine(会进行Channel状态机的自动化处理)。
  2. 固定间隔进行 Heartbeat

    心跳的间隔由 TunnelWorkerConfig 里的 heartbeatIntervalInSec 参数决定。

    1. 进行 heartbeat 请求,从 Tunnel 服务端获取最新可用的 Channel 列表,Channel 中会包含有 ChannelId,Channel 的版本和 Channel 的状态信息。
    2. 将服务端获取到的 Channel 列表和本地内存中的 Channel 列表进行 Merge,然后进行 ChannelConnect 的新建和 update,规则大致如下:
      • Merge:基于本轮从服务端获取的最新Channel列表,对于相同ChannelId,认定版本号更大的为最新状态,直接进行覆盖,若未出现的Channel,则直接插入。
      • 新建 ChannelConnect:若此 Channel 未新建有其对应的 ChannelConnect,则会新建一个 WAIT 状态的 ChannelConnect,若对应的 Channel 状态为 OPEN 状态,则同时会启动该 ChannelConnect 上处理数据的循环流水线任务(ReadRecords&&ProcessRecords),处理详细的细节可以参见源码里的ProcessDataPipeline 类。
      • Update 已有 ChannelConnect:Merge 完成后,若 Channel 对应的ChannelConnect 存在,则根据相同 ChannelId 的 Channel 状态来更新ChannelConnect 的状态,比如 Channel 为 Close 状态也需要将 ChannelConnect 的状态置为 Closed,进而终止处理任务的流水线任务,详细的细节可以参见源码中的ChannelConnect.notifyStatus 方法。
  3. Channel 状态自动机说明

    在心跳模式下,Tunnel 服务端会根据保持心跳的 Tunnel Client 数量,调度可以消费的分区到不同 client 上,以达到负载均衡的目的。Tunnel 服务端通过以下 channel 状态机来驱动每个 channel 的消费以及进行负载均衡。



    Tunnel 服务端和 client 通过一个心跳和 channel 版本号更新机制进行状态变换通信。
    1. 每个 channel 最初均处于 Wait 状态。
    2. 增量类型 channel 需要等待父分区上 channel 消费完毕转为 Terminated 之后才可以转为可消费状态 Open。
    3. Open 状态的分区会调度到各个 client上。
    4. 在需要负载均衡时,Tunnel 服务端和 client 有一个 channel 状态 Open->Closing->Closed 的调度协议,client 在消费完一个全量 channel split 或者发生了分裂的增量 channel 后,会将 channel 汇报为 Terminated。

自动化的负载均衡和良好的水平扩展性

  • 运行多个 Tunnel Client 对同一个 Tunnel 进行消费时(TunnelId 相同),在 Tunnel Client 执行 Heartbeat 时,Tunnel 服务端会自动对 Channel 资源进行重分配,让活跃的Channel 尽可能的均摊到每一个 Tunnel Client 上,达到对资源进行负载均衡的目的。
  • 在水平扩展性方面,用户可以很容易的通过增加 Tunnel Client 的数量来完成,Tunnel Client 可以在同一个机器或者不同机器上。

自动化的资源清理和容错处理

  • 资源清理: 当客户端(Tunnel Client)没有被正常 shutdown 时(比如异常退出或者手动结束),我们会自动帮用户进行资源的回收,包括释放线程池、自动调用用户在Channel 上注册的 shutdown 方法、关闭 Tunnel 连接等。
  • 容错处理: 当客户端出现 Heartbeat 超时等非参数类错误时,表格存储会自动帮用户 Renew Connect,以保证数据消费可以稳定的进行持续同步。