时序数据存储是一种高性能、低成本、稳定可靠的在线时序数据库服务,具有高效读写、高压缩比存储等优点。针对物联网设备数据采集场景,时序数据存储能解决由于设备采集点数量巨大、数据采集频率高造成的存储成本高、写入和查询分析效率低的问题。
如果您购买了企业版实例,可以享受时序数据存储服务。公共实例不提供此服务。关于如何购买企业版实例,请参见购买企业版实例。
什么是时序数据
时序数据存储将一个metric与一组tag的组合称为一条时间线,在一条时间线下面,连续时间点的采样数据则为时序数据。
例如:下图中有3条时间线,{“metric”: “cpu”,“tags”: “site”: “et2”, “ip”: “1.1.1.1”}
这个metric与tag的组合为一条时间线,在同一条时间线下面存储连续的时序数据(timestamp,value)
。
时序数据存储功能
时序数据存储提供以下功能:
- 时序数据高效读写:
- 数据写入:支持通过配置数据流转规则,将数据转发到实例内的时序数据存储进行数据写入;也支持通过SDK写入数据。最高可以支持每秒千万数据点的写入。
- 数据查询:支持通过SDK使用SQL,或通过JDBC连接使用TSQL进行数据的查询操作。百万数据点的读取,响应时间小于5秒。
- 数据管理:
您可以通过控制台设置数据的有效期。开启并设置数据时效后,系统将过期数据及时标记为失效,并自动在特定时间清除。
- 高压缩比存储:
单个数据点的平均使用存储空间仅需要1~2个字节,相较于常规存储降低90%存储使用空间。高压缩比存储同时能加快数据写入的速度。
- 数据安全:
为充分保证数据的可用性,时序数据存储默认采取三副本策略。
开通时序数据存储
时序数据存储随您购买的企业版实例开通。具体操作,请参见购买企业版实例。
需要选择时序数据存储的规格,包括以下项目:
项目 |
说明 |
时序数据写入TPS |
每秒钟写入时序数据存储的最大数据条数,达到上限后会写入失败。
时序数据写入TPS选择5千条/秒时,不支持TSQL。
|
时序数据存储空间 |
时序数据达到存储空间上限后会写入失败。
默认时序数据时效为永久,购买后时序数据时效可配置,请参见管理时序数据存储。
|
说明 当实例被释放时,时序数据存储也将同时被释放。
管理时序数据存储
开通时序数据存储后,您可以设置时序数据时效,查看已使用的存储空间、时间线数,查看用户名、密码、连接地址,重新创建时序存储用户。
- 登录物联网平台控制台。
- 在实例概览页面,找到对应的实例,单击实例进入实例详情页面。
- 在实例详情页面下方,查看实时时序存储写入TPS、实时时序存储空间数据,及对应的监控图表,设置相关报警规则。
单击相应区域框中的报警配置,在云监控控制台,设置阈值报警规则。具体操作,请参见创建阈值报警规则。
- 在实例详情页面,单击右上角的查看开发配置,查看下方的时序存储配置。
- 查看时序存储用户名和密码。
单击查看,短信校验码验证通过后显示密码。
用户名、密码、连接地址用于查询时序存储中的数据,购买企业版实例时,会自动生成。
- 重新创建时序存储用户。
注意 重新创建时序存储用户后,旧的用户名和密码将失效,会导致正在进行的时序数据查询失败,请及时更新SDK使用的用户名和密码。
单击重新创建,输入新的用户名、密码,短信校验码验证通过后,创建成功。
- 设置时序数据存储时效。
购买企业版实例时,时效配置默认关闭。您可打开存储时效开关,设置保留数据天数。取值范围为1~365天。
通过云产品流转写入数据
您可以通过配置数据流转规则,将数据转发到实例内的时序数据存储。
说明
- 在设置转发之前,请参见设置数据流转规则,创建数据转发规则和编写处理数据的SQL。
- 只支持JSON格式数据转发。
- 转发的消息中,除了配置的timestamp、tag值字段外,其他字段都将作为metric写入时序数据存储。metric的数据类型支持数值型、字符串,其他类型会导致写入失败。
- 登录物联网平台控制台。
- 在实例概览页面,找到对应的实例,单击实例进入实例详情页面。
- 在左侧导航栏,选择。
- 单击规则对应的查看,进入数据流转规则页。
- 单击转发数据一栏对应的添加操作。
- 在添加操作对话框中,选择操作为存储到实例内的时序数据存储中。按照界面提示,设置其他信息,单击确认。
参数 |
描述 |
选择操作 |
选择存储到实例内的时序数据存储中。
|
地域 |
固定为您的物联网平台实例所在地域。 |
metric数据类型 |
选择metric的数据类型。支持数值型和字符串。
更多信息,可单击帮助按钮 查看。
|
timestamp |
时间戳。支持:
- 使用转义符
${} 表达式,例如${time} ,表示取值为数据源Topic消息中time字段对应的值。
- 使用数据流转函数
timestamp() ,表示取值为数据流转服务器的时间戳。
- 输入值,必须为Unix时间戳,例如1404955893000。
|
tag名称 |
设置标记数据的标签名。支持中文汉字、英文字母、数字、和特殊字符,包括:半角冒号(:)、逗号(,)、英文句号(.)、单引号(')、正斜线(/)、短划线(-)、下划线(_)、圆括号(())、方括号([])。 |
tag值 |
设置标签值。支持:
- 使用转义符
${} 表达式。例如,数据源Topic的消息结构中,包含一个位置属性,标识符为city,则可以指定标签值为${city} ,表示消息中city字段对应的值。建议使用此方式。
- 使用数据流转函数规定的一些函数,例如
deviceName() ,表示标签值为设备名称。支持的函数,请参见函数列表。
- 输入常量,例如beijing。支持输入中文汉字、英文字母、数字、和特殊字符包括:半角冒号(:)、逗号(,)、英文句号(.)、单引号(')、正斜线(/)、短划线(-)、下划线(_)、圆括号(())、方括号([])。
说明
- 最多可添加8个tag键值对。
- 需保证时序数据存储能够获取到配置的tag键值对,如果获取不到任意一个tag键值对,会导致写入数据库失败。
|
- 回到云产品流转页,单击规则对应的启动按钮启动规则。
下面给出一个数据流转示例。
示例规则的SQL:
SELECT time,city,power,distance, FROM "/alprodu****/myDevice/user/update";
规则引擎根据SQL处理数据和写入数据到时序数据存储如下。
- 根据该SQL,规则引擎从Topic
/alprodu****/myDevice/user/update
的消息中,筛选出time、city、power和distance字段内容,作为转发的消息内容。
通过以上SQL处理后的转发消息内容示例如下:
{
"time": 1513677897,
"city": "beijing",
"distance": 8545,
"power": 93.0
}
- 根据已配置的数据流转操作,规则引擎向时序数据存储中写入两条数据。
示例中写入时序数据存储的数据如下:
-
数据: timestamp:1513677897, [metric:power value:93.0]
tag: cityName=beijing
-
数据: timestamp:1513677897, [metric:distance value:8545]
tag: cityName=beijing
通过SDK写入数据
SDK使用Point类表示一个时间点。一个 Point对象表示一个时间序列(时间线)某个时刻上的数据。
- 获取SDK。
使用Maven做项目构建工具,在pom.xml文件的<dependencies>标签中添加hitsdb-client依赖。代码如下:
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>hitsdb-client</artifactId>
<version>0.2.7</version>
</dependency>
- 配置客户端。
客户端的所有配置均由TSDBConfig类进行配置。示例代码如下:
//实例详情页面中的时序存储SQL连接地址、端口。
String connectString = "XXX";
int port = XXX;
//实例详情页面中的时序数据存储用户名、密码。
String username = "XXX";
String password = "XXXX";
TSDBConfig config = TSDBConfig.address(connectString, port)
.basicAuth(username, password)
// 网络连接池大小,默认为64。
.httpConnectionPool(64)
// HTTP 等待时间,单位为秒,默认为90秒。
.httpConnectTimeout(90)
// IO 线程数,默认为1。
.ioThreadCount(1)
.config();
- 构造时间点。
Point(时间点)有多种构造方法,形式比较多样化。下面提供三种构造时间点的方法示例。
示例一
构建一个时间点。用单位为秒的时间戳表示时间,指定Point数据的Metric与多个Tag。
// 以'秒'为时间戳。
int timestamp = (int)(System.currentTimeMillis()/1000);
// 构造Point。
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.tag("tagk2", "tagv2")
.tag("tagk3", "tagv3")
.timestamp(timestamp).value(123.456)
.build();
示例二
构建一个时间点。用单位为毫秒的时间戳表示时间,指定Point数据的Metric,Tag使用Map形式的键值对。
// 也可以毫秒为时间戳。
long timestamp = System.currentTimeMillis();
// 使用HashMap表示Tags。
Map<String,String> tagsMap = new HashMap<String,String>();
tagsMap.put("tagk1", "tagv1");
tagsMap.put("tagk2", "tagv2");
// 构造Point。
Point point = Point.metric("test1")
.tag(tagsMap)
.value(timestamp,123.456)
.build();
示例三
构建一个时间点。使用java.util.Date表示时间。
// 使用java.util.Date表示时间。
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(new Date,123.456)
.build();
- 写入数据。
SDK有两种写数据的方式:同步阻塞的写数据、异步非阻塞的写数据。
- 异步非阻塞的写数据
异步写数据的方式比较简单,只要有Point就可以提交数据。Client会自动帮助您批量地提交数据,不需要您手动打包。如果没有特殊需求,推荐您使用异步非阻塞的写数据的方式。示例代码:
// 构建Point。
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(timestamp, Math.random())
.build();
// 直接提交数据。
tsdb.put(point);
当您使用异步写数据的方式时,若需要获取Put接口的响应结果,则需要设置回调接口。示例代码:
final AtomicLong num = new AtomicLong();
// 回调对象。
BatchPutCallback cb = new BatchPutCallback() {
@Override
public void response(String address, List<Point> input, Result output) {
long afterNum = num.addAndGet(input.size());
System.out.println("成功处理" + input.size() + ",已处理" + afterNum);
}
@Override
public void failed(String address, List<Point> input, Exception ex) {
ex.printStackTrace();
long afterNum = num.addAndGet(input.size());
System.out.println("失败处理" + input.size() + ",已处理" + afterNum);
}
};
HiTSDBConfig config = HiTSDBConfig
.address("example.hitsdb.com", 8242)
.listenBatchPut(cb) // 设置回调接口。
.config();
tsdb = HiTSDBClientFactory.connect(config);
SDK提供了以下三种类型的PutCallback接口来根据需要选择返回的数据内容:
- BatchPutCallback:即调用
POST /api/put
。
- BatchPutSummaryCallback:即调用
POST /api/put?summary=true
。
- BatchPutDetailsCallback:即调用
POST /api/put?details=true
。
- 同步阻塞的写数据
假设我们现在需要构建500个时间点提交给时序数据存储,示例代码:
List<Point> points = new ArrayList<Point>();
// 构建Point。
for(int i = 0; i<500; i++) {
long timestamp = System.currentTimeMillis();
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(timestamp, Math.random())
.build();
// 手动打包数据。
points.add(point);
}
// 手动打包后提交数据。
tsdb.putSync(points)
说明 出于写入性能的考虑,同步写的方式一般需要您手动将数据点打包成一批数据,并且建议一批数据包含500~1000个数据点。此外,也建议多个线程并发进行提交。
您可以根据需要选择返回的数据内容:
- 返回空对象,无内容。本质是调用
POST /api/put
。
Result result = tsdb.putSync(ps);
- 返回提交概述,包含成功数和返回数。本质是调用
POST /api/put?summary=true
。
SummaryResult summaryResult = tsdb.putSync(ps,SummaryResult.class);
- 返回提交概述。包含成功数、返回数和失败原因。本质是调用
POST /api/put?details=true
。
DetailsResult detailsResult = tsdb.putSync(ps,DetailsResult.class);
通过SDK使用SQL查询数据
您可以通过SDK使用SQL查询已写入实例内时序数据存储的数据。
- 获取SDK。
使用Maven做项目构建工具,在pom.xml文件的<dependencies>标签中添加hitsdb-client依赖。代码如下:
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>hitsdb-client</artifactId>
<version>0.2.7</version>
</dependency>
- 配置客户端。
客户端的所有配置均由TSDBConfig类进行配置。示例代码如下:
//实例详情页面中的时序存储SQL连接地址、端口。
String connectString = "XXX";
int port = XXX;
//实例详情页面中的时序数据存储用户名、密码。
String username = "XXX";
String password = "XXXX";
TSDBConfig config = TSDBConfig.address(connectString, port)
.basicAuth(username, password)
// 网络连接池大小,默认为64。
.httpConnectionPool(64)
// HTTP 等待时间,单位为秒,默认为90秒。
.httpConnectTimeout(90)
// IO 线程数,默认为1。
.ioThreadCount(1)
.config();
- 查询数据。
示例代码如下:
TSDB tsdbClient = TSDBClientFactory.connect(config);
//按标签筛选数据。
Map<String, String> tags = new HashMap<>();
String metric = "XXX";
long now = System.currentTimeMillis();
LastPointQuery query = LastPointQuery.builder()
.timestamp(now)
.backScan(-1)
.msResolution(true)
.sub(LastPointSubQuery.builder(metric, tags).build()).build();
List<LastDataValue> lastDataValues = tsdbClient.queryLast(query);
System.out.println(lastDataValues);
说明 SDK不能用于将数据写入实例内的时序数据存储。
通过JDBC连接使用TSQL查询数据
您可以通过JDBC连接使用TSQL进行数据的查询操作。
说明 时序数据写入TPS为5千条/秒时,不支持TSQL。
- 引入Driver依赖。
运行时要求:
- Java 1.8 Runtime。
- 配置实例JBDC访问,获取JDBC URL。
TSQL的JDBC driver的依赖已经发布到Maven仓库,这里以Maven来管理项目(tsql_jdbc_app)为例,把下面的内容加入pom.xml。获得一个包括所有依赖.jar文件,以及应用程序.class文件的JAR包。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.tsdb.tsql</groupId>
<artifactId>tsql_jdbc_app</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.drill.exec/drill-jdbc -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-jdbc-all</artifactId>
<version>1.15.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.alibaba.tsdb.tsql.TsqlJdbcSampleApp</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 在Java应用项目下,创建一个package com.alibaba.tsdb.tsql文件,并创建一个Java源文件TsqlJdbcSampleApp。
在以下示例中,根据您的实际情况,修改:
- host:实例详情页面中的时序存储TSQL连接地址。
- port:实例详情页面中的时序存储TSQL端口。
- sql: 您所需要执行的TSQL查询语句。
package com.alibaba.tsdb.tsql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class TsqlJdbcSampleApp {
public static void main(String[] args) throws Exception {
Connection connection = null;
Statement stmt = null;
try {
// step 1: Register JDBC driver
Class.forName("org.apache.drill.jdbc.Driver");
// hostname or address of TSDB instance.
String host = "ts-uf64t3199j58j8251.tsql.hitsdb.rds.aliyuncs.com";
// port for TSQL JDBC service
int port = 3306;
String jdbcUrl = String.format("jdbc:drill:drillbit=%s:%s", host, port);
// step 2: Open connection
System.out.println("Connecting to database @ " + jdbcUrl + " ...");
connection = DriverManager.getConnection(jdbcUrl);
// step 3: Create a statement
System.out.println("Creating statement ...");
stmt = connection.createStatement();
// step 4: Execute a query using the statement.
String sql = "select hostname, `timestamp`, `value` " +
"from tsdb.`cpu.usage_system` " +
"where `timestamp` between '2019-03-01' and '2019-03-01 00:05:00'";
ResultSet rs = stmt.executeQuery(sql);
// step 5: Extract data from ResultSet.
int row = 0;
System.out.println("hostname\ttimestamp\tvalue");
System.out.println("-----------------------------------------------------");
while (rs.next()) {
row++;
System.out.println(rs.getString("hostname") + "\t" + rs.getTimestamp("timestamp") + "\t" +rs.getDouble("value"));
}
System.out.println("-----------------------------------------------------");
System.out.println( row + "rows returned");
} catch(SQLException se){
//Handle errors for JDBC
se.printStackTrace();
}catch(Exception e){
//Handle errors for Class.forName
e.printStackTrace();
}finally{
//finally block used to close resources
try{
if(stmt!=null)
stmt.close();
}catch(SQLException se2){
}// nothing we can do
try{
if(connection!=null)
connection.close();
}catch(SQLException se){
se.printStackTrace();
}//end finally try
}//end try
System.out.println("Goodbye!");
}
}
- 编译与执行。
在项目根目录下,执行下面的Maven命令:
maven clean install
执行完成后,您将在项目目录/targets下获得可执行程序tsql_jdbc_app-1.0-SNAPSHOT-jar-with-dependencies.jar。
执行该应用程序:
java -jar target/tsql_jdbc_app-1.0-SNAPSHOT-jar-with-dependencies.jar
JDBC协议使用限制
TSQL的JDBC协议存在功能限制。在使用TSQL JDBC协议前请对照检查,具体限制和说明如下:
- TSQL目前仅支持时序数据查询和时序元数据查询,不支持数据写入,修改和删除。
- 时序数据存储没有事务(Transaction)的支持。
具体JDBC接口的限制说明如下:
接口 |
方法 |
TSDB JDBC支持情况 |
Connection |
setAutoCommit(boolean) |
仅允许true作为传入参数。 |
Connection |
getAutoCommit() |
返回true。 |
Connection |
commit() |
调用会引发异常:SQLFeatureNotSupportedException。 |
Connection |
rollback() |
调用会引发异常: SQLFeatureNotSupportedException。 |
Connection |
setTransactionIsolation(int level) |
仅允许TRANSACTION_NONE。 |
Connection |
getTransactionIsolation() |
仅允许TRANSACTION_NONE。 |
Connection |
setSavePoint() |
调用会引发异常 SQLFeatureNotSupportedException。 |
Connection |
setSavePoint(String name) |
调用会引发异常 SQLFeatureNotSupportedException。 |
Connection |
rollback(Savepoint savepoint) |
调用会引发异常 SQLFeatureNotSupportedException。 |
Connection |
releaseSavePoint(Savepoint savepoint) |
调用会引发异常 SQLFeatureNotSupportedException。 |
Connection |
setNetworkTimeout() |
调用会引发异常 SQLFeatureNotSupportedException。 |
Connection |
getNetworkTimeout() |
调用会引发异常 SQLFeatureNotSupportedException。 |