程式語言:Go & Python
GitHub
功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言
Client
Client
Client
Client
Golang 如何使用 grpc
- Go Package:
- google.golang.org/grpc
- github.com/golang/protobuf/protoc-gen-go
- Python Module:
- grpcio
- grpcio-tools
GitHub
功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言
Go 基本步驟
- 官方教學
- 簡言之
- 下載 protoc.exe,並將路徑放進 PATH
go get -u google.golang.org/grpc
go get -u github.com/golang/protobuf/protoc-gen-go
protoc -I <所在資料夾> <proto檔案> --go_out=plugins=grpc:<所在資料夾>
- 產生一個檔案
- <檔名>.pb.go
Python 基本步驟
- 官方教學
- 簡言之
python -m pip install grpcio
python -m pip install grpcio-tools
python -m grpc_tools.protoc -I <所在資料夾> --python_out=<輸出位置> --grpc_python_out=<輸出位置> <proto檔案>
- 產生兩個檔案
- <檔名>_pb2.py
- <檔名>_pb2_grpc.py
Message 定義
message Int { int32 value = 1; }
基本用法
// A simple RPC. rpc Double(Int) returns (Int) {}Server
- Go
- func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
- rsp := &pb.Int{Value: Int.Value * 2}
- return rsp, nil
- }
- Python
- def Double(self, Int, context):
- rsp = helloWorld_pb2.Int(value=Int.value*2)
- return rsp
Client
- Go
- func getDouble(client pb.HelloWorldClient, value int32) {
- i := &pb.Int{Value: value}
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- rsp, err := client.Double(ctx, i)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
- }
- Python
- def getDoubleSync(stub, value):
- req = helloWorld_pb2.Int(value=value)
- rsp = stub.Double(req)
- print(f"double {value} => {rsp.value}")
- def getDoubleAsync(stub, value):
- req = helloWorld_pb2.Int(value=value)
- rspFuture = stub.Double.future(req)
- rsp = rspFuture.result()
- print("Async")
- print(f"double {value} => {rsp.value}")
Server-to-Client Streaming
// A server-to-client streaming RPC. rpc Range(Int) returns (stream Int) {}Server
- Go
- func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
- var i int32 = 0
- for ; i < Int.Value; i++ {
- rsp := &pb.Int{Value: i}
- if err := stream.Send(rsp); err != nil {
- return err
- }
- }
- return nil
- }
- Python
- def Range(self, Int, context):
- for i in range(Int.value):
- rsp = helloWorld_pb2.Int(value=i)
- yield rsp
Client
- Go
- func getRange(client pb.HelloWorldClient, value int32) {
- _i := &pb.Int{Value: value}
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- stream, err := client.Range(ctx, _i)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- for {
- i, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Fatalf("%v.Range(_) = _, %v", client, err)
- }
- fmt.Printf("range %d => %d\n", _i.Value, i.Value)
- }
- }
- Python
- def getRange(stub, value):
- req = helloWorld_pb2.Int(value=value)
- rsp = stub.Range(req)
- for i in rsp:
- print(f"range {value} => {i.value}")
Client-to-Server Streaming
// A client-to-server streaming RPC. rpc Sum(stream Int) returns (Int) {}Server
- Go
- func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
- var result int32 = 0
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- rsp := &pb.Int{Value: result}
- return stream.SendAndClose(rsp)
- }
- if err != nil {
- return err
- }
- result += in.Value
- }
- }
- Python
- def Sum(self, Int_iterator, context):
- result = 0
- for i in Int_iterator:
- result += i.value
- rsp = helloWorld_pb2.Int(value=result)
- return rsp
Client
- Go
- func getSum(client pb.HelloWorldClient, value int) {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- stream, err := client.Sum(ctx)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- for i := 0; i < value; i++ {
- _i := &pb.Int{Value: int32(i)}
- if err := stream.Send(_i); err != nil {
- log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
- }
- }
- reply, err := stream.CloseAndRecv()
- if err != nil {
- log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
- }
- fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
- }
- Python
- def getSumSync(stub, value):
- req = (helloWorld_pb2.Int(value=i) for i in range(value))
- rsp = stub.Sum(req)
- print(f"Sum range({value}) => {rsp.value}")
- def getSumAsync(stub, value):
- req = (helloWorld_pb2.Int(value=i) for i in range(value))
- rspFuture = stub.Sum.future(req)
- rsp = rspFuture.result()
- print("Async")
- print(f"Sum range({value}) => {rsp.value}")
Bidirectional Streaming
// A Bidirectional streaming RPC. rpc DoubleIter(stream Int) returns (stream Int) {}Server
- Go
- func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- rsp := &pb.Int{Value: in.Value * 2}
- if err := stream.Send(rsp); err != nil {
- return err
- }
- }
- }
- Python
- def DoubleIter(self, Int_iterator, context):
- for i in Int_iterator:
- rsp = helloWorld_pb2.Int(value=i.value*2)
- yield rsp
Client
- Go
- func getDoubleIter(client pb.HelloWorldClient, value int) {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- stream, err := client.DoubleIter(ctx)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- waitc := make(chan struct{})
- go func() {
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- // read done.
- close(waitc)
- return
- }
- if err != nil {
- log.Fatalf("Failed to receive: %v", err)
- }
- fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
- }
- }()
- for i := 0; i < value; i++ {
- _i := &pb.Int{Value: int32(i)}
- if err := stream.Send(_i); err != nil {
- log.Fatalf("Failed to send: %v", err)
- }
- }
- stream.CloseSend()
- <-waitc
- }
- Python
- def getDoubleIter(stub, value):
- req = (helloWorld_pb2.Int(value=i) for i in range(value))
- rsp = stub.DoubleIter(req)
- for i in rsp:
- print(f"double range({value}) => {i.value * 2}")
Proto 程式碼
helloWorld.proto
- syntax = "proto3";
- package helloworld;
- // Interface exported by the server.
- service HelloWorld {
- // A simple RPC.
- rpc Double(Int) returns (Int) {}
- // A server-to-client streaming RPC.
- rpc Range(Int) returns (stream Int) {}
- // A client-to-server streaming RPC.
- rpc Sum(stream Int) returns (Int) {}
- // A Bidirectional streaming RPC.
- rpc DoubleIter(stream Int) returns (stream Int) {}
- }
- message Int { int32 value = 1; }
Go 程式碼
架構
helloWorld_server.go
helloWorld_client.go
go ├─client │ helloWorld_client.go │ ├─protos │ helloWorld.pb.go │ helloWorld.proto │ └─server helloWorld_server.go
helloWorld_server.go
- //The Go implementation of the gRPC server.
- package main
- import (
- "context"
- "fmt"
- "io"
- "log"
- "net"
- "google.golang.org/grpc"
- pb "../protos"
- )
- type helloWorldServer struct {
- }
- func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
- rsp := &pb.Int{Value: Int.Value * 2}
- return rsp, nil
- }
- func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
- var i int32 = 0
- for ; i < Int.Value; i++ {
- rsp := &pb.Int{Value: i}
- if err := stream.Send(rsp); err != nil {
- return err
- }
- }
- return nil
- }
- func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
- var result int32 = 0
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- rsp := &pb.Int{Value: result}
- return stream.SendAndClose(rsp)
- }
- if err != nil {
- return err
- }
- result += in.Value
- }
- }
- func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- rsp := &pb.Int{Value: in.Value * 2}
- if err := stream.Send(rsp); err != nil {
- return err
- }
- }
- }
- func newServer() *helloWorldServer {
- s := &helloWorldServer{}
- return s
- }
- func main() {
- port := 50051
- addr := fmt.Sprintf("localhost:%d", port)
- lis, err := net.Listen("tcp", addr)
- if err != nil {
- log.Fatalf("failed to listen: %v", err)
- }
- fmt.Printf("Listen to %s\n", addr)
- var opts []grpc.ServerOption
- grpcServer := grpc.NewServer(opts...)
- pb.RegisterHelloWorldServer(grpcServer, newServer())
- grpcServer.Serve(lis)
- }
helloWorld_client.go
- //The Go implementation of the gRPC client.
- package main
- import (
- "context"
- "fmt"
- "io"
- "log"
- "time"
- pb "../protos"
- "google.golang.org/grpc"
- )
- func getDouble(client pb.HelloWorldClient, value int32) {
- i := &pb.Int{Value: value}
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- rsp, err := client.Double(ctx, i)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
- }
- func getRange(client pb.HelloWorldClient, value int32) {
- _i := &pb.Int{Value: value}
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- stream, err := client.Range(ctx, _i)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- for {
- i, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Fatalf("%v.Range(_) = _, %v", client, err)
- }
- fmt.Printf("range %d => %d\n", _i.Value, i.Value)
- }
- }
- func getSum(client pb.HelloWorldClient, value int) {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- stream, err := client.Sum(ctx)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- for i := 0; i < value; i++ {
- _i := &pb.Int{Value: int32(i)}
- if err := stream.Send(_i); err != nil {
- log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
- }
- }
- reply, err := stream.CloseAndRecv()
- if err != nil {
- log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
- }
- fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
- }
- func getDoubleIter(client pb.HelloWorldClient, value int) {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- stream, err := client.DoubleIter(ctx)
- if err != nil {
- log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
- }
- waitc := make(chan struct{})
- go func() {
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- // read done.
- close(waitc)
- return
- }
- if err != nil {
- log.Fatalf("Failed to receive: %v", err)
- }
- fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
- }
- }()
- for i := 0; i < value; i++ {
- _i := &pb.Int{Value: int32(i)}
- if err := stream.Send(_i); err != nil {
- log.Fatalf("Failed to send: %v", err)
- }
- }
- stream.CloseSend()
- <-waitc
- }
- func main() {
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- serverAddr := "localhost:50051"
- conn, err := grpc.Dial(serverAddr, opts...)
- if err != nil {
- log.Fatalf("fail to dial: %v", err)
- }
- defer conn.Close()
- client := pb.NewHelloWorldClient(conn)
- fmt.Println("-------------- Double-------------------")
- getDouble(client, 5)
- fmt.Println("-------------- Range -------------------")
- getRange(client, 5)
- fmt.Println("-------------- Sum----------------------")
- getSum(client, 5)
- fmt.Println("-------------- DoubleIter --------------")
- getDoubleIter(client, 5)
- }
- // Output
- // -------------- Double-------------------
- // double 5 => 10
- // -------------- Range -------------------
- // range 5 => 0
- // range 5 => 1
- // range 5 => 2
- // range 5 => 3
- // range 5 => 4
- // -------------- Sum----------------------
- // Sum range(5) => 10
- // -------------- DoubleIter --------------
- // double range(5) => 0
- // double range(5) => 4
- // double range(5) => 8
- // double range(5) => 12
- // double range(5) => 16
Python 程式碼
架構
helloWorld_server.py
helloWorld_client.py
python │ helloWorld_client.py │ helloWorld_pb2.py │ helloWorld_pb2_grpc.py │ helloWorld_server.py │ └─protos helloWorld.proto
helloWorld_server.py
- """The Python implementation of the gRPC server."""
- from concurrent import futures
- import logging
- import time
- import grpc
- import helloWorld_pb2
- import helloWorld_pb2_grpc
- _ONE_DAY_IN_SECONDS = 60 * 60 * 24
- class HelloWorldServicer(helloWorld_pb2_grpc.HelloWorldServicer):
- """Provides methods that implement functionality of hello world server."""
- def __init__(self):
- pass
- def Double(self, Int, context):
- rsp = helloWorld_pb2.Int(value=Int.value*2)
- return rsp
- def Range(self, Int, context):
- for i in range(Int.value):
- rsp = helloWorld_pb2.Int(value=i)
- yield rsp
- def Sum(self, Int_iterator, context):
- result = 0
- for i in Int_iterator:
- result += i.value
- rsp = helloWorld_pb2.Int(value=result)
- return rsp
- def DoubleIter(self, Int_iterator, context):
- for i in Int_iterator:
- rsp = helloWorld_pb2.Int(value=i.value*2)
- yield rsp
- def serve():
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- helloWorld_pb2_grpc.add_HelloWorldServicer_to_server(
- HelloWorldServicer(), server)
- server.add_insecure_port('[::]:50051')
- server.start()
- print("Listen to", "[::]:50051")
- try:
- while True:
- time.sleep(_ONE_DAY_IN_SECONDS)
- except KeyboardInterrupt:
- server.stop(0)
- if __name__ == '__main__':
- logging.basicConfig()
- serve()
helloWorld_client.py
- """The Python implementation of the gRPC client."""
- import logging
- import grpc
- import helloWorld_pb2
- import helloWorld_pb2_grpc
- def getDoubleSync(stub, value):
- req = helloWorld_pb2.Int(value=value)
- rsp = stub.Double(req)
- print(f"double {value} => {rsp.value}")
- def getDoubleAsync(stub, value):
- req = helloWorld_pb2.Int(value=value)
- rspFuture = stub.Double.future(req)
- rsp = rspFuture.result()
- print("Async")
- print(f"double {value} => {rsp.value}")
- def getRange(stub, value):
- req = helloWorld_pb2.Int(value=value)
- rsp = stub.Range(req)
- for i in rsp:
- print(f"range {value} => {i.value}")
- def getSumSync(stub, value):
- req = (helloWorld_pb2.Int(value=i) for i in range(value))
- rsp = stub.Sum(req)
- print(f"Sum range({value}) => {rsp.value}")
- def getSumAsync(stub, value):
- req = (helloWorld_pb2.Int(value=i) for i in range(value))
- rspFuture = stub.Sum.future(req)
- rsp = rspFuture.result()
- print("Async")
- print(f"Sum range({value}) => {rsp.value}")
- def getDoubleIter(stub, value):
- req = (helloWorld_pb2.Int(value=i) for i in range(value))
- rsp = stub.DoubleIter(req)
- for i in rsp:
- print(f"double range({value}) => {i.value * 2}")
- def run():
- # NOTE(gRPC Python Team): .close() is possible on a channel and should be
- # used in circumstances in which the with statement does not fit the needs
- # of the code.
- with grpc.insecure_channel("localhost:50051") as channel:
- stub = helloWorld_pb2_grpc.HelloWorldStub(channel)
- print("-------------- Double Sync--------------")
- getDoubleSync(stub, 5)
- print("-------------- Double Async-------------")
- getDoubleAsync(stub, 5)
- print("-------------- Range -------------------")
- getRange(stub, 5)
- print("-------------- Sum Sync-----------------")
- getSumSync(stub, 5)
- print("-------------- Sum Async----------------")
- getSumAsync(stub, 5)
- print("-------------- DoubleIter --------------")
- getDoubleIter(stub, 5)
- if __name__ == "__main__":
- logging.basicConfig()
- run()
- # Output
- # -------------- Double Sync--------------
- # double 5 => 10
- # -------------- Double Async-------------
- # Async
- # double 5 => 10
- # -------------- Range -------------------
- # range 5 => 0
- # range 5 => 1
- # range 5 => 2
- # range 5 => 3
- # range 5 => 4
- # -------------- Sum Sync-----------------
- # Sum range(5) => 10
- # -------------- Sum Async----------------
- # Async
- # Sum range(5) => 10
- # -------------- DoubleIter --------------
- # double range(5) => 0
- # double range(5) => 4
- # double range(5) => 8
- # double range(5) => 12
- # double range(5) => 16
參考
gRPC Tutorial for PythonGolang 如何使用 grpc
留言
張貼留言