本文介绍如何使用Java、Go、NodeJS、Python编程语言实现gRPC的Unary RPC、Server streaming RPC、Client streaming RPC、Client streaming RPC通信模型。
示例工程
gRPC的示例工程请参见hello-servicemesh-grpc,本文档中提到的目录都为hello-servicemesh-grpc下的目录。
步骤一:转换代码
- 执行以下命令,安装gRPC和Protobuf,以下以macOS为例。
brew install grpc protobuf
- 将Protobuf定义转换为使用的语言的代码。以下为不同语言的转换方式。说明 在示例工程中,每种语言的代码目录中都有一个proto目录,其中的landing.proto文件是示例工程根目录下proto/landing.proto文件的软连接,这样有利于统一更新Protobuf的定义。
- Java的构建工具Maven提供了自动转换插件protobuf-maven-plugin,执行
mvn package
会自动使用protoc-gen-grpc-java创建gRPC的模板代码。详情请参见hello-grpc-java/pom.xml。 - Go需要执行
go get github.com/golang/protobuf/protoc-gen-go
安装protoc-gen-go,然后使用protoc命令生成gRPC代码。详情请参见hello-grpc-go/proto2go.sh。 - NodeJS需要执行
npm install -g grpc-tools
安装grpc_tools_node_protoc,然后使用protoc命令生成gRPC代码。详情请参见见hello-grpc-nodejs/proto2js.sh。 - Python需要执行
pip install grpcio-tools
安装grpcio-tools ,然后使用protoc命令生成gRPC代码。详情请参见hello-grpc-python/proto2py.sh。
- Java的构建工具Maven提供了自动转换插件protobuf-maven-plugin,执行
步骤二:设置通信模型
- 设置Hello数组。
private final List<String> HELLO_LIST = Arrays.asList("Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"); kv.put("data", HELLO_LIST.get(index));
var helloList = []string{"Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"} kv["data"] = helloList[index]
let hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"] kv.set("data", hellos[index])
hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"] result.kv["data"] = hellos[index]
- 设置通信模型。
- 设置Unary RPC通信模型。
// 使用blockingStub与服务端通信 public TalkResponse talk(TalkRequest talkRequest) { return blockingStub.talk(talkRequest); } //服务端处理请求后触发StreamObserver实例的两个事件onNext和onCompleted public void talk(TalkRequest request, StreamObserver<TalkResponse> responseObserver) { ... responseObserver.onNext(response); responseObserver.onCompleted(); }
func talk(client pb.LandingServiceClient, request *pb.TalkRequest) { r, err := client.Talk(context.Background(), request) } func (s *ProtoServer) Talk(ctx context.Context, request *pb.TalkRequest) (*pb.TalkResponse, error) { return &pb.TalkResponse{ Status: 200, Results: []*pb.TalkResult{s.buildResult(request.Data)}, }, nil }
function talk(client, request) { client.talk(request, function (err, response) { ... }) } function talk(call, callback) { const talkResult = buildResult(call.request.getData()) ... callback(null, response) }
def talk(stub): response = stub.talk(request) def talk(self, request, context): result = build_result(request.data) ... return response
- 设置Server streaming RPC通信模型。
public List<TalkResponse> talkOneAnswerMore(TalkRequest request) { Iterator<TalkResponse> talkResponses = blockingStub.talkOneAnswerMore(request); talkResponses.forEachRemaining(talkResponseList::add); return talkResponseList; } public void talkOneAnswerMore(TalkRequest request, StreamObserver<TalkResponse> responseObserver) { String[] datas = request.getData().split(","); for (String data : datas) {...} talkResponses.forEach(responseObserver::onNext); responseObserver.onCompleted(); }
func talkOneAnswerMore(client pb.LandingServiceClient, request *pb.TalkRequest) { stream, err := client.TalkOneAnswerMore(context.Background(), request) for { r, err := stream.Recv() if err == io.EOF { break } ... } } func (s *ProtoServer) TalkOneAnswerMore(request *pb.TalkRequest, stream pb.Landing..Server) error { datas := strings.Split(request.Data, ",") for _, d := range datas { stream.Send(&pb.TalkResponse{...}) }
function talkOneAnswerMore(client, request) { let call = client.talkOneAnswerMore(request) call.on('data', function (response) { ... }) } function talkOneAnswerMore(call) { let datas = call.request.getData().split(",") for (const data in datas) { ... call.write(response) } call.end() }
def talk_one_answer_more(stub): responses = stub.talkOneAnswerMore(request) for response in responses: logger.info(response) def talkOneAnswerMore(self, request, context): datas = request.data.split(",") for data in datas: yield response
- 设置Client streaming RPC通信模型。
public void talkMoreAnswerOne(List<TalkRequest> requests) throws InterruptedException { final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse> () { @Override public void onNext(TalkResponse talkResponse) { log.info("Response=\n{}", talkResponse); } @Override public void onCompleted() { finishLatch.countDown(); } }; final StreamObserver<TalkRequest> requestObserver = asyncStub.talkMoreAnswerOne(responseObserver); try { requests.forEach(request -> { if (finishLatch.getCount() > 0) { requestObserver.onNext(request); }); requestObserver.onCompleted(); } public StreamObserver<TalkRequest> talkMoreAnswerOne(StreamObserver<TalkResponse> responseObserver) { return new StreamObserver<TalkRequest>() { @Override public void onNext(TalkRequest request) { talkRequests.add(request); } @Override public void onCompleted() { responseObserver.onNext(buildResponse(talkRequests)); responseObserver.onCompleted(); } }; }
func talkMoreAnswerOne(client pb.LandingServiceClient, requests []*pb.TalkRequest) { stream, err := client.TalkMoreAnswerOne(context.Background()) for _, request := range requests { stream.Send(request) } r, err := stream.CloseAndRecv() } func (s *ProtoServer) TalkMoreAnswerOne(stream pb.LandingService_TalkMoreAnswerOneServer) error { for { in, err := stream.Recv() if err == io.EOF { talkResponse := &pb.TalkResponse{ Status: 200, Results: rs, } stream.SendAndClose(talkResponse) return nil } rs = append(rs, s.buildResult(in.Data)) } }
function talkMoreAnswerOne(client, requests) { let call = client.talkMoreAnswerOne(function (err, response) { ... }) requests.forEach(request => { call.write(request) }) call.end() } function talkMoreAnswerOne(call, callback) { let talkResults = [] call.on('data', function (request) { talkResults.push(buildResult(request.getData())) }) call.on('end', function () { let response = new messages.TalkResponse() response.setStatus(200) response.setResultsList(talkResults) callback(null, response) }) }
def talk_more_answer_one(stub): response_summary = stub.talkMoreAnswerOne(request_iterator) def generate_request(): for _ in range(0, 3): yield request def talkMoreAnswerOne(self, request_iterator, context): for request in request_iterator: response.results.append(build_result(request.data)) return response
- 设置Bidirectional streaming RPC通信模型。
public void talkBidirectional(List<TalkRequest> requests) throws InterruptedException { final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() { @Override public void onNext(TalkResponse talkResponse) { log.info("Response=\n{}", talkResponse); } @Override public void onCompleted() { finishLatch.countDown(); } }; final StreamObserver<TalkRequest> requestObserver = asyncStub.talkBidirectional(responseObserver); try { requests.forEach(request -> { if (finishLatch.getCount() > 0) { requestObserver.onNext(request); ... requestObserver.onCompleted(); } public StreamObserver<TalkRequest> talkBidirectional(StreamObserver<TalkResponse> responseObserver) { return new StreamObserver<TalkRequest>() { @Override public void onNext(TalkRequest request) { responseObserver.onNext(TalkResponse.newBuilder() .setStatus(200) .addResults(buildResult(request.getData())).build()); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }
func talkBidirectional(client pb.LandingServiceClient, requests []*pb.TalkRequest) { stream, err := client.TalkBidirectional(context.Background()) waitc := make(chan struct{}) go func() { for { r, err := stream.Recv() if err == io.EOF { // read done. close(waitc) return } } }() for _, request := range requests { stream.Send(request) } stream.CloseSend() <-waitc } func (s *ProtoServer) TalkBidirectional(stream pb.LandingService_TalkBidirectionalServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } stream.Send(talkResponse) } }
function talkBidirectional(client, requests) { let call = client.talkBidirectional() call.on('data', function (response) { ... }) requests.forEach(request => { call.write(request) }) call.end() } function talkBidirectional(call) { call.on('data', function (request) { call.write(response) }) call.on('end', function () { call.end() }) }
def talk_bidirectional(stub): responses = stub.talkBidirectional(request_iterator) for response in responses: logger.info(response) def talkBidirectional(self, request_iterator, context): for request in request_iterator: yield response
- 设置Unary RPC通信模型。
步骤三:开发功能函数
- 设置环境变量。
private static String getGrcServer() { String server = System.getenv("GRPC_SERVER"); if (server == null) { return "localhost"; } return server; }
func grpcServer() string { server := os.Getenv("GRPC_SERVER") if len(server) == 0 { return "localhost" } else { return server } }
function grpcServer() { let server = process.env.GRPC_SERVER; if (typeof server !== 'undefined' && server !== null) { return server } else { return "localhost" } }
def grpc_server(): server = os.getenv("GRPC_SERVER") if server: return server else: return "localhost"
- 设置随机数。
public static String getRandomId() { return String.valueOf(random.nextInt(5)); }
func randomId(max int) string { return strconv.Itoa(rand.Intn(max)) }
function randomId(max) { return Math.floor(Math.random() * Math.floor(max)).toString() }
def random_id(end): return str(random.randint(0, end))
- 设置时间戳。
TalkResult.newBuilder().setId(System.nanoTime())
result.Id = time.Now().UnixNano()
result.setId(Math.round(Date.now() / 1000))
result.id = int((time.time()))
- 设置UUID。
kv.put("id", UUID.randomUUID().toString());
import ( "github.com/google/uuid" ) kv["id"] = uuid.New().String()
kv.set("id", uuid.v1())
result.kv["id"] = str(uuid.uuid1())
- 设置Sleep。
TimeUnit.SECONDS.sleep(1);
time.Sleep(2 * time.Millisecond)
let sleep = require('sleep') sleep.msleep(2)
time.sleep(random.uniform(0.5, 1.5))
结果验证
功能验证
在一个终端启动gRPC服务端,在另一个终端启动gRPC客户端。启动服务端和客户端后,客户端将分别对4个通信接口进行请求。
- 使用Java语言时启动gRPC服务和客户端。
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"
- 使用Go语言时启动gRPC服务和客户端。
go run server.go
go run client/proto_client.go
- 使用NodeJS语言时启动gRPC服务和客户端。
node proto_server.js
node proto_client.js
- 使用Python语言时启动gRPC服务和客户端。
python server/protoServer.py
python client/protoClient.py
如果没有出现通信错误,则表示启动gRPC服务和客户端成功。
交叉通信
交叉通信用以确保不同编程语言实现的gRPC通信行为一致。从而保证路由到不同编程语言版本,结果是一致的。
- 启动任意一种编程语言的gRPC服务端,以下以Java为例。
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"
- 使用4种编程语言客户端进行验证。
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"
go run client/proto_client.go
node proto_client.js
python client/protoClient.py
如果没有出现通信错误,则表示交叉通信成功。
构建工程和镜像
完成gRPC服务端和客户端功能验证后,您还可以构建gRPC服务端和客户端的镜像。
构建工程
使用4种语言构建服务端和客户端的工程。
- Java
分别构建服务端和客户端的JAR,将其拷贝到Docker目录备用。
mvn clean install -DskipTests -f server_pom cp target/hello-grpc-java.jar ../docker/ mvn clean install -DskipTests -f client_pom cp target/hello-grpc-java.jar ../docker/
- Go
Go编译的二进制是平台相关的,最终要部署到Linux上,因此构建命令如下。然后将二进制拷贝到Docker目录备用。
env GOOS=linux GOARCH=amd64 go build -o proto_server server.go mv proto_server ../docker/ env GOOS=linux GOARCH=amd64 go build -o proto_client client/proto_client.go mv proto_client ../docker/
- NodeJS
NodeJS需要在Docker镜像中进行构建,才能支持运行时所需的各种C++依赖。因此这一步主要是拷贝备用。
cp ../hello-grpc-nodejs/proto_server.js node cp ../hello-grpc-nodejs/package.json node cp -R ../hello-grpc-nodejs/common node cp -R ../proto node cp ../hello-grpc-nodejs/*_client.js node
- Python
Python无需编译,拷贝备用即可。
cp -R ../hello-grpc-python/server py cp ../hello-grpc-python/start_server.sh py cp -R ../proto py cp ../hello-grpc-python/proto2py.sh py cp -R ../hello-grpc-python/client py cp ../hello-grpc-python/start_client.sh py
构建GRPC服务端和客户端镜像
构建完毕后,Docker路径下存储了Dockerfile所需的全部文件,Dockerfile中重点信息说明如下。
- 基础镜像尽量选择alpine,因为尺寸最小。示例中Python的基础镜像选择的是2.7版本的python:2,您可以根据实际情况修改Python的基础镜像版本。
- NodeJS需要安装C++及编译器Make,Npm包需要安装grpc-tools。
这里以NodeJS 服务端的镜像作为示例,说明构建镜像的过程。
- 创建grpc-server-node.dockerfile文件。
FROM node:14.11-alpine RUN apk add --update \ python \ make \ g++ \ && rm -rf /var/cache/apk/* RUN npm config set registry http://registry.npmmirror.com && npm install -g node-pre-gyp grpc-tools --unsafe-perm COPY node/package.json . RUN npm install --unsafe-perm COPY node . ENTRYPOINT ["node","proto_server.js"]
- 构建镜像。
docker build -f grpc-server-node.dockerfile -t registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0 .
最终会构建出8个镜像。 - 使用Push命令将镜像分发到容器镜像ACR中。
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_java:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_java:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_go:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_go:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_node:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_python:1.0.0
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_python:1.0.0