通过关联服务发现Nacos调用

对于高QPS的大流量服务(例如图像服务),开启VPC高速直连,可大幅提高访问性能、降低访问延时。但是对于多实例情况,不具备负载均衡能力。如果您当前已有微服务在使用Nacos,可以利用Nacos实现访问控制。本文介绍如何通过挂载Nacos实例进行服务调用。

前提条件

  • 已创建专有网络VPC、交换机和安全组,确保交换机的IP地址段有足够的IP,EAS会将您部署推理服务中的实例IP在弹性网卡中注册。具体操作,请参见创建和管理专有网络管理安全组

    重要

    安全组控制ECS实例的出入流量,用户的ECS实例和EAS服务实例之间的网络互通也受安全组的配置控制。默认普通安全组内,实例之间是内网互通的,您可以在配置VPC高速直连时,选择需要访问EAS服务的ECS实例所在安全组,从而支持实例之间网络互通。当需要配置使用不同的安全组时,请设置安全组的规则支持ECS实例之间能够互通。

  • EAS已开通VPC高速直连功能,详情请参见配置网络连通

  • 已有Nacos实例,详情请参见创建实例

  • 确保Nacos所在的专有网络VPC和交换机与您在高速直连的配置一致。

调用原理

image
  1. 您在指定的专有网络VPC和交换机下创建了一个Nacos实例(与EAS开通VPC高速直连的专有网络VPC和交换机相同);

  2. 您在部署EAS推理服务时,指定挂载的Nacos;

  3. EAS将推理服务对应的Pod注册到您的Nacos实例下;

  4. 客户端启动了一个Nacos服务注册监听;

  5. NacosEAS服务的IP列表变化推送到客户端;

  6. 客户端使用SDK来选择一个实例(SDK封装了负载均衡调度算法:加权轮询WRR);

  7. 使用SDK返回的服务实例的IP:Port,查看调用信息通过IP:Port发起调用;

配置Nacos挂载

新建或更新服务,在服务配置添加Nacos配置,配置支持数组,即如果添加多个Nacos,会将服务同时注册到多个Nacos中,可供用户容灾等场景使用。涉及高速直连和Nacos挂载的具体配置如下:

{
    "cloud": {
        "networking": {
            "security_group_id": "sg-*****",
            "vpc_id": "vpc-***",
            "vswitch_id": "vsw-****"
        }
    },
    "networking": {
        "nacos": [
            {
                "nacos_id": "mse_regserverless_cn-****"
            }
        ]
    }
}

参数

描述

cloud

networking

vpc_id

通过配置VPC、交换机和安全组来启用VPC高速直连。

重要
  • 请使用与Nacos实例一致的专有网络。

  • 请确保交换机的IP地址段有足够的IP。

vswitch_id

security_group_id

networking

nacos

id

表示已创建的Nacos实例ID。

检查Nacos服务注册

服务部署成功后,EAS会将服务注册到您的Nacos实例上,具体的注册信息如下:

  • cluster:默认的DEFAULT集群

  • namespace: 默认的公共命名空间

  • serviceName: 你的服务名称

  • groupName: 系统生成固定值,pai-eas

检查创建的服务信息是否正确:

挂载Nacos

检查服务内的实例数是否正确,实例状态是否正常:

Nacos实例状态

客户端发起调用

Go

 package main

import (
	"fmt"
	"github.com/nacos-group/nacos-sdk-go/v2/clients"
	"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
	"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
	"github.com/nacos-group/nacos-sdk-go/v2/model"
	"github.com/nacos-group/nacos-sdk-go/v2/util"
	"github.com/nacos-group/nacos-sdk-go/v2/vo"
	"strconv"
	"testing"
	"time"
)

var Clients = make(map[string]NacosClientManager)

type NacosClientManager struct {
	userId   string
	endPoint string
	client   naming_client.INamingClient
}

func NewAndGetNacosClient(userId string, endPoint string) (*NacosClientManager, error) {
	key := generateKey(userId, endPoint)
	client, exists := Clients[key]

	if exists {
		return &client, nil
	}

	client, exists = Clients[key]
	if exists {
		return &client, nil
	}

	newClient, err := clients.NewNamingClient(
		vo.NacosClientParam{
			ClientConfig: constant.NewClientConfig(
				constant.WithNamespaceId(""),
				constant.WithTimeoutMs(5000),
				constant.WithNotLoadCacheAtStart(true),
				constant.WithLogDir("/tmp/nacos/log"),
				constant.WithCacheDir("/tmp/nacos/cache"),
				constant.WithLogLevel("debug"),
			),
			ServerConfigs: []constant.ServerConfig{
				*constant.NewServerConfig(endPoint, 8848, constant.WithContextPath("/nacos")),
			},
		},
	)
	if err != nil {
		return nil, err
	}
	nacosClient := NacosClientManager{
		userId:   userId,
		endPoint: endPoint,
		client:   newClient,
	}
	Clients[key] = nacosClient
	return &nacosClient, nil
}

func (p *NacosClientManager) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
	instance, err := p.client.SelectOneHealthyInstance(param)
	if err != nil {
		return nil, fmt.Errorf("SelectOneHealthyInstance failed: %v", err)
	}
	fmt.Printf("SelectOneHealthyInstance success, param: %+v\n", param)
	return instance, nil
}

func (p *NacosClientManager) Subscribe(service string, group string) error {
	subscribeParam := &vo.SubscribeParam{
		ServiceName: service,
		GroupName:   group,
		SubscribeCallback: func(services []model.Instance, err error) {
			fmt.Printf("callback return services:%s \n\n", util.ToJsonString(services))
		},
	}
	return p.client.Subscribe(subscribeParam)
}

func generateKey(userId string, endPoint string) string {
	return userId + fmt.Sprintf("-%s", endPoint)
}

func Test(t *testing.T) {

	nacosClient, err := NewAndGetNacosClient("yourAliyunUid", "yourNacosEndpoint")
	if err != nil {
		panic(err)
	}

	nacosClient.Subscribe("your_service", "pai-eas")

	params := vo.SelectOneHealthInstanceParam{
		ServiceName: "your_service",
		GroupName:   "pai-eas",
	}
	
	instance, err := nacosClient.SelectOneHealthyInstance(params)
	fmt.Println(instance)

    // check your own invoke info on console and replace host by ip:port
	url := fmt.Sprintf("http://%s:%s/api/predict/xxxxxxx", instance.Ip, strconv.FormatUint(instance.Port, 10))
	fmt.Println(url)

    //todo invoke service by url
}    

Python

说明

Nacos官方的Python SDK没有负载均衡算法,需要自行实现WRR负载均衡。

安装依赖

pip install nacos-sdk-python

发起调用

# -*- coding: utf8 -*-
import nacos

# Nacos server configuration
SERVER_ADDRESSES = "yourNacosEndpoint"
NAMESPACE = ""  # Use default namespace if empty
SERVICE_NAME = "your_service"
GROUP_NAME = "pai-eas"

# Create Nacos client
client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)


def callback(args):
    print(args)


if __name__ == '__main__':
    # Correctly pass the callback function as a list
    client.add_config_watchers(SERVICE_NAME, "pai-eas", [callback])

    # Fetch and print naming instance details
    b = client.list_naming_instance(SERVICE_NAME, None, None, GROUP_NAME, True)
    print(b)
    if b"hosts":
        print("ip", b["hosts"][0]["ip"])
        ip = b["hosts"][0]["ip"]
        port = b["hosts"][0]["port"]
        # check your own invoke info on console and replace host by ip:port
        url = f"http://{ip}:{port}/api/predict/xxxxxxx"
        print(url)
     #todo invoke service by url        

Java

添加Maven依赖

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>2.3.2</version>
</dependency>

发起调用

package test;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;



import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;

public class NacosTest {
    public static void main(String[] args) {
        try {
            String serverAddr = "yourNacosEndpoint:8848";
            NamingService namingService = NacosFactory.createNamingService(serverAddr);

            ExecutorService executorService = Executors.newFixedThreadPool(1);
            EventListener serviceListener = new AbstractEventListener() {
                @Override
                public void onEvent(Event event) {
                    if (event instanceof NamingEvent) {
                        System.out.println(((NamingEvent) event).getServiceName());
                        System.out.println(((NamingEvent) event).getGroupName());
                    }
                }

                @Override
                public Executor getExecutor() {
                    return executorService;
                }
            };
            namingService.subscribe("your_service", "pai-eas", serviceListener);
            Instance instance = namingService.selectOneHealthyInstance("your_service", "pai-eas");
            System.out.println(instance.getIp());
            System.out.println(instance.getPort());
            // check your own invoke info on console and replace host by ip:port
            String url = String.format("http://%s:%d/api/predict/xxxxxxx", instance.getIp(), instance.getPort());
            System.out.println(url);
            
            //todo invoke service by url
            
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }
}