Build an IM system

更新时间:
复制 MD 格式

Build an IM system that supports one-on-one chats, group chats, unread message counts, and full-text message search by using the Tablestore Timeline SDK.

Background and key capabilities

An IM system is the core component of any instant messaging application. A typical IM system requires the following capabilities.

  • Send and receive one-on-one and group chat messages.

  • Manage friend relationships, including adding and removing friends.

  • Manage groups, including creating groups, joining groups, leaving groups, and querying group members.

  • Synchronize new messages and track unread message counts.

  • Search message content by keyword and search groups by name.

Traditional approaches typically store messages in MySQL and manage online status and unread counts in Redis. As user and message volumes grow, MySQL requires frequent sharding, which steadily increases O&M costs.

The Tablestore Timeline SDK uses a fan-out-on-write model built on two libraries: a store library and a sync library. The store library persists all conversation messages, while the sync library distributes new messages to recipients in real time. Combined with automatic horizontal scaling of wide-column tables, full-text search on search indexes, and reverse queries on secondary indexes, you can build an IM system that requires no manual sharding.

Data table design

This solution uses four data tables and one secondary index to support the core IM functions.

Category

Table name

Primary key design

Purpose

User data

im_user_table

user_id (STRING)

Stores user attributes such as username and gender. Each user is uniquely identified by user_id.

Relationship data

im_user_relation_table

main_user (STRING) + sub_user (STRING)

Stores friend relationships. When a friend is added, BatchWriteRow writes two bidirectional rows to keep the relationship symmetric.

im_group_relation_table

group_id (STRING) + user_id (STRING)

Stores group membership. GetRange on the primary table lists all members in a group. The secondary index im_group_relation_global_index supports reverse queries to list all groups that a user has joined.

Message data (Timeline SDK)

im_timeline_meta_table

timeline_id (STRING)

Stores conversation metadata such as conversation type, group name, and creation time. The search index im_timeline_meta_index enables full-text search by group name.

im_timeline_store_table

timeline_id (STRING)

The message store library. Persists all conversation messages. The search index im_timeline_store_index enables full-text search on message content.

im_timeline_sync_table

timeline_id (STRING)

The message sync library. Each user has an independent sync queue. New messages are distributed to recipients through the fan-out-on-write model.

The core design principle is the separation of the store library and the sync library.

  • Store library: The store library persists the complete message history for each conversation, identified by conversation ID. When a user opens a conversation, messages are fetched from the store library in reverse chronological order. The store library handles large data volumes with relatively low read/write frequency. Use storage-optimized instances for the store library.

  • Sync library: Identified by user ID, the sync library maintains an independent sync queue for each user. When a message is sent, the system fans out the message to each recipient's sync queue. The client maintains a checkpoint (the last-read position) to incrementally pull new messages and count unread messages per conversation. The sync library requires low-latency real-time reads and writes. Use high-performance instances for the sync library. Set a data expiration period, such as one or two weeks.

Quick start

Download the complete IM system sample code. The project covers initialization, user management, friend relationships, group management, message sending and receiving, unread message counts, and message search.

  • Sample code download: im.demo.zip.

  • Before you run the sample, create a configuration file named tablestoreConf.json in your home directory. Specify the Tablestore instance endpoint, AccessKey ID, AccessKey secret, and instance name.

  • Run InitChartRoomExample to initialize the tables and indexes, ClientRequestExample to demonstrate all features, and ReleaseChartRoomExample to clean up the resources.

Implementation

The following steps use the Tablestore Java SDK and the Timeline SDK to build a complete IM system.

Note

This solution uses the Java Timeline SDK. The Timeline SDK encapsulates operations on the store library, sync library, and meta library. You do not need to manually manage the underlying table schema. Add the Tablestore SDK 5.17.4 or later as a Maven dependency.

Step 1: Initialize Timeline and create data tables

Initialize a TimelineV2 object and configure the table schema and indexes for the store library, sync library, and meta library. The search index on the store library enables full-text message search, and the search index on the meta library enables group name search.

// Initialize Timeline SDK with three libraries: meta, store, and sync
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(syncClient);

// Timeline identifier schema (shared by all three libraries)
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

