在前文的方案与架构基础上,本文介绍如何使用表格存储的Timeline模型实现Feed流系统的存储与同步功能。
模型介绍
本文使用表格存储作为存储和同步系统,并主要使用Timeline模型、基于推模式进行同步。
表格存储消息(Timeline)模型是针对消息数据场景所设计的,能够满足消息数据场景对消息保序、海量消息存储、实时同步的业务需求,同时支持全文检索与多维度组合查询。更多信息,请参见模型介绍。
一个Timeline模型存储有若干Timeline,单个Timeline由Meta、Queue、Data组成,如上图所示。
Meta:元数据。根据Identifier可以唯一标识一个Timeline。
Queue:消息存储队列。根据SequenceId可以确认一条Message在队列中的位点信息。
Data:消息实体数据。根据Index可以对Message实现检索。
Feed流系统的存储与同步均可以基于Timeline模型快速实现。
资源初始化
存储库
消息存储库保存了用户所发送的所有信息,每个用户的发件箱对应一个Timeline。模型初始化如下:
Meta:Identifier设置为
Timeline_id
,由于各个用户ID唯一,因此可以取值为用户ID。Queue:SequenceId可以设置为自增序列。
Data:配置Index的字段有
send
、send_time
、text
。
初始化的代码如下。
//将SyncClient作为参数,初始化StoreFactory。
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);
//构建Timeline表的Schema配置。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
//配置Index的字段以及字段类型。
IndexSchema timelineIndex = new IndexSchema();
timelineIndex.addFieldSchema(new FieldSchema("sender", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true).setStore(true));
timelineIndex.addFieldSchema(new FieldSchema("send_time", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true).setStore(true));
timelineIndex.addFieldSchema(new FieldSchema("text", FieldType.TEXT).setIndex(true).setStore(true).setAnalyzer(FieldSchema.Analyzer.MaxWord));
//配置参数:包含Timeline表名、Index名、Index类型等。
TimelineSchema timelineSchema = new TimelineSchema("storeTableName", idSchema)
.autoGenerateSeqId() //SequenceId 设置为自增列方式。
.setTimeToLive(ttlForever)
.withIndex("storeIndexName", timelineIndex);
//通过Store工厂创建并获取Timeline的管理Store。
TimelineStore timelineStoreTableInstance = serviceFactory.createTimelineStore(timelineSchema);
//建表。
timelineStoreTableInstance.prepareTables();
同步库
消息存储库保存了用户所发送的所有信息,每个用户的发件箱对应一个Timeline。模型初始化如下:
Meta:Identifier设置为
Timeline_id
,由于各个用户ID唯一,因此可以取值为用户ID。Queue:SequenceId可以设置为时间戳,或采用自增序列。
Data:由于用户对于同步库的拉取较为频繁,检索较少,因此不配置
Index
字段;主要是用存储库进行检索。
初始化的代码如下。
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
TimelineSchema timelineSchema = new TimelineSchema("syncTableName", idSchema)
.autoGenerateSeqId()
.setTimeToLive(ttlForever);
TimelineStore timelineSyncTableInstance = serviceFactory.createTimelineStore(timelineSchema);
timelineSyncTableInstance.prepareTables();
在使用开源Redis方案时,为降低成本、减少开源Redis里面存储的数据量。一般会采取仅在Redis中存储Feed ID而不存储Feed内容。整体数据量大幅减少,但是在读取的时候需要先读Feed ID,然后在到存储系统里面去读取Feed内容,网络开销增长了一倍,而且是串行的,对用户的刷新延迟有较大影响。
而表格存储单表可存储十万亿行以上的数据,价格又低,轻松保存用户Feed流中的所有Feed数据。
元数据表
用户信息
采用表格存储的宽表模型,则用户信息表可以设计如下。建表方法,可参见表操作。
主键-1 | 属性列-1 | 属性列-2 | 属性列-3 | ...... | |
字段名 | user_id | name | gender | other | |
备注 | 主键列,用于唯一确定一个用户 | 用户昵称,用户自定义属性 | 用户邮箱 | 用户性别,用户自定义属性 | 其他属性列。Tablestore是FreeSchema类型的 |
用户关系
采用表格存储的宽表模型,则用户信息表可以设计如下。建表方法,可参见表操作。
主键-1 | 主键-2 | 属性列 | ...... | |
字段名 | user_id | follow_user_id | timestamp | other |
备注 | 用户ID | 粉丝用户ID | 关注时间 | 其他属性列 |
功能实现
一个Feed流系统主要有以下三个功能:
关注:用户间可以彼此关注。
发布Feed:用户可以发布新消息。
获取Feed流:用户可以查看自己所关注的人的消息或查看某个用户发布的消息。
发布Feed
将自己的Feed消息写入个人页Timeline(发件箱)。
获取自己的粉丝列表。
将自己的Feed消息写给自己的粉丝,如果有100个粉丝,那么就要写给100个用户,包括Feed内容和Feed ID。
发布Feed的流程结束。
public void sendPost(String userId, Message message, List<String> followers) {
{
//构建Identifier。
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", message.getTimelineId())
.build();
//同步存储消息。
store.createTimelineQueue(identifier).store(message.getTimelineMessage());
}
{
for (String friend : followers) {
TimelineIdentifier identifier = timelineIdToTimelineIdentifier(friend);
sync.createTimelineQueue(identifier).store(message.getTimelineMessage());
}
}
}
关注
使用putRow
接口直接写入一行数据(关注人,粉丝)到用户关系表即可。
public void follow(String userA, String userB) {
//构造主键。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
.build();
//设置数据表名称。
RowPutChange rowPutChange = new RowPutChange("userRelationTableName", primaryKey);
syncClient.putRow(new PutRowRequest(rowPutChange));
}
获取Feed流
从客户端获取上次读取到的最新消息的ID:lastSequenceId。使用scan接口读取最新的消息,起始位置是lastSequenceId。
通过scan读取自己的收件箱:范围起始位置是上次读取到的Feed的ID;scan读取到一定条数后返回。
按时间排序,返回给用户。
用户若持续刷新Feed流,则重复执行步骤1即可。
public List<Post> fetchFeed(String userId, long lastSequenceId) {
TimelineIdentifier identifier = timelineIdToTimelineIdentifier(userId);
//构建Scan参数。
ScanParameter parameter = new ScanParameter()
.scanForward(lastSequenceId)
.maxCount(30);
//根据Scan参数范围读取单个Queue下的消息。
Iterator<TimelineEntry> iterator = sync.createTimelineQueue(identifier).scan(parameter);
List<Post> posts = new LinkedList<>();
int counter = 0;
while (iterator.hasNext() && counter++ <= 30) { //单次读取30条Feed
Message message = new Message(identifier.toString(), iterator.next());
TimelineMessage timelineMessage = message.getTimelineMessage();
Post post = new Post() //提取关键信息。
.setName(timelineMessage.getString("sender"))
.setTime(timelineMessage.getLong("send_time"))
.setContent(timelineMessage.getString("text"))
.setSequenceId(message.getSequenceId());
posts.add(post);
}
return posts;
}
如果需要访问某个用户页面,查看该用户的Feed,根据用户ID访问对应存储库即可。具体步骤与上述类似。
快速体验
通过ROS一键部署,您可以快速体验本文介绍的Feed流。
一键部署会自动创建ECS资源、表格存储资源等,预计产生0.1元的费用(假设您选择1 vCPU、1 GiB规格的ECS示例运行0.5小时)。建议您完成体验后及时释放资源。
打开一键配置模板链接前往ROS控制台,系统自动跳转至已加载模板的配置模板参数页面。
根据页面提示,完成参数配置,然后单击创建。
当资源栈信息页签下的状态由创建中变为创建成功时,表示Feed流系统一键搭建完成。
单击输出页签,单击输出的URL连接,打开如下页面。您可以在此查看Feed流系统接口,并输入参数快速体验。
您也可以下载以上项目的样例代码,在本地运行、体验。使用前请确保您已经完成了以下操作:
已开通表格存储服务并创建实例
获取AccessKey并修改环境变量
修改样例配置YAML文件
初始化
当首次启动项目时,会自动完成相应的资源初始化和建表。初始化后可在表格存储控制台查看表格如下:
创建用户并关注
您可在接口页面中快速体验,或在本地使用接口/user/register
。输入用户ID等信息后执行接口,在表格存储控制台可查看到用户信息表如下:
使用接口/user/follow
,输入关注和被关注用户ID后执行接口,在表格存储控制台可查看到用户关系表如下:
发布Feed
使用接口/feed/post
,输入操作用户ID和Feed内容后执行接口,在表格存储控制台可查看到存储表如下:
同时可查看到同步表中,粉丝的Timeline下也有相应的Feed内容。
读取Feed流
使用接口/feed/get
,输入操作用户ID和上次读取的消息ID后执行接口,返回Feed流内容如下:
第一次查询可设置lastSequenceId为0,后续根据实际情况传入。
查看某用户Feed
使用接口/feed/getUserPosts
,输入需要查看的用户ID后执行接口,返回该用户Feed流内容如下: