在Kafka异步消息通讯场景中实现链路追踪

2.7.1.2探针版本开始,ARMS应用监控增加了Kafka插件,在基于Kafka进行异步消息通讯的场景中,也可以实现链路追踪。本文介绍支持链路追踪的Kafka版本要求,以及如何配置Kafka插件。

ARMS探针版本要求

2.7.1.2及以上版本的探针包含Kafka插件,升级探针的操作,请参见升级ARMS探针

Kafka版本要求

Kafka插件实现链路跟踪,依赖于Kafka消息协议对Header的支持,仅0.11.0版本以上的Kafka支持了Header,详情请参见Kafka官方文档,所以请确保Kafka的服务端和客户端都在0.11.0版本以上。

Spring-Kafka等客户端在原生Kafka客户端的基础上做了封装,如果您使用Spring-Kafka等经过了封装的客户端,请参考对应的版本文档,确保底层的Kafka客户端版本在0.11.0以上。

Kafka插件使用方式

消息发送端 (Producer)

消息发送端接入ARMS应用监控以后,需要在ARMS控制台开启Kafka消息链路信息透传才能实现链路追踪。请在ARMS控制台的应用设置 > 自定义配置页面,打开kafka发送消息自动透传上下文开关,具体的开启方式请参见自定义配置

重要

如果Kafka服务端的版本小于0.11.0,请勿开启此选项,否则会导致发送消息异常。

消息接收方 (Consumer)

使用Spring-Kafka

如果消息接收方使用Spring-Kafka,只要探针版本和Kafka客户端版本满足要求,无需任何额外操作即可支持链路追踪。

使用原生Kafka客户端

原生的Apache Kafka Consumer API采用Pull模式不间断的轮询来消费消息,ARMS应用监控的Kafka插件无法直接从Header中获取并处理链路信息,需要用户通过ARMS控制台指定Kafka消费方法。消费方法需要带有一个org.apache.kafka.clients.consumer.ConsumerRecordorg.apache.kafka.clients.consumer.ConsumerRecords类型的参数。

package arms.test.kafka;
 
 public class KafkaConsumeTest {
        public void testConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "PLAINTEXT://XXXX");
            props.put("group.id",  UUID.randomUUID().toString());
            props.put("enable.auto.commit", "true");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ListAdapter consumer;
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                     handler(record);       
            }
         }

         public void handler(ConsumerRecord<String, String> record){
                LOGGER.info( Utils.string(record));
         }       
 }

请在ARMS控制台应用设置 > 自定义配置页面的自定义Kafka消费方法参数中,填入完整方法名arms.test.kafka.KafkaConsumeTest.handler,具体的配置方式请参见自定义配置。指定Kafka消费方法后,需要重启消息接收方的应用才能生效。

相关文档

完成链路追踪后,您可以基于已存储的全量链路明细数据,自由组合筛选条件与聚合维度进行实时分析。更多信息,请参见调用链分析