表格存储支持通过时序Writer将时序数据写入时序表,时序Writer还支持多表写入、写入状态统计、行级别回调和自定义配置功能。本文介绍使用时序Writer写入时序数据的使用流程。
背景信息
使用时序模型时,用户主要通过调用PutTimeseriesData接口来实现时序数据的写入,但是此方式存在如下使用限制:
需要手动控制并发。
不支持行级别的回调功能。
不支持性能参数和限制参数配置。
不支持一次向多个时序表中写入数据。
因此表格存储Java SDK提供了时序Writer用于高性能写入时序数据。时序Writer封装了PutTimeseriesData接口,实现了内部控制并发写入时序数据的功能,同时支持多表写入、写入状态统计、行级别回调和自定义配置功能。
功能特性
时序Writer提供了多表写入、批量写入、并发写入、写入状态统计、行级别回调和自定义配置功能。详细说明请参见下表。
功能 | 描述 |
多表写入 | 时序Writer可以实现向多个不同的数据表中写入数据。用户新增数据后先缓存至内存,再根据表名构造不同的请求。 |
批量写入 | 单行写入接口为 |
并发写入 | 通过数据分桶与分桶间的并发控制,提升数据的写入速率。 |
写入状态统计 | 支持总请求次数、总行数、总成功行数、总失败行数、单行请求数等指标的查询。 |
行级别回调 | 支持对每一行数据的写入结果设置回调函数。 |
自定义配置 | 支持对并发数、重试策略、队列大小等参数进行自定义配置。 |
注意事项
目前支持使用时序模型功能的地域有华东1(杭州)、华东2(上海)、华北2(北京)、华北3(张家口)、华北6(乌兰察布)、华南1(深圳)、中国香港、德国(法兰克福)、美国(弗吉尼亚)和新加坡。
如果使用过程中遇到问题,请通过钉钉加入用户群44327024(物联网存储 IoTstore 开发者交流群
)联系我们。
前提条件
已创建RAM用户,并授予RAM用户管理表格存储服务的权限(AliyunOTSFullAccess)。具体操作,请参见通过RAM Policy为RAM用户授权。
重要由于数据同步任务配置时需要填写访问密钥AccessKey(AK)信息来执行授权,为避免阿里云账号泄露AccessKey带来的安全风险,建议您通过RAM用户来完成授权和AccessKey的创建。
已获取AccessKey(包括AccessKey ID和AccessKey Secret),用于进行签名认证。具体操作,请参见获取AccessKey。
快速上手
使用时序Writer快速写入时序数据到时序表。
此处按照步骤详细介绍时序Writer的使用方式,完整代码请参见附录:完整代码。
步骤一:安装时序Writer
如果已经安装表格存储Java SDK,可跳过此操作。
在Maven工程中使用时序Writer,只需在pom.xml中加入表格存储 Java SDK依赖即可。以5.13.15版本为例,在<dependencies>内加入如下内容:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore</artifactId> <version>5.13.15</version> </dependency>
步骤二:创建时序Writer实例
创建时序Writer实例用于将时序数据写入时序表。
表格存储提供了三种构造时序Writer函数的方式,此处以通过传入credentials且使用默认配置为例介绍构造函数的方式。更多信息,请参见构造函数。
创建时序Writer实例时支持通过
TimeseriesWriterConfig()
工具类实现自定义参数。具体的自定义参数说明请参见配置TimeseriesWriterConfig。
endpoint = ""; accessKeyId = ""; accessKeySecret = ""; instanceName = ""; ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret); TimeseriesWriterConfig config = new TimeseriesWriterConfig(); DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter( endpoint, credentials, instanceName, config, null);
步骤三:新增数据
构造一条时序数据,新增的时序数据将被缓存到本地。
Map<String, String> tags = new HashMap<String, String>(); tags.put("region", "hangzhou"); tags.put("os", "Ubuntu16.04"); TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_0", tags); TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000); //一条时序数据。 String tableName = "rowTableName"; // 时序数据所在的时序表。 TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); //封装时序数据与时序表。 writer.addTimeseriesRowChange(rowInTable); // 新增的时序数据将被缓存到本地。
步骤四:写入时序数据
新增数据到本地缓存后,您可以调用flush函数手动将本地缓存数据写入时序表。
当某一分桶中的数据量达到maxBatchRowsCount(默认为200)设定值时,该分桶将会自动将本地缓存数据写入到时序表。
writer.flush(); // 将本地缓存数据写入时序表。
步骤五:关闭时序Writer
时序数据写入后,关闭时序Writer。时序Writer关闭前会将本地缓存中未写入的时序数据自动写入时序表。
writer.close();
接口说明
DefaultTableStoreTimeseriesWriter类是实现时序Writer的主工具类。
接口定义
DefaultTableStoreTimeseriesWriter类中的函数定义如下:
public class DefaultTableStoreTimeseriesWriter implements TableStoreTimeseriesWriter { //构造函数 public DefaultTableStoreTimeseriesWriter(...) // 单时间线新增数据。 void addTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException; // 单时间线新增数据。 Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(TimeseriesTableRow timeseriesTableRow) throws ClientException; // 尝试单时间线新增数据。 boolean tryAddTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException; // 多时间线新增数据。 void addTimeseriesRowChange(List<TimeseriesTableRow> timeseriesTableRow, List<TimeseriesRow> dirtySeriesRow) throws ClientException; // 多时间线新增数据。 Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(List<TimeseriesTableRow> timeseriesTableRows) throws ClientException; // 用户自定义行级别callback。 void setResultCallback(TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> callback); // 获取callback。 TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> getResultCallback(); // 获取时序writer配置。 TimeseriesWriterConfig getTimeseriesWriterConfig(); // 设置时序writer。 TimeseriesWriterHandleStatistics getTimeseriesWriterStatistics(); // 等待writer内存中数据全部落盘并写入数据。 void flush() throws ClientException; // 关闭时序表写入writer。 void close(); }
构造函数
DefaultTableStoreTimeseriesWriter支持三种构造函数,请根据实际应用场景选择构造函数的方式。
通过传入credentials且使用默认配置构建函数
通过传入credentials,时序Writer会在内部构建时序client,在释放时序Writer后无需其他操作。推荐使用此方式构造函数。
/** * 推荐使用的时序writer。 * * @param endpoint 实例的服务地址。 * @param credentials 认证信息,包含AccessKey,也支持使用临时访问凭证token。 * @param instanceName 实例名称。 * @param config 时序writer的配置。 * @param resultCallback 行级别回调。 */ public DefaultTableStoreTimeseriesWriter( String endpoint, ServiceCredentials credentials, String instanceName, TimeseriesWriterConfig config, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
通过传入credentials且使用自定义配置构建函数
通过传入credentials,时序Writer会在内部构建时序client,同时可以进行自定义配置,在释放时序Writer后无需其他操作。
/** * 推荐使用的时序writer。 * * @param endpoint 实例的服务地址。 * @param credentials 认证信息,含AccessKey,也支持临时访问凭证token。 * @param instanceName 实例名称。 * @param config 时序writer的配置。 * @param cc 客户端的配置。 * @param resultCallback 行级别回调。 */ public DefaultTableStoreTimeseriesWriter( String endpoint, ServiceCredentials credentials, String instanceName, TimeseriesWriterConfig config, ClientConfiguration cc, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
直接传入时序client构造函数
直接传入时序client,时序client的配置更灵活,但是需要在释放Writer后主动对otsclient进行shutdown。
重要在释放Writer后请务必主动对otsclient进行shutdown。
/** * 时序writer。 * * @param ots 异步时序表客户端实例。 * @param config 时序writer的配置。 * @param callback 行级别回调。 * @param executor 线程池。 */ public DefaultTableStoreTimeseriesWriter( AsyncTimeseriesClientInterface ots, TimeseriesWriterConfig config, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> callback, Executor executor)
配置TimeseriesWriterConfig
时序Writer支持自定义配置参数,通过TimeseriesWriterConfig()
工具类实现自定义参数。
参数 | 类型 | 示例 | 说明 |
maxBatchRowsCount | Integer | 200 | 一次批量RPC请求导入的最大行数。默认值为200。取值范围为1~200。 |
maxBatchSize | Integer | 4*1024*1024 | 一次批量RPC请求导入的最大数据量。默认值为 |
concurrency | Integer | 64 | 一个时序Writer的最大请求并发数。默认值为64。 |
bucketCount | Integer | 4 | 时序数据缓存分桶数。默认值为4。 由于总缓存队列为 |
bufferSize | Integer | 1024 | 每个桶的队列长度。默认值为1024。 |
flushInterval | Integer | 10000 | flush数据的时间间隔。默认值为10000。单位为毫秒。 |
logInterval | Integer | 10000 | 记录日志的时间间隔。默认值为10000。单位为毫秒。 |
dispatchMode | String | HASH_PRIMARY_KEY | 多桶分发模式。取值范围如下:
|
writeMode | String | PARALLEL | 写入模式。取值范围如下:
|
callbackThreadCount | Integer | 9 | 内部Callback运行的线程池的最大线程数。默认值为 |
callbackThreadPoolQueueSize | Integer | 1024 | 内部Callback运行的线程池的队列大小。默认值为1024。只有调用Callback函数内部构建线程池时生效。 |
writerRetryStrategy | String | CERTAIN_ERROR_CODE_NOT_RETRY | 通过传入credentials方式构造函数时内部构建client使用的重试策略。取值范围如下:
|
clientMaxConnections | String | 300 | 通过传入credentials方式构造函数时,内部构建client使用的最大连接数配置。默认值为300。 |
allowDuplicatedRowInBatchRequest | Boolean | true | 批量请求是否允许对同一时间线进行写入。取值范围为true和false。默认值为true。 如果批量请求时不允许对同一时间线进行写入,请配置此参数为false。 |
附录:完整代码
完整代码主要包括了创建时序表、构造时序Writer、使用时序Writer写入10000条数据和关闭时序Writer四个操作。
使用时序Writer前,请确保已安装5.13.15版本及以上的表格存储Java SDK。
public class TimeseriesWriterSample { // 填写实例的服务地址。 private static String endpoint = "https://[instanceName].cn-hangzhou.ots.aliyuncs.com"; // 填写实例名称。 private static String instanceName = "instanceName"; // 填写阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和Access Secret)。 private static String accessKeyId = "XXXXXXXXXX"; private static String accessKeySecret = "XXXXXXXXXXXXXXXXXXXXXX"; // 填写时序表名称。 private static String tableName = "tableName"; private static AtomicLong succeedRows = new AtomicLong(); private static AtomicLong failedRows = new AtomicLong(); public static void main(String[] args) { TimeseriesWriterSample sample = new TimeseriesWriterSample(); /** * 使用Writer前确保表已存在 * 经过测试,时序表创建30秒后向其中写入数据不会出现异常情况 * writer会校验表的存在性 * */ sample.tryCreateTable(); /** * 初始化建议使用 * DefaultTableStoreTimeseriesWriter( * String endpoint, // 实例的服务地址、 * ServiceCredentials credentials, // 认证信息,包含AccessKey,也支持使用临时访问凭证token。 * String instanceName, // 实例名称。 * String tableName, // 时序表名称:一个时序writer仅针对一个时序表。 * TimeseriesWriterConfig config, // 时序writer的配置。 * TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback) // 行级别回调。 * */ DefaultTableStoreTimeseriesWriter writer = sample.createTableStoreTimeseriesWriter(); /** * Future使用:批量写 * */ sample.writeTimeseriesRowWithFuture(writer); System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get()); System.out.println("Count by WriterStatics: " + writer.getTimeseriesWriterStatistics()); /** * 用户需要主动关闭Writer,内部等候所有队列数据写入后,自动关闭client与内部的线程池。 * */ writer.close(); } private void tryCreateTable() { TimeseriesClient ots = new TimeseriesClient(endpoint, accessKeyId, accessKeySecret, instanceName); TimeseriesTableMeta timeseriesTableMeta = new TimeseriesTableMeta(tableName); int timeToLive = -1; timeseriesTableMeta.setTimeseriesTableOptions(new TimeseriesTableOptions(timeToLive)); CreateTimeseriesTableRequest request = new CreateTimeseriesTableRequest(timeseriesTableMeta); try { CreateTimeseriesTableResponse res = ots.createTimeseriesTable(request); System.out.println("waiting for creating time series table ......"); TimeUnit.SECONDS.sleep(30); } catch (Exception e) { throw new ClientException(e); } finally { ots.shutdown(); } } private DefaultTableStoreTimeseriesWriter createTableStoreTimeseriesWriter() { TimeseriesWriterConfig config = new TimeseriesWriterConfig(); config.setWriteMode(TSWriteMode.PARALLEL); // 并行写(每个桶内并行写)。 config.setDispatchMode(TSDispatchMode.HASH_PRIMARY_KEY); // 基于主键哈希值做分桶,保证相同主键落在同一个桶内有序写入。 config.setBucketCount(4); // 分桶数,提升串行写并发。未达到机器瓶颈时,写入速率与分桶数正相关。 config.setCallbackThreadCount(16); // 设置Writer内部Callback运行的线程池线程。 /** * 用户自定义的行级别callback * 该示例通过成功、失败计数,简单展示回调能力 * */ TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> resultCallback = new TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult>() { @Override public void onCompleted(TimeseriesTableRow rowChange, TimeseriesRowResult cc) { succeedRows.incrementAndGet(); } @Override public void onFailed(TimeseriesTableRow rowChange, Exception ex) { failedRows.incrementAndGet(); } }; ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret); /** * 推荐使用内部构建的线程池与Client,方便用户使用,隔离初始化、释放的逻辑 * */ DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter( endpoint, credentials, instanceName, config, resultCallback); return writer; } private void writeTimeseriesRowWithFuture(DefaultTableStoreTimeseriesWriter writer) { System.out.println("=========================================================[Start]"); System.out.print("Write Timeseries Row With Future, "); int rowsCount = 10000; int columnsCount = 10; AtomicLong rowIndex = new AtomicLong(-1); List<Future<TimeseriesWriterResult>> futures = new LinkedList<Future<TimeseriesWriterResult>>(); List<TimeseriesTableRow> rows = new ArrayList<TimeseriesTableRow>(); for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) { Map<String, String> tags = new HashMap<String, String>(); tags.put("region", "hangzhou"); tags.put("os", "Ubuntu16.04"); // 通过measurementName、dataSource和tags构建TimeseriesKey。 TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_" + index, tags); // 指定timeseriesKey和timeInUs创建timeseriesRow。 TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000 + index); // 增加数据值(field)。 for (int j = 0; j < columnsCount; j++) { row.addField("cpu_usage_" + j, ColumnValue.fromDouble(Math.random() * 100)); } row.addField("index", ColumnValue.fromLong(index)); TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); rows.add(rowInTable); if (Math.random() > 0.9995 || index == rowsCount - 1) { Future<TimeseriesWriterResult> future = writer.addTimeseriesRowChangeWithFuture(rows); futures.add(future); rows.clear(); } } System.out.println("Write thread finished."); writer.flush(); printFutureResult(futures); System.out.println("=========================================================[Finish]"); } private void printFutureResult(List<Future<TimeseriesWriterResult>> futures) { int totalRow = 0; System.out.println("time series writer results as follow:"); for (int index = 0; index < futures.size(); index++) { try { TimeseriesWriterResult result = futures.get(index).get(); totalRow += result.getTotalCount(); System.out.println(String.format( "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d", index, result.getFailedRows().size(), result.getSucceedRows().size(), result.getTotalCount(), totalRow)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }