全部产品
云市场

从采集到分析-TSDB For InfluxDB®让你的数据产生价值

更新时间:2019-04-26 14:19:45

前言

数据无处不在,价值无处不在。在时序数据库领域,TSDB For InfluxDB®作为一款数据存储分析的利刃,在生产和开发环境中得到了比较广泛的应用。目前,阿里云TSDB For InfluxDB®版本已上线公测,本文主要讲述如何将你的数据搬迁上云,让你的数据产生更大的价值。

了解我们-TSDB For InfluxDB®

TSDB For InfluxDB®是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据。它的主要特点如下:

  • 专为时间序列数据量身订造高性能数据存储。TSM引擎提供数据高速读写和压缩等功能。
  • 简单高效的HTTP API写入和查询接口。
  • 针对时序数据,量身订造类似SQL的查询语言,轻松查询聚合数据。
  • 允许对tag建索引,实现快速有效的查询。
  • 数据保留策略(Retention policies)能够有效地使旧数据自动失效。

在阿里云购买成功(参考文档)之后,我们需要创建数据库帐户和密码,设置存储策略(参考文档)。通过实例详情页面可以查看访问TSDB For InfluxDB®的方式,目前提供两种HTTP访问地址:VPC网络(阿里云专有网络,相同VPC内的云产品实例可以相互访问)与公共网络(需购买时开通,内外部网络都可访问),均开通了HTTPS加密验证,注意接入时使用HTTPS通道

实例生产完成后可以在帐户管理中创建用户test和数据库test,并保证用户test对数据库test具有读写权限。

Telegraf-数据的搬运工

Telegraf是一个用Go语言编写的代理程序,可采集系统和服务的统计数据,并写入TSDB For InfluxDB®数据库。Telegraf具有内存占用小的特点,通过插件系统开发人员可轻松添加支持其他服务的扩展。Telegraf安装完成后,数据可以用HTTP方式写入TSDB For InfluxDB®。

urls选择实例详情中的公网或者VPC访问地址,修改上图标记中的写入的数据库和存储策略,存储策略在不填的情况下将会写入该数据库默认存储策略中。TSDB For InfluxDB®采用认证的方式写入,在填写username和password之前需要确保该帐号对数据库具有写权限。关于权限可以在实例管理->帐号管理中进行设置。完成这些操作之后,Telegraf采集的数据便可以写入到TSDB For InfluxDB®中。

启动Telegraf

Ubuntu, Debian, RedHat, CentOS启动

  1. sudo service telegraf start

Ubuntu 15.04+, Debian 8+, CentOS 7+, RHEL 7+启动

  1. sudo systemctl start telegraf

数据迁移-传输的高速通道

对于在自建环境或者其它平台中已经拥有InfluxDB实例的用户,如果考虑将业务迁移至TSDB For InfluxDB®服务中,我们打造了一键迁移的工具,具体实践可以参考迁移文档。注意迁移工具会自动在TSDB For InfluxDB®上创建选择迁移的数据库,迁移之前不需手动创建,否则会存在写冲突。
数据迁移之后,数据库运维工作放心交给系统后台,用户可以节省更多的时间写入新数据和对实例中的数据进行查询分析。

客户端写入-不一样的风格

InfluxDB开源社区提供了丰富的SDK,基本上涵盖了主流了编程语言,TSDB For InfluxDB®完全兼容各种客户端。下面将介绍几种主要的写入方式:

Go写入方式

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "math/rand"
  6. "net/url"
  7. "time"
  8. client "github.com/influxdata/influxdb1-client"
  9. )
  10. func main() {
  11. host, err := url.Parse(fmt.Sprintf("https://%s:%d", "xxx.influxdata.rds.aliyuncs.com", 3242))
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. config := client.Config{
  16. URL: *host,
  17. Username: "test",
  18. Password: "test",
  19. }
  20. con, err := client.NewClient(config)
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. _, _, err = con.Ping()
  25. if err != nil {
  26. log.Fatal(err)
  27. }
  28. var (
  29. shapes = []string{"circle", "rectangle", "square", "triangle"}
  30. colors = []string{"red", "blue", "green"}
  31. sampleSize = 1000
  32. pts = make([]client.Point, sampleSize)
  33. )
  34. rand.Seed(42)
  35. for i := 0; i < sampleSize; i++ {
  36. pts[i] = client.Point{
  37. Measurement: "shapes",
  38. Tags: map[string]string{
  39. "color": colors[rand.Intn(len(colors))],
  40. "shape": shapes[rand.Intn(len(shapes))],
  41. },
  42. Fields: map[string]interface{}{
  43. "value": rand.Intn(sampleSize),
  44. },
  45. Time: time.Now(),
  46. Precision: "s",
  47. }
  48. }
  49. bps := client.BatchPoints{
  50. Points: pts,
  51. Database: "test",
  52. RetentionPolicy: "autogen",
  53. }
  54. _, err = con.Write(bps)
  55. if err != nil {
  56. log.Fatal(err)
  57. }
  58. }

