全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
日志服务

Producer Library

更新时间:2018-01-04 15:35:05

LogHub Producer Library 是针对 Java 应用程序高并发写LogHub类库,Producer Library 和 Consumer Library 是对LogHub的读写包装,降低数据收集与消费的门槛。

功能特点

  • 提供异步的发送接口,线程安全。
  • 可以添加多个Project的配置。
  • 用于发送的网络 I/O 线程数量可以配置。
  • merge成的包的日志数量以及大小都可以配置。
  • 内存使用可控,当内存使用达到用户配置的阈值时,Producer 的 send 接口会阻塞,直到有空闲的内存可用。

功能优势

  • 客户端日志不落盘:既数据产生后直接通过网络发往服务端。
  • 客户端高并发写入:例如一秒钟会有百次以上写操作。
  • 客户端计算与 I/O 逻辑分离:打印日志不影响计算耗时。

在以上场景中,Producer Library 会简化您程序开发的步骤,帮助您批量聚合写请求,通过异步的方式发往LogHub服务端。在整个过程中,您可以配置批量聚合的参数、服务端异常处理的逻辑等。

0c5e22da184eec0f93979cec8ff159394b1143e0

以上各种接入方式的对比:

接入方式 优点/缺点 针对场景
日志落盘 + Logtail 日志收集与打日志解耦,无需修改代码 常用场景
syslog + Logtail 性能较好(80MB/S),日志不落盘,需支持 syslog 协议 syslog 场景
SDK 直发 不落盘,直接发往服务端,需要处理好网络 IO 与程序 IO 之间的切换 日志不落盘
Producer Library 不落盘,异步合并发送服务端,吞吐量较好 日志不落盘,客户端 QPS 高

注意:目前 Producer Library 只支持 Java 版本。

配置步骤

Producer Library配置分为以下几个步骤:

  1. maven 工程中添加依赖

    1. <dependency>
    2. <groupId>com.google.protobuf</groupId>
    3. <artifactId>protobuf-java</artifactId>
    4. <version>2.5.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>com.aliyun.openservices</groupId>
    8. <artifactId>aliyun-log</artifactId>
    9. <version>0.6.10</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>com.aliyun.openservices</groupId>
    13. <artifactId>log-loghub-producer</artifactId>
    14. <version>0.1.8</version>
    15. </dependency>
  2. 程序中配置 ProducerConfig

    配置格式如下,参数取值见本文档中参数取值部分。

    1. public class ProducerConfig
    2. {
    3. public int packageTimeoutInMS = 3000;
    4. public int logsCountPerPackage = 4096;
    5. public int logsBytesPerPackage = 3 * 1024 * 1024;
    6. public int memPoolSizeInByte = 1000 * 1024 * 1024;
    7. public int maxIOThreadSizeInPool = 8;
    8. public int shardHashUpdateIntervalInMS = 10 * 60 * 1000;
    9. public int retryTimes = 3;
    10. }
  3. 继承 ILogCallback

    callback 主要用于日志发送结果的处理,结果包括发送成功和发生异常。您也可以选择不处理,这样就不需要继承 ILogCallback。

  4. 创建 producer 实例,调用 send 接口发数据

参数取值

参数 参数说明 取值
packageTimeoutInMS 指定被缓存日志的发送超时时间,如果缓存超时,则会被立即发送。 整数形式,单位为毫秒。
logsCountPerPackage 指定每个缓存的日志包中包含日志数量的最大值。 整数形式,取值为1~4096。
logsBytesPerPackage 指定每个缓存的日志包的大小上限。 整数形式,取值为1~3145728 ,单位为字节。
memPoolSizeInByte 指定单个Producer实例可以使用的内存的上限。 整数形式,单位为字节。
maxIOThreadSizeInPool 指定I/O线程池最大线程数量,主要用于发送数据到日志服务。 整数形式。
shardHashUpdateIntervalInMS 指定更新Shard的Hash区间的时间间隔,当指定shardhash的方式发送日志时,需要设置此参数。
后端merge线程会将映射到同一个Shard的数据merge在一起,而Shard关联的是一个Hash区间,Producer在处理时会将用户传入的Hash映射成Shard关联Hash区间的最小值。每一个Shard关联的Hash区间,Producer会定时从LogHub拉取。
整数形式。
retryTimes 指定发送失败时重试的次数,如果超过该值,就会将异常作为callback的参数,交由用户处理。 整数形式。

