搭建IM消息系统

更新时间:
复制 MD 格式

基于表格存储的 Timeline SDK,可以搭建支持单聊、群聊、未读消息统计和消息全文检索的 IM(即时通讯)消息系统。

方案概述

IM 消息系统是即时通讯应用的核心组件,需要具备以下能力。

  • 收发单聊与群聊消息。

  • 管理好友关系(双向好友、解除好友)。

  • 管理群组关系(创建群、加入群、退出群、查询群成员)。

  • 同步新消息并统计未读消息。

  • 全文检索消息内容并搜索群名。

传统方案通常采用 MySQL 存储消息数据,配合 Redis 管理在线状态和未读计数。随着用户量和消息量增长,MySQL 需要频繁分库分表,运维成本持续上升。

表格存储提供的 Timeline SDK 基于存储库(Store)和同步库(Sync)的双库写扩散模型设计,专为消息场景优化。存储库持久化会话消息,同步库即时分发新消息。配合宽表的自动水平扩展、多元索引的全文检索和二级索引的反向查询能力,可以搭建一套无需分库分表的 IM 消息系统。

快速体验

下载完整的 IM 消息系统示例代码,该项目包含初始化、用户管理、好友关系、群组管理、消息收发、未读统计和消息检索的实现。

  • 示例代码下载:im.demo.zip

  • 运行前在用户主目录创建配置文件 tablestoreConf.json,填入表格存储实例的 Endpoint、AccessKey ID、AccessKey Secret 和实例名称。

  • 依次运行 InitChartRoomExample(初始化表和索引)、ClientRequestExample(完整功能演示)和 ReleaseChartRoomExample(清理资源)。

方案设计

本方案使用 4 类数据表和 1 个二级索引支撑 IM 消息系统的核心功能。

类别

表名

主键设计

用途

用户数据

im_user_table

user_id (STRING)

存储用户属性(用户名、性别等),通过 user_id 唯一标识用户。

关系数据

im_user_relation_table

main_user (STRING) + sub_user (STRING)

存储好友关系。添加好友时使用 BatchWriteRow 写入双向两行数据,确保关系对称。

im_group_relation_table

group_id (STRING) + user_id (STRING)

存储群组成员关系。基于主表的 GetRange 接口可获取群内所有成员;配套二级索引 im_group_relation_global_index 支持反向查询用户加入的所有群组。

消息数据(Timeline SDK)

im_timeline_meta_table

timeline_id (STRING)

存储会话元数据(会话类型、群名称、创建时间等)。配合多元索引 im_timeline_meta_index 支持按群名全文搜索。

im_timeline_store_table

timeline_id (STRING)

消息存储库,持久化所有会话消息。配合多元索引 im_timeline_store_index 支持消息内容全文检索。

im_timeline_sync_table

timeline_id (STRING)

消息同步库,每个用户对应一个同步队列,基于写扩散模型即时分发新消息。

消息系统的核心设计是存储库和同步库的分离。

  • 存储库(Store):以会话 ID 为标识,持久存储该会话的完整消息记录。用户打开会话窗口时,从存储库倒序拉取历史消息。存储库数据量大、读写频率相对较低,建议使用容量型实例。

  • 同步库(Sync):以用户 ID 为标识,每个用户拥有独立的同步队列。发送消息时,系统将消息写扩散到接收方的同步队列。客户端通过维护 checkpoint(上次读取位置)增量拉取新消息,统计各会话的未读消息数。同步库要求低延迟实时读写,建议使用高性能型实例,可设置数据过期时间(一周或两周)。

方案实现

以下步骤基于表格存储 Java SDK 和 Timeline SDK 实现完整的 IM 消息系统。

说明

本方案使用 Java Timeline SDK 实现。Timeline SDK 封装了存储库、同步库和元数据库的操作,无需手动管理底层表结构,需在 Maven 依赖中添加 tablestore SDK 5.17.4 或以上版本。

步骤一:初始化 Timeline 和创建数据表

初始化 TimelineV2 对象,配置存储库、同步库和元数据库的表结构和索引。存储库的多元索引支持消息全文检索,元数据库的多元索引支持群名搜索。

// Initialize Timeline SDK with three libraries: meta, store, and sync
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);

// Timeline identifier schema (shared by all three libraries)
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

// Meta library: stores conversation metadata with search index for group name search
IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(
        new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord)
));
TimelineMetaSchema metaSchema = new TimelineMetaSchema("im_timeline_meta_table", idSchema)
        .withIndex("im_timeline_meta_index", metaIndex);
TimelineMetaStore metaStore = serviceFactory.createMetaStore(metaSchema);

// Store library: persists all conversation messages with search index for full-text search
IndexSchema storeIndex = new IndexSchema();
storeIndex.setFieldSchemas(Arrays.asList(
        new FieldSchema("timeline_id", FieldType.KEYWORD).setIndex(true),
        new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("sender", FieldType.KEYWORD).setIndex(true),
        new FieldSchema("type", FieldType.KEYWORD).setIndex(true)
));
TimelineSchema storeSchema = new TimelineSchema("im_timeline_store_table", idSchema)
        .withIndex("im_timeline_store_index", storeIndex);
TimelineStore storeTable = serviceFactory.createTimelineStore(storeSchema);

// Sync library: per-user inbox for message fan-out, 7-day TTL
TimelineSchema syncSchema = new TimelineSchema("im_timeline_sync_table", idSchema)
        .setTimeToLive(7 * 24 * 3600);
TimelineStore syncTable = serviceFactory.createTimelineStore(syncSchema);

// Create all tables and indexes
metaStore.prepareTables();
storeTable.prepareTables();
syncTable.prepareTables();

除 Timeline SDK 管理的 3 张表外,还需要手动创建用户表、好友关系表和群组关系表。群组关系表需要创建二级索引用于反向查询。

// Create user table
TableMeta userMeta = new TableMeta("im_user_table");
userMeta.addPrimaryKeyColumn("user_id", PrimaryKeyType.STRING);
syncClient.createTable(new CreateTableRequest(userMeta, new TableOptions(-1, 1)));

// Create user relation table (for friendship)
TableMeta relationMeta = new TableMeta("im_user_relation_table");
relationMeta.addPrimaryKeyColumn("main_user", PrimaryKeyType.STRING);
relationMeta.addPrimaryKeyColumn("sub_user", PrimaryKeyType.STRING);
syncClient.createTable(new CreateTableRequest(relationMeta, new TableOptions(-1, 1)));

// Create group relation table with global secondary index
TableMeta groupMeta = new TableMeta("im_group_relation_table");
groupMeta.addPrimaryKeyColumn("group_id", PrimaryKeyType.STRING);
groupMeta.addPrimaryKeyColumn("user_id", PrimaryKeyType.STRING);
CreateTableRequest groupRequest = new CreateTableRequest(groupMeta, new TableOptions(-1, 1));

// Global secondary index: reverse PK order (user_id, group_id) for "list my groups"
IndexMeta indexMeta = new IndexMeta("im_group_relation_global_index");
indexMeta.addPrimaryKeyColumn("user_id");
indexMeta.addPrimaryKeyColumn("group_id");
groupRequest.addIndex(indexMeta);
syncClient.createTable(groupRequest);

步骤二:创建用户

将用户信息写入用户表,user_id 与同步库中的 timeline_id 保持一致,便于消息写扩散时直接使用用户 ID 定位同步队列。

// Create a user
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
        .build();

RowPutChange rowPutChange = new RowPutChange("im_user_table", primaryKey);
rowPutChange.addColumn("user_name", ColumnValue.fromString("Alice"));
rowPutChange.addColumn("sexuality", ColumnValue.fromString("FEMALE"));

syncClient.putRow(new PutRowRequest(rowPutChange));

步骤三:管理好友关系

好友关系是双向的。添加好友时需要使用 BatchWriteRow 同时写入两行数据(A→B 和 B→A),确保双方都能通过 GetRange 查询到对方。解除好友同样使用 BatchWriteRow 同时删除两行。

// Establish bidirectional friendship between userA and userB
String userA = "user_1";
String userB = "user_2";
String timelineId = "single_conv_1";  // Shared conversation timeline ID

// Row A -> B
PrimaryKey pkA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
        .build();
RowPutChange changeA = new RowPutChange("im_user_relation_table", pkA);
changeA.addColumn("timeline_id", ColumnValue.fromString(timelineId));

// Row B -> A
PrimaryKey pkB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
        .build();
RowPutChange changeB = new RowPutChange("im_user_relation_table", pkB);
changeB.addColumn("timeline_id", ColumnValue.fromString(timelineId));

// Batch write for atomicity
BatchWriteRowRequest request = new BatchWriteRowRequest();
request.addRowChange(changeA);
request.addRowChange(changeB);
syncClient.batchWriteRow(request);

查询好友列表时,以当前用户 ID 为 main_user,通过 GetRange 扫描获取所有 sub_user。

// List friends of a user
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MIN)
        .build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MAX)
        .build();

RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("im_user_relation_table");
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);

GetRangeResponse response = syncClient.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
    String friendId = row.getPrimaryKey().getPrimaryKeyColumn("sub_user").getValue().asString();
    System.out.println("Friend: " + friendId);
}

步骤四:管理群组

创建群组时,在 Timeline 元数据库中写入群组信息(群名、类型、创建时间),然后将群成员写入群组关系表。

// Create a group conversation in the meta library
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();
TimelineMeta meta = new TimelineMeta(identifier)
        .setField("type", "GROUP")
        .setField("group_name", "Tablestore Discussion")
        .setField("create_time", System.currentTimeMillis());
metaStore.insert(meta);

// Add members to the group relation table
String[] members = {"user_1", "user_2", "user_3"};
for (String userId : members) {
    PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("group_id", PrimaryKeyValue.fromString("group_1"))
            .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
            .build();
    syncClient.putRow(new PutRowRequest(new RowPutChange("im_group_relation_table", pk)));
}

群组关系表的主键为 group_id + user_id,通过 GetRange 可获取群内所有成员。通过二级索引 im_group_relation_global_index(主键反转为 user_id + group_id),可以反向查询某用户加入的所有群组。

// List groups that a user has joined (via global secondary index)
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MIN)
        .build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MAX)
        .build();

RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("im_group_relation_global_index");
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);

GetRangeResponse response = syncClient.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
    String groupId = row.getPrimaryKey().getPrimaryKeyColumn("group_id").getValue().asString();
    System.out.println("Joined group: " + groupId);
}

步骤五:发送消息

发送消息分为两步:先写入存储库持久化,再写扩散到接收方的同步库。单聊消息写扩散到对方的同步队列,群聊消息写扩散到所有群成员的同步队列。

// Send a single chat message
TimelineMessage message = new TimelineMessage()
        .setField("text", "Hello, how are you?")
        .setField("type", "SINGLE")
        .setField("sender", "user_2")
        .setField("send_time", System.currentTimeMillis())
        .setField("conversation", "user_2");

// Step 1: Write to the store library (persistent storage)
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "single_conv_1")
        .build();
storeTable.createTimelineQueue(storeId).store(message);

// Step 2: Fan-out to the receiver's sync library
TimelineIdentifier syncId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "user_1")  // Receiver's user ID
        .build();
syncTable.createTimelineQueue(syncId).store(message);
// Send a group chat message (fan-out to all group members)
TimelineMessage groupMsg = new TimelineMessage()
        .setField("text", "Who used Tablestore today?")
        .setField("type", "GROUP")
        .setField("sender", "user_1")
        .setField("send_time", System.currentTimeMillis())
        .setField("conversation", "group_1");

// Step 1: Write to the store library
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();
storeTable.createTimelineQueue(storeId).store(groupMsg);

// Step 2: Fan-out to each group member's sync library
// Get group members from the group relation table, then write to each member's sync queue
for (String memberId : groupMemberIds) {
    TimelineIdentifier syncId = new TimelineIdentifier.Builder()
            .addField("timeline_id", memberId)
            .build();
    syncTable.createTimelineQueue(syncId).store(groupMsg);
}