HTTP连接配置中需要指明实例的公网或者VPC地址和数据库用户、密码(具体可以在实例管理中查询)。在写入前指定数据写入的数据库和保留策略(如果不指定,则写入默认的保留策略),数据可以按行或者Batch的方式写入。为了提高写入吞吐,推荐Batch的写入方式,Batch行数按数据大小可以在100~1000行左右。
Go环境安装见文档,SDK下载见地址,程序编译执行命令为go run main.go

JAVA写入方式

  1. package main;
  2. import java.util.concurrent.TimeUnit;
  3. import org.influxdb.InfluxDB;
  4. import org.influxdb.BatchOptions;
  5. import org.influxdb.InfluxDBFactory;
  6. import org.influxdb.dto.Point;
  7. public class StartMain {
  8. public static void main(String[] args) throws Exception {
  9. StartMain startMain = new StartMain();
  10. try {
  11. startMain.run();
  12. } catch (Exception e) {
  13. System.out.println(e);
  14. }
  15. }
  16. public void run() throws Exception {
  17. InfluxDB influxDB = InfluxDBFactory.connect("https://xxx.influxdata.rds.aliyuncs.com:3242", "test", "test");
  18. String dbName = "test";
  19. influxDB.setDatabase(dbName);
  20. String rpName = "autogen";
  21. influxDB.setRetentionPolicy(rpName);
  22. influxDB.enableBatch(BatchOptions.DEFAULTS);
  23. influxDB.write(Point.measurement("cpu")
  24. .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  25. .addField("idle", 90L)
  26. .addField("user", 9L)
  27. .addField("system", 1L)
  28. .build());
  29. influxDB.write(Point.measurement("disk")
  30. .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  31. .addField("used", 80L)
  32. .addField("free", 1L)
  33. .build());
  34. influxDB.close();
  35. }
  36. }

Java SDK下载见地址,在enableBatch中设置异常捕捉函数可以处理写入异常。

  1. influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(
  2. (failedPoints, throwable) -> { /* custom error handling here */ })
  3. );

Influx CLI写入

TSDB for InfluxDB®支持命令行界面Influx CLI,连接时需指定ssl、host、port、username和password:

  1. influx -ssl -host xxx.influxdata.rds.aliyuncs.com -port 3242 -username admin -password admin

客户端上通过行写入协议可以将数据写入TSDB For InfluxDB®,如:

  1. > INSERT cpu,host=serverA,region=us_west value=0.64

Shell写入

  1. curl -i -XPOST -u test:test 'https://xxx.influxdata.rds.aliyuncs.com:3242/write?db=test' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

Shell的具体用法可以参考文档

数据分析

目前TSDB For InfluxDB®支持丰富的类SQL查询,非常方便从业者使用。具体查询实践可以参考 TSDB For InfluxDB®文档。通过使用这些查询语句和函数,便可以挖掘出埋藏在数据里面的价值。

下面示例是查询每组color的平均值:

  1. > SELECT MEAN("value") FROM "shapes" GROUP BY "color"
  2. name: shapes
  3. tags: color=blue
  4. time mean
  5. ---- ----
  6. 0 216.25
  7. name: shapes
  8. tags: color=green
  9. time mean
  10. ---- ----
  11. 0 434.25
  12. name: shapes
  13. tags: color=red
  14. time mean
  15. ---- ----
  16. 0 540.25


InfluxDB® is a trademark registered by InfluxData, which is not affiliated with, and does not endorse, TSDB for InfluxDB®.