使用Go自定义扩展完善Kafka消息处理流程

Go Agent在处理例如github.com/segmentio/kafka-go时,无法自动将消费消息时的自定义处理流程与消息的发布、消费流程进行串联。这会导致在ARMS控制台的调用链分析中出现Trace中断的问题,给问题排查和性能分析带来困难。本文介绍如何使用Go 自定义扩展来处理这类问题。

场景描述

demo应用在通过Go Agent编译后,可以在调用链分析中查看到如下两条Trace:

调用链中断示意图

上图展示了Kafka消息消费的链路,消息自定义处理流程中的HTTP请求没有跟消息消费串联在一起,而是单独作为另一条Trace。

下游调用独立链路示意图

上图展示了本应由消费触发的HTTP调用,形成了一条新的、无上游关联的链路。而预期是HTTP请求接在消息消费后。问题原因就是Agent中的埋点只有SDK函数,无法串联用户代码中业务逻辑。

解决方案

Agent 会在收到的消息Header中添加Trace的上下文信息,通过Go 自定义扩展给用户消费消息的函数进行埋点,设置函数入口与出口,来恢复这个上下文信息,创建新的Span,通过这个过渡Span来实现消息获取和消息消费的业务流程串联,实现端到端的全链路追踪。

  1. 函数入口(OnEnter):从Kafka消息Header中提取traceparent,恢复Trace上下文,并基于此上下文创建一个新的Span,代表整个业务处理过程。

  2. 函数出口(OnExit):结束这个新创建的Span。

这样,您业务函数内部所有被ARMS自动监控的调用,都会自然地成为这个新建Span的子节点,从而实现链路的串联。

操作步骤

步骤一:定位业务处理函数

首先,在您的项目中定位到实际处理Kafka消息的业务函数。这个函数通常接收context.Contextkafka.Message作为参数。假设您的函数如下所示:

// in package: your-project/consumer
package consumer

// ...
func (c *KafkaConsumer) ConsumerMsg(ctx context.Context, message kafka.Message) error {
	// 您的业务逻辑,例如发起一个HTTP请求...
	// http.Get("http://downstream-service/api")
	return nil
}

后续步骤将围绕(*KafkaConsumer).ConsumerMsg这个函数进行操作。

步骤二:编写Hook代码

Hook代码是您希望注入到目标函数中的自定义逻辑。

  1. 在您的Go项目外部创建一个独立的目录,例如my-hooks

  2. 进入该目录并初始化Go Module:cd my-hooks && go mod init my-hooks

  3. 创建rules.go文件,并编写以下代码。

    // my-hooks/rules.go
    package rules
    
    import (
    	"context"
    	"github.com/alibaba/loongsuite-go-agent/pkg/api"
    	kafka "github.com/segmentio/kafka-go"
    	"github.com/alibaba/loongsuite-go-agent/pkg/go-sdk/otel"
    	"github.com/alibaba/loongsuite-go-agent/pkg/go-sdk/otel/propagation"
    	oTrace "github.com/alibaba/loongsuite-go-agent/pkg/go-sdk/otel/trace"
    	_ "unsafe"
    )
    // OnEnter函数:添加创建Span的逻辑,在ConsumerMsg函数开始时执行
    // go:linkname 将此Hook函数与ARMS Agent生成的桩函数链接
    //go:linkname consumerMessageOnEnter your-project/consumer.consumerMessageOnEnter
    func consumerMessageOnEnter(call api.CallContext, k interface{}, ctx context.Context, message kafka.Message) {
    	for _, v := range message.Headers {
    		if v.Key == "traceparent" {
    			var headerMap propagation.MapCarrier
    			headerMap = make(map[string]string)
    			headerMap[v.Key] = string(v.Value)
    			ctx = otel.GetTextMapPropagator().Extract(ctx, headerMap)
    			tracer := otel.GetTracerProvider().Tracer("")
    			opts := append([]oTrace.SpanStartOption{}, oTrace.WithSpanKind(oTrace.SpanKindConsumer))
    			_, span := tracer.Start(ctx, "consumer message", opts...)
    			temp := make(map[string]interface{}, 1)
    			temp["span"] = span
    			call.SetData(temp)
    			break
    		}
    	}
    }
    // OnExit函数:调用Span.End(),在ConsumerMsg函数结束时执行
    //go:linkname consumerMessageOnExit your-project/consumer.consumerMessageOnExit
    func consumerMessageOnExit(call api.CallContext, err error) {
    	if call.GetData() == nil {
    		return
    	}
    	temp := call.GetData().(map[string]interface{})
    	span := temp["span"].(oTrace.Span)
    	if span != nil {
    		span.End()
    		return
    	}
    }

步骤三:编写注入规则文件

创建一个JSON配置文件,规则如下所示,将文件放到编译时候能找到的目录即可,如arms-rule.json。编写规则详情请参考Go自定义扩展编写规范和字段说明

// arms-rule.json
[
  {
    "ImportPath":"your-project/consumer", //必填项,目标函数所在的完整包路径,这里可以为github.com,也可以是自己sdk的包路径
    "Function":"ConsumerMsg",  //必填项,表示你需要注入的函数是什么,这里是ConsumerMsg
    "OnEnter":"consumerMessageOnEnter", //非必填,表示在ConsumerMsg函数的开始添加的执行函数名称,如consumerMessageOnEnter
    "ReceiverType": "\\*KafkaConsumer", //非必填,表示这个ConsumerMsg函数属于哪个类,这里属于*KafkaConsumer
    "OnExit": "consumerMessageOnExit", //非必填,表示在ConsumerMsg函数的执行后添加的执行函数名称,如consumerMessageOnExit
    "Path": "/path/to/your/my-hooks"  //必填项,表示上述创建的hook代码所在目录的路径
  }
]

结果验证

应用运行并处理Kafka消息后,在ARMS控制台应用监控 > 调用链分析页面查找相关Trace。您将看到Kafka消息的生产、消费以及后续的业务处理(包括HTTP调用)都串联起来了,达到了预期的结果。

调用链串联成功示意图