使用Aliyun Log Java Producer写入日志数据

如果您在使用FlinkSparkStorm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地将数据上传到日志服务。

前提条件

您已完成以下操作:

  • 已安装日志服务Java SDK。具体操作,请参见安装Java SDK

什么是Aliyun Log Java Producer

Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。Aliyun LOG Java Producer使用阿里云日志服务提供的顺序写入功能来保证日志的上传顺序。

Aliyun Log Java Producer实现原理请参见Aliyun LOG Java Producer

日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application

工作流程

image

特点

  • 线程安全:Producer接口暴露的所有方法都是线程安全的。

  • 异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。

  • 自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。

  • 行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。

  • 上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。

  • 优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。

应用场景

producer对比原始的API或SDK的优势如下:

  • 高性能

    在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。

  • 异步非阻塞

    在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。随后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。

  • 资源可控制

    可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。

  • 定位问题简单

    如果日志数据发送失败,Producer除了返回状态码,还会返回一个String类型的异常信息,用于描述失败的原因和详细信息。例如,如果发送失败是因为网络连接超时,则返回的异常信息可能是“连接超时”;如果发送失败是因为服务器无响应,则返回的异常信息可能是“服务器无响应”。

使用限制

  • aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

  • 代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。

  • 在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引

费用说明

使用SDK产生的费用和使用控制台产生的费用一致。更多信息,请参见计费概述

步骤一:安装Aliyun Log Java Producer

在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

添加更新完后,如果提示Producer依赖的版本冲突,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

步骤二:配置ProducerConfig

ProducerConfig用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示:

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

参数

类型

描述

totalSizeInBytes

整型

单个Producer实例能缓存的日志大小上限,默认为 100MB。

maxBlockMs

整型

如果Producer可用空间不足,调用者在send方法上的最大阻塞时间,默认为60秒。

如果超过这个时间后所需空间仍无法得到满足,send方法会抛出TimeoutException

如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。

如果您希望send方法一直阻塞直到所需空间得到满足,可将该值设为负数。

ioThreadCount

整型

执行日志发送任务的线程池大小,默认为可用处理器个数。

batchSizeThresholdInBytes

整型

当一个ProducerBatch中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该batch将被发送,默认为512KB,最大可设置成 5MB。

batchCountThreshold

整型

当一个ProducerBatch中缓存的日志条数大于等于 batchCountThreshold时,该batch将被发送,默认4096,最大可设置成40960。

lingerMs

整型

一个ProducerBatch从创建到可发送的逗留时间,默认为2秒,最小可设置成100毫秒。

retries

整型

如果某个ProducerBatch首次发送失败,能够对其重试的次数,默认为10次。

如果retries小于等于 0,该ProducerBatch首次发送失败后将直接进入失败队列。

maxReservedAttempts

整型

每个ProducerBatch每次被尝试发送都对应着一个Attempt,此参数用来控制返回给用户的attempt个数,默认只保留最近的11次attempt信息。

该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。

baseRetryBackoffMs

整型

首次重试的退避时间,默认为100毫秒。

Producer采样指数退避算法,第N次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。

maxRetryBackoffMs

整型

重试的最大退避时间,默认为50秒。

adjustShardHash

布尔

如果调用send方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为true。

buckets

整型

当且仅当adjustShardHash为true时,该参数才生效。此时,producer会自动将shardHash重新分组,分组数量为buckets。

如果两条数据的shardHash不同,它们是无法合并到一起发送的,会降低producer吞吐量。将shardHash重新分组后,能让数据有更多地机会被批量发送。

该参数的取值范围是 [1, 256],且必须是2的整数次幂,默认为64。

步骤三:创建Producer

Producer 支持用户配置AK或STS token。如果使用STS token,需要定期创建新的ProjectConfig然后将其添加到ProjectConfigs里。

LogProducer是接口Producer的实现类,它接收唯一的参数producerConfig。当您准备好producerConfig后,可以按照下列方式创建producer实例。

Producer producer = new LogProducer(producerConfig);

创建producer的同时会创建一系列线程,这是一个相对昂贵的操作,因此建议一个应用共用一个producer实例。一个producer实例包含的线程如下表所示,其中N为该producer实例在当前进程中的编号,从 0 开始。另外,LogProducer提供的所有方法都是线程安全的,可以在多线程环境下安全执行。

线程名格式

数量

描述

aliyun-log-producer-<N>-mover

1

负责将满足发送条件的batch投递到发送线程池里。

aliyun-log-producer-<N>-io-thread

ioThreadCount

IOThreadPool中真正用于执行数据发送任务的线程。

aliyun-log-producer-<N>-success-batch-handler

1

用于处理发送成功的batch。

aliyun-log-producer-<N>-failure-batch-handler

1

用于处理发送失败的batch。

步骤四:配置日志项目

ProjectConfig包含目标Project的服务入口信息以及表征调用者身份的访问凭证。每个日志项目对应一个ProjectConfig对象。

可以按照如下方式创建实例。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

步骤五:发送数据

创建Future或Callback

在使用Aliyun Log Java Producer发送日志数据时,需要指定一个回调函数来处理发送过程中的各种情况。当日志数据发送成功时,回调函数会被调用,并返回一个发送结果;当日志数据发送失败时,回调函数也会被调用,并传入一个异常对象。

说明

如果获取结果后,应用的处理逻辑比较简单且不会造成producer阻塞,建议直接使用callback。否则,建议使用ListenableFuture,在单独的线程(池)中执行后续业务

方法的各个参数含义如下:

参数

描述

project

待发送数据的目标 project。

logstore

待发送数据的目标 logStore。

logTem

待发送数据。

completed

Java提供的一个原子类型,用来确保所有日志发送完成(成功或者失败)。

发送数据

Producer接口提供多种发送方法,方法的各个参数含义如下。

参数

描述

是否必选

project

目标Project。

logStore

目标LogStore。

logItem

要发送的日志/日志列表。

topic

日志主题

说明

如果留空或没有指定,该字段将被赋予""。

source

发送源。

说明

如果留空或没有指定,该字段将被赋予producer所在宿主机的 IP。

shardHash

可为发送的日志设置自定义哈希,服务端将根据此哈希选择对应的日志库Shard分片写入日志。

说明

如果留空或没有指定,数据将被随机写入目标LogStore的某个shard中。

callback

可设置一个回调函数。该回调函数将在日志被成功发送或者重试多次失败后被丢弃时调用。

常见异常

异常

说明

TimeoutException

当Producer缓存的日志大小超过设定的内存上限时,且阻塞maxBlockMs毫秒后仍未获取到足够内存时,将抛出TimeoutException。

maxBlockMs 为-1时,阻塞没有时间上限,将永远不会抛出 TimeoutException。

IllegalStateException

当Producer已经处于关闭状态(调用过close方法)时,再调用send 方法,会抛出IllegalStateException。

步骤六:获取发送数据

由于producer提供的所有发送方法都是异步的,需要通过返回的future或者传入的callback获取发送结果。

Future

Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许你注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行,完整样例请参见SampleProducerWithFuture.java

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully send, or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // 只有进程退出的时候,才需要考虑如下的逻辑。
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

Callback

Callback由producer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。另外,在callback中调用send方法进行重试也是不建议的,您可以在ListenableFuture的callback中进行重试。完整样例请参见SampleProducerWithCallback.java

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully send, or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // 只有进程退出的时候,才需要考虑如下的逻辑。
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

步骤七:关闭Producer

当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。

安全关闭

在大多数情况下,建议您使用安全关闭。安全关闭对应的方法是close(),它会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行,返回future全部被设置后才会返回。

虽然要等到数据全部处理完成,但Producer被关闭后,缓存的batch会被立刻处理且不会被重试。因此,如果callback不被阻塞,close方法往往能在很短的时间内返回。

有限关闭

如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。

常见问题

写入数据次数是否存在限制?

  • 日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

为什么数据没有写入日志服务?

如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。

  1. 检查您项目中引入的aliyun-log-produceraliyun-logprotobuf-java Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。

  2. Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。

  3. 如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用producer.close()方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()方法。

  4. Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。

  5. 如果通过上述步骤仍然没有解决,请提工单

相关文档