方案实现

在前文的方案与架构基础上,本文介绍如何使用表格存储的Timeline模型实现Feed流系统的存储与同步功能。

模型介绍

本文使用表格存储作为存储和同步系统,并主要使用Timeline模型、基于推模式进行同步。

表格存储消息(Timeline)模型是针对消息数据场景所设计的,能够满足消息数据场景对消息保序、海量消息存储、实时同步的业务需求,同时支持全文检索与多维度组合查询。更多信息,请参见模型介绍

image.png

一个Timeline模型存储有若干Timeline,单个TimelineMeta、Queue、Data组成,如上图所示。

  • Meta:元数据。根据Identifier可以唯一标识一个Timeline。

  • Queue:消息存储队列。根据SequenceId可以确认一条Message在队列中的位点信息。

  • Data:消息实体数据。根据Index可以对Message实现检索。

Feed流系统的存储与同步均可以基于Timeline模型快速实现。

资源初始化

存储库

消息存储库保存了用户所发送的所有信息,每个用户的发件箱对应一个Timeline。模型初始化如下:

  • Meta:Identifier设置为Timeline_id,由于各个用户ID唯一,因此可以取值为用户ID。

  • Queue:SequenceId可以设置为自增序列。

  • Data:配置Index的字段有sendsend_timetext

初始化的代码如下。

//将SyncClient作为参数,初始化StoreFactory。
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);

//构建Timeline表的Schema配置。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

//配置Index的字段以及字段类型。
IndexSchema timelineIndex = new IndexSchema();
timelineIndex.addFieldSchema(new FieldSchema("sender", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true).setStore(true));
timelineIndex.addFieldSchema(new FieldSchema("send_time", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true).setStore(true));
timelineIndex.addFieldSchema(new FieldSchema("text", FieldType.TEXT).setIndex(true).setStore(true).setAnalyzer(FieldSchema.Analyzer.MaxWord));

//配置参数:包含Timeline表名、Index名、Index类型等。
TimelineSchema timelineSchema = new TimelineSchema("storeTableName", idSchema)
        .autoGenerateSeqId() //SequenceId 设置为自增列方式。
        .setTimeToLive(ttlForever)
        .withIndex("storeIndexName", timelineIndex);

//通过Store工厂创建并获取Timeline的管理Store。
TimelineStore timelineStoreTableInstance = serviceFactory.createTimelineStore(timelineSchema);

//建表。
timelineStoreTableInstance.prepareTables();

同步库

消息存储库保存了用户所发送的所有信息,每个用户的发件箱对应一个Timeline。模型初始化如下:

  • Meta:Identifier设置为Timeline_id,由于各个用户ID唯一,因此可以取值为用户ID。

  • Queue:SequenceId可以设置为时间戳,或采用自增序列。

  • Data:由于用户对于同步库的拉取较为频繁,检索较少,因此不配置Index字段;主要是用存储库进行检索。

初始化的代码如下。

TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);

TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

TimelineSchema timelineSchema = new TimelineSchema("syncTableName", idSchema)
        .autoGenerateSeqId()
        .setTimeToLive(ttlForever);

TimelineStore timelineSyncTableInstance = serviceFactory.createTimelineStore(timelineSchema);

timelineSyncTableInstance.prepareTables();

在使用开源Redis方案时,为降低成本、减少开源Redis里面存储的数据量。一般会采取仅在Redis中存储Feed ID而不存储Feed内容。整体数据量大幅减少,但是在读取的时候需要先读Feed ID,然后在到存储系统里面去读取Feed内容,网络开销增长了一倍,而且是串行的,对用户的刷新延迟有较大影响。

而表格存储单表可存储十万亿行以上的数据,价格又低,轻松保存用户Feed流中的所有Feed数据。

元数据表

用户信息

采用表格存储的宽表模型,则用户信息表可以设计如下。建表方法,可参见表操作

主键-1

属性列-1

属性列-2

属性列-3

......

字段名

user_id

name

email

gender

other

备注

主键列,用于唯一确定一个用户

用户昵称,用户自定义属性

用户邮箱

用户性别,用户自定义属性

其他属性列。TablestoreFreeSchema类型的

用户关系

采用表格存储的宽表模型,则用户信息表可以设计如下。建表方法,可参见表操作

主键-1

主键-2

属性列

......

字段名

user_id

follow_user_id

timestamp

other

备注

用户ID

粉丝用户ID

关注时间

其他属性列

功能实现

一个Feed流系统主要有以下三个功能:

  • 关注:用户间可以彼此关注。

  • 发布Feed:用户可以发布新消息。

  • 获取Feed流:用户可以查看自己所关注的人的消息或查看某个用户发布的消息。

image

