本文介绍如何使用Java、Go、NodeJS、Python编程语言实现gRPC的Unary RPC、Server streaming RPC、Client streaming RPC、Client streaming RPC通信模型。

示例工程

gRPC的示例工程请参见hello-servicemesh-grpc,本文档中提到的目录都为hello-servicemesh-grpc下的目录。

步骤一:转换代码

  1. 执行以下命令,安装gRPC和Protobuf,以下以macOS为例。
    brew install grpc protobuf
  2. 将Protobuf定义转换为使用的语言的代码。以下为不同语言的转换方式。
    说明 在示例工程中,每种语言的代码目录中都有一个proto目录,其中的landing.proto文件是示例工程根目录下proto/landing.proto文件的软连接,这样有利于统一更新Protobuf的定义。
    • Java的构建工具Maven提供了自动转换插件protobuf-maven-plugin,执行mvn package会自动使用protoc-gen-grpc-java创建gRPC的模板代码。详情请参见hello-grpc-java/pom.xml
    • Go需要执行go get github.com/golang/protobuf/protoc-gen-go安装protoc-gen-go,然后使用protoc命令生成gRPC代码。详情请参见hello-grpc-go/proto2go.sh
    • NodeJS需要执行npm install -g grpc-tools安装grpc_tools_node_protoc,然后使用protoc命令生成gRPC代码。详情请参见见hello-grpc-nodejs/proto2js.sh
    • Python需要执行pip install grpcio-tools安装grpcio-tools ,然后使用protoc命令生成gRPC代码。详情请参见hello-grpc-python/proto2py.sh

