全部产品
表格存储

Stream介绍和API使用

更新时间:2017-07-27 15:57:25   分享:   

Stream

Table Store Stream 是一个数据通道,用于获取 Table Store 表中的增量数据。您可以使用 Table Store Stream API 来获取这些修改内容。增量数据的重要性不言而喻,有了增量数据,可以进行增量数据流的实时增量分析、数据增量同步等。

原理

表格存储作为一款分布式 NoSQL 数据库,当有写操作(包括 put,delete,update)传入时,会把对应的修改记录存放在表格存储的 commit log 中,同时数据库也会定期做 checkpoint,旧的 commit 日志会被清除。

开启 Stream后,日志文件会被保存。在设置的保存期限内,可以通过 Stream 提供的通道读取这些增量数据。

由于表格存储的分区特性,同一分区的操作会共享一个 commit log,所以获取增量数据也是按照分区维度获取。

开启 Stream 时,会生成一个当前日志偏移量(即 iterator)并记录下来。用户可以通过 GetShardIterator 接口获取当前分区的 iterator,在后续读取该分区增量数据时传入这个 iterator,Stream 通道就可以知道从哪一行日志记录开始返回增量内容。返回增量内容的同时,Stream 也会返回一个新的偏移量,用于后续的读取。整个过程可以类比我们分页读取数据,iterator 就是分页的偏移量。

例如,我们有顺序地生成一批数据库日志文件,如下所示:

stream4

当我们在文件 A,第 3 行开始开启 Stream,则这个 iterator 就可以用来标识文件 A 的第 3 行。读取数据时传入这个 iterator,就能读到从上图中第三个操作 pk3 开始的后续修改操作。

Stream API 也提供了接口关闭这个数据通道。当用户再次开启时,Stream 会根据本次开启时间的日志偏移量重新生成一个当前分区的 iterator,我们可以用这个 iterator 读取该分区当前时间点之后的增量数据。

由于写操作会发生在同一个主键上,为了确保数据的一致性,对同一个主键的写操作需要顺序读取。然而,在读取增量数据之前,我们并不知道在哪些主键上发生了修改,所以读取增量数据的接口按照分区 id 来划分。用户可以通过罗列当前表下的所有分区来读取整张表的增量数据。Stream 通道会保证同分区内的写操作是顺序返回的,即,只要在同一分区内按照顺序读取,就可以保证相同主键的数据的一致性。如果持续读取所有分区的 Stream 数据,也就确保了整张表的增量数据都被会被读取到。

用户可以在创建表的时候开启 Stream,或者通过 UpdateTable 来开启或者关闭 Stream。当有一个 putupdate 或者 delete 操作发生,一条修改记录会被写入 Stream,数据包含用户修改行的主键以及修改内容。

注意:

  • 每个修改记录在 Stream 中存在且仅存在一次。
  • 在每一个 shard 内,Stream 按照用户的实际操作顺序进行修改。但是不同 shard 的数据,不保证顺序。

示例

stream1

如上图所示,当前表有三个分区。图中的每一行表示一个分区,每一列表示当前分区的一次更新操作。每个分区维护自己的更新记录。我们可以通过 DescribeStream 接口得到分区的信息,然后顺序读取各自分区的数据。但系统会根据负载进行分区的分裂或者合并。分区的合并、分裂操作会生成新的分区,老的分区也将不会再产生新的增量数据。

stream2

上图中,分区 P2 发生了一次分裂,变成分区 P4 和分区 P5。我们可以并行读取分区 P4 和分区 P5 的数据,并不互相影响。但是在开始读取分区 P4 和分区 P5 之前,需要确保分区 P2 的增量数据已经全部读取完毕。

stream3

例如上图中,当开始读取分区 P4 的记录 R6 时,需要保证分区 P2 的 R5 已经被读取完毕,当 R5 被读完后,分区 P2 不会再生成新的数据。

Stream API

打开/关闭 Stream

用户可以在创建表的时候设置 Stream 是否开启,也可以通过 UpdateTable 来开启或者关闭 Stream。CreateTableUpdateTable 中新增了 StreamSpecification 参数,用来表示 Stream 的相关参数:

  • enable_stream:Stream 是否打开
  • expiration_time:Stream 数据的过期时间,较早的修改记录将会被删除。

读取修改记录

具体读取 Stream 数据的流程如下:

  1. 调用 ListStreams 获取当前表的 Stream 信息,例如 StreamID。具体请参见 ListStream API

  2. 调用 DescribeStream 获取当前 Stream 的数据分片信息,例如 shard 的列表,每个 shard 记录又包含父 shard 信息、shardID 信息等。具体请参见 DescribeStream API

  3. 获取 StreamID 和 shardID 后,通过 GetShardIterator 获取当前 shard 的读 iterator 值,这个值标记着读取该 shard 记录的起始位置。具体请参见 GetShardIterator API

  4. 调用 GetStreamRecord 来读取具体的修改记录,每次调用会返回新的 iterator,用于下次读取。具体请参见 GetStreamRecord API

注意:

  • 因为需要保证同一个主键下的操作要有序,在同一个 shard 下,Stream 做了这个保证。但是 shard 会出现分裂、合并等操作,所以我们在读取某个 shard 的数据时,确保他的父 shard 以及 parent_sibling 的数据已经被读取。
  • 当读到空的 NextShardIterator 的时候说明当前 shard 的增量数据已经全部读完,通常情况是该 shard 已经进入 inactive 的状态(分裂或者合并发生)。当一个 shard 已经被全部读完以后,我们可以重新调用 DescribeStream 获取新的 shard 信息。

SDK

为了方便用户使用 Stream API,TableStore 的 Java SDK 已经支持 Stream 接口,具体请参见 JAVA SDK 介绍

本文导读目录
本文导读目录
以上内容是否对您有帮助?