使用时序Writer写入时序数据

表格存储支持通过时序Writer将时序数据写入时序表,时序Writer还支持多表写入、写入状态统计、行级别回调和自定义配置功能。本文介绍使用时序Writer写入时序数据的使用流程。

背景信息

使用时序模型时,用户主要通过调用PutTimeseriesData接口来实现时序数据的写入,但是此方式存在如下使用限制:

  • 需要手动控制并发。

  • 不支持行级别的回调功能。

  • 不支持性能参数和限制参数配置。

  • 不支持一次向多个时序表中写入数据。

因此表格存储Java SDK提供了时序Writer用于高性能写入时序数据。时序Writer封装了PutTimeseriesData接口,实现了内部控制并发写入时序数据的功能,同时支持多表写入、写入状态统计、行级别回调和自定义配置功能。

功能特性

时序Writer提供了多表写入、批量写入、并发写入、写入状态统计、行级别回调和自定义配置功能。详细说明请参见下表。

功能

描述

多表写入

时序Writer可以实现向多个不同的数据表中写入数据。用户新增数据后先缓存至内存,再根据表名构造不同的请求。

批量写入

单行写入接口为addTimeseriesRowChange(TimeseriesRow),支持使用批量写入接口addTimeseriesRowChange(List<TimeseriesRow>)写入多行数据。

并发写入

通过数据分桶与分桶间的并发控制,提升数据的写入速率。

写入状态统计

支持总请求次数、总行数、总成功行数、总失败行数、单行请求数等指标的查询。

行级别回调

支持对每一行数据的写入结果设置回调函数。

自定义配置

支持对并发数、重试策略、队列大小等参数进行自定义配置。

注意事项

目前支持使用时序模型功能的地域有华东1(杭州)、华东2(上海)、华北2(北京)、华北3(张家口)、华北6(乌兰察布)、华南1(深圳)、中国香港、德国(法兰克福)、美国(弗吉尼亚)和新加坡。

如果使用过程中遇到问题,请通过钉钉加入用户群44327024(物联网存储 IoTstore 开发者交流群)联系我们。

前提条件

  • 已创建时序模型实例以及时序表,用于存放时序数据。具体操作,请参见创建时序模型实例创建时序表

  • 已创建RAM用户,并授予RAM用户管理表格存储服务的权限(AliyunOTSFullAccess)。具体操作,请参见通过RAM PolicyRAM用户授权

    重要

    由于数据同步任务配置时需要填写访问密钥AccessKey(AK)信息来执行授权,为避免阿里云账号泄露AccessKey带来的安全风险,建议您通过RAM用户来完成授权和AccessKey的创建。

  • 已获取AccessKey(包括AccessKey IDAccessKey Secret),用于进行签名认证。具体操作,请参见获取AccessKey

快速上手

使用时序Writer快速写入时序数据到时序表。

说明

此处按照步骤详细介绍时序Writer的使用方式,完整代码请参见附录:完整代码

步骤一:安装时序Writer

如果已经安装表格存储Java SDK,可跳过此操作。

Maven工程中使用时序Writer,只需在pom.xml中加入表格存储 Java SDK依赖即可。以5.13.15版本为例,在<dependencies>内加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.13.15</version>
</dependency>  

步骤二:创建时序Writer实例

创建时序Writer实例用于将时序数据写入时序表。

说明
  • 表格存储提供了三种构造时序Writer函数的方式,此处以通过传入credentials且使用默认配置为例介绍构造函数的方式。更多信息,请参见构造函数

  • 创建时序Writer实例时支持通过TimeseriesWriterConfig()工具类实现自定义参数。具体的自定义参数说明请参见配置TimeseriesWriterConfig

endpoint = "";
accessKeyId = "";
accessKeySecret = "";
instanceName = "";

ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
TimeseriesWriterConfig config = new TimeseriesWriterConfig();

DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter(
       endpoint, credentials, instanceName, config, null);

步骤三:新增数据

构造一条时序数据,新增的时序数据将被缓存到本地。

Map<String, String> tags = new HashMap<String, String>();
tags.put("region", "hangzhou");
tags.put("os", "Ubuntu16.04");
TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_0", tags);
TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000); //一条时序数据。
String tableName = "rowTableName"; // 时序数据所在的时序表。
TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); //封装时序数据与时序表。
writer.addTimeseriesRowChange(rowInTable); // 新增的时序数据将被缓存到本地。

步骤四:写入时序数据

