使用Aliyun Log Java Producer写入日志数据

如果您在使用FlinkSparkStorm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地将数据上传到日志服务。

前提条件

您已完成以下操作:

  • 已安装日志服务Java SDK。具体操作,请参见安装Java SDK

什么是Aliyun Log Java Producer

Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的APISDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。Aliyun LOG Java Producer使用阿里云日志服务提供的顺序写入功能来保证日志的上传顺序。

Aliyun Log Java Producer实现原理请参见Aliyun LOG Java Producer

日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application

工作流程

image

特点

  • 线程安全:Producer接口暴露的所有方法都是线程安全的。

  • 异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。

  • 自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。

  • 行为追溯:通过CallbackFuture能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。

  • 上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。

  • 优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。

应用场景

producer对比原始的APISDK的优势如下:

  • 高性能

    在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。

  • 异步非阻塞

    在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。随后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。

  • 资源可控制

    可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。

  • 定位问题简单

    如果日志数据发送失败,Producer除了返回状态码,还会返回一个String类型的异常信息,用于描述失败的原因和详细信息。例如,如果发送失败是因为网络连接超时,则返回的异常信息可能是“连接超时”;如果发送失败是因为服务器无响应,则返回的异常信息可能是“服务器无响应”。

使用限制

  • aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

  • 代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。

  • 在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引

费用说明

使用SDK产生的费用和使用控制台产生的费用一致。更多信息,请参见计费概述

步骤一:安装Aliyun Log Java Producer

Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

添加更新完后,如果提示Producer依赖的版本冲突,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

步骤二:配置ProducerConfig

ProducerConfig用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示:

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

参数

类型

描述

totalSizeInBytes

整型

单个Producer实例能缓存的日志大小上限,默认为 100MB。

maxBlockMs

整型

如果Producer可用空间不足,调用者在send方法上的最大阻塞时间,默认为60秒。

如果超过这个时间后所需空间仍无法得到满足,send方法会抛出TimeoutException

如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。

如果您希望send方法一直阻塞直到所需空间得到满足,可将该值设为负数。

ioThreadCount

整型

执行日志发送任务的线程池大小,默认为可用处理器个数。

batchSizeThresholdInBytes

整型

当一个ProducerBatch中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该batch将被发送,默认为512KB,最大可设置成 5MB。

batchCountThreshold

整型

当一个ProducerBatch中缓存的日志条数大于等于 batchCountThreshold时,该batch将被发送,默认4096,最大可设置成40960。

lingerMs

整型

一个ProducerBatch从创建到可发送的逗留时间,默认为2秒,最小可设置成100毫秒。

retries

整型

如果某个ProducerBatch首次发送失败,能够对其重试的次数,默认为10次。

如果retries小于等于 0,该ProducerBatch首次发送失败后将直接进入失败队列。

maxReservedAttempts

整型

每个ProducerBatch每次被尝试发送都对应着一个Attempt,此参数用来控制返回给用户的attempt个数,默认只保留最近的11attempt信息。

该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。

baseRetryBackoffMs

整型

首次重试的退避时间,默认为100毫秒。

Producer采样指数退避算法,第N次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。

maxRetryBackoffMs

整型

重试的最大退避时间,默认为50秒。

adjustShardHash

布尔

如果调用send方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为true。

buckets

整型

当且仅当adjustShardHashtrue时,该参数才生效。此时,producer会自动将shardHash重新分组,分组数量为buckets。

如果两条数据的shardHash不同,它们是无法合并到一起发送的,会降低producer吞吐量。将shardHash重新分组后,能让数据有更多地机会被批量发送。

该参数的取值范围是 [1, 256],且必须是2的整数次幂,默认为64。

步骤三:创建Producer

Producer 支持用户配置AKSTS token。如果使用STS token,需要定期创建新的ProjectConfig然后将其添加到ProjectConfigs里。

LogProducer是接口Producer的实现类,它接收唯一的参数producerConfig。当您准备好producerConfig后,可以按照下列方式创建producer实例。

Producer producer = new LogProducer(producerConfig);

创建producer的同时会创建一系列线程,这是一个相对昂贵的操作,因此建议一个应用共用一个producer实例。一个producer实例包含的线程如下表所示,其中N为该producer实例在当前进程中的编号,从 0 开始。另外,LogProducer提供的所有方法都是线程安全的,可以在多线程环境下安全执行。

线程名格式

数量

描述

aliyun-log-producer-<N>-mover

1

负责将满足发送条件的batch投递到发送线程池里。

aliyun-log-producer-<N>-io-thread

ioThreadCount

IOThreadPool中真正用于执行数据发送任务的线程。