发布Feed

  1. 将自己的Feed消息写入个人页Timeline(发件箱)。

  2. 获取自己的粉丝列表。

  3. 将自己的Feed消息写给自己的粉丝,如果有100个粉丝,那么就要写给100个用户,包括Feed内容和Feed ID。

  4. 发布Feed的流程结束。

public void sendPost(String userId, Message message, List<String> followers) {
    {
        //构建Identifier。
      	TimelineIdentifier identifier = new TimelineIdentifier.Builder()
                .addField("timeline_id", message.getTimelineId())
                .build();
				
      	//同步存储消息。
        store.createTimelineQueue(identifier).store(message.getTimelineMessage());
    }

    {
        for (String friend : followers) {
            TimelineIdentifier identifier = timelineIdToTimelineIdentifier(friend);

            sync.createTimelineQueue(identifier).store(message.getTimelineMessage());
        }
    }
}

关注

使用putRow接口直接写入一行数据(关注人,粉丝)到用户关系表即可。

public void follow(String userA, String userB) {
  	//构造主键。
    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
            .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
            .build();
		
  	//设置数据表名称。
    RowPutChange rowPutChange = new RowPutChange("userRelationTableName", primaryKey);

    syncClient.putRow(new PutRowRequest(rowPutChange));
}

获取Feed

从客户端获取上次读取到的最新消息的ID:lastSequenceId。使用scan接口读取最新的消息,起始位置是lastSequenceId。

  1. 通过scan读取自己的收件箱:范围起始位置是上次读取到的FeedID;scan读取到一定条数后返回。

  2. 按时间排序,返回给用户。

  3. 用户若持续刷新Feed流,则重复执行步骤1即可。

public List<Post> fetchFeed(String userId, long lastSequenceId) {
    TimelineIdentifier identifier = timelineIdToTimelineIdentifier(userId);
		
  	//构建Scan参数。
    ScanParameter parameter = new ScanParameter()
            .scanForward(lastSequenceId)
            .maxCount(30);
		
  	//根据Scan参数范围读取单个Queue下的消息。
    Iterator<TimelineEntry> iterator = sync.createTimelineQueue(identifier).scan(parameter);

    List<Post> posts = new LinkedList<>();
    int counter = 0;
    while (iterator.hasNext() && counter++ <= 30) { //单次读取30条Feed
        Message message = new Message(identifier.toString(), iterator.next());
        TimelineMessage timelineMessage = message.getTimelineMessage();

        Post post = new Post() //提取关键信息。
                .setName(timelineMessage.getString("sender"))
                .setTime(timelineMessage.getLong("send_time"))
                .setContent(timelineMessage.getString("text"))
                .setSequenceId(message.getSequenceId());

        posts.add(post);
    }

    return posts;
}

如果需要访问某个用户页面,查看该用户的Feed,根据用户ID访问对应存储库即可。具体步骤与上述类似。

快速体验

通过ROS一键部署,您可以快速体验本文介绍的Feed流。

重要

一键部署会自动创建ECS资源、表格存储资源等,预计产生0.1元的费用(假设您选择1 vCPU、1 GiB规格的ECS示例运行0.5小时)。建议您完成体验后及时释放资源。

  1. 打开一键配置模板链接前往ROS控制台,系统自动跳转至已加载模板的配置模板参数页面。

  2. 根据页面提示,完成参数配置,然后单击创建

    资源栈信息页签下的状态由创建中变为创建成功时,表示Feed流系统一键搭建完成。

  3. 单击输出页签,单击输出的URL连接,打开如下页面。您可以在此查看Feed流系统接口,并输入参数快速体验。

image.png

您也可以下载以上项目的样例代码,在本地运行、体验。使用前请确保您已经完成了以下操作:

  • 已开通表格存储服务并创建实例

  • 获取AccessKey并修改环境变量

  • 修改样例配置YAML文件

初始化

当首次启动项目时,会自动完成相应的资源初始化和建表。初始化后可在表格存储控制台查看表格如下:

image.png

创建用户并关注

您可在接口页面中快速体验,或在本地使用接口/user/register。输入用户ID等信息后执行接口,在表格存储控制台可查看到用户信息表如下:

image.png

使用接口/user/follow,输入关注和被关注用户ID后执行接口,在表格存储控制台可查看到用户关系表如下:

image.png

发布Feed

使用接口/feed/post,输入操作用户IDFeed内容后执行接口,在表格存储控制台可查看到存储表如下:

image.png

同时可查看到同步表中,粉丝的Timeline下也有相应的Feed内容。

读取Feed

使用接口/feed/get,输入操作用户ID和上次读取的消息ID后执行接口,返回Feed流内容如下:

image.png

第一次查询可设置lastSequenceId0,后续根据实际情况传入。

查看某用户Feed

使用接口/feed/getUserPosts,输入需要查看的用户ID后执行接口,返回该用户Feed流内容如下:

image.png