通过SDK写入时序数据

日志服务支持通过SDK写入时序数据,本文列举了Java、Golang和Python语言的SDK demo。

说明
  • 使用SDK写入时序数据时,需遵循时序数据格式

  • 尽可能使用Producer Library发送数据,目前支持ProducerLibrary的语言有JavaGolangC。使用其它语言时,可以将多条数据放到同一个LogGroup中发送,尽可能减少网络请求次数。

  • __time_nano__字段支持秒、毫秒、微秒和纳秒。

  • 本文中的SDK采用环境变量方式配置AccessKey。具体操作,请参见配置环境变量

Java SDK示例

安装步骤可参见安装Alyun Lo Java Producer

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
    private static final Random random = new Random();

    public static void main(String[] args) throws InterruptedException {
        final String project = "";
        final String logstore = "";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        final String endpoint = "https://cn-hangzhou.log.aliyuncs.com";
        // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        int sendThreadCount = 8;
        final int times = 10;
        LOGGER.info(
                "project={}, logstore={}, endpoint={}, sendThreadCount={}, times={}",
                project, logstore, endpoint, sendThreadCount, times);
        ExecutorService executorService = Executors.newFixedThreadPool(sendThreadCount);
        ProducerConfig producerConfig = new ProducerConfig();
        producerConfig.setBatchSizeThresholdInBytes(3 * 1024 * 1024);
        producerConfig.setBatchCountThreshold(40960);

        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        final AtomicInteger completedCount = new AtomicInteger(0);
        LOGGER.info("Test started.");
        long t1 = System.currentTimeMillis();
        final Map<String, String> labels = new HashMap<String, String>();
        labels.put("test_k", "test_v");
        for (int i = 0; i < sendThreadCount; ++i) {
            executorService.submit(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                for (int i = 0; i < times; ++i) {
                                    int r = random.nextInt(times);
                                    producer.send(project, logstore, generateTopic(r), generateSource(r),
                                            buildLogItem("test_metric", labels, i),
                                            new Callback() {
                                                @Override
                                                public void onCompletion(Result result) {
                                                    completedCount.incrementAndGet();
                                                    if (!result.isSuccessful()) {
                                                        LOGGER.error(
                                                                "Failed to send log, project={}, logstore={}, result={}",
                                                                project,
                                                                logstore,
                                                                result);
                                                    }
                                                }
                                            });
                                }
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, e=", e);
                            }
                        }
                    });
        }
        while (completedCount.get() < sendThreadCount * times) {
            Thread.sleep(100);
        }
        long t2 = System.currentTimeMillis();
        LOGGER.info("Test end.");
        LOGGER.info("======Summary======");
        LOGGER.info("Total count " + sendThreadCount * times + ".");
        long timeCost = t2 - t1;
        LOGGER.info("Time cost " + timeCost + " millis");
        try {
            producer.close();
        } catch (ProducerException e) {
            LOGGER.error("Failed to close producer, e=", e);
        }
        executorService.shutdown();
    }

    private static String generateTopic(int r) {
        return "topic-" + r % 5;
    }

    private static String generateSource(int r) {
        return "source-" + r % 10;
    }

    /**
     * @param metricName: the metric name, eg: http_requests_count
     * @param labels:     labels map, eg: {'idc': 'idc1', 'ip': '192.0.2.0', 'hostname': 'appserver1'}
     * @param value:      double value, eg: 1.234
     * @return LogItem
     */
    public static LogItem buildLogItem(String metricName, Map<String, String> labels, double value) {
        String labelsKey = "__labels__";
        String timeKey = "__time_nano__";
        String valueKey = "__value__";
        String nameKey = "__name__";
        LogItem logItem = new LogItem();
        int timeInSec = (int) (System.currentTimeMillis() / 1000);
        logItem.SetTime(timeInSec);
        logItem.PushBack(timeKey, timeInSec + "000000");
        logItem.PushBack(nameKey, metricName);
        logItem.PushBack(valueKey, String.valueOf(value));

        // 按照字典序对labels排序, 如果您的labels已排序, 请忽略此步骤。
        TreeMap<String, String> sortedLabels = new TreeMap<String, String>(labels);
        StringBuilder labelsBuilder = new StringBuilder();

        boolean hasPrev = false;
        for (Map.Entry<String, String> entry : sortedLabels.entrySet()) {
            if (hasPrev) {
                labelsBuilder.append("|");
            }
            hasPrev = true;
            labelsBuilder.append(entry.getKey());
            labelsBuilder.append("#$#");
            labelsBuilder.append(entry.getValue());
        }
        logItem.PushBack(labelsKey, labelsBuilder.toString());
        return logItem;
    }
}
            