新增数据到本地缓存后,您可以调用flush函数手动将本地缓存数据写入时序表。

重要

当某一分桶中的数据量达到maxBatchRowsCount(默认为200)设定值时,该分桶将会自动将本地缓存数据写入到时序表。

writer.flush(); // 将本地缓存数据写入时序表。

步骤五:关闭时序Writer

时序数据写入后,关闭时序Writer。时序Writer关闭前会将本地缓存中未写入的时序数据自动写入时序表。

writer.close();

接口说明

DefaultTableStoreTimeseriesWriter类是实现时序Writer的主工具类。

接口定义

DefaultTableStoreTimeseriesWriter类中的函数定义如下:


public class DefaultTableStoreTimeseriesWriter implements TableStoreTimeseriesWriter {
    //构造函数
    public DefaultTableStoreTimeseriesWriter(...)
      
    // 单时间线新增数据。
    void addTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException;
  
    // 单时间线新增数据。
    Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(TimeseriesTableRow timeseriesTableRow) throws ClientException;
  
    // 尝试单时间线新增数据。
    boolean tryAddTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException;
  
    // 多时间线新增数据。
    void addTimeseriesRowChange(List<TimeseriesTableRow> timeseriesTableRow, List<TimeseriesRow> dirtySeriesRow) throws ClientException;
  
    // 多时间线新增数据。
    Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(List<TimeseriesTableRow> timeseriesTableRows) throws ClientException;
  
    // 用户自定义行级别callback。
    void setResultCallback(TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> callback);
  
    // 获取callback。
    TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> getResultCallback();
  
    // 获取时序writer配置。
    TimeseriesWriterConfig getTimeseriesWriterConfig();
  
    // 设置时序writer。
    TimeseriesWriterHandleStatistics getTimeseriesWriterStatistics();
  
    // 等待writer内存中数据全部落盘并写入数据。
    void flush() throws ClientException;
  
    // 关闭时序表写入writer。
    void close();
}

构造函数

