介绍如何使用表格存储的主键增列功能优化高并发IM系统架构。
背景
在构建社交IM和朋友圈应用时,最基本的需求是将用户发送的消息和朋友圈的更新及时、准确地更新给该用户的好友。这需要为用户发送的每一条消息或者朋友圈更新设置一个序号或者ID,并且保证递增,这个机制可以确保所有的消息能够按照正确的顺序被接收端处理。
高并发的IM系统通常选择NoSQL数据库存储产品来存储消息,但常见的NoSQL产品没有提供自增列的功能,通常要借助外部组件来实现消息序号和ID的递增,使得整体的架构更加复杂,且影响整条链路的延时。
功能需求
- 支持用户一对一聊天
- 支持用户群组内聊天
- 支持同一个用户的多终端消息同步
现有架构
- 消息模型 消息模型的实现顺序如下。
- 发送方发送了一条消息后,消息会被客户端推送给后台系统。
- 后台系统会先存储消息。
- 存储成功后,会推送消息给接收方的客户端。
- 后台架构 后台架构主要分为两部分:逻辑层和存储层。
- 逻辑层包括应用服务器、队列服务和自增ID生成器,是整个后台架构的核心,处理消息的接收、推送、通知、群消息写复制等核心业务逻辑。
- 存储层主要是用来存储持久化消息数据和其他一些需要持久化的数据。
对于一对一聊天,发送方发送消息给应用服务器后,应用服务器将消息存储到接收方为主键的表中,同时通知应用服务器中的消息推送服务,消息推送服务会将上次推送给接收方的最后一条消息的ID作为起始主键,从存储系统中读取之后的所有消息,然后将消息推送给接收方。
对于群组内的聊天,逻辑会更加复杂,需要通过异步队列来完成消息的扩散写,即发到群组内的一条消息会给群组内的每个人都保存一份。
下图展示了省略掉存储层后的群消息发送过程。
使用扩散写而非扩散读,主要由于以下两点原因:- 群组内成员一般都不多,存储成本并不高,而且有压缩,成本更低。
- 消息扩散写到每个人的存储表(收件箱)后,为每个接收方推送消息时,只需要检查自己的收件箱即可,此时群聊和单聊的处理逻辑一样,实现简单。
- 存储系统 存储系统采用阿里云表格存储,表格存储具有以下优势:
- 表格存储写操作不仅支持单行写,也支持多行批量写,可满足大并发写数据需求。
- 表格存储支持按范围读,消息多时可翻页。
- 表格存储支持数据生命周期管理,可对过期数据进行自动清理,节省存储费用。
- 表格存储价格便宜,且稳定可靠。
- 表格存储读写性能极佳,对于聊天消息,延迟基本在毫秒,甚至微秒级别。
表格存储表结构的主键列部分请参见下表。主键顺序 主键名称 主键值 说明 1 partition_key md5(receive_id)前4位 分区键保证数据均匀分布 2 receive_id receive_id 接收方的用户ID 3 message_id message_id 消息ID 表格存储表结构包括主键列和属性列两部分。- 主键列
- 最多支持4个主键列,第一个主键为分区键。
- 通过分区键可以让数据和请求均衡分布、避免热点。由于最终读取消息时要按照接收方读取,所以此处可以使用接收方ID作为分区键。为了更加均衡,可以使用接收方ID的md5值的部分区域,例如前4个字符,这样就可以将数据均衡分布了。
- 第一个主键只用了部分接收方ID,为了能定位到接收方的消息,需要保存完整的接收方ID,所以可以将接收方ID作为第二个主键。
- 第三个主键可以是消息ID,由于需要查询最新的消息,这个值需要是单调自增的。
说明 主键列结构在使用过程中不能修改。 - 属性列
属性由多个属性列组成。每行的属性列个数没有限制,即每行的属性列可不同。一个属性列在某一行的值可为空。同一个属性列的值可以有多种数据类型。属性列可以保存消息内容和元数据等。
- 挑战
此架构虽可应用于高并发IM系统,但仍面临挑战。多个用户在一个队列中,队列串行执行,为了保证消息严格递增,执行过程中要持有锁,这个过程会有一个风险:如果发送给某个用户的消息量很大,这个用户所在的队列中消息会变多,就有可能堵塞其他用户的消息,导致同队列的其他用户消息出现延迟。
新架构
现有架构存在的挑战可通过使用表格存储的主键列自增功能轻松解决。
- 与原架构相比,新架构最明显的区别是减少了队列服务和自增ID生成器两个组件,架构更加简单。
- 应用服务器接收到消息后,直接将消息写入表格存储,对于主键自增列message_id,在写数据时不需要填写具体的值,只需要填充一个特定的占位符即可,此值会在表格存储系统内部自动生成。
- 新架构中自增操作是在表格存储系统内部处理的,就算多个应用服务器同时给表格存储中的同一个接收方写数据,表格存储内部也能保证这些消息是串行处理,每个消息都有一个独立的消息ID,且严格递增。使用主键列自增功能,就不再需要队列服务,这样也就彻底解决了原架构的问题。
- 原架构只能有一个队列处理同一个用户的消息,新架构可以多个队列并行处理,当一些用户的消息量突然变大时,也不会立即堵塞其他用户,而是将压力均匀分布给了所有队列。
- 使用主键自增列功能后,应用服务器可以直接写数据到表格存储,不再需要经过队列和获取消息ID,性能表现会更加优秀。
方案实现
本文使用Java SDK实现优化方案。
- 创建数据表 数据表结构的主键列部分请参见下表。
主键顺序 主键名称 主键值 说明 1 partition_key hash(receive_id)前4位 分区键,保证数据均匀分布,可以使用md5作为hash函数 2 receive_id receive_id 接收方的用户ID 3 message_id message_id 消息ID 上表中,第三列主键(PK)是message_id,这一列是主键自增列,建表时指定message_id列的属性为AUTO_INCREMENT,且类型为INTEGER。private static void createTable(SyncClient client) { TableMeta tableMeta = new TableMeta(“message_table”); //第一列为分区建。 tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING)); //第二列为接收方ID。 tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("receive_id", PrimaryKeyType.STRING)); //第三列为消息ID,自动自增列,类型为INTEGER,属性为PKO_AUTO_INCREMENT。 tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT)); int timeToLive = -1; //永不过期,也可以设置数据有效期,过期了会自动删除。 int maxVersions = 1; //只保存一个版本,目前支持多版本。 TableOptions tableOptions = new TableOptions(timeToLive, maxVersions); CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions); client.createTable(request); }
完成以上操作后即创建了一个第三列PK为自动自增的表。
- 写数据您可以使用PutRow和BatchWriteRow接口写数据,这两个接口都支持主键列自增功能。写数据时,第三列message_id是主键自增列,这一列不需要填值,只需要填入占位符即可。
private static void putRow(SyncClient client, String receive_id) { //构造主键。 PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); //第一列的值为hash(receive_id)前4位。 primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4))); //第二列的值为接收方ID。 primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id)); //第三列是消息ID,主键递增列,这个值是表格存储产生的,用户在这里不需要填入真实值,只需要一个占位符AUTO_INCREMENT即可。 primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT); PrimaryKey primaryKey = primaryKeyBuilder.build(); RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey); //此处设置返回类型为RT_PK,表示是在返回结果中包含PK列的值。如果不设置ReturnType,默认不返回。 rowPutChange.setReturnType(ReturnType.RT_PK); //加入属性列,消息内容。 rowPutChange.addColumn(new Column("content", ColumnValue.fromString(content))); //写数据到表格存储。 PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange)); //打印出返回的PK列。 Row returnRow = response.getRow(); if (returnRow != null) { System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString()); } //打印出消耗的CU。 CapacityUnit cu = response.getConsumedCapacity().getCapacityUnit(); System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit()); System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit()); }
- 读数据您可以通过GetRange接口读取最近的消息。message_id这一列PK的起始位置是上一条消息的message_id+1,结束位置是INF_MAX,这样每次都可以读出最新的消息,然后发送给客户端。
private static void getRange(SyncClient client, String receive_id, String lastMessageId) { RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(“message_table”); //设置起始主键。 PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); //第一列的值为hash(receive_id)前4位。 primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4))); //第二列的值为接收方ID。 primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id)); //第三列的值为消息ID,起始于上一条消息。 primaryKeyBuilder.addPrimaryKeyColumn(“message_id”, PrimaryKeyValue.fromLong(lastMessageId + 1)); rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build()); //设置结束主键。 primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); //第一列的值为hash(receive_id)前4位。 primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4))); //第二列的值为接收方ID。 primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id)); //第三列的值为消息ID。 primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.INF_MAX); rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build()); rangeRowQueryCriteria.setMaxVersions(1); System.out.println("GetRange的结果为:"); while (true) { GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria)); for (Row row : getRangeResponse.getRows()) { System.out.println(row); } //如果nextStartPrimaryKey不为null,则继续读取。 if (getRangeResponse.getNextStartPrimaryKey() != null) { rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey()); } else { break; } } }
技术支持
表格存储为您提供专业的免费的技术咨询服务,欢迎通过钉钉加入相应交流群。
为互联网应用、大数据、社交应用等开发者提供的最新技术交流群有36165029092(
表格存储技术交流群-3
)。说明表格存储用户群11789671(
表格存储技术交流群
)和23307953(表格存储技术交流群-2
)已满,暂时无法加入。为物联网和时序模型开发者提供的技术交流群有44327024(
物联网存储 IoTstore 开发者交流群
)。