步骤六:消息同步与未读统计

客户端维护一个 checkpoint(上次读取的 sequenceId),通过 ScanParameter 从 checkpoint 之后增量拉取同步库中的新消息,然后按会话分组统计未读消息数。

// Fetch new messages from the sync library
long syncCheckpoint = 0;  // Last read position, maintained by the client

TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "user_1")
        .build();

ScanParameter parameter = new ScanParameter()
        .scanForward(syncCheckpoint + 1)
        .maxCount(30);

Iterator<TimelineEntry> iterator = syncTable.createTimelineQueue(identifier).scan(parameter);

// Group messages by conversation and count unread messages
Map<String, Integer> unreadCount = new HashMap<>();
while (iterator.hasNext()) {
    TimelineEntry entry = iterator.next();
    String conversation = entry.getMessage().getString("conversation");
    unreadCount.merge(conversation, 1, Integer::sum);

    // Update checkpoint to the latest sequenceId
    syncCheckpoint = Math.max(syncCheckpoint, entry.getSequenceID());
}

for (Map.Entry<String, Integer> e : unreadCount.entrySet()) {
    System.out.println("Conversation: " + e.getKey() + ", Unread: " + e.getValue());
}

查看会话历史消息时,从存储库倒序拉取指定会话的消息列表。

// Fetch conversation history from the store library (reverse order)
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();

ScanParameter parameter = new ScanParameter()
        .scanBackward(Long.MAX_VALUE - 1)
        .maxCount(30);

Iterator<TimelineEntry> iterator = storeTable.createTimelineQueue(storeId).scan(parameter);
while (iterator.hasNext()) {
    TimelineEntry entry = iterator.next();
    String sender = entry.getMessage().getString("sender");
    String text = entry.getMessage().getString("text");
    System.out.println("[" + sender + "]: " + text);
}

步骤七:消息检索

存储库配置了多元索引 im_timeline_store_index,支持按会话 ID 和消息内容全文检索。元数据库配置了多元索引 im_timeline_meta_index,支持按群名模糊搜索群组。

// Search messages in a specific conversation by content keyword
TermQuery termQuery = new TermQuery();
termQuery.setFieldName("timeline_id");
termQuery.setTerm(ColumnValue.fromString("group_1"));

MatchPhraseQuery matchQuery = new MatchPhraseQuery();
matchQuery.setFieldName("text");
matchQuery.setText("Tablestore");

BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(termQuery, matchQuery));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setLimit(30);

List<SearchResult.Entry<TimelineEntry>> results = storeTable.search(searchQuery).getEntries();
for (SearchResult.Entry<TimelineEntry> result : results) {
    TimelineEntry entry = result.getData();
    System.out.println("[" + entry.getMessage().getString("sender") + "]: "
            + entry.getMessage().getString("text"));
}
// Search group conversations by group name
TermQuery typeQuery = new TermQuery();
typeQuery.setFieldName("type");
typeQuery.setTerm(ColumnValue.fromString("GROUP"));

MatchPhraseQuery nameQuery = new MatchPhraseQuery();
nameQuery.setFieldName("group_name");
nameQuery.setText("Tablestore");

BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(typeQuery, nameQuery));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setLimit(100);

List<SearchResult.Entry<TimelineMeta>> results = metaStore.search(searchQuery).getEntries();
for (SearchResult.Entry<TimelineMeta> result : results) {
    TimelineMeta meta = result.getData();
    System.out.println("Group: " + meta.getString("group_name"));
}

资源清理

重要

以下操作将删除所有 IM 相关的数据表和索引,数据不可恢复。确认不再需要这些数据后,再执行清理操作。

// Drop user and relation tables
syncClient.deleteTable(new DeleteTableRequest("im_user_table"));
syncClient.deleteTable(new DeleteTableRequest("im_user_relation_table"));
syncClient.deleteTable(new DeleteTableRequest("im_group_relation_table"));

// Drop Timeline SDK managed tables and indexes
metaStore.dropAllTables();
storeTable.dropAllTables();
syncTable.dropAllTables();