DefaultTableStoreTimeseriesWriter支持三种构造函数,请根据实际应用场景选择构造函数的方式。

  • 通过传入credentials且使用默认配置构建函数

    通过传入credentials,时序Writer会在内部构建时序client,在释放时序Writer后无需其他操作。推荐使用此方式构造函数。

    /**
     * 推荐使用的时序writer。
     *
     * @param endpoint 实例的服务地址。
     * @param credentials 认证信息,包含AccessKey,也支持使用临时访问凭证token。
     * @param instanceName 实例名称。
     * @param config 时序writer的配置。
     * @param resultCallback 行级别回调。
     */
    public DefaultTableStoreTimeseriesWriter(
            String endpoint,
            ServiceCredentials credentials,
            String instanceName,
            TimeseriesWriterConfig config,
            TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
  • 通过传入credentials且使用自定义配置构建函数

    通过传入credentials,时序Writer会在内部构建时序client,同时可以进行自定义配置,在释放时序Writer后无需其他操作。

    /**
     * 推荐使用的时序writer。
     *
     * @param endpoint 实例的服务地址。
     * @param credentials 认证信息,含AccessKey,也支持临时访问凭证token。
     * @param instanceName 实例名称。
     * @param config 时序writer的配置。
     * @param cc 客户端的配置。
     * @param resultCallback 行级别回调。
     */
    public DefaultTableStoreTimeseriesWriter(
            String endpoint,
            ServiceCredentials credentials,
            String instanceName,
            TimeseriesWriterConfig config,
            ClientConfiguration cc,
            TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
  • 直接传入时序client构造函数

    直接传入时序client,时序client的配置更灵活,但是需要在释放Writer后主动对otsclient进行shutdown。

    重要

    在释放Writer后请务必主动对otsclient进行shutdown。

    /**
     * 时序writer。
     *
     * @param ots 异步时序表客户端实例。
     * @param config 时序writer的配置。
     * @param callback 行级别回调。
     * @param executor 线程池。
     */
    public DefaultTableStoreTimeseriesWriter(
            AsyncTimeseriesClientInterface ots,
            TimeseriesWriterConfig config,
            TableStoreCallback<TimeseriesRow, TimeseriesRowResult> callback,
            Executor executor)

配置TimeseriesWriterConfig

时序Writer支持自定义配置参数,通过TimeseriesWriterConfig()工具类实现自定义参数。

参数

类型

示例

说明

maxBatchRowsCount

Integer

200

一次批量RPC请求导入的最大行数。默认值为200。取值范围为1~200。

maxBatchSize

Integer

4*1024*1024

一次批量RPC请求导入的最大数据量。默认值为4*1024*1024(即4 MB)。单位为字节。

concurrency

Integer

64

一个时序Writer的最大请求并发数。默认值为64。

bucketCount

Integer

4

时序数据缓存分桶数。默认值为4。

由于总缓存队列为bucketCount*bufferSize,如果分桶数量太少会导致并发能力下降,如果分桶数量太大会因资源抢占而写入速率下降,因此分桶数量一般建议取值48。

bufferSize

Integer

1024

每个桶的队列长度。默认值为1024。

flushInterval

Integer

10000

flush数据的时间间隔。默认值为10000。单位为毫秒。

logInterval

Integer

10000

记录日志的时间间隔。默认值为10000。单位为毫秒。

dispatchMode

String

HASH_PRIMARY_KEY

多桶分发模式。取值范围如下:

  • HASH_PRIMARY_KEY(默认):哈希完整主键分桶派发,可保证相同主键(度量名称、数据源、tags)的时序数据被写入同一个桶内,实现一个桶只对部分分区进行写入。一个桶内只包含部分分区的数据。

  • ROUND_ROBIN:循环遍历分桶派发。一个桶内可能包含所有分区的数据。

writeMode

String

PARALLEL

写入模式。取值范围如下:

  • SEQUENTIAL:串行写。不同桶间并发,同一个桶内串行请求。

  • PARALLEL(默认):并发写。不同桶间并发,同一个桶内也会并行请求。

callbackThreadCount

Integer

9

内部Callback运行的线程池的最大线程数。默认值为CPU核数+1。只有调用Callback函数内部构建线程池时生效。

callbackThreadPoolQueueSize

Integer

1024

内部Callback运行的线程池的队列大小。默认值为1024。只有调用Callback函数内部构建线程池时生效。

writerRetryStrategy

String

CERTAIN_ERROR_CODE_NOT_RETRY

通过传入credentials方式构造函数时内部构建client使用的重试策略。取值范围如下:

  • CERTAIN_ERROR_CODE_RETRY:指定需要重试的错误码集合。如果产生的不是指定错误码,则均不做重试。

    当产生OTSInternalServerError、OTSRequestTimeout、OTSPartitionUnavailable、OTSTableNotReady、OTSRowOperationConflict、OTSTimeout、OTSServerUnavailableOTSServerBusy错误码时,系统才会重试。

  • CERTAIN_ERROR_CODE_NOT_RETRY(默认):指定不需要重试的错误码集合。如果产生的不是指定的错误码,则均做重试。

    当产生OTSParameterInvalid、OTSConditionCheckFail、OTSRequestBodyTooLarge,OTSInvalidPK、OTSOutOfColumnCountLimitOTSOutOfRowSizeLimit错误码时,系统均不会重试。

clientMaxConnections

String

300

通过传入credentials方式构造函数时,内部构建client使用的最大连接数配置。默认值为300。

allowDuplicatedRowInBatchRequest

Boolean

true

批量请求是否允许对同一时间线进行写入。取值范围为truefalse。默认值为true。

如果批量请求时不允许对同一时间线进行写入,请配置此参数为false。

附录:完整代码

完整代码主要包括了创建时序表、构造时序Writer、使用时序Writer写入10000条数据和关闭时序Writer四个操作。

重要

使用时序Writer前,请确保已安装5.13.15版本及以上的表格存储Java SDK。

public class TimeseriesWriterSample {
    // 填写实例的服务地址。
    private static String endpoint = "https://[instanceName].cn-hangzhou.ots.aliyuncs.com";
    // 填写实例名称。
    private static String instanceName = "instanceName";
    // 填写阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和Access Secret)。
    private static String accessKeyId = "XXXXXXXXXX";
    private static String accessKeySecret = "XXXXXXXXXXXXXXXXXXXXXX";
    // 填写时序表名称。
    private static String tableName = "tableName";
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) {
        TimeseriesWriterSample sample = new TimeseriesWriterSample();

        /**
         * 使用Writer前确保表已存在
         * 经过测试,时序表创建30秒后向其中写入数据不会出现异常情况
         * writer会校验表的存在性
         * */
        sample.tryCreateTable();

        /**
         * 初始化建议使用
         * DefaultTableStoreTimeseriesWriter(
         *             String endpoint,                                                             // 实例的服务地址、
         *             ServiceCredentials credentials,                                              // 认证信息,包含AccessKey,也支持使用临时访问凭证token。
         *             String instanceName,                                                         // 实例名称。
         *             String tableName,                                                            // 时序表名称:一个时序writer仅针对一个时序表。
         *             TimeseriesWriterConfig config,                                               // 时序writer的配置。
         *             TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)       // 行级别回调。
         * */
        DefaultTableStoreTimeseriesWriter writer = sample.createTableStoreTimeseriesWriter();

        /**
         * Future使用:批量写
         * */
        sample.writeTimeseriesRowWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getTimeseriesWriterStatistics());

        /**
         * 用户需要主动关闭Writer,内部等候所有队列数据写入后,自动关闭client与内部的线程池。
         * */
        writer.close();
    }

    private void tryCreateTable() {
        TimeseriesClient ots = new TimeseriesClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        TimeseriesTableMeta timeseriesTableMeta = new TimeseriesTableMeta(tableName);
        int timeToLive = -1;
        timeseriesTableMeta.setTimeseriesTableOptions(new TimeseriesTableOptions(timeToLive));
        CreateTimeseriesTableRequest request = new CreateTimeseriesTableRequest(timeseriesTableMeta);

        try {
            CreateTimeseriesTableResponse res = ots.createTimeseriesTable(request);
            System.out.println("waiting for creating time series table ......");
            TimeUnit.SECONDS.sleep(30);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    private DefaultTableStoreTimeseriesWriter createTableStoreTimeseriesWriter() {

        TimeseriesWriterConfig config = new TimeseriesWriterConfig();
        config.setWriteMode(TSWriteMode.PARALLEL);                         // 并行写(每个桶内并行写)。
        config.setDispatchMode(TSDispatchMode.HASH_PRIMARY_KEY);           // 基于主键哈希值做分桶,保证相同主键落在同一个桶内有序写入。
        config.setBucketCount(4);                                          // 分桶数,提升串行写并发。未达到机器瓶颈时,写入速率与分桶数正相关。
        config.setCallbackThreadCount(16);                                 // 设置Writer内部Callback运行的线程池线程。
        /**
         * 用户自定义的行级别callback
         * 该示例通过成功、失败计数,简单展示回调能力
         * */
        TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> resultCallback = new TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult>() {
            @Override
            public void onCompleted(TimeseriesTableRow rowChange, TimeseriesRowResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(TimeseriesTableRow rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

        /**
         * 推荐使用内部构建的线程池与Client,方便用户使用,隔离初始化、释放的逻辑
         * */
        DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter(
                endpoint, credentials, instanceName, config, resultCallback);

        return writer;
    }

    private void writeTimeseriesRowWithFuture(DefaultTableStoreTimeseriesWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.print("Write Timeseries Row With Future, ");

        int rowsCount = 10000;
        int columnsCount = 10;
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<TimeseriesWriterResult>> futures = new LinkedList<Future<TimeseriesWriterResult>>();
        List<TimeseriesTableRow> rows = new ArrayList<TimeseriesTableRow>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
            Map<String, String> tags = new HashMap<String, String>();
            tags.put("region", "hangzhou");
            tags.put("os", "Ubuntu16.04");
            // 通过measurementName、dataSource和tags构建TimeseriesKey。
            TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_" + index, tags);
            // 指定timeseriesKey和timeInUs创建timeseriesRow。
            TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000 + index);
            // 增加数据值(field)。
            for (int j = 0; j < columnsCount; j++) {
                row.addField("cpu_usage_" + j, ColumnValue.fromDouble(Math.random() * 100));
            }
            row.addField("index", ColumnValue.fromLong(index));
            TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName);
            rows.add(rowInTable);
            if (Math.random() > 0.9995 || index == rowsCount - 1) {
                Future<TimeseriesWriterResult> future = writer.addTimeseriesRowChangeWithFuture(rows);
                futures.add(future);
                rows.clear();
            }
        }
        System.out.println("Write thread finished.");
        writer.flush();
        printFutureResult(futures);
        System.out.println("=========================================================[Finish]");

    }

    private void printFutureResult(List<Future<TimeseriesWriterResult>> futures) {
        int totalRow = 0;
        System.out.println("time series writer results as follow:");
        for (int index = 0; index < futures.size(); index++) {
            try {
                TimeseriesWriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

    }

}