基于表格存储的 Timeline SDK,可以搭建支持单聊、群聊、未读消息统计和消息全文检索的 IM(即时通讯)消息系统。
方案概述
IM 消息系统是即时通讯应用的核心组件,需要具备以下能力。
收发单聊与群聊消息。
管理好友关系(双向好友、解除好友)。
管理群组关系(创建群、加入群、退出群、查询群成员)。
同步新消息并统计未读消息。
全文检索消息内容并搜索群名。
传统方案通常采用 MySQL 存储消息数据,配合 Redis 管理在线状态和未读计数。随着用户量和消息量增长,MySQL 需要频繁分库分表,运维成本持续上升。
表格存储提供的 Timeline SDK 基于存储库(Store)和同步库(Sync)的双库写扩散模型设计,专为消息场景优化。存储库持久化会话消息,同步库即时分发新消息。配合宽表的自动水平扩展、多元索引的全文检索和二级索引的反向查询能力,可以搭建一套无需分库分表的 IM 消息系统。
快速体验
下载完整的 IM 消息系统示例代码,该项目包含初始化、用户管理、好友关系、群组管理、消息收发、未读统计和消息检索的实现。
示例代码下载:im.demo.zip。
运行前在用户主目录创建配置文件
tablestoreConf.json,填入表格存储实例的 Endpoint、AccessKey ID、AccessKey Secret 和实例名称。依次运行
InitChartRoomExample(初始化表和索引)、ClientRequestExample(完整功能演示)和ReleaseChartRoomExample(清理资源)。
方案设计
本方案使用 4 类数据表和 1 个二级索引支撑 IM 消息系统的核心功能。
类别 | 表名 | 主键设计 | 用途 |
用户数据 | im_user_table | user_id (STRING) | 存储用户属性(用户名、性别等),通过 user_id 唯一标识用户。 |
关系数据 | im_user_relation_table | main_user (STRING) + sub_user (STRING) | 存储好友关系。添加好友时使用 BatchWriteRow 写入双向两行数据,确保关系对称。 |
im_group_relation_table | group_id (STRING) + user_id (STRING) | 存储群组成员关系。基于主表的 GetRange 接口可获取群内所有成员;配套二级索引 im_group_relation_global_index 支持反向查询用户加入的所有群组。 | |
消息数据(Timeline SDK) | im_timeline_meta_table | timeline_id (STRING) | 存储会话元数据(会话类型、群名称、创建时间等)。配合多元索引 im_timeline_meta_index 支持按群名全文搜索。 |
im_timeline_store_table | timeline_id (STRING) | 消息存储库,持久化所有会话消息。配合多元索引 im_timeline_store_index 支持消息内容全文检索。 | |
im_timeline_sync_table | timeline_id (STRING) | 消息同步库,每个用户对应一个同步队列,基于写扩散模型即时分发新消息。 |
消息系统的核心设计是存储库和同步库的分离。
存储库(Store):以会话 ID 为标识,持久存储该会话的完整消息记录。用户打开会话窗口时,从存储库倒序拉取历史消息。存储库数据量大、读写频率相对较低,建议使用容量型实例。
同步库(Sync):以用户 ID 为标识,每个用户拥有独立的同步队列。发送消息时,系统将消息写扩散到接收方的同步队列。客户端通过维护 checkpoint(上次读取位置)增量拉取新消息,统计各会话的未读消息数。同步库要求低延迟实时读写,建议使用高性能型实例,可设置数据过期时间(一周或两周)。
方案实现
以下步骤基于表格存储 Java SDK 和 Timeline SDK 实现完整的 IM 消息系统。
本方案使用 Java Timeline SDK 实现。Timeline SDK 封装了存储库、同步库和元数据库的操作,无需手动管理底层表结构,需在 Maven 依赖中添加 tablestore SDK 5.17.4 或以上版本。
步骤一:初始化 Timeline 和创建数据表
初始化 TimelineV2 对象,配置存储库、同步库和元数据库的表结构和索引。存储库的多元索引支持消息全文检索,元数据库的多元索引支持群名搜索。
// Initialize Timeline SDK with three libraries: meta, store, and sync
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);
// Timeline identifier schema (shared by all three libraries)
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
// Meta library: stores conversation metadata with search index for group name search
IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(
new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord)
));
TimelineMetaSchema metaSchema = new TimelineMetaSchema("im_timeline_meta_table", idSchema)
.withIndex("im_timeline_meta_index", metaIndex);
TimelineMetaStore metaStore = serviceFactory.createMetaStore(metaSchema);
// Store library: persists all conversation messages with search index for full-text search
IndexSchema storeIndex = new IndexSchema();
storeIndex.setFieldSchemas(Arrays.asList(
new FieldSchema("timeline_id", FieldType.KEYWORD).setIndex(true),
new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("sender", FieldType.KEYWORD).setIndex(true),
new FieldSchema("type", FieldType.KEYWORD).setIndex(true)
));
TimelineSchema storeSchema = new TimelineSchema("im_timeline_store_table", idSchema)
.withIndex("im_timeline_store_index", storeIndex);
TimelineStore storeTable = serviceFactory.createTimelineStore(storeSchema);
// Sync library: per-user inbox for message fan-out, 7-day TTL
TimelineSchema syncSchema = new TimelineSchema("im_timeline_sync_table", idSchema)
.setTimeToLive(7 * 24 * 3600);
TimelineStore syncTable = serviceFactory.createTimelineStore(syncSchema);
// Create all tables and indexes
metaStore.prepareTables();
storeTable.prepareTables();
syncTable.prepareTables();除 Timeline SDK 管理的 3 张表外,还需要手动创建用户表、好友关系表和群组关系表。群组关系表需要创建二级索引用于反向查询。
// Create user table
TableMeta userMeta = new TableMeta("im_user_table");
userMeta.addPrimaryKeyColumn("user_id", PrimaryKeyType.STRING);
syncClient.createTable(new CreateTableRequest(userMeta, new TableOptions(-1, 1)));
// Create user relation table (for friendship)
TableMeta relationMeta = new TableMeta("im_user_relation_table");
relationMeta.addPrimaryKeyColumn("main_user", PrimaryKeyType.STRING);
relationMeta.addPrimaryKeyColumn("sub_user", PrimaryKeyType.STRING);
syncClient.createTable(new CreateTableRequest(relationMeta, new TableOptions(-1, 1)));
// Create group relation table with global secondary index
TableMeta groupMeta = new TableMeta("im_group_relation_table");
groupMeta.addPrimaryKeyColumn("group_id", PrimaryKeyType.STRING);
groupMeta.addPrimaryKeyColumn("user_id", PrimaryKeyType.STRING);
CreateTableRequest groupRequest = new CreateTableRequest(groupMeta, new TableOptions(-1, 1));
// Global secondary index: reverse PK order (user_id, group_id) for "list my groups"
IndexMeta indexMeta = new IndexMeta("im_group_relation_global_index");
indexMeta.addPrimaryKeyColumn("user_id");
indexMeta.addPrimaryKeyColumn("group_id");
groupRequest.addIndex(indexMeta);
syncClient.createTable(groupRequest);步骤二:创建用户
将用户信息写入用户表,user_id 与同步库中的 timeline_id 保持一致,便于消息写扩散时直接使用用户 ID 定位同步队列。
// Create a user
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
.build();
RowPutChange rowPutChange = new RowPutChange("im_user_table", primaryKey);
rowPutChange.addColumn("user_name", ColumnValue.fromString("Alice"));
rowPutChange.addColumn("sexuality", ColumnValue.fromString("FEMALE"));
syncClient.putRow(new PutRowRequest(rowPutChange));步骤三:管理好友关系
好友关系是双向的。添加好友时需要使用 BatchWriteRow 同时写入两行数据(A→B 和 B→A),确保双方都能通过 GetRange 查询到对方。解除好友同样使用 BatchWriteRow 同时删除两行。
// Establish bidirectional friendship between userA and userB
String userA = "user_1";
String userB = "user_2";
String timelineId = "single_conv_1"; // Shared conversation timeline ID
// Row A -> B
PrimaryKey pkA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
.build();
RowPutChange changeA = new RowPutChange("im_user_relation_table", pkA);
changeA.addColumn("timeline_id", ColumnValue.fromString(timelineId));
// Row B -> A
PrimaryKey pkB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
.build();
RowPutChange changeB = new RowPutChange("im_user_relation_table", pkB);
changeB.addColumn("timeline_id", ColumnValue.fromString(timelineId));
// Batch write for atomicity
BatchWriteRowRequest request = new BatchWriteRowRequest();
request.addRowChange(changeA);
request.addRowChange(changeB);
syncClient.batchWriteRow(request);查询好友列表时,以当前用户 ID 为 main_user,通过 GetRange 扫描获取所有 sub_user。
// List friends of a user
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString("user_1"))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MIN)
.build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString("user_1"))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MAX)
.build();
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("im_user_relation_table");
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);
GetRangeResponse response = syncClient.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
String friendId = row.getPrimaryKey().getPrimaryKeyColumn("sub_user").getValue().asString();
System.out.println("Friend: " + friendId);
}步骤四:管理群组
创建群组时,在 Timeline 元数据库中写入群组信息(群名、类型、创建时间),然后将群成员写入群组关系表。
// Create a group conversation in the meta library
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", "group_1")
.build();
TimelineMeta meta = new TimelineMeta(identifier)
.setField("type", "GROUP")
.setField("group_name", "Tablestore Discussion")
.setField("create_time", System.currentTimeMillis());
metaStore.insert(meta);
// Add members to the group relation table
String[] members = {"user_1", "user_2", "user_3"};
for (String userId : members) {
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("group_id", PrimaryKeyValue.fromString("group_1"))
.addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
.build();
syncClient.putRow(new PutRowRequest(new RowPutChange("im_group_relation_table", pk)));
}群组关系表的主键为 group_id + user_id,通过 GetRange 可获取群内所有成员。通过二级索引 im_group_relation_global_index(主键反转为 user_id + group_id),可以反向查询某用户加入的所有群组。
// List groups that a user has joined (via global secondary index)
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
.addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MIN)
.build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
.addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MAX)
.build();
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("im_group_relation_global_index");
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);
GetRangeResponse response = syncClient.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
String groupId = row.getPrimaryKey().getPrimaryKeyColumn("group_id").getValue().asString();
System.out.println("Joined group: " + groupId);
}步骤五:发送消息
发送消息分为两步:先写入存储库持久化,再写扩散到接收方的同步库。单聊消息写扩散到对方的同步队列,群聊消息写扩散到所有群成员的同步队列。
// Send a single chat message
TimelineMessage message = new TimelineMessage()
.setField("text", "Hello, how are you?")
.setField("type", "SINGLE")
.setField("sender", "user_2")
.setField("send_time", System.currentTimeMillis())
.setField("conversation", "user_2");
// Step 1: Write to the store library (persistent storage)
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
.addField("timeline_id", "single_conv_1")
.build();
storeTable.createTimelineQueue(storeId).store(message);
// Step 2: Fan-out to the receiver's sync library
TimelineIdentifier syncId = new TimelineIdentifier.Builder()
.addField("timeline_id", "user_1") // Receiver's user ID
.build();
syncTable.createTimelineQueue(syncId).store(message);// Send a group chat message (fan-out to all group members)
TimelineMessage groupMsg = new TimelineMessage()
.setField("text", "Who used Tablestore today?")
.setField("type", "GROUP")
.setField("sender", "user_1")
.setField("send_time", System.currentTimeMillis())
.setField("conversation", "group_1");
// Step 1: Write to the store library
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
.addField("timeline_id", "group_1")
.build();
storeTable.createTimelineQueue(storeId).store(groupMsg);
// Step 2: Fan-out to each group member's sync library
// Get group members from the group relation table, then write to each member's sync queue
for (String memberId : groupMemberIds) {
TimelineIdentifier syncId = new TimelineIdentifier.Builder()
.addField("timeline_id", memberId)
.build();
syncTable.createTimelineQueue(syncId).store(groupMsg);
}步骤六:消息同步与未读统计
客户端维护一个 checkpoint(上次读取的 sequenceId),通过 ScanParameter 从 checkpoint 之后增量拉取同步库中的新消息,然后按会话分组统计未读消息数。
// Fetch new messages from the sync library
long syncCheckpoint = 0; // Last read position, maintained by the client
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", "user_1")
.build();
ScanParameter parameter = new ScanParameter()
.scanForward(syncCheckpoint + 1)
.maxCount(30);
Iterator<TimelineEntry> iterator = syncTable.createTimelineQueue(identifier).scan(parameter);
// Group messages by conversation and count unread messages
Map<String, Integer> unreadCount = new HashMap<>();
while (iterator.hasNext()) {
TimelineEntry entry = iterator.next();
String conversation = entry.getMessage().getString("conversation");
unreadCount.merge(conversation, 1, Integer::sum);
// Update checkpoint to the latest sequenceId
syncCheckpoint = Math.max(syncCheckpoint, entry.getSequenceID());
}
for (Map.Entry<String, Integer> e : unreadCount.entrySet()) {
System.out.println("Conversation: " + e.getKey() + ", Unread: " + e.getValue());
}查看会话历史消息时,从存储库倒序拉取指定会话的消息列表。
// Fetch conversation history from the store library (reverse order)
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
.addField("timeline_id", "group_1")
.build();
ScanParameter parameter = new ScanParameter()
.scanBackward(Long.MAX_VALUE - 1)
.maxCount(30);
Iterator<TimelineEntry> iterator = storeTable.createTimelineQueue(storeId).scan(parameter);
while (iterator.hasNext()) {
TimelineEntry entry = iterator.next();
String sender = entry.getMessage().getString("sender");
String text = entry.getMessage().getString("text");
System.out.println("[" + sender + "]: " + text);
}步骤七:消息检索
存储库配置了多元索引 im_timeline_store_index,支持按会话 ID 和消息内容全文检索。元数据库配置了多元索引 im_timeline_meta_index,支持按群名模糊搜索群组。
// Search messages in a specific conversation by content keyword
TermQuery termQuery = new TermQuery();
termQuery.setFieldName("timeline_id");
termQuery.setTerm(ColumnValue.fromString("group_1"));
MatchPhraseQuery matchQuery = new MatchPhraseQuery();
matchQuery.setFieldName("text");
matchQuery.setText("Tablestore");
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(termQuery, matchQuery));
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setLimit(30);
List<SearchResult.Entry<TimelineEntry>> results = storeTable.search(searchQuery).getEntries();
for (SearchResult.Entry<TimelineEntry> result : results) {
TimelineEntry entry = result.getData();
System.out.println("[" + entry.getMessage().getString("sender") + "]: "
+ entry.getMessage().getString("text"));
}// Search group conversations by group name
TermQuery typeQuery = new TermQuery();
typeQuery.setFieldName("type");
typeQuery.setTerm(ColumnValue.fromString("GROUP"));
MatchPhraseQuery nameQuery = new MatchPhraseQuery();
nameQuery.setFieldName("group_name");
nameQuery.setText("Tablestore");
BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(typeQuery, nameQuery));
SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setLimit(100);
List<SearchResult.Entry<TimelineMeta>> results = metaStore.search(searchQuery).getEntries();
for (SearchResult.Entry<TimelineMeta> result : results) {
TimelineMeta meta = result.getData();
System.out.println("Group: " + meta.getString("group_name"));
}资源清理
以下操作将删除所有 IM 相关的数据表和索引,数据不可恢复。确认不再需要这些数据后,再执行清理操作。
// Drop user and relation tables
syncClient.deleteTable(new DeleteTableRequest("im_user_table"));
syncClient.deleteTable(new DeleteTableRequest("im_user_relation_table"));
syncClient.deleteTable(new DeleteTableRequest("im_group_relation_table"));
// Drop Timeline SDK managed tables and indexes
metaStore.dropAllTables();
storeTable.dropAllTables();
syncTable.dropAllTables();