使用实例

main:

  1. public class ProducerSample {
  2. public static String RandomString(int length) {
  3. String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  4. Random random = new Random();
  5. StringBuffer buf = new StringBuffer();
  6. for (int i = 0; i < length; i++) {
  7. int num = random.nextInt(62);
  8. buf.append(str.charAt(num));
  9. }
  10. return buf.toString();
  11. }
  12. public static void main(String args[]) throws InterruptedException {
  13. ProducerConfig producerConfig = new ProducerConfig();
  14. //使用默认配置创建 producer 实例
  15. final LogProducer producer = new LogProducer(producerConfig);
  16. // 添加多个 project 配置
  17. producer.setProjectConfig(new ProjectConfig("your project 1",
  18. "endpoint", "your accesskey id", "your accesskey"));
  19. producer.setProjectConfig(new ProjectConfig("your project 2",
  20. "endpoint", "your accesskey id", "your accesskey",
  21. "your sts token"));
  22. // 更新 project 1 的配置
  23. producer.setProjectConfig(new ProjectConfig("your project 1",
  24. "endpoint", "your new accesskey id", "your new accesskey"));
  25. // 删除 project 2 的配置
  26. producer.removeProjectConfig("your project 2");
  27. // 生成日志集合,用于测试
  28. final Vector<Vector<LogItem>> logGroups = new Vector<Vector<LogItem>>();
  29. for (int i = 0; i < 100000; ++i) {
  30. Vector<LogItem> tmpLogGroup = new Vector<LogItem>();
  31. LogItem logItem = new LogItem((int) (new Date().getTime() / 1000));
  32. logItem.PushBack("level", "info" + System.currentTimeMillis());
  33. logItem.PushBack("message", "test producer send perf "
  34. + RandomString(50));
  35. logItem.PushBack("method", "SenderToServer " + RandomString(10));
  36. tmpLogGroup.add(logItem);
  37. logGroups.add(tmpLogGroup);
  38. }
  39. // 并发调用 send 发送日志
  40. Random random = new Random();
  41. for (int j = 0; j < 100000; ++j) {
  42. int rand = random.nextInt(99999);
  43. producer.send("project 1", "logstore 1", "topic", "source ip", logGroups.get(rand), new CallbackSample("project 1", "logstore 1", "topic", "source ip", null, logGroups.get(rand), producer));
  44. }
  45. //主动刷新缓存起来的还没有被发送的日志
  46. producer.flush();
  47. // 等待数据发送完毕
  48. Thread.sleep(2 * producerConfig.packageTimeoutInMS);
  49. //关闭后台 io 线程,close 会将调用时刻内存中缓存的数据发送出去
  50. producer.close();
  51. }
  52. }

callback:

  1. public class CallbackSample extends ILogCallback {
  2. //保存要发送的数据,当时发生异常时,进行重试
  3. public String project;
  4. public String logstore;
  5. public String topic;
  6. public String shardHash;
  7. public String source;
  8. public Vector<LogItem> items;
  9. public LogProducer producer;
  10. public int retryTimes = 0;
  11. public CallbackSample(String project, String logstore, String topic,
  12. String shardHash, String source, Vector<LogItem> items, LogProducer producer) {
  13. super();
  14. this.project = project;
  15. this.logstore = logstore;
  16. this.topic = topic;
  17. this.shardHash = shardHash;
  18. this.source = source;
  19. this.items = items;
  20. this.producer = producer;
  21. }
  22. public void onCompletion(PutLogsResponse response, LogException e) {
  23. if (e != null) {
  24. // 打印异常
  25. System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId());
  26. //最多重试三次
  27. if(retryTimes++ < 3)
  28. {
  29. producer.send(project, logstore, topic, source, shardHash, items, this);
  30. }
  31. }
  32. else{
  33. System.out.println("send success, request id: " + response.GetRequestId());
  34. }
  35. }
  36. }
本文导读目录