Write time series data using Timeseries Writer
Tablestore provides Timeseries Writer for writing time series data to time series tables. Timeseries Writer supports features such as writing to multiple tables, collecting write status statistics, using row-level callbacks, and custom configurations. This topic describes how to use Timeseries Writer to write time series data.
Background information
When you use the time series model, you typically write time series data by calling the PutTimeseriesData operation. However, this method has the following limitations:
You must manually control concurrency.
It does not support row-level callback functions.
It does not support configuring performance and limit parameters.
It does not support writing data to multiple time series tables at once.
To address these limitations, the Tablestore Java SDK provides Timeseries Writer for high-performance data writing. Timeseries Writer encapsulates the PutTimeseriesData operation, internally controls concurrent data writing, and supports features such as writing to multiple tables, collecting write status statistics, row-level callbacks, and custom configurations.
Features
Timeseries Writer provides features such as writing to multiple tables, batch writing, concurrent writing, write status statistics, row-level callbacks, and custom configurations. For more information, see the following table.
Feature | Description |
Write to multiple tables | Timeseries Writer can write data to multiple tables. After you add data, the data is first cached in memory and then used to construct different requests based on the table name. |
Batch write | The operation for writing a single row is |
Concurrent write | Improves the data write speed through data bucketing and concurrency control between buckets. |
Write status statistics | Supports querying metrics such as the total number of requests, total number of rows, total number of successful rows, total number of failed rows, and number of single-row requests. |
Row-level callback | Supports setting a callback function for the write result of each row of data. |
Custom configuration | Supports custom configuration for parameters such as concurrency, retry policy, and queue size. |
Usage notes
The TimeSeries model is supported in the following regions: China (Hangzhou), China (Shanghai), China (Beijing), China (Zhangjiakou), China (Ulanqab), China (Shenzhen), China (Chengdu), China (Hong Kong), Japan (Tokyo), Malaysia (Kuala Lumpur), Germany (Frankfurt), Indonesia (Jakarta), UK (London), US (Silicon Valley), US (Virginia), and Singapore.
如果使用过程中遇到问题,请通过钉钉加入用户群44327024(物联网存储 IoTstore 开发者交流群)联系我们。
Prerequisites
A time series instance and a time series table are created to store time series data. For more information, see Create a time series instance and Create a time series table.
A Resource Access Management (RAM) user is created, and the AliyunOTSFullAccess permission is granted to the RAM user to manage Tablestore. For more information, see Grant permissions to a RAM user using a RAM policy.
ImportantWhen you configure a data synchronization task, you must provide an AccessKey pair for authorization. To avoid security risks if the AccessKey pair for your Alibaba Cloud account is leaked, use an AccessKey pair that belongs to a RAM user.
An AccessKey pair, which includes an AccessKey ID and an AccessKey secret, is obtained for signature authentication. For more information, see Obtain an AccessKey pair.
Quick start
You can use Timeseries Writer to quickly write time series data to a time series table.
This section provides a detailed step-by-step guide on how to use Timeseries Writer. For the complete code, see Appendix: Complete code.
Step 1: Install Timeseries Writer
If you have already installed the Tablestore Java SDK, you can skip this step.
To use Timeseries Writer in a Maven project, add the Tablestore Java SDK dependency to the pom.xml file. The following example shows how to add the dependency for version 5.13.15 inside the <dependencies> tag:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore</artifactId> <version>5.13.15</version> </dependency>
Step 2: Create a Timeseries Writer instance
Create a Timeseries Writer instance to write time series data to a time series table.
Tablestore provides three ways to create a Timeseries Writer instance. This example shows how to create the instance by passing credentials and using default configurations. For more information, see Constructors.
When you create a Timeseries Writer instance, you can use the
TimeseriesWriterConfig()utility class to customize parameters. For more information about the custom parameters, see Configure TimeseriesWriterConfig.
endpoint = ""; accessKeyId = ""; accessKeySecret = ""; instanceName = ""; ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret); TimeseriesWriterConfig config = new TimeseriesWriterConfig(); DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter( endpoint, credentials, instanceName, config, null);
Step 3: Add data
Construct a row of time series data. The new time series data is cached locally.
Map<String, String> tags = new HashMap<String, String>(); tags.put("region", "hangzhou"); tags.put("os", "Ubuntu16.04"); TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_0", tags); TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000); // A row of time series data. String tableName = "rowTableName"; // The time series table that stores the time series data. TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); // Encapsulate the time series data and the time series table. writer.addTimeseriesRowChange(rowInTable); // The new time series data is cached locally.
Step 4: Write the time series data
After you add data to the local cache, you can call the flush function to manually write the cached data to the time series table.
Timeseries Writer automatically writes the locally cached data to the time series table when the data volume in a bucket reaches the maxBatchRowsCount value. The default value is 200.
writer.flush(); // Write the locally cached data to the time series table.
Step 5: Close Timeseries Writer
After you finish writing data, close Timeseries Writer. When Timeseries Writer is closed, it automatically writes any remaining data from the local cache to the time series table.
writer.close();
API reference
The DefaultTableStoreTimeseriesWriter class is the main utility class for implementing Timeseries Writer.
API definition
The functions in the DefaultTableStoreTimeseriesWriter class are defined as follows:
public class DefaultTableStoreTimeseriesWriter implements TableStoreTimeseriesWriter { // Constructor public DefaultTableStoreTimeseriesWriter(...) // Add data for a single time series. void addTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException; // Add data for a single time series. Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(TimeseriesTableRow timeseriesTableRow) throws ClientException; // Try to add data for a single time series. boolean tryAddTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException; // Add data for multiple time series. void addTimeseriesRowChange(List<TimeseriesTableRow> timeseriesTableRow, List<TimeseriesRow> dirtySeriesRow) throws ClientException; // Add data for multiple time series. Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(List<TimeseriesTableRow> timeseriesTableRows) throws ClientException; // A user-defined row-level callback. void setResultCallback(TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> callback); // Get the callback. TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> getResultCallback(); // Get the Timeseries Writer configuration. TimeseriesWriterConfig getTimeseriesWriterConfig(); // Get the Timeseries Writer statistics. TimeseriesWriterHandleStatistics getTimeseriesWriterStatistics(); // Wait for all data in the writer's memory to be flushed and written. void flush() throws ClientException; // Close the writer for the time series table. void close(); }
Constructors
DefaultTableStoreTimeseriesWriter supports three constructors. You can select a constructor based on your scenario.
Create an instance by passing credentials and using default configurations
When you use this constructor, you pass credentials to Timeseries Writer, which then internally creates a time series client. No other operations are required after you close Timeseries Writer. We recommend that you use this constructor.
/** * The recommended Timeseries Writer. * * @param endpoint The endpoint of the instance. * @param credentials The authentication information, which includes the AccessKey pair. You can also use a temporary access credential token. * @param instanceName The name of the instance. * @param config The configuration of the Timeseries Writer. * @param resultCallback The row-level callback. */ public DefaultTableStoreTimeseriesWriter( String endpoint, ServiceCredentials credentials, String instanceName, TimeseriesWriterConfig config, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)Create an instance by passing credentials and using custom configurations
When you use this constructor, you pass credentials to Timeseries Writer, which then internally creates a time series client and allows for custom configurations. No other operations are required after you close Timeseries Writer.
/** * The recommended Timeseries Writer. * * @param endpoint The endpoint of the instance. * @param credentials The authentication information, which includes the AccessKey pair. You can also use a temporary access credential token. * @param instanceName The name of the instance. * @param config The configuration of the Timeseries Writer. * @param cc The configuration of the client. * @param resultCallback The row-level callback. */ public DefaultTableStoreTimeseriesWriter( String endpoint, ServiceCredentials credentials, String instanceName, TimeseriesWriterConfig config, ClientConfiguration cc, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)Create an instance by directly passing a time series client
Passing a time series client directly provides more flexibility for client configuration. However, you must explicitly shut down the otsclient after you close the writer.
ImportantAfter you close the writer, you must explicitly shut down the otsclient.
/** * The Timeseries Writer. * * @param ots The asynchronous time series table client instance. * @param config The configuration of the Timeseries Writer. * @param callback The row-level callback. * @param executor The thread pool. */ public DefaultTableStoreTimeseriesWriter( AsyncTimeseriesClientInterface ots, TimeseriesWriterConfig config, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> callback, Executor executor)
Configure TimeseriesWriterConfig
Timeseries Writer supports custom configuration parameters. You can use the TimeseriesWriterConfig() utility class to customize parameters.
Parameter | Type | Example | Description |
maxBatchRowsCount | Integer | 200 | The maximum number of rows that can be imported in a single batch RPC request. The default value is 200. The value must be in the range of 1 to 200. |
maxBatchSize | Integer | 4*1024*1024 | The maximum data volume that can be imported in a single batch RPC request. The default value is |
concurrency | Integer | 64 | The maximum number of concurrent requests for a Timeseries Writer. The default value is 64. |
bucketCount | Integer | 4 | The number of buckets for caching time series data. The default value is 4. The total cache queue is |
bufferSize | Integer | 1024 | The queue length of each bucket. The default value is 1024. |
flushInterval | Integer | 10000 | The interval for flushing data. The default value is 10000. The unit is millisecond. |
logInterval | Integer | 10000 | The interval for recording logs. The default value is 10000. The unit is millisecond. |
dispatchMode | String | HASH_PRIMARY_KEY | The dispatch mode for multiple buckets. Valid values:
|
writeMode | String | PARALLEL | The write mode. Valid values:
|
callbackThreadCount | Integer | 9 | The maximum number of threads in the thread pool for the internal callback. The default value is |
callbackThreadPoolQueueSize | Integer | 1024 | The queue size of the thread pool for the internal callback. The default value is 1024. This parameter takes effect only when the callback function is called to internally construct a thread pool. |
writerRetryStrategy | String | CERTAIN_ERROR_CODE_NOT_RETRY | The retry policy used by the internally constructed client when the constructor is created by passing credentials. Valid values:
|
clientMaxConnections | String | 300 | The maximum connections configuration for the internally constructed client when the constructor is created by passing credentials. The default value is 300. |
allowDuplicatedRowInBatchRequest | Boolean | true | Specifies whether a batch request can write to the same time series. Valid values are true and false. The default value is true. If you do not want a batch request to write to the same time series, set this parameter to false. |
Appendix: Complete code
The complete code demonstrates four operations: creating a time series table, creating a Timeseries Writer instance, using the Timeseries Writer to write 10,000 rows of data, and closing the Timeseries Writer.
Before you use Timeseries Writer, make sure that you have installed Tablestore Java SDK 5.13.15 or later.
public class TimeseriesWriterSample { // The endpoint of your instance. private static String endpoint = "https://[instanceName].cn-hangzhou.ots.aliyuncs.com"; // The name of your instance. private static String instanceName = "instanceName"; // The AccessKey pair (AccessKey ID and AccessKey secret) of your Alibaba Cloud account or RAM user. private static String accessKeyId = "XXXXXXXXXX"; private static String accessKeySecret = "XXXXXXXXXXXXXXXXXXXXXX"; // The name of the time series table. private static String tableName = "tableName"; private static AtomicLong succeedRows = new AtomicLong(); private static AtomicLong failedRows = new AtomicLong(); public static void main(String[] args) { TimeseriesWriterSample sample = new TimeseriesWriterSample(); /** * Make sure the table exists before using the writer. * Tests show that no exceptions occur if you write data to a time series table 30 seconds after it is created. * The writer verifies the existence of the table. * */ sample.tryCreateTable(); /** * For initialization, we recommend that you use: * DefaultTableStoreTimeseriesWriter( * String endpoint, // The endpoint of the instance. * ServiceCredentials credentials, // The authentication information, which includes the AccessKey pair. You can also use a temporary access credential token. * String instanceName, // The name of the instance. * String tableName, // The name of the time series table. A Timeseries Writer is for a single time series table. * TimeseriesWriterConfig config, // The configuration of the Timeseries Writer. * TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback) // The row-level callback. * */ DefaultTableStoreTimeseriesWriter writer = sample.createTableStoreTimeseriesWriter(); /** * Use Future for batch writing. * */ sample.writeTimeseriesRowWithFuture(writer); System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get()); System.out.println("Count by WriterStatics: " + writer.getTimeseriesWriterStatistics()); /** * You must actively close the writer. The system waits for all data in the queues to be written, * and then automatically closes the client and the internal thread pool. * */ writer.close(); } private void tryCreateTable() { TimeseriesClient ots = new TimeseriesClient(endpoint, accessKeyId, accessKeySecret, instanceName); TimeseriesTableMeta timeseriesTableMeta = new TimeseriesTableMeta(tableName); int timeToLive = -1; timeseriesTableMeta.setTimeseriesTableOptions(new TimeseriesTableOptions(timeToLive)); CreateTimeseriesTableRequest request = new CreateTimeseriesTableRequest(timeseriesTableMeta); try { CreateTimeseriesTableResponse res = ots.createTimeseriesTable(request); System.out.println("waiting for creating time series table ......"); TimeUnit.SECONDS.sleep(30); } catch (Exception e) { throw new ClientException(e); } finally { ots.shutdown(); } } private DefaultTableStoreTimeseriesWriter createTableStoreTimeseriesWriter() { TimeseriesWriterConfig config = new TimeseriesWriterConfig(); config.setWriteMode(TSWriteMode.PARALLEL); // Parallel write (parallel writing within each bucket). config.setDispatchMode(TSDispatchMode.HASH_PRIMARY_KEY); // Bucketing based on the primary key hash to ensure that data with the same primary key is written to the same bucket in order. config.setBucketCount(4); // The number of buckets. This improves concurrency for sequential writes. The write speed is positively correlated with the number of buckets until the machine bottleneck is reached. config.setCallbackThreadCount(16); // Set the number of threads in the thread pool for the internal callback of the writer. /** * A user-defined row-level callback. * This example uses success and failure counters to demonstrate the callback capability. * */ TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> resultCallback = new TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult>() { @Override public void onCompleted(TimeseriesTableRow rowChange, TimeseriesRowResult cc) { succeedRows.incrementAndGet(); } @Override public void onFailed(TimeseriesTableRow rowChange, Exception ex) { failedRows.incrementAndGet(); } }; ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret); /** * We recommend that you use the internally built thread pool and client. * This simplifies usage and isolates the initialization and release logic. * */ DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter( endpoint, credentials, instanceName, config, resultCallback); return writer; } private void writeTimeseriesRowWithFuture(DefaultTableStoreTimeseriesWriter writer) { System.out.println("=========================================================[Start]"); System.out.print("Write Timeseries Row With Future, "); int rowsCount = 10000; int columnsCount = 10; AtomicLong rowIndex = new AtomicLong(-1); List<Future<TimeseriesWriterResult>> futures = new LinkedList<Future<TimeseriesWriterResult>>(); List<TimeseriesTableRow> rows = new ArrayList<TimeseriesTableRow>(); for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) { Map<String, String> tags = new HashMap<String, String>(); tags.put("region", "hangzhou"); tags.put("os", "Ubuntu16.04"); // Construct a TimeseriesKey from the measurementName, dataSource, and tags. TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_" + index, tags); // Create a timeseriesRow by specifying the timeseriesKey and timeInUs. TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000 + index); // Add data values (fields). for (int j = 0; j < columnsCount; j++) { row.addField("cpu_usage_" + j, ColumnValue.fromDouble(Math.random() * 100)); } row.addField("index", ColumnValue.fromLong(index)); TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); rows.add(rowInTable); if (Math.random() > 0.9995 || index == rowsCount - 1) { Future<TimeseriesWriterResult> future = writer.addTimeseriesRowChangeWithFuture(rows); futures.add(future); rows.clear(); } } System.out.println("Write thread finished."); writer.flush(); printFutureResult(futures); System.out.println("=========================================================[Finish]"); } private void printFutureResult(List<Future<TimeseriesWriterResult>> futures) { int totalRow = 0; System.out.println("time series writer results as follow:"); for (int index = 0; index < futures.size(); index++) { try { TimeseriesWriterResult result = futures.get(index).get(); totalRow += result.getTotalCount(); System.out.println(String.format( "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d", index, result.getFailedRows().size(), result.getSucceedRows().size(), result.getTotalCount(), totalRow)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }