全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
日志服务

Storm消费

更新时间:2018-04-11 14:31:37

日志服务的LogHub提供了高效、可靠的日志通道功能,您可以通过Logtail、SDK等多种方式来实时收集日志数据。收集日志之后,可以通过Spark Stream、Storm 等各实时系统来消费写入到LogHub中的数据。

为了降低Storm用户消费LogHub的代价,日志服务提供了LogHub Storm Spout来实时读取LogHub的数据。

基本结构和流程

  • 上图中红色虚线框中就是LogHub Storm Spout,每个Storm Topology会有一组Spout,同组内的Spout共同负责读取Logstore中全部数据。不同Topology中的Spout相互不干扰。
  • 每个Topology需要选择唯一的LogHub Consume Group名字来相互标识,同一 Topology内的Spout通过 LogHub client lib 来完成负载均衡和自动failover。
  • Spout从LogHub中实时读取数据之后,发送至Topology中的Bolt节点,定期保存消费完成位置作为checkpoint到LogHub服务端。

使用限制

  • 为了防止滥用,每个Logstore最多支持 5 个Consumer Group,对于不再使用的 Consumer Group,可以使用Java SDK中的DeleteConsumerGroup接口进行删除。
  • Spout的个数最好和Shard个数相同,否则可能会导致单个Spout处理数据量过多而处理不过来。
  • 如果单个Shard 的数据量太大,超过一个Spout处理极限,则可以使用Shard split接口分裂Shard,来降低每个Shard的数据量。
  • 在Loghub Spout中,强制依赖Storm的ACK机制,用于确认Spout将消息正确发送至Bolt,所以在Bolt中一定要调用ACK进行确认。

使用样例

Spout 使用示例(用于构建 Topology)

  1. public static void main( String[] args )
  2. {
  3. String mode = "Local"; // 使用本地测试模式
  4. String conumser_group_name = ""; // 每个Topology 需要设定唯一的 consumer group 名字,不能为空,支持 [a-z][0-9] 和 '_','-',长度在 [3-63] 字符,只能以小写字母和数字开头结尾
  5. String project = ""; // 日志服务的Project
  6. String logstore = ""; // 日志服务的Logstore
  7. String endpoint = ""; // 日志服务访问域名
  8. String access_id = ""; // 用户 ak 信息
  9. String access_key = "";
  10. // 构建一个 Loghub Storm Spout 需要使用的配置
  11. LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
  12. endpoint, project, logstore, access_id,
  13. access_key, LogHubCursorPosition.END_CURSOR);
  14. TopologyBuilder builder = new TopologyBuilder();
  15. // 构建 loghub storm spout
  16. LogHubSpout spout = new LogHubSpout(config);
  17. // 在实际场景中,Spout的个数可以和Logstore Shard 个数相同
  18. builder.setSpout("spout", spout, 1);
  19. builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
  20. Config conf = new Config();
  21. conf.setDebug(false);
  22. conf.setMaxSpoutPending(1);
  23. // 如果使用Kryo进行数据的序列化和反序列化,则需要显示设置 LogGroupData 的序列化方法 LogGroupDataSerializSerializer
  24. Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
  25. if (mode.equals("Local")) {
  26. logger.info("Local mode...");
  27. LocalCluster cluster = new LocalCluster();
  28. cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
  29. try {
  30. Thread.sleep(6000 * 1000); //waiting for several minutes
  31. } catch (InterruptedException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. cluster.killTopology("test-jstorm-spout");
  36. cluster.shutdown();
  37. } else if (mode.equals("Remote")) {
  38. logger.info("Remote mode...");
  39. conf.setNumWorkers(2);
  40. try {
  41. StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
  42. } catch (AlreadyAliveException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. } catch (InvalidTopologyException e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }
  49. } else {
  50. logger.error("invalid mode: " + mode);
  51. }
  52. }
  53. }

Bolt 代码样例

以下为消费数据的Bolt代码样例,只打印每条日志的内容。

  1. public class SampleBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 4752656887774402264L;
  3. private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
  4. private OutputCollector mCollector;
  5. @Override
  6. public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
  7. OutputCollector collector) {
  8. mCollector = collector;
  9. }
  10. @Override
  11. public void execute(Tuple tuple) {
  12. String shardId = (String) tuple
  13. .getValueByField(LogHubSpout.FIELD_SHARD_ID);
  14. @SuppressWarnings("unchecked")
  15. List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
  16. for (LogGroupData groupData : logGroupDatas) {
  17. // 每个 LogGroup 由一条或多条日志组成
  18. LogGroup logGroup = groupData.GetLogGroup();
  19. for (Log log : logGroup.getLogsList()) {
  20. StringBuilder sb = new StringBuilder();
  21. // 每条日志,有一个时间字段, 以及多个 Key:Value 对,
  22. int log_time = log.getTime();
  23. sb.append("LogTime:").append(log_time);
  24. for (Content content : log.getContentsList()) {
  25. sb.append("\t").append(content.getKey()).append(":")
  26. .append(content.getValue());
  27. }
  28. logger.info(sb.toString());
  29. }
  30. }
  31. // 在LogHub spout中,强制依赖Storm的ACK机制,用于确认Spout将消息正确
  32. // 发送至Bolt,所以在Bolt中一定要调用ACK
  33. mCollector.ack(tuple);
  34. }
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. //do nothing
  38. }
  39. }

Maven

storm 1.0 之前版本(如 0.9.6),请使用:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-spout</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

storm 1.0 版本及以后,请使用:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-1.0-spout</artifactId>
  4. <version>0.1.2</version>
  5. </dependency>
本文导读目录