// Meta library: stores conversation metadata with search index for group name search
IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(
        new FieldSchema("type", FieldType.KEYWORD).setIndex(true).setEnableSortAndAgg(true),
        new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord)
));
TimelineMetaSchema metaSchema = new TimelineMetaSchema("im_timeline_meta_table", idSchema)
        .withIndex("im_timeline_meta_index", metaIndex);
TimelineMetaStore metaStore = serviceFactory.createMetaStore(metaSchema);

// Store library: persists all conversation messages with search index for full-text search
IndexSchema storeIndex = new IndexSchema();
storeIndex.setFieldSchemas(Arrays.asList(
        new FieldSchema("timeline_id", FieldType.KEYWORD).setIndex(true),
        new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("sender", FieldType.KEYWORD).setIndex(true),
        new FieldSchema("type", FieldType.KEYWORD).setIndex(true)
));
TimelineSchema storeSchema = new TimelineSchema("im_timeline_store_table", idSchema)
        .withIndex("im_timeline_store_index", storeIndex);
TimelineStore storeTable = serviceFactory.createTimelineStore(storeSchema);

// Sync library: per-user inbox for message fan-out, 7-day TTL
TimelineSchema syncSchema = new TimelineSchema("im_timeline_sync_table", idSchema)
        .setTimeToLive(7 * 24 * 3600);
TimelineStore syncTable = serviceFactory.createTimelineStore(syncSchema);

// Create all tables and indexes
metaStore.prepareTables();
storeTable.prepareTables();
syncTable.prepareTables();

In addition to the three tables managed by the Timeline SDK, you must manually create the user table, friend relationship table, and group relationship table. The group relationship table requires a secondary index for reverse queries.

// Create user table
TableMeta userMeta = new TableMeta("im_user_table");
userMeta.addPrimaryKeyColumn("user_id", PrimaryKeyType.STRING);
syncClient.createTable(new CreateTableRequest(userMeta, new TableOptions(-1, 1)));

// Create user relation table (for friendship)
TableMeta relationMeta = new TableMeta("im_user_relation_table");
relationMeta.addPrimaryKeyColumn("main_user", PrimaryKeyType.STRING);
relationMeta.addPrimaryKeyColumn("sub_user", PrimaryKeyType.STRING);
syncClient.createTable(new CreateTableRequest(relationMeta, new TableOptions(-1, 1)));

// Create group relation table with global secondary index
TableMeta groupMeta = new TableMeta("im_group_relation_table");
groupMeta.addPrimaryKeyColumn("group_id", PrimaryKeyType.STRING);
groupMeta.addPrimaryKeyColumn("user_id", PrimaryKeyType.STRING);
CreateTableRequest groupRequest = new CreateTableRequest(groupMeta, new TableOptions(-1, 1));

// Global secondary index: reverse PK order (user_id, group_id) for "list my groups"
IndexMeta indexMeta = new IndexMeta("im_group_relation_global_index");
indexMeta.addPrimaryKeyColumn("user_id");
indexMeta.addPrimaryKeyColumn("group_id");
groupRequest.addIndex(indexMeta);
syncClient.createTable(groupRequest);

Step 2: Create a user

Write user information to the user table. The user_id matches the timeline_id in the sync library, so the system can directly locate the sync queue by user ID during message fan-out.

// Create a user
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
        .build();

RowPutChange rowPutChange = new RowPutChange("im_user_table", primaryKey);
rowPutChange.addColumn("user_name", ColumnValue.fromString("Alice"));
rowPutChange.addColumn("sexuality", ColumnValue.fromString("FEMALE"));

syncClient.putRow(new PutRowRequest(rowPutChange));

Step 3: Manage friend relationships

Friend relationships are bidirectional. When adding a friend, use BatchWriteRow to write two rows simultaneously (A to B and B to A) so that both users can find each other through GetRange. Removing a friend also uses BatchWriteRow to delete both rows.

// Establish bidirectional friendship between userA and userB
String userA = "user_1";
String userB = "user_2";
String timelineId = "single_conv_1";  // Shared conversation timeline ID

// Row A -> B
PrimaryKey pkA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
        .build();
