前提条件
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。
已安装Java SDK。具体操作,请参见安装Java SDK。
背景信息
Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。
特点
线程安全:Producer接口暴露的所有方法都是线程安全的。
异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。
自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。
行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。
上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。
优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。
优势
高性能
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
异步非阻塞
在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。之后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。
资源可控制
可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。
安装Aliyun Log Java Producer
在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.10</version>
</dependency>
jar-with-dependency版本,可以解决Producer依赖的版本冲突问题。在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.35</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
Java代码示例
安装完成后,您就可以使用Producer类库编写Java代码。
说明 aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写和PutLogs。
日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtaiConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。
代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。
在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引。
Callback
本示例中,创建一个SampleProducerWithCallback.java文件,将生成的日志数据上传至日志服务。
示例代码
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有进程退出的时候,才需要考虑如下的逻辑。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
预期结果
{"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
{"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
{"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
{"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"}
......
更多信息,请参见Aliyun Log Java Producer。
Future
本示例中,创建一个SampleProducerWithFuture.java文件,将生成的日志数据上传至日志服务。
代码示例
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.aliyun.openservices.log.common.LogItem;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithFuture {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);
private static final ExecutorService threadPool = Executors
.newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1));
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int n = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
for (int i = 0; i < n; ++i) {
List<LogItem> logItems = new ArrayList<LogItem>();
for (int j = 0; j < 10; ++j) {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo" + j);
logItem.PushBack("key2", "bar" + j);
logItems.add(logItem);
}
try {
ListenableFuture<Result> f = producer.send(project, logstore, logItems);
Futures.addCallback(
f, new SampleFutureCallback(project, logstore, logItems, completed), threadPool);
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send logs, e=", e);
}
}
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
threadPool.awaitTermination(100, TimeUnit.MILLISECONDS);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleFutureCallback implements FutureCallback<Result> {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);
private final String project;
private final String logStore;
private final List<LogItem> logItems;
private final AtomicLong completed;
SampleFutureCallback(
String project, String logStore, List<LogItem> logItems, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItems = logItems;
this.completed = completed;
}
@Override
public void onSuccess(@Nullable Result result) {
LOGGER.info("Send logs successfully.");
completed.getAndIncrement();
}
@Override
public void onFailure(Throwable t) {
if (t instanceof ResultFailedException) {
Result result = ((ResultFailedException) t).getResult();
LOGGER.error(
"Failed to send logs, project={}, logStore={}, result={}", project, logStore, result);
} else {
LOGGER.error("Failed to send log, e=", t);
}
completed.getAndIncrement();
}
}
}
预期结果
{"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo0","key2":"bar0"}
{"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo1","key2":"bar1"}
{"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo2","key2":"bar2"}
{"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo3","key2":"bar3"}
......
更多信息,请参见Aliyun Log Java Producer。
此外,日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application。
常见问题
写入数据次数是否存在限制?
为什么数据没有写入日志服务?
如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。
检查您项目中引入的aliyun-log-producer
、aliyun-log
、protobuf-java
Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。
Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。
如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用producer.close()
方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()
方法。
Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。
如果通过上述步骤仍然没有解决,请提工单。