使用Envoy External Processing对请求进行自定义处理

Envoy External Processing是一种扩展能力,使得Envoy可以通过外部处理服务来增强其HTTP请求/响应处理功能,而不用编写Wasm插件或是其它处理脚本,使得处理更加灵活和可扩展。本文介绍External Processing的实现机制和使用示例。

前提条件

External Processing机制

Envoy External Processing对下游服务请求以及上游服务响应进行处理的过程如下图:

image
  1. 下游服务向上游服务发送请求①,Envoy拦截请求并将请求信息转发至External Processing服务②。

  2. External Processing服务对请求进行的处理,并将其返回给Envoy③。

  3. Envoy根据返回的结果对请求进行处理,并将处理完的请求转发至上游服务④。

  4. 上游处理完请求后向下游服务发送响应⑤,Envoy拦截请求并将响应信息转发至External Processing服务⑥。

  5. External Processing服务计对请求进行处理⑦,并将其返回Envoy⑧。

  6. Envoy根据返回的结果对响应头和响应体信息进行处理,并将处理完的请求转发至下游服务⑨。

步骤一:编写External Processing服务处理逻辑

以下是部分核心逻辑代码,完整示例代码请参见ext-proc-demo。更多设计细节,请参见Envoy官方文档

展开查看核心逻辑代码

func NewServer() *Server {
	return &Server{}
}

// Server 实现了 Envoy external processing server 接口
// 参见 https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
type Server struct{}

func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
	klog.Infof("Processing")
	ctx := srv.Context()
	for {
		select {
		case <-ctx.Done():
			klog.Infof("context done")
			return ctx.Err()
		default:
		}

		req, err := srv.Recv()
		if err == io.EOF {
			// envoy has closed the stream. Don't return anything and close this stream entirely
			return nil
		}
		if err != nil {
			return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
		}

		resp := &extProcPb.ProcessingResponse{}
		switch v := req.Request.(type) {
        // 对于请求头所需处理的请求
		case *extProcPb.ProcessingRequest_RequestHeaders:
			klog.Infof("Got RequestHeaders")
			h := req.Request.(*extProcPb.ProcessingRequest_RequestHeaders)
			resp = handleRequestHeaders(h)
        // 对于响应头所需处理的请求
        case *extProcPb.ProcessingRequest_ResponseHeaders:
			klog.Infof("Got ResponseHeaders")
			h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
			resp = handleResponseHeaders(h)
        // 对于请求头所需处理的请求,暂未实现
		case *extProcPb.ProcessingRequest_RequestBody:
			klog.Infof("Got RequestBody (not currently handled)")
        // 对于请求 Trailers 所需处理的请求,暂未实现
		case *extProcPb.ProcessingRequest_RequestTrailers:
			klog.Infof("Got RequestTrailers (not currently handled)")
        // 对于响应体所需处理的请求,暂未实现
		case *extProcPb.ProcessingRequest_ResponseBody:
			klog.Infof("Got ResponseBody (not currently handled)")
        // 对于响应 Trailers 所需处理的请求,暂未实现
		case *extProcPb.ProcessingRequest_ResponseTrailers:
			klog.Infof("Got ResponseTrailers (not currently handled)")

		default:
			klog.Infof("Unknown Request type %v", v)
		}
        // 返回需要对请求作出的处理
		klog.Infof("Sending ProcessingResponse: %+v", resp)
		if err := srv.Send(resp); err != nil {
			klog.Infof("send error %v", err)
			return err
		}
	}
}

// 为请求头添加 x-ext-proc-header=hello-to-asm
func handleRequestHeaders(req *extProcPb.ProcessingRequest_RequestHeaders) *extProcPb.ProcessingResponse {
	klog.Infof("handle request headers: %+v\n", req)

	resp := &extProcPb.ProcessingResponse{
		Response: &extProcPb.ProcessingResponse_RequestHeaders{
			RequestHeaders: &extProcPb.HeadersResponse{
				Response: &extProcPb.CommonResponse{
					HeaderMutation: &extProcPb.HeaderMutation{
						SetHeaders: []*configPb.HeaderValueOption{
							{
								Header: &configPb.HeaderValue{
									Key:      "x-ext-proc-header",
									RawValue: []byte("hello-to-asm"),
								},
							},
						},
					},
				},
			},
		},
	}
    
	return resp
}

// 为响应头添加 x-ext-proc-header=hello-from-asm
func handleResponseHeaders(req *extProcPb.ProcessingRequest_ResponseHeaders) *extProcPb.ProcessingResponse {
	klog.Infof("handle response headers: %+v\n", req)

	resp := &extProcPb.ProcessingResponse{
		Response: &extProcPb.ProcessingResponse_ResponseHeaders{
			ResponseHeaders: &extProcPb.HeadersResponse{
				Response: &extProcPb.CommonResponse{
					HeaderMutation: &extProcPb.HeaderMutation{
						SetHeaders: []*configPb.HeaderValueOption{
							{
								Header: &configPb.HeaderValue{
									Key:      "x-ext-proc-header",
									RawValue: []byte("hello-from-asm"),
								},
							},
						},
					},
				},
			},
		},
	}

	return resp
}
说明

您需要编写DockerfileExternal Processing服务代码构建成镜像并上传到镜像仓库才可以进行部署。

步骤二:部署External Processing服务

本步骤使用ASM提供的External Processing服务示例镜像进行演示。该服务会在接收到的请求中添加请求头x-ext-proc-header: hello-to-asm,并在返回的响应中添加请求头x-ext-proc-header: hello-from-asm

  1. 使用以下内容,创建ext.yaml。

    apiVersion: v1
    kind: Service
    metadata:
      name: ext-proc
      labels:
        app: ext-proc
        service: ext-proc
    spec:
      ports:
      # External Processing 监听端口
      - name: grpc
        port: 9002
        targetPort: 9002
      selector:
        app: ext-proc
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: ext-proc
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: ext-proc
          version: v1
      template:
        metadata:
          labels:
            app: ext-proc
            version: v1
        spec:
          containers:
          - image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/ext-proc:v0.2
            imagePullPolicy: IfNotPresent
            name: ext-proc
            ports:
            - containerPort: 9002
  2. 执行以下命令,查看Pod日志以确认External Processing服务运行状态。

    kubectl logs ext-proc-64c8xxxxx-xxxxx

    预期输出:

    I1126 06:41:25.467033       1 main.go:52] Starting gRPC server on port :9002

    日志中出现上述内容,说明External Processing服务运行状态正常。

步骤三:配置EnvoyFilter

  1. 登录ASM控制台,在左侧导航栏,选择服务网格 > 网格管理

  2. 网格管理页面,单击目标实例名称,然后在左侧导航栏,选择插件扩展中心 > Envoy过滤器模板

  3. 使用以下内容,创建httpbin-ext-proc EnvoyFilter。

    apiVersion: networking.istio.io/v1alpha3
    kind: EnvoyFilter
    spec:
      configPatches:
        - applyTo: HTTP_FILTER
          match:
            context: SIDECAR_INBOUND
            listener:
              portNumber: 80
              filterChain:
                filter:
                  name: envoy.filters.network.http_connection_manager
            proxy:
              proxyVersion: ^MIN_VERSION-MAX_VERSION.*
          patch:
            operation: INSERT_BEFORE
            value:
              name: envoy.filters.http.ext_proc
              typed_config:
                '@type': >-
                  type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor
                grpc_service:
                  envoy_grpc:
                    cluster_name: outbound|9002||ext-proc.default.svc.cluster.local
                    authority: ext-proc.default.svc.cluster.local
                processing_mode:
                  request_header_mode: SEND
                  response_header_mode: SEND

步骤四:访问httpbin应用进行验证

执行以下命令,访问httpbin应用,并查看响应头。

kubectl exec -it deploy/sleep -- curl httpbin:8000/headers -i

预期输出:

HTTP/1.1 200 OK
server: envoy
date: Wed, 11 Dec 2024 06:47:59 GMT
content-type: application/json
content-length: 564
access-control-allow-origin: *
access-control-allow-credentials: true
x-envoy-upstream-service-time: 3
x-ext-proc-header: hello-from-asm

{
  "headers": {
    "Accept": "*/*",
    "Host": "httpbin:8000",
    "User-Agent": "curl/8.1.2",
    "X-B3-Parentspanid": "5c6dd2cc9312d6bb",
    "X-B3-Sampled": "1",
    "X-B3-Spanid": "1153a2737cee4434",
    "X-B3-Traceid": "baba86b696edc75a5c6dd2cc9312d6bb",
    "X-Envoy-Attempt-Count": "1",
    "X-Ext-Proc-Header": "hello-to-asm",
    "X-Forwarded-Client-Cert": "By=spiffe://cluster.local/ns/default/sa/httpbin;Hash=69d8f267c3c00b4396a83e12d14520acc9dadb1492d660e10f77e94dcad7cb06;Subject=\"\";URI=spiffe://cluster.local/ns/default/sa/sleep"
  }
}

可以看到,请求头中添加了x-ext-proc-header: hello-to-asm,响应头中添加了x-ext-proc-header: hello-from-asm