- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Go & Python
GitHub
功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言
Client
Client
Client
Client
Golang 如何使用 grpc
- Go Package:
- google.golang.org/grpc
- github.com/golang/protobuf/protoc-gen-go
- Python Module:
- grpcio
- grpcio-tools
GitHub
功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言
Go 基本步驟
- 官方教學
- 簡言之
- 下載 protoc.exe,並將路徑放進 PATH
go get -u google.golang.org/grpc
go get -u github.com/golang/protobuf/protoc-gen-go
protoc -I <所在資料夾> <proto檔案> --go_out=plugins=grpc:<所在資料夾>
- 產生一個檔案
- <檔名>.pb.go
Python 基本步驟
- 官方教學
- 簡言之
python -m pip install grpcio
python -m pip install grpcio-tools
python -m grpc_tools.protoc -I <所在資料夾> --python_out=<輸出位置> --grpc_python_out=<輸出位置> <proto檔案>
- 產生兩個檔案
- <檔名>_pb2.py
- <檔名>_pb2_grpc.py
Message 定義
message Int { int32 value = 1; }
基本用法
// A simple RPC. rpc Double(Int) returns (Int) {}Server
- Go
func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) { rsp := &pb.Int{Value: Int.Value * 2} return rsp, nil }
- Python
def Double(self, Int, context): rsp = helloWorld_pb2.Int(value=Int.value*2) return rsp
Client
- Go
func getDouble(client pb.HelloWorldClient, value int32) { i := &pb.Int{Value: value} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() rsp, err := client.Double(ctx, i) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } fmt.Printf("double %d => %d\n", i.Value, rsp.Value) }
- Python
def getDoubleSync(stub, value): req = helloWorld_pb2.Int(value=value) rsp = stub.Double(req) print(f"double {value} => {rsp.value}")
def getDoubleAsync(stub, value): req = helloWorld_pb2.Int(value=value) rspFuture = stub.Double.future(req) rsp = rspFuture.result() print("Async") print(f"double {value} => {rsp.value}")
Server-to-Client Streaming
// A server-to-client streaming RPC. rpc Range(Int) returns (stream Int) {}Server
- Go
func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error { var i int32 = 0 for ; i < Int.Value; i++ { rsp := &pb.Int{Value: i} if err := stream.Send(rsp); err != nil { return err } } return nil }
- Python
def Range(self, Int, context): for i in range(Int.value): rsp = helloWorld_pb2.Int(value=i) yield rsp
Client
- Go
func getRange(client pb.HelloWorldClient, value int32) { _i := &pb.Int{Value: value} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Range(ctx, _i) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } for { i, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("%v.Range(_) = _, %v", client, err) } fmt.Printf("range %d => %d\n", _i.Value, i.Value) } }
- Python
def getRange(stub, value): req = helloWorld_pb2.Int(value=value) rsp = stub.Range(req) for i in rsp: print(f"range {value} => {i.value}")
Client-to-Server Streaming
// A client-to-server streaming RPC. rpc Sum(stream Int) returns (Int) {}Server
- Go
func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error { var result int32 = 0 for { in, err := stream.Recv() if err == io.EOF { rsp := &pb.Int{Value: result} return stream.SendAndClose(rsp) } if err != nil { return err } result += in.Value } }
- Python
def Sum(self, Int_iterator, context): result = 0 for i in Int_iterator: result += i.value rsp = helloWorld_pb2.Int(value=result) return rsp
Client
- Go
func getSum(client pb.HelloWorldClient, value int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Sum(ctx) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } for i := 0; i < value; i++ { _i := &pb.Int{Value: int32(i)} if err := stream.Send(_i); err != nil { log.Fatalf("%v.Send(%v) = %v", stream, _i, err) } } reply, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } fmt.Printf("Sum range(%d) => %d\n", value, reply.Value) }
- Python
def getSumSync(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rsp = stub.Sum(req) print(f"Sum range({value}) => {rsp.value}")
def getSumAsync(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rspFuture = stub.Sum.future(req) rsp = rspFuture.result() print("Async") print(f"Sum range({value}) => {rsp.value}")
Bidirectional Streaming
// A Bidirectional streaming RPC. rpc DoubleIter(stream Int) returns (stream Int) {}Server
- Go
func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } rsp := &pb.Int{Value: in.Value * 2} if err := stream.Send(rsp); err != nil { return err } } }
- Python
def DoubleIter(self, Int_iterator, context): for i in Int_iterator: rsp = helloWorld_pb2.Int(value=i.value*2) yield rsp
Client
- Go
func getDoubleIter(client pb.HelloWorldClient, value int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.DoubleIter(ctx) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } waitc := make(chan struct{}) go func() { for { in, err := stream.Recv() if err == io.EOF { // read done. close(waitc) return } if err != nil { log.Fatalf("Failed to receive: %v", err) } fmt.Printf("double range(%d) => %d\n", value, in.Value*2) } }() for i := 0; i < value; i++ { _i := &pb.Int{Value: int32(i)} if err := stream.Send(_i); err != nil { log.Fatalf("Failed to send: %v", err) } } stream.CloseSend() <-waitc }
- Python
def getDoubleIter(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rsp = stub.DoubleIter(req) for i in rsp: print(f"double range({value}) => {i.value * 2}")
Proto 程式碼
helloWorld.proto
syntax = "proto3"; package helloworld; // Interface exported by the server. service HelloWorld { // A simple RPC. rpc Double(Int) returns (Int) {} // A server-to-client streaming RPC. rpc Range(Int) returns (stream Int) {} // A client-to-server streaming RPC. rpc Sum(stream Int) returns (Int) {} // A Bidirectional streaming RPC. rpc DoubleIter(stream Int) returns (stream Int) {} } message Int { int32 value = 1; }
Go 程式碼
架構
helloWorld_server.go
helloWorld_client.go
go ├─client │ helloWorld_client.go │ ├─protos │ helloWorld.pb.go │ helloWorld.proto │ └─server helloWorld_server.go
helloWorld_server.go
//The Go implementation of the gRPC server. package main import ( "context" "fmt" "io" "log" "net" "google.golang.org/grpc" pb "../protos" ) type helloWorldServer struct { } func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) { rsp := &pb.Int{Value: Int.Value * 2} return rsp, nil } func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error { var i int32 = 0 for ; i < Int.Value; i++ { rsp := &pb.Int{Value: i} if err := stream.Send(rsp); err != nil { return err } } return nil } func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error { var result int32 = 0 for { in, err := stream.Recv() if err == io.EOF { rsp := &pb.Int{Value: result} return stream.SendAndClose(rsp) } if err != nil { return err } result += in.Value } } func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } rsp := &pb.Int{Value: in.Value * 2} if err := stream.Send(rsp); err != nil { return err } } } func newServer() *helloWorldServer { s := &helloWorldServer{} return s } func main() { port := 50051 addr := fmt.Sprintf("localhost:%d", port) lis, err := net.Listen("tcp", addr) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Printf("Listen to %s\n", addr) var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) pb.RegisterHelloWorldServer(grpcServer, newServer()) grpcServer.Serve(lis) }
helloWorld_client.go
//The Go implementation of the gRPC client. package main import ( "context" "fmt" "io" "log" "time" pb "../protos" "google.golang.org/grpc" ) func getDouble(client pb.HelloWorldClient, value int32) { i := &pb.Int{Value: value} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() rsp, err := client.Double(ctx, i) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } fmt.Printf("double %d => %d\n", i.Value, rsp.Value) } func getRange(client pb.HelloWorldClient, value int32) { _i := &pb.Int{Value: value} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Range(ctx, _i) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } for { i, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("%v.Range(_) = _, %v", client, err) } fmt.Printf("range %d => %d\n", _i.Value, i.Value) } } func getSum(client pb.HelloWorldClient, value int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Sum(ctx) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } for i := 0; i < value; i++ { _i := &pb.Int{Value: int32(i)} if err := stream.Send(_i); err != nil { log.Fatalf("%v.Send(%v) = %v", stream, _i, err) } } reply, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } fmt.Printf("Sum range(%d) => %d\n", value, reply.Value) } func getDoubleIter(client pb.HelloWorldClient, value int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.DoubleIter(ctx) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } waitc := make(chan struct{}) go func() { for { in, err := stream.Recv() if err == io.EOF { // read done. close(waitc) return } if err != nil { log.Fatalf("Failed to receive: %v", err) } fmt.Printf("double range(%d) => %d\n", value, in.Value*2) } }() for i := 0; i < value; i++ { _i := &pb.Int{Value: int32(i)} if err := stream.Send(_i); err != nil { log.Fatalf("Failed to send: %v", err) } } stream.CloseSend() <-waitc } func main() { var opts []grpc.DialOption opts = append(opts, grpc.WithInsecure()) serverAddr := "localhost:50051" conn, err := grpc.Dial(serverAddr, opts...) if err != nil { log.Fatalf("fail to dial: %v", err) } defer conn.Close() client := pb.NewHelloWorldClient(conn) fmt.Println("-------------- Double-------------------") getDouble(client, 5) fmt.Println("-------------- Range -------------------") getRange(client, 5) fmt.Println("-------------- Sum----------------------") getSum(client, 5) fmt.Println("-------------- DoubleIter --------------") getDoubleIter(client, 5) } // Output // -------------- Double------------------- // double 5 => 10 // -------------- Range ------------------- // range 5 => 0 // range 5 => 1 // range 5 => 2 // range 5 => 3 // range 5 => 4 // -------------- Sum---------------------- // Sum range(5) => 10 // -------------- DoubleIter -------------- // double range(5) => 0 // double range(5) => 4 // double range(5) => 8 // double range(5) => 12 // double range(5) => 16
Python 程式碼
架構
helloWorld_server.py
helloWorld_client.py
python │ helloWorld_client.py │ helloWorld_pb2.py │ helloWorld_pb2_grpc.py │ helloWorld_server.py │ └─protos helloWorld.proto
helloWorld_server.py
"""The Python implementation of the gRPC server.""" from concurrent import futures import logging import time import grpc import helloWorld_pb2 import helloWorld_pb2_grpc _ONE_DAY_IN_SECONDS = 60 * 60 * 24 class HelloWorldServicer(helloWorld_pb2_grpc.HelloWorldServicer): """Provides methods that implement functionality of hello world server.""" def __init__(self): pass def Double(self, Int, context): rsp = helloWorld_pb2.Int(value=Int.value*2) return rsp def Range(self, Int, context): for i in range(Int.value): rsp = helloWorld_pb2.Int(value=i) yield rsp def Sum(self, Int_iterator, context): result = 0 for i in Int_iterator: result += i.value rsp = helloWorld_pb2.Int(value=result) return rsp def DoubleIter(self, Int_iterator, context): for i in Int_iterator: rsp = helloWorld_pb2.Int(value=i.value*2) yield rsp def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) helloWorld_pb2_grpc.add_HelloWorldServicer_to_server( HelloWorldServicer(), server) server.add_insecure_port('[::]:50051') server.start() print("Listen to", "[::]:50051") try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': logging.basicConfig() serve()
helloWorld_client.py
"""The Python implementation of the gRPC client.""" import logging import grpc import helloWorld_pb2 import helloWorld_pb2_grpc def getDoubleSync(stub, value): req = helloWorld_pb2.Int(value=value) rsp = stub.Double(req) print(f"double {value} => {rsp.value}") def getDoubleAsync(stub, value): req = helloWorld_pb2.Int(value=value) rspFuture = stub.Double.future(req) rsp = rspFuture.result() print("Async") print(f"double {value} => {rsp.value}") def getRange(stub, value): req = helloWorld_pb2.Int(value=value) rsp = stub.Range(req) for i in rsp: print(f"range {value} => {i.value}") def getSumSync(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rsp = stub.Sum(req) print(f"Sum range({value}) => {rsp.value}") def getSumAsync(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rspFuture = stub.Sum.future(req) rsp = rspFuture.result() print("Async") print(f"Sum range({value}) => {rsp.value}") def getDoubleIter(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rsp = stub.DoubleIter(req) for i in rsp: print(f"double range({value}) => {i.value * 2}") def run(): # NOTE(gRPC Python Team): .close() is possible on a channel and should be # used in circumstances in which the with statement does not fit the needs # of the code. with grpc.insecure_channel("localhost:50051") as channel: stub = helloWorld_pb2_grpc.HelloWorldStub(channel) print("-------------- Double Sync--------------") getDoubleSync(stub, 5) print("-------------- Double Async-------------") getDoubleAsync(stub, 5) print("-------------- Range -------------------") getRange(stub, 5) print("-------------- Sum Sync-----------------") getSumSync(stub, 5) print("-------------- Sum Async----------------") getSumAsync(stub, 5) print("-------------- DoubleIter --------------") getDoubleIter(stub, 5) if __name__ == "__main__": logging.basicConfig() run() # Output # -------------- Double Sync-------------- # double 5 => 10 # -------------- Double Async------------- # Async # double 5 => 10 # -------------- Range ------------------- # range 5 => 0 # range 5 => 1 # range 5 => 2 # range 5 => 3 # range 5 => 4 # -------------- Sum Sync----------------- # Sum range(5) => 10 # -------------- Sum Async---------------- # Async # Sum range(5) => 10 # -------------- DoubleIter -------------- # double range(5) => 0 # double range(5) => 4 # double range(5) => 8 # double range(5) => 12 # double range(5) => 16
參考
gRPC Tutorial for PythonGolang 如何使用 grpc
留言
張貼留言