对于高QPS的大流量服务(例如图像服务),开启VPC高速直连,可大幅提高访问性能、降低访问延时。但是对于多实例情况,不具备负载均衡能力。如果您当前已有微服务在使用Nacos,可以利用Nacos实现访问控制。本文介绍如何通过挂载Nacos实例进行服务调用。
前提条件
已创建专有网络VPC、交换机和安全组,确保交换机的IP地址段有足够的IP,EAS会将您部署推理服务中的实例IP在弹性网卡中注册。具体操作,请参见创建和管理专有网络和管理安全组。
重要安全组控制ECS实例的出入流量,用户的ECS实例和EAS服务实例之间的网络互通也受安全组的配置控制。默认普通安全组内,实例之间是内网互通的,您可以在配置VPC高速直连时,选择需要访问EAS服务的ECS实例所在安全组,从而支持实例之间网络互通。当需要配置使用不同的安全组时,请设置安全组的规则支持ECS实例之间能够互通。
EAS已开通VPC高速直连功能,详情请参见配置网络连通。
已有Nacos实例,详情请参见创建实例。
确保Nacos所在的专有网络VPC和交换机与您在高速直连的配置一致。
调用原理
您在指定的专有网络VPC和交换机下创建了一个Nacos实例(与EAS开通VPC高速直连的专有网络VPC和交换机相同);
您在部署EAS推理服务时,指定挂载的Nacos;
EAS将推理服务对应的Pod注册到您的Nacos实例下;
客户端启动了一个Nacos服务注册监听;
Nacos将EAS服务的IP列表变化推送到客户端;
客户端使用SDK来选择一个实例(SDK封装了负载均衡调度算法:加权轮询WRR);
使用SDK返回的服务实例的IP:Port,查看调用信息通过IP:Port发起调用;
配置Nacos挂载
新建或更新服务,在服务配置添加Nacos配置,配置支持数组,即如果添加多个Nacos,会将服务同时注册到多个Nacos中,可供用户容灾等场景使用。涉及高速直连和Nacos挂载的具体配置如下:
{
"cloud": {
"networking": {
"security_group_id": "sg-*****",
"vpc_id": "vpc-***",
"vswitch_id": "vsw-****"
}
},
"networking": {
"nacos": [
{
"nacos_id": "mse_regserverless_cn-****"
}
]
}
}
参数 | 描述 | ||
cloud | networking | vpc_id | 通过配置VPC、交换机和安全组来启用VPC高速直连。 重要
|
vswitch_id | |||
security_group_id | |||
networking | nacos | id | 表示已创建的Nacos实例ID。 |
检查Nacos服务注册
服务部署成功后,EAS会将服务注册到您的Nacos实例上,具体的注册信息如下:
cluster:默认的DEFAULT集群
namespace: 默认的公共命名空间
serviceName: 你的服务名称
groupName: 系统生成固定值,pai-eas
检查创建的服务信息是否正确:
检查服务内的实例数是否正确,实例状态是否正常:
客户端发起调用
Go
package main
import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/util"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"strconv"
"testing"
"time"
)
var Clients = make(map[string]NacosClientManager)
type NacosClientManager struct {
userId string
endPoint string
client naming_client.INamingClient
}
func NewAndGetNacosClient(userId string, endPoint string) (*NacosClientManager, error) {
key := generateKey(userId, endPoint)
client, exists := Clients[key]
if exists {
return &client, nil
}
client, exists = Clients[key]
if exists {
return &client, nil
}
newClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: constant.NewClientConfig(
constant.WithNamespaceId(""),
constant.WithTimeoutMs(5000),
constant.WithNotLoadCacheAtStart(true),
constant.WithLogDir("/tmp/nacos/log"),
constant.WithCacheDir("/tmp/nacos/cache"),
constant.WithLogLevel("debug"),
),
ServerConfigs: []constant.ServerConfig{
*constant.NewServerConfig(endPoint, 8848, constant.WithContextPath("/nacos")),
},
},
)
if err != nil {
return nil, err
}
nacosClient := NacosClientManager{
userId: userId,
endPoint: endPoint,
client: newClient,
}
Clients[key] = nacosClient
return &nacosClient, nil
}
func (p *NacosClientManager) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
instance, err := p.client.SelectOneHealthyInstance(param)
if err != nil {
return nil, fmt.Errorf("SelectOneHealthyInstance failed: %v", err)
}
fmt.Printf("SelectOneHealthyInstance success, param: %+v\n", param)
return instance, nil
}
func (p *NacosClientManager) Subscribe(service string, group string) error {
subscribeParam := &vo.SubscribeParam{
ServiceName: service,
GroupName: group,
SubscribeCallback: func(services []model.Instance, err error) {
fmt.Printf("callback return services:%s \n\n", util.ToJsonString(services))
},
}
return p.client.Subscribe(subscribeParam)
}
func generateKey(userId string, endPoint string) string {
return userId + fmt.Sprintf("-%s", endPoint)
}
func Test(t *testing.T) {
nacosClient, err := NewAndGetNacosClient("yourAliyunUid", "yourNacosEndpoint")
if err != nil {
panic(err)
}
nacosClient.Subscribe("your_service", "pai-eas")
params := vo.SelectOneHealthInstanceParam{
ServiceName: "your_service",
GroupName: "pai-eas",
}
instance, err := nacosClient.SelectOneHealthyInstance(params)
fmt.Println(instance)
// check your own invoke info on console and replace host by ip:port
url := fmt.Sprintf("http://%s:%s/api/predict/xxxxxxx", instance.Ip, strconv.FormatUint(instance.Port, 10))
fmt.Println(url)
//todo invoke service by url
}
Python
Nacos官方的Python SDK没有负载均衡算法,需要自行实现WRR负载均衡。
安装依赖
pip install nacos-sdk-python
发起调用
# -*- coding: utf8 -*-
import nacos
# Nacos server configuration
SERVER_ADDRESSES = "yourNacosEndpoint"
NAMESPACE = "" # Use default namespace if empty
SERVICE_NAME = "your_service"
GROUP_NAME = "pai-eas"
# Create Nacos client
client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
def callback(args):
print(args)
if __name__ == '__main__':
# Correctly pass the callback function as a list
client.add_config_watchers(SERVICE_NAME, "pai-eas", [callback])
# Fetch and print naming instance details
b = client.list_naming_instance(SERVICE_NAME, None, None, GROUP_NAME, True)
print(b)
if b"hosts":
print("ip", b["hosts"][0]["ip"])
ip = b["hosts"][0]["ip"]
port = b["hosts"][0]["port"]
# check your own invoke info on console and replace host by ip:port
url = f"http://{ip}:{port}/api/predict/xxxxxxx"
print(url)
#todo invoke service by url
Java
添加Maven依赖
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.3.2</version>
</dependency>
发起调用
package test;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
public class NacosTest {
public static void main(String[] args) {
try {
String serverAddr = "yourNacosEndpoint:8848";
NamingService namingService = NacosFactory.createNamingService(serverAddr);
ExecutorService executorService = Executors.newFixedThreadPool(1);
EventListener serviceListener = new AbstractEventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getGroupName());
}
}
@Override
public Executor getExecutor() {
return executorService;
}
};
namingService.subscribe("your_service", "pai-eas", serviceListener);
Instance instance = namingService.selectOneHealthyInstance("your_service", "pai-eas");
System.out.println(instance.getIp());
System.out.println(instance.getPort());
// check your own invoke info on console and replace host by ip:port
String url = String.format("http://%s:%d/api/predict/xxxxxxx", instance.getIp(), instance.getPort());
System.out.println(url);
//todo invoke service by url
} catch (NacosException e) {
e.printStackTrace();
}
}
}