Go Agent在处理例如github.com/segmentio/kafka-go时,无法自动将消费消息时的自定义处理流程与消息的发布、消费流程进行串联。这会导致在ARMS控制台的调用链分析中出现Trace中断的问题,给问题排查和性能分析带来困难。本文介绍如何使用Go 自定义扩展来处理这类问题。
场景描述
demo应用在通过Go Agent编译后,可以在调用链分析中查看到如下两条Trace:

上图展示了Kafka消息消费的链路,消息自定义处理流程中的HTTP请求没有跟消息消费串联在一起,而是单独作为另一条Trace。

上图展示了本应由消费触发的HTTP调用,形成了一条新的、无上游关联的链路。而预期是HTTP请求接在消息消费后。问题原因就是Agent中的埋点只有SDK函数,无法串联用户代码中业务逻辑。
解决方案
Agent 会在收到的消息Header中添加Trace的上下文信息,通过Go 自定义扩展给用户消费消息的函数进行埋点,设置函数入口与出口,来恢复这个上下文信息,创建新的Span,通过这个过渡Span来实现消息获取和消息消费的业务流程串联,实现端到端的全链路追踪。
函数入口(OnEnter):从Kafka消息Header中提取
traceparent,恢复Trace上下文,并基于此上下文创建一个新的Span,代表整个业务处理过程。函数出口(OnExit):结束这个新创建的Span。
这样,您业务函数内部所有被ARMS自动监控的调用,都会自然地成为这个新建Span的子节点,从而实现链路的串联。
操作步骤
步骤一:定位业务处理函数
首先,在您的项目中定位到实际处理Kafka消息的业务函数。这个函数通常接收context.Context和kafka.Message作为参数。假设您的函数如下所示:
// in package: your-project/consumer
package consumer
// ...
func (c *KafkaConsumer) ConsumerMsg(ctx context.Context, message kafka.Message) error {
// 您的业务逻辑,例如发起一个HTTP请求...
// http.Get("http://downstream-service/api")
return nil
}
后续步骤将围绕(*KafkaConsumer).ConsumerMsg这个函数进行操作。
步骤二:编写Hook代码
Hook代码是您希望注入到目标函数中的自定义逻辑。
在您的Go项目外部创建一个独立的目录,例如
my-hooks。进入该目录并初始化Go Module:
cd my-hooks && go mod init my-hooks。创建
rules.go文件,并编写以下代码。// my-hooks/rules.go package rules import ( "context" "github.com/alibaba/loongsuite-go-agent/pkg/api" kafka "github.com/segmentio/kafka-go" "github.com/alibaba/loongsuite-go-agent/pkg/go-sdk/otel" "github.com/alibaba/loongsuite-go-agent/pkg/go-sdk/otel/propagation" oTrace "github.com/alibaba/loongsuite-go-agent/pkg/go-sdk/otel/trace" _ "unsafe" ) // OnEnter函数:添加创建Span的逻辑,在ConsumerMsg函数开始时执行 // go:linkname 将此Hook函数与ARMS Agent生成的桩函数链接 //go:linkname consumerMessageOnEnter your-project/consumer.consumerMessageOnEnter func consumerMessageOnEnter(call api.CallContext, k interface{}, ctx context.Context, message kafka.Message) { for _, v := range message.Headers { if v.Key == "traceparent" { var headerMap propagation.MapCarrier headerMap = make(map[string]string) headerMap[v.Key] = string(v.Value) ctx = otel.GetTextMapPropagator().Extract(ctx, headerMap) tracer := otel.GetTracerProvider().Tracer("") opts := append([]oTrace.SpanStartOption{}, oTrace.WithSpanKind(oTrace.SpanKindConsumer)) _, span := tracer.Start(ctx, "consumer message", opts...) temp := make(map[string]interface{}, 1) temp["span"] = span call.SetData(temp) break } } } // OnExit函数:调用Span.End(),在ConsumerMsg函数结束时执行 //go:linkname consumerMessageOnExit your-project/consumer.consumerMessageOnExit func consumerMessageOnExit(call api.CallContext, err error) { if call.GetData() == nil { return } temp := call.GetData().(map[string]interface{}) span := temp["span"].(oTrace.Span) if span != nil { span.End() return } }
步骤三:编写注入规则文件
创建一个JSON配置文件,规则如下所示,将文件放到编译时候能找到的目录即可,如arms-rule.json。编写规则详情请参考Go自定义扩展编写规范和字段说明。
// arms-rule.json
[
{
"ImportPath":"your-project/consumer", //必填项,目标函数所在的完整包路径,这里可以为github.com,也可以是自己sdk的包路径
"Function":"ConsumerMsg", //必填项,表示你需要注入的函数是什么,这里是ConsumerMsg
"OnEnter":"consumerMessageOnEnter", //非必填,表示在ConsumerMsg函数的开始添加的执行函数名称,如consumerMessageOnEnter
"ReceiverType": "\\*KafkaConsumer", //非必填,表示这个ConsumerMsg函数属于哪个类,这里属于*KafkaConsumer
"OnExit": "consumerMessageOnExit", //非必填,表示在ConsumerMsg函数的执行后添加的执行函数名称,如consumerMessageOnExit
"Path": "/path/to/your/my-hooks" //必填项,表示上述创建的hook代码所在目录的路径
}
]结果验证
应用运行并处理Kafka消息后,在ARMS控制台的应用监控 > 调用链分析页面查找相关Trace。您将看到Kafka消息的生产、消费以及后续的业务处理(包括HTTP调用)都串联起来了,达到了预期的结果。