RowPutChange changeA = new RowPutChange("im_user_relation_table", pkA);
changeA.addColumn("timeline_id", ColumnValue.fromString(timelineId));

// Row B -> A
PrimaryKey pkB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
        .build();
RowPutChange changeB = new RowPutChange("im_user_relation_table", pkB);
changeB.addColumn("timeline_id", ColumnValue.fromString(timelineId));

// Batch write for atomicity
BatchWriteRowRequest request = new BatchWriteRowRequest();
request.addRowChange(changeA);
request.addRowChange(changeB);
syncClient.batchWriteRow(request);

To list a user's friends, use the current user's ID as main_user and scan all sub_user values through GetRange.

// List friends of a user
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MIN)
        .build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MAX)
        .build();

RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("im_user_relation_table");
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);

GetRangeResponse response = syncClient.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
    String friendId = row.getPrimaryKey().getPrimaryKeyColumn("sub_user").getValue().asString();
    System.out.println("Friend: " + friendId);
}

Step 4: Manage groups

To create a group, write the group information (group name, type, and creation time) to the Timeline meta library, and then add the group members to the group relationship table.

// Create a group conversation in the meta library
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();
TimelineMeta meta = new TimelineMeta(identifier)
        .setField("type", "GROUP")
        .setField("group_name", "Tablestore Discussion")
        .setField("create_time", System.currentTimeMillis());
metaStore.insert(meta);

// Add members to the group relation table
String[] members = {"user_1", "user_2", "user_3"};
for (String userId : members) {
    PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("group_id", PrimaryKeyValue.fromString("group_1"))
            .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
            .build();
    syncClient.putRow(new PutRowRequest(new RowPutChange("im_group_relation_table", pk)));
}

The primary key of the group relationship table is group_id + user_id. GetRange lists all members in a group. The secondary index im_group_relation_global_index reverses the primary key order to user_id + group_id, which enables reverse queries to list all groups that a user has joined.

// List groups that a user has joined (via global secondary index)
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MIN)
        .build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString("user_1"))
        .addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MAX)
        .build();

RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("im_group_relation_global_index");
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);

GetRangeResponse response = syncClient.getRange(new GetRangeRequest(criteria));
for (Row row : response.getRows()) {
    String groupId = row.getPrimaryKey().getPrimaryKeyColumn("group_id").getValue().asString();
    System.out.println("Joined group: " + groupId);
}

Step 5: Send messages

To send a message, write it to the store library for persistent storage, then fan it out to the recipient's sync library. For one-on-one chats, the message is fanned out to the other user's sync queue. For group chats, the message is fanned out to the sync queues of all group members.

// Send a one-on-one chat message
TimelineMessage message = new TimelineMessage()
        .setField("text", "Hello, how are you?")
        .setField("type", "SINGLE")
        .setField("sender", "user_2")
        .setField("send_time", System.currentTimeMillis())
        .setField("conversation", "user_2");

// Step 1: Write to the store library (persistent storage)
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "single_conv_1")
        .build();
storeTable.createTimelineQueue(storeId).store(message);

// Step 2: Fan out to the receiver's sync library
TimelineIdentifier syncId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "user_1")  // Receiver's user ID
        .build();
syncTable.createTimelineQueue(syncId).store(message);
// Send a group chat message (fan out to all group members)
TimelineMessage groupMsg = new TimelineMessage()
        .setField("text", "Who used Tablestore today?")
        .setField("type", "GROUP")
        .setField("sender", "user_1")
        .setField("send_time", System.currentTimeMillis())
        .setField("conversation", "group_1");

// Step 1: Write to the store library
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();
storeTable.createTimelineQueue(storeId).store(groupMsg);

// Step 2: Fan out to each group member's sync library
// Get group members from the group relation table, then write to each member's sync queue
for (String memberId : groupMemberIds) {
    TimelineIdentifier syncId = new TimelineIdentifier.Builder()
            .addField("timeline_id", memberId)
            .build();
    syncTable.createTimelineQueue(syncId).store(groupMsg);
}

Step 6: Synchronize messages and count unread messages

