如果您在使用Flink、Spark、Storm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地将数据上传到日志服务。
前提条件
您已完成以下操作:
已安装日志服务Java SDK。具体操作,请参见安装Java SDK。
什么是Aliyun Log Java Producer
Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。Aliyun LOG Java Producer使用阿里云日志服务提供的顺序写入功能来保证日志的上传顺序。
Aliyun Log Java Producer实现原理请参见Aliyun LOG Java Producer。
日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application。
工作流程
特点
线程安全:Producer接口暴露的所有方法都是线程安全的。
异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。
自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。
行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。
上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。
优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。
应用场景
producer对比原始的API或SDK的优势如下:
高性能
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
异步非阻塞
在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。随后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。
资源可控制
可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。
定位问题简单
如果日志数据发送失败,Producer除了返回状态码,还会返回一个String类型的异常信息,用于描述失败的原因和详细信息。例如,如果发送失败是因为网络连接超时,则返回的异常信息可能是“连接超时”;如果发送失败是因为服务器无响应,则返回的异常信息可能是“服务器无响应”。
使用限制
费用说明
使用SDK产生的费用和使用控制台产生的费用一致。更多信息,请参见计费概述。
步骤一:安装Aliyun Log Java Producer
在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.22</version>
</dependency>
添加更新完后,如果提示Producer依赖的版本冲突,在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.114</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
步骤二:配置ProducerConfig
ProducerConfig用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示:
Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);
参数 | 类型 | 描述 |
totalSizeInBytes | 整型 | 单个Producer实例能缓存的日志大小上限,默认为 100MB。 |
maxBlockMs | 整型 | 如果Producer可用空间不足,调用者在send方法上的最大阻塞时间,默认为60秒。 如果超过这个时间后所需空间仍无法得到满足,send方法会抛出TimeoutException。 如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。 如果您希望send方法一直阻塞直到所需空间得到满足,可将该值设为负数。 |
ioThreadCount | 整型 | 执行日志发送任务的线程池大小,默认为可用处理器个数。 |
batchSizeThresholdInBytes | 整型 | 当一个ProducerBatch中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该batch将被发送,默认为512KB,最大可设置成 5MB。 |
batchCountThreshold | 整型 | 当一个ProducerBatch中缓存的日志条数大于等于 batchCountThreshold时,该batch将被发送,默认4096,最大可设置成40960。 |
lingerMs | 整型 | 一个ProducerBatch从创建到可发送的逗留时间,默认为2秒,最小可设置成100毫秒。 |
retries | 整型 | 如果某个ProducerBatch首次发送失败,能够对其重试的次数,默认为10次。 如果retries小于等于 0,该ProducerBatch首次发送失败后将直接进入失败队列。 |
maxReservedAttempts | 整型 | 每个ProducerBatch每次被尝试发送都对应着一个Attempt,此参数用来控制返回给用户的attempt个数,默认只保留最近的11次attempt信息。 该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
baseRetryBackoffMs | 整型 | 首次重试的退避时间,默认为100毫秒。 Producer采样指数退避算法,第N次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
maxRetryBackoffMs | 整型 | 重试的最大退避时间,默认为50秒。 |
adjustShardHash | 布尔 | 如果调用send方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为true。 |
buckets | 整型 | 当且仅当adjustShardHash为true时,该参数才生效。此时,producer会自动将shardHash重新分组,分组数量为buckets。 如果两条数据的shardHash不同,它们是无法合并到一起发送的,会降低producer吞吐量。将shardHash重新分组后,能让数据有更多地机会被批量发送。 该参数的取值范围是 [1, 256],且必须是2的整数次幂,默认为64。 |
步骤三:创建Producer
Producer 支持用户配置AK或STS token。如果使用STS token,需要定期创建新的ProjectConfig然后将其添加到ProjectConfigs里。
LogProducer是接口Producer的实现类,它接收唯一的参数producerConfig。当您准备好producerConfig后,可以按照下列方式创建producer实例。
Producer producer = new LogProducer(producerConfig);
创建producer的同时会创建一系列线程,这是一个相对昂贵的操作,因此建议一个应用共用一个producer实例。一个producer实例包含的线程如下表所示,其中N为该producer实例在当前进程中的编号,从 0 开始。另外,LogProducer提供的所有方法都是线程安全的,可以在多线程环境下安全执行。
线程名格式 | 数量 | 描述 |
aliyun-log-producer-<N>-mover | 1 | 负责将满足发送条件的batch投递到发送线程池里。 |
aliyun-log-producer-<N>-io-thread | ioThreadCount | IOThreadPool中真正用于执行数据发送任务的线程。 |
aliyun-log-producer-<N>-success-batch-handler | 1 | 用于处理发送成功的batch。 |
aliyun-log-producer-<N>-failure-batch-handler | 1 | 用于处理发送失败的batch。 |
步骤四:配置日志项目
ProjectConfig包含目标Project的服务入口信息以及表征调用者身份的访问凭证。每个日志项目对应一个ProjectConfig对象。
可以按照如下方式创建实例。
ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);
步骤五:发送数据
创建Future或Callback
在使用Aliyun Log Java Producer发送日志数据时,需要指定一个回调函数来处理发送过程中的各种情况。当日志数据发送成功时,回调函数会被调用,并返回一个发送结果;当日志数据发送失败时,回调函数也会被调用,并传入一个异常对象。
如果获取结果后,应用的处理逻辑比较简单且不会造成producer阻塞,建议直接使用callback。否则,建议使用ListenableFuture,在单独的线程(池)中执行后续业务
方法的各个参数含义如下:
参数 | 描述 |
project | 待发送数据的目标 project。 |
logstore | 待发送数据的目标 logStore。 |
logTem | 待发送数据。 |
completed | Java提供的一个原子类型,用来确保所有日志发送完成(成功或者失败)。 |
发送数据
Producer接口提供多种发送方法,方法的各个参数含义如下。
参数 | 描述 | 是否必选 |
project | 目标Project。 | 是 |
logStore | 目标LogStore。 | 是 |
logItem | 要发送的日志/日志列表。 | 是 |
topic | 日志主题 | 否 说明 如果留空或没有指定,该字段将被赋予""。 |
source | 发送源。 | 否 说明 如果留空或没有指定,该字段将被赋予producer所在宿主机的 IP。 |
shardHash | 可为发送的日志设置自定义哈希,服务端将根据此哈希选择对应的日志库Shard分片写入日志。 | 否 说明 如果留空或没有指定,数据将被随机写入目标LogStore的某个shard中。 |
callback | 可设置一个回调函数。该回调函数将在日志被成功发送或者重试多次失败后被丢弃时调用。 | 否 |
常见异常
异常 | 说明 |
TimeoutException | 当Producer缓存的日志大小超过设定的内存上限时,且阻塞maxBlockMs毫秒后仍未获取到足够内存时,将抛出TimeoutException。 maxBlockMs 为-1时,阻塞没有时间上限,将永远不会抛出 TimeoutException。 |
IllegalStateException | 当Producer已经处于关闭状态(调用过close方法)时,再调用send 方法,会抛出IllegalStateException。 |
步骤六:获取发送数据
由于producer提供的所有发送方法都是异步的,需要通过返回的future或者传入的callback获取发送结果。
Future
Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许你注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行,完整样例请参见SampleProducerWithFuture.java。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有进程退出的时候,才需要考虑如下的逻辑。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
Callback
Callback由producer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。另外,在callback中调用send方法进行重试也是不建议的,您可以在ListenableFuture的callback中进行重试。完整样例请参见SampleProducerWithCallback.java。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有进程退出的时候,才需要考虑如下的逻辑。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
步骤七:关闭Producer
当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。
安全关闭
在大多数情况下,建议您使用安全关闭。安全关闭对应的方法是close()
,它会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行,返回future全部被设置后才会返回。
虽然要等到数据全部处理完成,但Producer被关闭后,缓存的batch会被立刻处理且不会被重试。因此,如果callback不被阻塞,close方法往往能在很短的时间内返回。
有限关闭
如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs)
,如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。
常见问题
写入数据次数是否存在限制?
日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写。
日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。
为什么数据没有写入日志服务?
如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。
检查您项目中引入的
aliyun-log-producer
、aliyun-log
、protobuf-java
Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。
如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用
producer.close()
方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()
方法。Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。
如果通过上述步骤仍然没有解决,请提工单。
相关文档
在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表。
日志服务除自研的SDK外,还支持公共的阿里云SDK,关于阿里云SDK的使用方式,请参见日志服务_SDK中心-阿里云OpenAPI开发者门户。
为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。
更多示例代码,请参见Aliyun Log Java SDK on GitHub。