Queue管理

更新时间:
复制为 MD 格式

介绍消息模型下 Queue 实例的管理接口及使用方式。

获取Queue实例

Queue 是单个消息队列的抽象,对应 TimelineStore 下某个 TimelineIdentifier 的所有消息。通过 TimelineStore 接口创建 TimelineQueue 实例。

TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();

// 单 TimelineStore 下单 identifier 对应的消息队列(Queue)。
TimelineQueue timelineQueue = timelineStore.createTimelineQueue(identifier);

TimelineQueue 实例提供以下操作接口:

  • 写入:同步写入(store)、异步写入(storeAsync)、批量写入(batchStore

  • 更新:同步更新(update)、异步更新(updateAsync

  • 读取:单行读取(get)、范围读取(scan

  • 删除:单行删除(delete

Store

同步写入消息。提供两个接口,分别对应 SequenceId 的两种实现方式:自增列和手动设置。实现方式在 TimelineSchema 中配置。

timelineQueue.store(message); // 自增列实现的 SequenceId。
timelineQueue.store(sequenceId, message); // 手动设置 SequenceId。

StoreAsync

异步写入消息。定义 TimelineCallback 回调处理成功和失败两种情况,接口返回 Future<TimelineEntry>

TimelineCallback callback = new TimelineCallback() {
    @Override
    public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
        // do something when succeed.
    }

    @Override
    public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
        // do something when failed.
    }
};

timelineQueue.storeAsync(message, callback); // 自增列实现的 SequenceId。
timelineQueue.storeAsync(sequenceId, message, callback); // 手动设置 SequenceId。

BatchStore

批量写入消息,支持无回调和有回调两种方式。

timelineQueue.batchStore(message); // 自增列实现的 SequenceId。
timelineQueue.batchStore(sequenceId, message); // 手动设置 SequenceId。

timelineQueue.batchStore(message, callback); // 自增列实现的 SequenceId。
timelineQueue.batchStore(sequenceId, message, callback); // 手动设置 SequenceId。

Get

通过 SequenceId 读取单行消息。消息不存在时不抛出异常,返回 null。

timelineQueue.get(sequenceId);

GetLatestTimelineEntry

读取最新一条消息。消息不存在时不抛出异常,返回 null。

timelineQueue.getLatestTimelineEntry();

GetLatestSequenceId

获取最新一条消息的 SequenceId。消息不存在时不抛出异常,返回 0。

timelineQueue.getLatestSequenceId();

Update

通过 SequenceId 同步更新消息内容。

TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine.");

//update message with new field
message.setField("text", "new value");
timelineQueue.update(sequenceId, message);

UpdateAsync

通过 SequenceId 异步更新消息。定义 TimelineCallback 回调处理成功和失败两种情况,接口返回 Future<TimelineEntry>

TimelineMessage oldMessage = new TimelineMessage().setField("text", "Timeline is fine.");
TimelineCallback callback = new TimelineCallback() {
    @Override
    public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
        // do something when succeed.
    }

    @Override
    public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
        // do something when failed.
    }
};

TimelineMessage newMessage = oldMessage;
newMessage.setField("text", "new value");
timelineQueue.updateAsync(sequenceId, newMessage, callback);

Delete

通过 SequenceId 删除单行消息。

timelineQueue.delete(sequenceId);

Scan

通过 ScanParameter 正序或逆序范围读取 Queue 下的消息,返回 Iterator<TimelineEntry>,通过迭代器遍历结果。

ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0);

timelineQueue.scan(scanParameter);