本文介绍日志服务Aliyun Log Java Producer类库的使用方法。
前提条件
已完成Java SDK安装。更多信息,请参见安装Java SDK。背景信息
Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。
特点
- 线程安全:Producer接口暴露的所有方法都是线程安全的。
- 异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。
- 自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。
- 行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。
- 上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。
- 优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。
优势
- 高性能
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
- 异步非阻塞
在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。之后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。
- 资源可控制
可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。
安装Aliyun Log Java Producer
在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.10</version>
</dependency>
jar-with-dependency版本,可以解决Producer依赖的版本冲突问题。在<dependencies>中加入如下内容: <dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.35</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
Java代码示例
安装完成后,您就可以使用Producer类库编写ava代码。
- Callback本示例中,创建一个SampleProducerWithCallback.java文件,将生成的日志数据上传至日志服务。
- 示例代码
import com.aliyun.openservices.aliyun.log.producer.Callback; import com.aliyun.openservices.aliyun.log.producer.LogProducer; import com.aliyun.openservices.aliyun.log.producer.Producer; import com.aliyun.openservices.aliyun.log.producer.ProducerConfig; import com.aliyun.openservices.aliyun.log.producer.ProjectConfig; import com.aliyun.openservices.aliyun.log.producer.Result; import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException; import com.aliyun.openservices.log.common.LogItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; public class SampleProducerWithCallback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class); private static final ExecutorService threadPool = Executors.newFixedThreadPool(10); public static void main(String[] args) throws InterruptedException { final String project = "example-project"; final String logstore = "example-logstore"; String endpoint = "example-endpoint"; String accessKeyId = "your-accesskey-id"; String accessKeySecret = "your-access-key-secret"; ProducerConfig producerConfig = new ProducerConfig(); final Producer producer = new LogProducer(producerConfig); producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret)); int nTask = 100; // The number of logs that have finished (either successfully send, or failed). final AtomicLong completed = new AtomicLong(0); final CountDownLatch latch = new CountDownLatch(nTask); for (int i = 0; i < nTask; ++i) { threadPool.submit( new Runnable() { @Override public void run() { LogItem logItem = new LogItem(); logItem.PushBack("key1", "foo"); logItem.PushBack("key2", "bar"); try { producer.send( project, logstore, "your-topic", "your-source", logItem, new SampleCallback(project, logstore, logItem, completed)); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted during send logs."); } catch (Exception e) { LOGGER.error("Failed to send log, logItem={}, e=", logItem, e); } finally { latch.countDown(); } } }); } // 只有进程退出的时候,才需要考虑如下的逻辑。 latch.await(); threadPool.shutdown(); try { producer.close(); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted from close."); } catch (ProducerException e) { LOGGER.info("Failed to close producer, e=", e); } LOGGER.info("All log complete, completed={}", completed.get()); } private static final class SampleCallback implements Callback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class); private final String project; private final String logStore; private final LogItem logItem; private final AtomicLong completed; SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) { this.project = project; this.logStore = logStore; this.logItem = logItem; this.completed = completed; } @Override public void onCompletion(Result result) { try { if (result.isSuccessful()) { LOGGER.info("Send log successfully."); } else { LOGGER.error( "Failed to send log, project={}, logStore={}, logItem={}, result={}", project, logStore, logItem.ToJsonString(), result); } } finally { completed.getAndIncrement(); } } } }
- 预期结果
{"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} ......
更多信息,请参见Aliyun Log Java Producer。
- 示例代码
- Future本示例中,创建一个SampleProducerWithFuture.java文件,将生成的日志数据上传至日志服务。
- 代码示例
import com.aliyun.openservices.aliyun.log.producer.LogProducer; import com.aliyun.openservices.aliyun.log.producer.Producer; import com.aliyun.openservices.aliyun.log.producer.ProducerConfig; import com.aliyun.openservices.aliyun.log.producer.ProjectConfig; import com.aliyun.openservices.aliyun.log.producer.Result; import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException; import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException; import com.aliyun.openservices.log.common.LogItem; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class SampleProducerWithFuture { private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class); private static final ExecutorService threadPool = Executors .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1)); public static void main(String[] args) throws InterruptedException { final String project = "example-project"; final String logstore = "example-logstore"; String endpoint = "example-endpoint"; String accessKeyId = "your-accesskey-id"; String accessKeySecret = "your-access-key-secret"; ProducerConfig producerConfig = new ProducerConfig(); final Producer producer = new LogProducer(producerConfig); producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret)); int n = 100; // The number of logs that have finished (either successfully send, or failed). final AtomicLong completed = new AtomicLong(0); for (int i = 0; i < n; ++i) { List<LogItem> logItems = new ArrayList<LogItem>(); for (int j = 0; j < 10; ++j) { LogItem logItem = new LogItem(); logItem.PushBack("key1", "foo" + j); logItem.PushBack("key2", "bar" + j); logItems.add(logItem); } try { ListenableFuture<Result> f = producer.send(project, logstore, logItems); Futures.addCallback( f, new SampleFutureCallback(project, logstore, logItems, completed), threadPool); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted during send logs."); } catch (Exception e) { LOGGER.error("Failed to send logs, e=", e); } } try { producer.close(); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted from close."); } catch (ProducerException e) { LOGGER.info("Failed to close producer, e=", e); } threadPool.shutdown(); while (!threadPool.isTerminated()) { threadPool.awaitTermination(100, TimeUnit.MILLISECONDS); } LOGGER.info("All log complete, completed={}", completed.get()); } private static final class SampleFutureCallback implements FutureCallback<Result> { private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class); private final String project; private final String logStore; private final List<LogItem> logItems; private final AtomicLong completed; SampleFutureCallback( String project, String logStore, List<LogItem> logItems, AtomicLong completed) { this.project = project; this.logStore = logStore; this.logItems = logItems; this.completed = completed; } @Override public void onSuccess(@Nullable Result result) { LOGGER.info("Send logs successfully."); completed.getAndIncrement(); } @Override public void onFailure(Throwable t) { if (t instanceof ResultFailedException) { Result result = ((ResultFailedException) t).getResult(); LOGGER.error( "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result); } else { LOGGER.error("Failed to send log, e=", t); } completed.getAndIncrement(); } } }
- 预期结果
{"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo0","key2":"bar0"} {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo1","key2":"bar1"} {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo2","key2":"bar2"} {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo3","key2":"bar3"} ......
- 代码示例
更多信息,请参见Aliyun Log Java Producer。
此外,日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application。
常见问题
写入数据次数是否存在限制?
日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写。
为什么数据没有写入日志服务?
如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。
- 检查您项目中引入的
aliyun-log-producer
、aliyun-log
、protobuf-java
Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。 - Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。
- 如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用
producer.close()
方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()
方法。 - Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。
- 如果通过上述步骤仍然没有解决,请提工单。
相关文档
- 在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表。
- 阿里云OpenAPI开发者门户提供调试、SDK、示例和配套文档。通过OpenAPI,您无需手动封装请求和签名操作,就可以快速对日志服务API进行调试。更多信息,请参见OpenAPI开发者门户。
- 为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。
- 更多示例代码,请参见Aliyun Log Java SDK on GitHub。