The client maintains a checkpoint (the sequenceId of the last message read). Use ScanParameter to incrementally pull new messages from the sync library starting from the checkpoint, then group the messages by conversation to count unread messages.

// Fetch new messages from the sync library
long syncCheckpoint = 0;  // Last read position, maintained by the client

TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "user_1")
        .build();

ScanParameter parameter = new ScanParameter()
        .scanForward(syncCheckpoint + 1)
        .maxCount(30);

Iterator<TimelineEntry> iterator = syncTable.createTimelineQueue(identifier).scan(parameter);

// Group messages by conversation and count unread messages
Map<String, Integer> unreadCount = new HashMap<>();
while (iterator.hasNext()) {
    TimelineEntry entry = iterator.next();
    String conversation = entry.getMessage().getString("conversation");
    unreadCount.merge(conversation, 1, Integer::sum);

    // Update checkpoint to the latest sequenceId
    syncCheckpoint = Math.max(syncCheckpoint, entry.getSequenceID());
}

for (Map.Entry<String, Integer> e : unreadCount.entrySet()) {
    System.out.println("Conversation: " + e.getKey() + ", Unread: " + e.getValue());
}

To view the message history of a conversation, pull messages from the store library in reverse order.

// Fetch conversation history from the store library (reverse order)
TimelineIdentifier storeId = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();

ScanParameter parameter = new ScanParameter()
        .scanBackward(Long.MAX_VALUE - 1)
        .maxCount(30);

Iterator<TimelineEntry> iterator = storeTable.createTimelineQueue(storeId).scan(parameter);
while (iterator.hasNext()) {
    TimelineEntry entry = iterator.next();
    String sender = entry.getMessage().getString("sender");
    String text = entry.getMessage().getString("text");
    System.out.println("[" + sender + "]: " + text);
}

Step 7: Search messages

The store library uses the search index im_timeline_store_index to support full-text search by conversation ID and message content. The meta library uses the search index im_timeline_meta_index to support fuzzy search by group name.

// Search messages in a specific conversation by content keyword
TermQuery termQuery = new TermQuery();
termQuery.setFieldName("timeline_id");
termQuery.setTerm(ColumnValue.fromString("group_1"));

MatchPhraseQuery matchQuery = new MatchPhraseQuery();
matchQuery.setFieldName("text");
matchQuery.setText("Tablestore");

BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(termQuery, matchQuery));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setLimit(30);

List<SearchResult.Entry<TimelineEntry>> results = storeTable.search(searchQuery).getEntries();
for (SearchResult.Entry<TimelineEntry> result : results) {
    TimelineEntry entry = result.getData();
    System.out.println("[" + entry.getMessage().getString("sender") + "]: "
            + entry.getMessage().getString("text"));
}
// Search group conversations by group name
TermQuery typeQuery = new TermQuery();
typeQuery.setFieldName("type");
typeQuery.setTerm(ColumnValue.fromString("GROUP"));

MatchPhraseQuery nameQuery = new MatchPhraseQuery();
nameQuery.setFieldName("group_name");
nameQuery.setText("Tablestore");

BoolQuery boolQuery = new BoolQuery();
boolQuery.setMustQueries(Arrays.asList(typeQuery, nameQuery));

SearchQuery searchQuery = new SearchQuery();
searchQuery.setQuery(boolQuery);
searchQuery.setLimit(100);

List<SearchResult.Entry<TimelineMeta>> results = metaStore.search(searchQuery).getEntries();
for (SearchResult.Entry<TimelineMeta> result : results) {
    TimelineMeta meta = result.getData();
    System.out.println("Group: " + meta.getString("group_name"));
}

Clean up resources

Important

The following operations delete all IM-related data tables and indexes. The data cannot be recovered. Make sure that you no longer need the data before you proceed.

// Drop user and relation tables
syncClient.deleteTable(new DeleteTableRequest("im_user_table"));
syncClient.deleteTable(new DeleteTableRequest("im_user_relation_table"));
syncClient.deleteTable(new DeleteTableRequest("im_group_relation_table"));

// Drop Timeline SDK managed tables and indexes
metaStore.dropAllTables();
storeTable.dropAllTables();
syncTable.dropAllTables();