文档

接口说明

更新时间:

本文介绍Lindorm TSDB SDK常用接口的使用方法。

Lindorm TSDB SDK的接口主要分成管理接口、写入接口以及查询接口等3类。

管理接口

Lindorm时序引擎支持使用SQL进行DDL、DCL操作,这里管理接口相关的示例是直接通过提交SQL实现对应的DDL和DCL操作。在Lindorm TSDB SDK中,支持以下两个重载方法向Lindorm时序引擎提交SQL实现DDL和DCL操作。

// 直接提交SQL
Result execute(String sql);

// 在指定的数据库database下进行DDL等操作,比如在指定的database下创建表
Result execute(String database, String sql);

其中,Result对象表示SQL的执行结果,比如提交数据库的列表语句("SHOW DATABASES")后,可以通过Result获取到已经存在的数据库列表。Result对象有columns、metadata、rows等3个字段,其中,columns为结果的列名集合,metadata为对应列的数据类型集合,rows为结果的按行返回的集合。

public class Result {
    private List<String> columns;
    private List<String> metadata;
    private List<List<Object>> rows;
    
    ....
 }

数据库和表管理示例

数据库和表的DDL操作主要包括创建、删除、查看等。代码示例如下:

// 1.查看数据库列表
String showDatabase = "show databases";
Result result = lindormTSDBClient.execute(showDatabase);
System.out.println("before create, db list: " +  result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));

// 2.创建数据库,此处示例创建了名为demo的数据库
String createDatabase = "create database demo";
result = lindormTSDBClient.execute(createDatabase);
System.out.println("create database:" + result.isSuccessful());

// 3.查看数据库列表
result = lindormTSDBClient.execute(showDatabase);
System.out.println("after create, db list: " +  result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));

String database = "demo";

// 4.查看所有表
String showTables = "show tables";
result = lindormTSDBClient.execute(database, showTables);
System.out.println("before create, table list: " +  result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));

// 5.创建表
String createTable = "CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))";
result = lindormTSDBClient.execute(database, createTable);
System.out.println("create table: " + result.isSuccessful());

// 6.查看所有表,验证是否成功创建了表sensor
result = lindormTSDBClient.execute(database, showTables);
System.out.println("after create, table list: " +  result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));

// 7.描述表详情
String describeTable = "describe table sensor";
result = lindormTSDBClient.execute(database, describeTable);
System.out.println("------------ describe table -------------------");
List<String> columns = result.getColumns();
System.out.println("columns: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("metadata: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
    List<Object> row = rows.get(i);
    System.out.println("column #" + i + " : " + row);
}

System.out.println("------------ describe table -------------------");
// 8.删除表
String dropTable = "drop table sensor";
result = lindormTSDBClient.execute(database, dropTable);
System.out.println("drop table: " + result.isSuccessful());

// 9.查看表列表,验证是否成功删除
result = lindormTSDBClient.execute(database, showTables);
System.out.println("after drop, table list: " +  result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));

// 10.删除指定数据库
String dropDatabase = "drop database demo";
result = lindormTSDBClient.execute(dropDatabase);
System.out.println("drop database:" + result.isSuccessful());

// 11.查看数据库列表,验证指定数据库是否删除成功
result = lindormTSDBClient.execute(showDatabase);
System.out.println("after drop, db list : " +  result.getRows().stream().map(e -> (String) e.get(0)).collect(Collectors.toList()));

运行结果示例:

before create, db list: [default]
create database:true
after create, db list: [default, demo]
before create, table list: []
create table: true
after create, table list: [sensor]
------------ describe table -------------------
columns: [columnName, typeName, columnKind]
metadata: [VARCHAR, VARCHAR, VARCHAR]
column #0 : [device_id, VARCHAR, TAG]
column #1 : [region, VARCHAR, TAG]
column #2 : [time, TIMESTAMP, TIMESTAMP]
column #3 : [temperature, DOUBLE, FIELD]
column #4 : [humidity, DOUBLE, FIELD]
------------ describe table -------------------
drop table: true
after drop, table list: []
drop database:true
after drop, db list : [default]
说明

对于这里没有列举的其他管理相关的操作,比如连续查询的创建、删除等DDL操作,都可以根据Lindorm时序引擎的对应的SQL语法说明构造对应的SQL语句来实现。关于Lindorm时序引擎的SQL语句说明,可以参考SQL语法参考

写入接口

Lindom TSDB SDK的写入接口为多个相同名称但参数不同的write方法,但根据处理写入结果方式,可以分成两类,分别为使用CompletableFuture<WriteResult>处理写入结果的接口和使用回调函数Callback处理异步写入结果的接口。

默认情况下,LindormTSDBClient为提高写入性能,使用异步攒批的方式进行数据写入。若需要同步写入,只需要调用下write方法返回的CompletableFuture<WriteResult>的join方法即可。

写入记录

Lindorm TSDB SDK使用Record对象表示表中的一行写入记录。Record需要指定表名、时间戳、标签和量测值。其中,时间戳和标签将用于构建索引。

默认构建Record会进行字符合法性校验,可以通过在build中添加参数'false'关闭校验。

Record record = Record
    // 指定表名
    .table("sensor")
    // 指定时间戳,单位为毫秒
    .time(currentTime)
    // 指定标签
    .tag("device_id", "F07A1260")
    .tag("region", "north-cn")
    // 指定量测值
    .addField("temperature", 12.1)
    .addField("humidity", 45.0)
    .build();

使用CompletableFuture处理写入结果的接口

  • 每次只提交一行记录。

// 写入默认数据库 
CompletableFuture<WriteResult> future = lindormTSDBClient.write(record);

// 写入指定的数据库
String database = "demo";
CompletableFuture<WriteResult> future = lindormTSDBClient.write(database, record);
  • 每次提交多行记录(推荐)。这种方式提交方式,可减少提交时的SDK异步队列锁竞争,效率较高。

List<Record> records;
// 写入默认数据库
CompletableFuture<WriteResult> future = lindormTSDBClient.write(records);

// 写入指定的数据库
String database = "demo";
CompletableFuture<WriteResult> future = lindormTSDBClient.write(database, records);
  • 处理CompletableFuture<WriteResult>完成时的结果。这里仅是一个示例,业务上可以结合实际情况,调用CompletableFuture的其他方法进行结果处理。

CompletableFuture<WriteResult> future = lindormTSDBClient.write(records);
// 处理异步写入结果
future.whenComplete((r, ex) -> {
    if (ex != null) { // 发送异常,一般都是写入失败
        System.out.println("Failed to write.");
        Throwable throwable = ExceptionUtils.getRootCause(ex);
        if (throwable instanceof LindormTSDBException) {
            LindormTSDBException e = (LindormTSDBException) throwable;
            System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, "
                               + "but was rejected with an error response for some reason.");
            System.out.println("Error Code: " + e.getCode());
            System.out.println("SQL State:  " + e.getSqlstate());
            System.out.println("Error Message: " + e.getMessage());
        }  else {
            throwable.printStackTrace();
        }
    } else  { // 一般都是写入成功
        if (r.isSuccessful()) {
            System.out.println("Write successfully.");
        } else {
            System.out.println("Write failure.");
        }
    }
});
重要

不要在CompletableFuture的whenComplete等方法中做复杂耗时的计算,否则可能会阻塞写入;若确实需要做复杂的耗时计算,请将计算逻辑提交到其他独立现场池中。关于错误码的取值和含义,请参见常见错误码参考

使用回调函数Callback处理异步写入结果的接口

  • 写入回调接口Callback的定义如下所示。onCompletion方法上的result表示写入结果,records表示这个回调函数关联的写入记录,异常e表示写入失败时的异常。

public interface Callback {
    void onCompletion(WriteResult result, List<Record> records, Throwable e);
}

下面是使用Callback接口处理写入结果的一个示例。

Callback callback =  new Callback() {
    @Override
    public void onCompletion(WriteResult result, List<Record> list,
                             Throwable throwable) {
        if (throwable != null) { // 写入失败
            if (throwable instanceof LindormTSDBException) {
                LindormTSDBException ex = (LindormTSDBException) throwable;
                System.out.println("errorCode: " + ex.getCode());
                System.out.println("sqlstate: " + ex.getSqlstate());
                System.out.println("message: " + ex.getMessage());
            } else {
                // 其他错误
                throwable.printStackTrace();
            }
        } else {
            if (result.isSuccessful()) {
                System.out.println("Write successfully.");
            } else {
                System.out.println("Write failure.");
            }
        }
    }
};
重要

不要在Callback的onCompletion中做复杂耗时的计算,否则可能会阻塞写入;若确实需要做复杂的耗时计算,请将计算逻辑提交到其他独立现场池中。关于错误码的取值和含义,请参见常见错误码参考

  • 每次提交一行记录。

// 写入默认数据库 
lindormTSDBClient.write(record, callback);

// 写入指定的数据库
String database = "demo";
lindormTSDBClient.write(database, record, callback);
  • 每次提交多行记录(推荐)。这种方式提交方式,可减少提交时的SDK异步队列锁竞争,效率较高。

List<Record> records;
// 写入默认数据库
lindormTSDBClient.write(records, callback);

// 写入指定的数据库
String database = "demo";
lindormTSDBClient.write(database, records, callback);

查询接口

Lindorm TSDB SDK的查询接口是通过SQL查询数据的,关于SQL语句说明可以参考SQL语法参考

Lindorm TSDB SDK的查询接口如下所示,其中,入参有要查询的数据库database, 查询语句sql,查询结果每批返回的行数大小chunkSize。

ResultSet query(String database, String sql, int chunkSize);

Lindorm TSDB SDK 使用ResultSet接口表示SQL查询结果,其定义如下所示。

public interface ResultSet extends Closeable {

    QueryResult next();

    void close();
}

在处理查询结果时,可以循环调用ResultSet的next方法获取查询结果,当next方法返回的QueryResult对象为null时,表示已经读取完全部查询结果。 当查询结束时,调用ResultSet的close方法释放对应的IO资源。

QueryResult表示每批返回的查询结果,该对象有columns、metadata、rows等3个字段,其中,columns为查询结果的查询的列名集合,metadata为对应列的数据类型集合,rows为查询结果的按行返回的集合。其中,关于Lindorm时序引擎查询支持的数据类型集合,请参考数据类型。下面是处理查询结果ResultSet的示例。

String sql = "select * from sensor";
int chunkSize = 100;
ResultSet resultSet = lindormTSDBClient.query("demo", sql, chunkSize);

// 处理查询结果
try {
    QueryResult result = null;
    // 当resultSet的next()方法返回为null,表示已经读取完所有的查询结果
    while ((result = resultSet.next()) != null) {
        List<String> columns = result.getColumns();
        System.out.println("columns: " + columns);
        List<String> metadata = result.getMetadata();
        System.out.println("metadata: " + metadata);
        List<List<Object>> rows = result.getRows();
        for (int i = 0, size = rows.size(); i < size; i++) {
            List<Object> row = rows.get(i);
            System.out.println("row #" + i + " : " + row);
        }
    }
} finally {
    // 查询结束后,需确保调用ResultSet的close方法,以释放IO资源
    resultSet.close();
}
重要

不管查询是否成功,在查询结束时,都需显式调用ResultSet的close方法,以释放IO资源,否则会造成网络连接泄露。

另外,Lindorm TSDB SDK也提供了几个查询接口重载实现,如下所示,业务上可以根据实际情况选择使用不同实现。

// SQL查询语句
String sql = "xxxx";

// 1.使用SQL语句,查询默认数据库, 默认每批返回1000行数据
ResultSet resultSet = lindormTSDBClient.query(sql);

String database = "demo";
// 2.使用SQL语句,查询指定数据库,默认每批返回1000行数据
ResultSet resultSet = lindormTSDBClient.query(database, sql);