Python SDK示例

更多信息,请参见安装日志服务Python SDK

# encoding: utf-8
import time
import os
from aliyun.log import *

def build_log_item(metric_name, labels, value):
    """
    build log item
    :param metric_name: the metric name, eg: http_requests_count
    :param labels: dict labels, eg: {'idc': 'idc1', 'ip': '192.0.2.0', 'hostname': 'appserver1'}
    :param value: double value, eg: 1.234
    :return: LogItem
    """
    sorted_labels = sorted(labels.items())
    log_item = LogItem()
    now = int(time.time())
    log_item.set_time(now)
    contents = [('__time_nano__', str(now)), ('__name__', metric_name), ('__value__', str(value))]

    labels_str = ''
    for i, kv in enumerate(sorted_labels):
        labels_str += kv[0]
        labels_str += '#$#'
        labels_str += kv[1]
        if i < len(sorted_labels) - 1:
            labels_str += '|'

    contents.append(('__labels__', labels_str))
    log_item.set_contents(contents)
    return log_item

def main():
    # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
    endpoint = 'cn-hangzhou.log.aliyuncs.com'
    # 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
    accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
    project = ''
    logstore = ''

    client = LogClient(endpoint, accessKeyId, accessKey)
    item1 = build_log_item('test1', {'k1': 'v1'}, 1)
    item2 = build_log_item('test2', {'k2': 'v2'}, 2)
    item3 = build_log_item('test3', {'k3': 'v3'}, 3)
    items = [item1, item2, item3]
    res = client.put_logs(PutLogsRequest(project=project, logstore=logstore, logitems=items))
    res.log_print()

if __name__ == '__main__':
    main()

GO SDK示例

更多信息,请参见安装Go SDK

package main

import (
   "fmt"
   sls "github.com/aliyun/aliyun-log-go-sdk"
   "github.com/aliyun/aliyun-log-go-sdk/producer"
   "github.com/golang/protobuf/proto"
   "os"
   "os/signal"
   "sort"
   "strconv"
   "sync"
   "time"
)

func buildLogItem(metricName string, labels map[string]string, value float64) *sls.Log {

   now := uint32(time.Now().Unix())
   log := &sls.Log{Time: proto.Uint32(now)}

   var contents []*sls.LogContent
   contents = append(contents, &sls.LogContent{
      Key:   proto.String("__time_nano__"),
      Value: proto.String(strconv.FormatInt(int64(now), 10)),
   })
   contents = append(contents, &sls.LogContent{
      Key:   proto.String("__name__"),
      Value: proto.String(metricName),
   })
   contents = append(contents, &sls.LogContent{
      Key:   proto.String("__value__"),
      Value: proto.String(strconv.FormatFloat(value, 'f', 6, 64)),
   })

   keys := make([]string, 0, len(labels))
   for k := range labels {
      keys = append(keys, k)
   }
   sort.Strings(keys)

   labelsStr := ""
   for i, k := range keys {
      labelsStr += k
      labelsStr += "#$#"
      labelsStr += labels[k]
      if i < len(keys) - 1 {
         labelsStr += "|"
      }
   }

   contents = append(contents, &sls.LogContent{Key: proto.String("__labels__"), Value: proto.String(labelsStr)})
   log.Contents = contents
   return log

}

func main() {
   project := ""
   logstore := ""

   producerConfig := producer.GetDefaultProducerConfig()
   // 日志服务的服务入口。此处以杭州为例,其它地域请根据实际情况填写。
   producerConfig.Endpoint = "https://cn-hangzhou.log.aliyuncs.com"
   // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
   producerConfig.AccessKeyID = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
   producerConfig.AccessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
   producerInstance, err := producer.NewProducer(producerConfig)
   if err != nil {
     panic(err)
   }
   ch := make(chan os.Signal)
   signal.Notify(ch)
   producerInstance.Start()
   var m sync.WaitGroup
   for i := 0; i < 10; i++ {
      m.Add(1)
      go func() {
         defer m.Done()
         for i := 0; i < 1000; i++ {
            // GenerateLog  is producer's function for generating SLS format logs
            // GenerateLog has low performance, and native Log interface is the best choice for high performance.
            log := buildLogItem("test_metric", map[string]string{"test_k":"test_v"}, float64(i))
            err := producerInstance.SendLog(project, logstore, "topic", "127.0.0.1", log)
            if err != nil {
               fmt.Println(err)
            }
         }
      }()
   }
   m.Wait()
   fmt.Println("Send completion")
   if _, ok := <-ch; ok {
      fmt.Println("Get the shutdown signal and start to shut down")
      producerInstance.Close(60000)
   }
}