使用消息模型时,您需要完成Factory、MetaStoreTimelineStore的初始化。本文介绍如何完成初始化的配置和操作。

前提条件

初始化Factory

SyncClient作为参数,初始化StoreFactory。通过Store工厂创建Meta数据和Timeline数据的管理Store。

实现错误重试需要依赖SyncClient的重试策略,您可以通过配置SyncClient实现重试。如果有特殊需求,可自定义策略(只需实现RetryStrategy接口)。

/**
 * 重试策略配置。
 * Code: configuration.setRetryStrategy(new DefaultRetryStrategy());
 **/
ClientConfiguration configuration = new ClientConfiguration();

SyncClient client = new SyncClient(
        "http://instanceName.cn-shanghai.ots.aliyuncs.com",
        System.getenv("OTS_AK_ENV"),
        System.getenv("OTS_SK_ENV"),
        "instanceName", configuration);

TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(client);

初始化MetaStore

构建Meta表的Schema,包含Identifier、MetaIndex等参数,通过Store工厂创建并获取Meta的管理Store,配置参数包含Meta表名、索引名、主键字段、索引类型等。

TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(//配置索引字段以及字段类型。
        new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("create_time", FieldType.LONG).setIndex(true)
));

TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema)
        .withIndex("metaIndex", metaIndex); //配置索引。

TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);
  • 建表

    根据metaSchema的参数创建表。如果metaSchema中配置了索引,建表成功后会创建索引。

    timelineMetaStore.prepareTables();
  • 删表

    当表存在索引时,在删除表前,系统会先删除索引,再删除Store相应表。

    timelineMetaStore.dropAllTables();

初始化TimelineStore

构建timeline表的Schema配置,包含Identifier、TimelineIndex等参数,通过Store工厂创建并获取Timeline的管理Store;配置参数包含Timeline表名、索引名、主键字段、索引类型等。

消息的批量写入,基于TablestoreDefaultTableStoreWriter提升并发,用户可以根据自己需求设置线程池数目。

TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

IndexSchema timelineIndex = new IndexSchema();
timelineIndex.setFieldSchemas(Arrays.asList(//配置索引的字段以及字段类型。
        new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true)
));

TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema)
        .autoGenerateSeqId() //SequenceId 设置为自增列方式。
        .setCallbackExecuteThreads(5) //设置Writer初始线程数为5。
        .withIndex("metaIndex", timelineIndex); //设置索引。

TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);
  • 建表

    根据TimelineSchema的参数创建表。如果TimelineSchema中配置了索引,建表成功后会创建索引。

    timelineStore.prepareTables();
  • 删表

    当表存在索引时,在删除表前,系统会先删除索引,再删除Store相应的表。

    timelineStore.dropAllTables();