文档

使用Aliyun Log Java Producer写入日志数据

更新时间:

本文介绍日志服务Aliyun Log Java Producer类库的使用方法。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • 已安装Java SDK。具体操作,请参见安装Java SDK

背景信息

Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。

特点

  • 线程安全:Producer接口暴露的所有方法都是线程安全的。

  • 异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。

  • 自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。

  • 行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。

  • 上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。

  • 优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。

优势

  • 高性能

    在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。

  • 异步非阻塞

    在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。之后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。

  • 资源可控制

    可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。

安装Aliyun Log Java Producer

在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.10</version>
</dependency>

jar-with-dependency版本,可以解决Producer依赖的版本冲突问题。在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.35</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

Java代码示例

安装完成后,您就可以使用Producer类库编写Java代码。

说明
  • aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写PutLogs

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

  • 代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。

  • 在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引

  • Callback

    本示例中,创建一个SampleProducerWithCallback.java文件,将生成的日志数据上传至日志服务。

    • 示例代码

      import com.aliyun.openservices.aliyun.log.producer.Callback;
      import com.aliyun.openservices.aliyun.log.producer.LogProducer;
      import com.aliyun.openservices.aliyun.log.producer.Producer;
      import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
      import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
      import com.aliyun.openservices.aliyun.log.producer.Result;
      import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
      import com.aliyun.openservices.log.common.LogItem;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.atomic.AtomicLong;
      
      public class SampleProducerWithCallback {
      
          private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
      
          private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
      
          public static void main(String[] args) throws InterruptedException {
              final String project = "example-project";
              final String logstore = "example-logstore";
              String endpoint = "example-endpoint";
              // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
              String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
              String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
      
              ProducerConfig producerConfig = new ProducerConfig();
              final Producer producer = new LogProducer(producerConfig);
              producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
      
              int nTask = 100;
              // The number of logs that have finished (either successfully send, or failed).
              final AtomicLong completed = new AtomicLong(0);
              final CountDownLatch latch = new CountDownLatch(nTask);
      
              for (int i = 0; i < nTask; ++i) {
                  threadPool.submit(
                          new Runnable() {
                              @Override
                              public void run() {
             //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                                  LogItem logItem = new LogItem();
                                  logItem.PushBack("key1", "foo");
                                  logItem.PushBack("key2", "bar");
                                  try {
                                      producer.send(
                                              project,
                                              logstore,
                                              "your-topic",
                                              "your-source",
                                              logItem,
                                              new SampleCallback(project, logstore, logItem, completed));
                                  } catch (InterruptedException e) {
                                      LOGGER.warn("The current thread has been interrupted during send logs.");
                                  } catch (Exception e) {
                                      LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                                  } finally {
                                      latch.countDown();
                                  }
                              }
                          });
              }
      
              // 只有进程退出的时候,才需要考虑如下的逻辑。
              latch.await();
              threadPool.shutdown();
              try {
                  producer.close();
              } catch (InterruptedException e) {
                  LOGGER.warn("The current thread has been interrupted from close.");
              } catch (ProducerException e) {
                  LOGGER.info("Failed to close producer, e=", e);
              }
      
              LOGGER.info("All log complete, completed={}", completed.get());
          }
      
          private static final class SampleCallback implements Callback {
              private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
              private final String project;
              private final String logStore;
              private final LogItem logItem;
              private final AtomicLong completed;
      
              SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
                  this.project = project;
                  this.logStore = logStore;
                  this.logItem = logItem;
                  this.completed = completed;
              }
      
              @Override
              public void onCompletion(Result result) {
                  try {
                      if (result.isSuccessful()) {
                          LOGGER.info("Send log successfully.");
                      } else {
                          LOGGER.error(
                                  "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                                  project,
                                  logStore,
                                  logItem.ToJsonString(),
                                  result);
                      }
                  } finally {
                      completed.getAndIncrement();
                  }
              }
          }
      }
    • 预期结果

      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
      ......

    更多信息,请参见Aliyun Log Java Producer

  • Future

    本示例中,创建一个SampleProducerWithFuture.java文件,将生成的日志数据上传至日志服务。

    • 代码示例

      import com.aliyun.openservices.aliyun.log.producer.LogProducer;
      import com.aliyun.openservices.aliyun.log.producer.Producer;
      import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
      import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
      import com.aliyun.openservices.aliyun.log.producer.Result;
      import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
      import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
      import com.aliyun.openservices.log.common.LogItem;
      import com.google.common.util.concurrent.FutureCallback;
      import com.google.common.util.concurrent.Futures;
      import com.google.common.util.concurrent.ListenableFuture;
      import org.checkerframework.checker.nullness.qual.Nullable;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicLong;
      
      public class SampleProducerWithFuture {
      
          private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);
      
          private static final ExecutorService threadPool = Executors
                  .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1));
      
          public static void main(String[] args) throws InterruptedException {
              final String project = "example-project";
              final String logstore = "example-logstore";
              String endpoint = "example-endpoint";
              // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
              String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
              String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
      
              ProducerConfig producerConfig = new ProducerConfig();
              final Producer producer = new LogProducer(producerConfig);
              producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
      
              int n = 100;
              // The number of logs that have finished (either successfully send, or failed).
              final AtomicLong completed = new AtomicLong(0);
      
              for (int i = 0; i < n; ++i) {
                  List<LogItem> logItems = new ArrayList<LogItem>();
                  for (int j = 0; j < 10; ++j) {
                 //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.
                      LogItem logItem = new LogItem();
                      logItem.PushBack("key1", "foo" + j);
                      logItem.PushBack("key2", "bar" + j);
                      logItems.add(logItem);
                  }
                  try {
                      ListenableFuture<Result> f = producer.send(project, logstore, logItems);
                      Futures.addCallback(
                              f, new SampleFutureCallback(project, logstore, logItems, completed), threadPool);
                  } catch (InterruptedException e) {
                      LOGGER.warn("The current thread has been interrupted during send logs.");
                  } catch (Exception e) {
                      LOGGER.error("Failed to send logs, e=", e);
                  }
              }
      
              try {
                  producer.close();
              } catch (InterruptedException e) {
                  LOGGER.warn("The current thread has been interrupted from close.");
              } catch (ProducerException e) {
                  LOGGER.info("Failed to close producer, e=", e);
              }
      
              threadPool.shutdown();
              while (!threadPool.isTerminated()) {
                  threadPool.awaitTermination(100, TimeUnit.MILLISECONDS);
              }
              LOGGER.info("All log complete, completed={}", completed.get());
          }
      
          private static final class SampleFutureCallback implements FutureCallback<Result> {
      
              private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);
      
              private final String project;
              private final String logStore;
              private final List<LogItem> logItems;
              private final AtomicLong completed;
      
              SampleFutureCallback(
                      String project, String logStore, List<LogItem> logItems, AtomicLong completed) {
                  this.project = project;
                  this.logStore = logStore;
                  this.logItems = logItems;
                  this.completed = completed;
              }
      
              @Override
              public void onSuccess(@Nullable Result result) {
                  LOGGER.info("Send logs successfully.");
                  completed.getAndIncrement();
              }
      
              @Override
              public void onFailure(Throwable t) {
                  if (t instanceof ResultFailedException) {
                      Result result = ((ResultFailedException) t).getResult();
                      LOGGER.error(
                              "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result);
                  } else {
                      LOGGER.error("Failed to send log, e=", t);
                  }
                  completed.getAndIncrement();
              }
          }
      }
    • 预期结果

      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo0","key2":"bar0"}
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo1","key2":"bar1"}
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo2","key2":"bar2"}
      {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo3","key2":"bar3"}
      ......

更多信息,请参见Aliyun Log Java Producer

此外,日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application

常见问题

写入数据次数是否存在限制?

  • 日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

为什么数据没有写入日志服务?

如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。

  1. 检查您项目中引入的aliyun-log-produceraliyun-logprotobuf-java Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。

  2. Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。

  3. 如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用producer.close()方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()方法。

  4. Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。

  5. 如果通过上述步骤仍然没有解决,请提工单

相关文档

  • 在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表

  • 阿里云OpenAPI开发者门户提供调试、SDK、示例和配套文档。通过OpenAPI,您无需手动封装请求和签名操作,就可以快速对日志服务API进行调试。更多信息,请参见OpenAPI开发者门户

  • 为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI

  • 更多示例代码,请参见Aliyun Log Java SDK on GitHub

  • 本页导读 (1)
文档反馈