aliyun-log-producer-<N>-success-batch-handler

1

用于处理发送成功的batch。

aliyun-log-producer-<N>-failure-batch-handler

1

用于处理发送失败的batch。

步骤四:配置日志项目

ProjectConfig包含目标Project的服务入口信息以及表征调用者身份的访问凭证。每个日志项目对应一个ProjectConfig对象。

可以按照如下方式创建实例。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

步骤五:发送数据

创建FutureCallback

在使用Aliyun Log Java Producer发送日志数据时,需要指定一个回调函数来处理发送过程中的各种情况。当日志数据发送成功时,回调函数会被调用,并返回一个发送结果;当日志数据发送失败时,回调函数也会被调用,并传入一个异常对象。

说明

如果获取结果后,应用的处理逻辑比较简单且不会造成producer阻塞,建议直接使用callback。否则,建议使用ListenableFuture,在单独的线程(池)中执行后续业务

方法的各个参数含义如下:

参数

描述

project

待发送数据的目标 project。

logstore

待发送数据的目标 logStore。

logTem

待发送数据。

completed

Java提供的一个原子类型,用来确保所有日志发送完成(成功或者失败)。

发送数据

Producer接口提供多种发送方法,方法的各个参数含义如下。

参数

描述

是否必选

project

目标Project。

logStore

目标LogStore。

logItem

要发送的日志/日志列表。

topic

日志主题

说明

如果留空或没有指定,该字段将被赋予""。

source

发送源。

说明

如果留空或没有指定,该字段将被赋予producer所在宿主机的 IP。

shardHash

可为发送的日志设置自定义哈希,服务端将根据此哈希选择对应的日志库Shard分片写入日志。

说明

如果留空或没有指定,数据将被随机写入目标LogStore的某个shard中。

callback

可设置一个回调函数。该回调函数将在日志被成功发送或者重试多次失败后被丢弃时调用。

常见异常

异常

说明

TimeoutException

Producer缓存的日志大小超过设定的内存上限时,且阻塞maxBlockMs毫秒后仍未获取到足够内存时,将抛出TimeoutException。

maxBlockMs 为-1时,阻塞没有时间上限,将永远不会抛出 TimeoutException。

IllegalStateException

Producer已经处于关闭状态(调用过close方法)时,再调用send 方法,会抛出IllegalStateException。

步骤六:获取发送数据

由于producer提供的所有发送方法都是异步的,需要通过返回的future或者传入的callback获取发送结果。

Future

Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许你注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行,完整样例请参见SampleProducerWithFuture.java

package com.aliyun.openservices.aliyun.log.producer.sample;

import com.aliyun.openservices.aliyun.log.producer.*;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.log.common.LogItem;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProducerWithFuture {

  private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);

  private static final ExecutorService EXECUTOR_SERVICE = Executors
      .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1));

  public static void main(String[] args) throws InterruptedException {
    Producer producer = Utils.createProducer();
    int n = 100;
    int size = 20;

    // The number of logs that have finished (either successfully send, or failed)
    final AtomicLong completed = new AtomicLong(0);

    for (int i = 0; i < n; ++i) {
      List<LogItem> logItems = Utils.generateLogItems(size);
      try {
        String project = System.getenv("PROJECT");
        String logStore = System.getenv("LOG_STORE");
        ListenableFuture<Result> f = producer.send(project, logStore, logItems);
        Futures.addCallback(
            f, new SampleFutureCallback(project, logStore, logItems, completed), EXECUTOR_SERVICE);
      } catch (InterruptedException e) {
        LOGGER.warn("The current thread has been interrupted during send logs.");
      } catch (Exception e) {
        if (e instanceof MaxBatchCountExceedException) {
          LOGGER.error("The logs exceeds the maximum batch count, e={}", e);
        } else if (e instanceof LogSizeTooLargeException) {
          LOGGER.error("The size of log is larger than the maximum allowable size, e={}", e);
        } else if (e instanceof TimeoutException) {
          LOGGER.error("The time taken for allocating memory for the logs has surpassed., e={}", e);
        } else {
          LOGGER.error("Failed to send logs, e=", e);
        }
      }
    }

    Utils.doSomething();

    try {
      producer.close();
    } catch (InterruptedException e) {
      LOGGER.warn("The current thread has been interrupted from close.");
    } catch (ProducerException e) {
      LOGGER.info("Failed to close producer, e=", e);
    }

    EXECUTOR_SERVICE.shutdown();
    while (!EXECUTOR_SERVICE.isTerminated()) {
      EXECUTOR_SERVICE.awaitTermination(100, TimeUnit.MILLISECONDS);
    }
    LOGGER.info("All log complete, completed={}", completed.get());
  }

  private static final class SampleFutureCallback implements FutureCallback<Result> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);

    private final String project;

    private final String logStore;

    private final List<LogItem> logItems;

    private final AtomicLong completed;

    SampleFutureCallback(
        String project, String logStore, List<LogItem> logItems, AtomicLong completed) {
      this.project = project;
      this.logStore = logStore;
      this.logItems = logItems;
      this.completed = completed;
    }

    @Override
    public void onSuccess(@Nullable Result result) {
      LOGGER.info("Send logs successfully.");
      completed.getAndIncrement();
    }

    @Override
    public void onFailure(Throwable t) {
      if (t instanceof ResultFailedException) {
        Result result = ((ResultFailedException) t).getResult();
        LOGGER.error(
            "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result);
      } else {
        LOGGER.error("Failed to send log, e=", t);
      }
      completed.getAndIncrement();
    }
  }
}