步骤二:设置通信模型

  1. 设置Hello数组。
    private final List<String> HELLO_LIST = Arrays.asList("Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요");
    kv.put("data", HELLO_LIST.get(index));
    var helloList = []string{"Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"}
    kv["data"] = helloList[index]
    let hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]
    kv.set("data", hellos[index])
    hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]
    result.kv["data"] = hellos[index]
  2. 设置通信模型。
    • 设置Unary RPC通信模型。
      // 使用blockingStub与服务端通信
      public TalkResponse talk(TalkRequest talkRequest) {
          return blockingStub.talk(talkRequest);
      }
      //服务端处理请求后触发StreamObserver实例的两个事件onNext和onCompleted
      public void talk(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
          ...
          responseObserver.onNext(response);
          responseObserver.onCompleted();
      }
      func talk(client pb.LandingServiceClient, request *pb.TalkRequest) {
          r, err := client.Talk(context.Background(), request)
      }
      
      func (s *ProtoServer) Talk(ctx context.Context, request *pb.TalkRequest) (*pb.TalkResponse, error) {
          return &pb.TalkResponse{
              Status:  200,
              Results: []*pb.TalkResult{s.buildResult(request.Data)},
          }, nil
      }
      function talk(client, request) {
          client.talk(request, function (err, response) {
                  ...
          })
      }
                      
      function talk(call, callback) {
          const talkResult = buildResult(call.request.getData())
          ...
          callback(null, response)
      }  
      def talk(stub):
          response = stub.talk(request)
          
      def talk(self, request, context):
          result = build_result(request.data)
          ...
          return response
    • 设置Server streaming RPC通信模型。
      public List<TalkResponse> talkOneAnswerMore(TalkRequest request) {
          Iterator<TalkResponse> talkResponses = blockingStub.talkOneAnswerMore(request);
          talkResponses.forEachRemaining(talkResponseList::add);
          return talkResponseList;
      }
      
      public void talkOneAnswerMore(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
          String[] datas = request.getData().split(",");
          for (String data : datas) {...}
          talkResponses.forEach(responseObserver::onNext);
          responseObserver.onCompleted();
      }
      func talkOneAnswerMore(client pb.LandingServiceClient, request *pb.TalkRequest) {
          stream, err := client.TalkOneAnswerMore(context.Background(), request)
          for {
              r, err := stream.Recv()
              if err == io.EOF {
                  break
              }
          ...
          }
      }
      
      func (s *ProtoServer) TalkOneAnswerMore(request *pb.TalkRequest, stream pb.Landing..Server) error {
          datas := strings.Split(request.Data, ",")
          for _, d := range datas {
              stream.Send(&pb.TalkResponse{...})
      }
      function talkOneAnswerMore(client, request) {
          let call = client.talkOneAnswerMore(request)
          call.on('data', function (response) {
              ...
          })
      }
      
      function talkOneAnswerMore(call) {
          let datas = call.request.getData().split(",")
          for (const data in datas) {
              ...
              call.write(response)
          }
          call.end()
      }
      def talk_one_answer_more(stub):
          responses = stub.talkOneAnswerMore(request)
          for response in responses:
              logger.info(response)
      
      def talkOneAnswerMore(self, request, context):
          datas = request.data.split(",")
          for data in datas:
              yield response
    • 设置Client streaming RPC通信模型。
      public void talkMoreAnswerOne(List<TalkRequest> requests) throws InterruptedException {
          final CountDownLatch finishLatch = new CountDownLatch(1);
          StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse> () {
              @Override
              public void onNext(TalkResponse talkResponse) {
                  log.info("Response=\n{}", talkResponse);
              }
              @Override
              public void onCompleted() {
                  finishLatch.countDown();
              }
          };
          final StreamObserver<TalkRequest> requestObserver = asyncStub.talkMoreAnswerOne(responseObserver);
          try {
              requests.forEach(request -> {
                  if (finishLatch.getCount() > 0) {
                      requestObserver.onNext(request);
              });
          requestObserver.onCompleted();
      }
                               
      public StreamObserver<TalkRequest> talkMoreAnswerOne(StreamObserver<TalkResponse> responseObserver) {
          return new StreamObserver<TalkRequest>() {
              @Override
              public void onNext(TalkRequest request) {
                  talkRequests.add(request);
              }
              @Override
              public void onCompleted() {
                  responseObserver.onNext(buildResponse(talkRequests));
                  responseObserver.onCompleted();
              }
          };
      }
      func talkMoreAnswerOne(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
          stream, err := client.TalkMoreAnswerOne(context.Background())
          for _, request := range requests {
              stream.Send(request)
          }
          r, err := stream.CloseAndRecv()
      }
      
      func (s *ProtoServer) TalkMoreAnswerOne(stream pb.LandingService_TalkMoreAnswerOneServer) error {
          for {
              in, err := stream.Recv()
              if err == io.EOF {
                  talkResponse := &pb.TalkResponse{
                      Status:  200,
                      Results: rs,
                  }
                  stream.SendAndClose(talkResponse)
                  return nil
              }
              rs = append(rs, s.buildResult(in.Data))
          }
      }
      function talkMoreAnswerOne(client, requests) {
          let call = client.talkMoreAnswerOne(function (err, response) {
              ...
          })
          requests.forEach(request => {
              call.write(request)
          })
          call.end()
      }
      
      function talkMoreAnswerOne(call, callback) {
          let talkResults = []
          call.on('data', function (request) {
              talkResults.push(buildResult(request.getData()))
          })
          call.on('end', function () {
              let response = new messages.TalkResponse()
              response.setStatus(200)
              response.setResultsList(talkResults)
              callback(null, response)
          })
      }
      def talk_more_answer_one(stub):
          response_summary = stub.talkMoreAnswerOne(request_iterator)
      
      def generate_request():
          for _ in range(0, 3):
              yield request
              
      def talkMoreAnswerOne(self, request_iterator, context):
          for request in request_iterator:
              response.results.append(build_result(request.data))
          return response  
    • 设置Bidirectional streaming RPC通信模型。
      public void talkBidirectional(List<TalkRequest> requests) throws InterruptedException {
          final CountDownLatch finishLatch = new CountDownLatch(1);
          StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
              @Override
              public void onNext(TalkResponse talkResponse) {
                  log.info("Response=\n{}", talkResponse);
              }
              @Override
              public void onCompleted() {
                  finishLatch.countDown();
              }
          };
          final StreamObserver<TalkRequest> requestObserver = asyncStub.talkBidirectional(responseObserver);
          try {
              requests.forEach(request -> {
                  if (finishLatch.getCount() > 0) {
                      requestObserver.onNext(request);
          ...
          requestObserver.onCompleted();
      }
      
      public StreamObserver<TalkRequest> talkBidirectional(StreamObserver<TalkResponse> responseObserver) {
          return new StreamObserver<TalkRequest>() {
              @Override
              public void onNext(TalkRequest request) {
                  responseObserver.onNext(TalkResponse.newBuilder()
                          .setStatus(200)
                          .addResults(buildResult(request.getData())).build());
              }
              @Override
              public void onCompleted() {
                  responseObserver.onCompleted();
              }
          };
      }
      func talkBidirectional(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
          stream, err := client.TalkBidirectional(context.Background())
          waitc := make(chan struct{})
          go func() {
              for {
                  r, err := stream.Recv()
                  if err == io.EOF {
                      // read done.
                      close(waitc)
                      return
                  }
              }
          }()
          for _, request := range requests {
              stream.Send(request)
          }
          stream.CloseSend()
          <-waitc
      }
      
      func (s *ProtoServer) TalkBidirectional(stream pb.LandingService_TalkBidirectionalServer) error {
          for {
              in, err := stream.Recv()
              if err == io.EOF {
                  return nil
              }
              stream.Send(talkResponse)
          }
      }
      function talkBidirectional(client, requests) {
          let call = client.talkBidirectional()
          call.on('data', function (response) {
              ...
          })
          requests.forEach(request => {
              call.write(request)
          })
          call.end()
      }
      
      function talkBidirectional(call) {
          call.on('data', function (request) {
              call.write(response)
          })
          call.on('end', function () {
              call.end()
          })
      }
      def talk_bidirectional(stub):
          responses = stub.talkBidirectional(request_iterator)
          for response in responses:
              logger.info(response)
      
      def talkBidirectional(self, request_iterator, context):
          for request in request_iterator:
              yield response

步骤三:开发功能函数

  1. 设置环境变量。
    private static String getGrcServer() {
        String server = System.getenv("GRPC_SERVER");
        if (server == null) {
            return "localhost";
        }
        return server;
    }
    func grpcServer() string {
        server := os.Getenv("GRPC_SERVER")
        if len(server) == 0 {
            return "localhost"
        } else {
            return server
        }
    }
    function grpcServer() {
        let server = process.env.GRPC_SERVER;
        if (typeof server !== 'undefined' && server !== null) {
            return server
        } else {
            return "localhost"
        }
    }
    def grpc_server():
        server = os.getenv("GRPC_SERVER")
        if server:
            return server
        else:
            return "localhost"
  2. 设置随机数。
    public static String getRandomId() {
        return String.valueOf(random.nextInt(5));
    }
    func randomId(max int) string {
        return strconv.Itoa(rand.Intn(max))
    }
    function randomId(max) {
        return Math.floor(Math.random() * Math.floor(max)).toString()
    }
    def random_id(end):
        return str(random.randint(0, end))
  3. 设置时间戳。
    TalkResult.newBuilder().setId(System.nanoTime())
    result.Id = time.Now().UnixNano()
    result.setId(Math.round(Date.now() / 1000))
    result.id = int((time.time()))
  4. 设置UUID。
    kv.put("id", UUID.randomUUID().toString());
    import (
        "github.com/google/uuid"
    )
    kv["id"] = uuid.New().String()
    kv.set("id", uuid.v1())
    result.kv["id"] = str(uuid.uuid1())
  5. 设置Sleep。
    TimeUnit.SECONDS.sleep(1);
    time.Sleep(2 * time.Millisecond)
    let sleep = require('sleep')
    sleep.msleep(2)
    time.sleep(random.uniform(0.5, 1.5))

结果验证

功能验证

在一个终端启动gRPC服务端,在另一个终端启动gRPC客户端。启动服务端和客户端后,客户端将分别对4个通信接口进行请求。
  • 使用Java语言时启动gRPC服务和客户端。
    mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"
    mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"
  • 使用Go语言时启动gRPC服务和客户端。
    go run server.go
    go run client/proto_client.go
  • 使用NodeJS语言时启动gRPC服务和客户端。
    node proto_server.js
    node proto_client.js
  • 使用Python语言时启动gRPC服务和客户端。
    python server/protoServer.py
    python client/protoClient.py

如果没有出现通信错误,则表示启动gRPC服务和客户端成功。

交叉通信

交叉通信用以确保不同编程语言实现的gRPC通信行为一致。从而保证路由到不同编程语言版本,结果是一致的。

  1. 启动任意一种编程语言的gRPC服务端,以下以Java为例。
    mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"
  2. 使用4种编程语言客户端进行验证。
    mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"
    go run client/proto_client.go
    node proto_client.js
    python client/protoClient.py
    如果没有出现通信错误,则表示交叉通信成功。

构建工程和镜像

完成gRPC服务端和客户端功能验证后,您还可以构建gRPC服务端和客户端的镜像。

构建工程

使用4种语言构建服务端和客户端的工程。
  • Java
    分别构建服务端和客户端的JAR,将其拷贝到Docker目录备用。
    mvn clean install -DskipTests -f server_pom
    cp target/hello-grpc-java.jar ../docker/
    
    mvn clean install -DskipTests -f client_pom
    cp target/hello-grpc-java.jar ../docker/
  • Go
    Go编译的二进制是平台相关的,最终要部署到Linux上,因此构建命令如下。然后将二进制拷贝到Docker目录备用。
    env GOOS=linux GOARCH=amd64 go build -o proto_server server.go
    mv proto_server ../docker/
    
    env GOOS=linux GOARCH=amd64 go build -o proto_client client/proto_client.go
    mv proto_client ../docker/
  • NodeJS
    NodeJS需要在Docker镜像中进行构建,才能支持运行时所需的各种C++依赖。因此这一步主要是拷贝备用。
    cp ../hello-grpc-nodejs/proto_server.js node
    cp ../hello-grpc-nodejs/package.json node
    cp -R ../hello-grpc-nodejs/common node
    cp -R ../proto node
    cp ../hello-grpc-nodejs/*_client.js node
  • Python
    Python无需编译,拷贝备用即可。
    cp -R ../hello-grpc-python/server py
    cp ../hello-grpc-python/start_server.sh py
    cp -R ../proto py
    cp ../hello-grpc-python/proto2py.sh py
    cp -R ../hello-grpc-python/client py
    cp ../hello-grpc-python/start_client.sh py

构建GRPC服务端和客户端镜像

构建完毕后,Docker路径下存储了Dockerfile所需的全部文件,Dockerfile中重点信息说明如下。
  • 基础镜像尽量选择alpine,因为尺寸最小。示例中Python的基础镜像选择的是2.7版本的python:2,您可以根据实际情况修改Python的基础镜像版本。
  • NodeJS需要安装C++及编译器Make,Npm包需要安装grpc-tools。

这里以NodeJS 服务端的镜像作为示例,说明构建镜像的过程。

  1. 创建grpc-server-node.dockerfile文件。
    FROM node:14.11-alpine
    RUN apk add --update \
          python \
          make \
          g++ \
      && rm -rf /var/cache/apk/*
    RUN npm config set registry http://registry.npmmirror.com && npm install -g node-pre-gyp grpc-tools --unsafe-perm
    COPY node/package.json .
    RUN npm install --unsafe-perm
    COPY node .
    ENTRYPOINT ["node","proto_server.js"]
  2. 构建镜像。
    docker build -f grpc-server-node.dockerfile -t registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0 .
    最终会构建出8个镜像。
  3. 使用Push命令将镜像分发到容器镜像ACR中。
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_java:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_java:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_go:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_go:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_node:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_python:1.0.0
    docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_python:1.0.0