Callback

Callbackproducer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。另外,在callback中调用send方法进行重试也是不建议的,您可以在ListenableFuturecallback中进行重试。完整样例请参见SampleProducerWithCallback.java

package com.aliyun.openservices.aliyun.log.producer.sample;

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.log.common.LogItem;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleProducerWithCallback {

  private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);

  public static void main(String[] args) throws InterruptedException {
    final Producer producer = Utils.createProducer();

    int nTask = 100;

    // The monotonically increasing sequence number we will put in the data of each log
    final AtomicLong sequenceNumber = new AtomicLong(0);

    // The number of logs that have finished (either successfully send, or failed)
    final AtomicLong completed = new AtomicLong(0);

    final CountDownLatch latch = new CountDownLatch(nTask);

    for (int i = 0; i < nTask; ++i) {
      EXECUTOR_SERVICE.submit(
          new Runnable() {
            @Override
            public void run() {
              LogItem logItem = Utils.generateLogItem(sequenceNumber.getAndIncrement());
              try {
                String project = System.getenv("PROJECT");
                String logStore = System.getenv("LOG_STORE");
                producer.send(
                    project,
                    logStore,
                    Utils.getTopic(),
                    Utils.getSource(),
                    logItem,
                    new SampleCallback(project, logStore, logItem, completed));
              } catch (InterruptedException e) {
                LOGGER.warn("The current thread has been interrupted during send logs.");
              } catch (Exception e) {
                if (e instanceof LogSizeTooLargeException) {
                  LOGGER.error(
                      "The size of log is larger than the maximum allowable size, e={}", e);
                } else if (e instanceof TimeoutException) {
                  LOGGER.error(
                      "The time taken for allocating memory for the logs has surpassed., e={}", e);
                } else {
                  LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                }
              } finally {
                latch.countDown();
              }
            }
          });
    }
    latch.await();
    EXECUTOR_SERVICE.shutdown();

    Utils.doSomething();

    try {
      producer.close();
    } catch (InterruptedException e) {
      LOGGER.warn("The current thread has been interrupted from close.");
    } catch (ProducerException e) {
      LOGGER.info("Failed to close producer, e=", e);
    }

    LOGGER.info("All log complete, completed={}", completed.get());
  }

  private static final class SampleCallback implements Callback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);

    private final String project;

    private final String logStore;

    private final LogItem logItem;

    private final AtomicLong completed;

    SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
      this.project = project;
      this.logStore = logStore;
      this.logItem = logItem;
      this.completed = completed;
    }

    @Override
    public void onCompletion(Result result) {
      try {
        if (result.isSuccessful()) {
          LOGGER.info("Send log successfully.");
        } else {
          LOGGER.error(
              "Failed to send log, project={}, logStore={}, logItem={}, result={}",
              project,
              logStore,
              logItem.ToJsonString(),
              result);
        }
      } finally {
        completed.getAndIncrement();
      }
    }
  }
}

步骤七:关闭Producer

当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。

安全关闭

在大多数情况下,建议您使用安全关闭。安全关闭对应的方法是close(),它会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行,返回future全部被设置后才会返回。

虽然要等到数据全部处理完成,但Producer被关闭后,缓存的batch会被立刻处理且不会被重试。因此,如果callback不被阻塞,close方法往往能在很短的时间内返回。

有限关闭

如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的timeoutMsProducer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。

常见问题

写入数据次数是否存在限制?

  • 日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

为什么数据没有写入日志服务?

如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。

  1. 检查您项目中引入的aliyun-log-produceraliyun-logprotobuf-java Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。

  2. Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。

  3. 如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用producer.close()方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()方法。

  4. Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。

  5. 如果通过上述步骤仍然没有解决,请提工单

相关文档