[Go & Python] gRPC 簡介

程式語言:Go & Python
Go Package:
google.golang.org/grpc
github.com/golang/protobuf/protoc-gen-go
Python Module:
grpcio
grpcio-tools
官網
GitHub

功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言

Go 基本步驟

  • 官方教學
  • 簡言之
    1. 下載 protoc.exe,並將路徑放進 PATH
    2. go get -u google.golang.org/grpc
    3. go get -u github.com/golang/protobuf/protoc-gen-go
    4. protoc -I <所在資料夾> <proto檔案> --go_out=plugins=grpc:<所在資料夾>
    5. 產生一個檔案
      • <檔名>.pb.go


Python 基本步驟

  • 官方教學
  • 簡言之
    1. python -m pip install grpcio
    2. python -m pip install grpcio-tools
    3. python -m grpc_tools.protoc -I <所在資料夾> --python_out=<輸出位置> --grpc_python_out=<輸出位置> <proto檔案>
    4. 產生兩個檔案
      • <檔名>_pb2.py
      • <檔名>_pb2_grpc.py


Message 定義

message Int { int32 value = 1; }

基本用法

// A simple RPC.
rpc Double(Int) returns (Int) {}
Server
  • Go
    1. func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
    2. rsp := &pb.Int{Value: Int.Value * 2}
    3.  
    4. return rsp, nil
    5. }
  • Python
    1. def Double(self, Int, context):
    2. rsp = helloWorld_pb2.Int(value=Int.value*2)
    3.  
    4. return rsp

Client
  • Go
    1. func getDouble(client pb.HelloWorldClient, value int32) {
    2. i := &pb.Int{Value: value}
    3. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    4. defer cancel()
    5.  
    6. rsp, err := client.Double(ctx, i)
    7. if err != nil {
    8. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    9. }
    10.  
    11. fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
    12. }
  • Python
    1. def getDoubleSync(stub, value):
    2. req = helloWorld_pb2.Int(value=value)
    3. rsp = stub.Double(req)
    4.  
    5. print(f"double {value} => {rsp.value}")
    1. def getDoubleAsync(stub, value):
    2. req = helloWorld_pb2.Int(value=value)
    3. rspFuture = stub.Double.future(req)
    4.  
    5. rsp = rspFuture.result()
    6. print("Async")
    7.  
    8. print(f"double {value} => {rsp.value}")

Server-to-Client Streaming

// A server-to-client streaming RPC.
rpc Range(Int) returns (stream Int) {}
Server
  • Go
    1. func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
    2. var i int32 = 0
    3. for ; i < Int.Value; i++ {
    4. rsp := &pb.Int{Value: i}
    5. if err := stream.Send(rsp); err != nil {
    6. return err
    7. }
    8. }
    9. return nil
    10. }
  • Python
    1. def Range(self, Int, context):
    2. for i in range(Int.value):
    3. rsp = helloWorld_pb2.Int(value=i)
    4. yield rsp

Client
  • Go
    1. func getRange(client pb.HelloWorldClient, value int32) {
    2. _i := &pb.Int{Value: value}
    3. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    4. defer cancel()
    5.  
    6. stream, err := client.Range(ctx, _i)
    7. if err != nil {
    8. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    9. }
    10.  
    11. for {
    12. i, err := stream.Recv()
    13. if err == io.EOF {
    14. break
    15. }
    16. if err != nil {
    17. log.Fatalf("%v.Range(_) = _, %v", client, err)
    18. }
    19. fmt.Printf("range %d => %d\n", _i.Value, i.Value)
    20. }
    21. }
  • Python
    1. def getRange(stub, value):
    2. req = helloWorld_pb2.Int(value=value)
    3. rsp = stub.Range(req)
    4.  
    5. for i in rsp:
    6. print(f"range {value} => {i.value}")

Client-to-Server Streaming

// A client-to-server streaming RPC.
rpc Sum(stream Int) returns (Int) {}
Server
  • Go
    1. func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
    2. var result int32 = 0
    3. for {
    4. in, err := stream.Recv()
    5. if err == io.EOF {
    6. rsp := &pb.Int{Value: result}
    7. return stream.SendAndClose(rsp)
    8. }
    9. if err != nil {
    10. return err
    11. }
    12. result += in.Value
    13. }
    14. }
  • Python
    1. def Sum(self, Int_iterator, context):
    2. result = 0
    3. for i in Int_iterator:
    4. result += i.value
    5.  
    6. rsp = helloWorld_pb2.Int(value=result)
    7. return rsp

Client
  • Go
    1. func getSum(client pb.HelloWorldClient, value int) {
    2. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    3. defer cancel()
    4.  
    5. stream, err := client.Sum(ctx)
    6. if err != nil {
    7. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    8. }
    9.  
    10. for i := 0; i < value; i++ {
    11. _i := &pb.Int{Value: int32(i)}
    12. if err := stream.Send(_i); err != nil {
    13. log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
    14. }
    15. }
    16. reply, err := stream.CloseAndRecv()
    17. if err != nil {
    18. log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    19. }
    20. fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
    21. }
  • Python
    1. def getSumSync(stub, value):
    2. req = (helloWorld_pb2.Int(value=i) for i in range(value))
    3. rsp = stub.Sum(req)
    4.  
    5. print(f"Sum range({value}) => {rsp.value}")
    1. def getSumAsync(stub, value):
    2. req = (helloWorld_pb2.Int(value=i) for i in range(value))
    3. rspFuture = stub.Sum.future(req)
    4.  
    5. rsp = rspFuture.result()
    6. print("Async")
    7.  
    8. print(f"Sum range({value}) => {rsp.value}")

Bidirectional Streaming

// A Bidirectional streaming RPC.
rpc DoubleIter(stream Int) returns (stream Int) {}
Server
  • Go
    1. func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
    2. for {
    3. in, err := stream.Recv()
    4. if err == io.EOF {
    5. return nil
    6. }
    7. if err != nil {
    8. return err
    9. }
    10. rsp := &pb.Int{Value: in.Value * 2}
    11. if err := stream.Send(rsp); err != nil {
    12. return err
    13. }
    14. }
    15. }
  • Python
    1. def DoubleIter(self, Int_iterator, context):
    2. for i in Int_iterator:
    3. rsp = helloWorld_pb2.Int(value=i.value*2)
    4. yield rsp

Client
  • Go
    1. func getDoubleIter(client pb.HelloWorldClient, value int) {
    2. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    3. defer cancel()
    4.  
    5. stream, err := client.DoubleIter(ctx)
    6. if err != nil {
    7. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    8. }
    9. waitc := make(chan struct{})
    10. go func() {
    11. for {
    12. in, err := stream.Recv()
    13. if err == io.EOF {
    14. // read done.
    15. close(waitc)
    16. return
    17. }
    18. if err != nil {
    19. log.Fatalf("Failed to receive: %v", err)
    20. }
    21. fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
    22. }
    23. }()
    24. for i := 0; i < value; i++ {
    25. _i := &pb.Int{Value: int32(i)}
    26. if err := stream.Send(_i); err != nil {
    27. log.Fatalf("Failed to send: %v", err)
    28. }
    29. }
    30. stream.CloseSend()
    31. <-waitc
    32. }
  • Python
    1. def getDoubleIter(stub, value):
    2. req = (helloWorld_pb2.Int(value=i) for i in range(value))
    3. rsp = stub.DoubleIter(req)
    4. for i in rsp:
    5. print(f"double range({value}) => {i.value * 2}")
    6.  

Proto 程式碼

helloWorld.proto
  1. syntax = "proto3";
  2.  
  3. package helloworld;
  4.  
  5. // Interface exported by the server.
  6. service HelloWorld {
  7. // A simple RPC.
  8. rpc Double(Int) returns (Int) {}
  9.  
  10. // A server-to-client streaming RPC.
  11. rpc Range(Int) returns (stream Int) {}
  12.  
  13. // A client-to-server streaming RPC.
  14. rpc Sum(stream Int) returns (Int) {}
  15.  
  16. // A Bidirectional streaming RPC.
  17. rpc DoubleIter(stream Int) returns (stream Int) {}
  18. }
  19.  
  20. message Int { int32 value = 1; }

Go 程式碼

架構
go
├─client
      helloWorld_client.go

├─protos
      helloWorld.pb.go
      helloWorld.proto

└─server
        helloWorld_server.go

helloWorld_server.go
  1. //The Go implementation of the gRPC server.
  2. package main
  3.  
  4. import (
  5. "context"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10.  
  11. "google.golang.org/grpc"
  12.  
  13. pb "../protos"
  14. )
  15.  
  16. type helloWorldServer struct {
  17. }
  18.  
  19. func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
  20. rsp := &pb.Int{Value: Int.Value * 2}
  21.  
  22. return rsp, nil
  23. }
  24.  
  25. func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
  26. var i int32 = 0
  27. for ; i < Int.Value; i++ {
  28. rsp := &pb.Int{Value: i}
  29. if err := stream.Send(rsp); err != nil {
  30. return err
  31. }
  32. }
  33. return nil
  34. }
  35.  
  36. func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
  37. var result int32 = 0
  38. for {
  39. in, err := stream.Recv()
  40. if err == io.EOF {
  41. rsp := &pb.Int{Value: result}
  42. return stream.SendAndClose(rsp)
  43. }
  44. if err != nil {
  45. return err
  46. }
  47. result += in.Value
  48. }
  49. }
  50.  
  51. func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
  52. for {
  53. in, err := stream.Recv()
  54. if err == io.EOF {
  55. return nil
  56. }
  57. if err != nil {
  58. return err
  59. }
  60. rsp := &pb.Int{Value: in.Value * 2}
  61. if err := stream.Send(rsp); err != nil {
  62. return err
  63. }
  64. }
  65. }
  66.  
  67. func newServer() *helloWorldServer {
  68. s := &helloWorldServer{}
  69. return s
  70. }
  71.  
  72. func main() {
  73. port := 50051
  74. addr := fmt.Sprintf("localhost:%d", port)
  75.  
  76. lis, err := net.Listen("tcp", addr)
  77. if err != nil {
  78. log.Fatalf("failed to listen: %v", err)
  79. }
  80. fmt.Printf("Listen to %s\n", addr)
  81.  
  82. var opts []grpc.ServerOption
  83. grpcServer := grpc.NewServer(opts...)
  84. pb.RegisterHelloWorldServer(grpcServer, newServer())
  85. grpcServer.Serve(lis)
  86. }

helloWorld_client.go
  1. //The Go implementation of the gRPC client.
  2. package main
  3.  
  4. import (
  5. "context"
  6. "fmt"
  7. "io"
  8. "log"
  9. "time"
  10.  
  11. pb "../protos"
  12. "google.golang.org/grpc"
  13. )
  14.  
  15. func getDouble(client pb.HelloWorldClient, value int32) {
  16. i := &pb.Int{Value: value}
  17. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  18. defer cancel()
  19.  
  20. rsp, err := client.Double(ctx, i)
  21. if err != nil {
  22. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
  23. }
  24.  
  25. fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
  26. }
  27.  
  28. func getRange(client pb.HelloWorldClient, value int32) {
  29. _i := &pb.Int{Value: value}
  30. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  31. defer cancel()
  32.  
  33. stream, err := client.Range(ctx, _i)
  34. if err != nil {
  35. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
  36. }
  37.  
  38. for {
  39. i, err := stream.Recv()
  40. if err == io.EOF {
  41. break
  42. }
  43. if err != nil {
  44. log.Fatalf("%v.Range(_) = _, %v", client, err)
  45. }
  46. fmt.Printf("range %d => %d\n", _i.Value, i.Value)
  47. }
  48. }
  49.  
  50. func getSum(client pb.HelloWorldClient, value int) {
  51. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  52. defer cancel()
  53.  
  54. stream, err := client.Sum(ctx)
  55. if err != nil {
  56. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
  57. }
  58.  
  59. for i := 0; i < value; i++ {
  60. _i := &pb.Int{Value: int32(i)}
  61. if err := stream.Send(_i); err != nil {
  62. log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
  63. }
  64. }
  65. reply, err := stream.CloseAndRecv()
  66. if err != nil {
  67. log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  68. }
  69. fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
  70. }
  71.  
  72. func getDoubleIter(client pb.HelloWorldClient, value int) {
  73. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  74. defer cancel()
  75.  
  76. stream, err := client.DoubleIter(ctx)
  77. if err != nil {
  78. log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
  79. }
  80. waitc := make(chan struct{})
  81. go func() {
  82. for {
  83. in, err := stream.Recv()
  84. if err == io.EOF {
  85. // read done.
  86. close(waitc)
  87. return
  88. }
  89. if err != nil {
  90. log.Fatalf("Failed to receive: %v", err)
  91. }
  92. fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
  93. }
  94. }()
  95. for i := 0; i < value; i++ {
  96. _i := &pb.Int{Value: int32(i)}
  97. if err := stream.Send(_i); err != nil {
  98. log.Fatalf("Failed to send: %v", err)
  99. }
  100. }
  101. stream.CloseSend()
  102. <-waitc
  103. }
  104.  
  105. func main() {
  106. var opts []grpc.DialOption
  107. opts = append(opts, grpc.WithInsecure())
  108.  
  109. serverAddr := "localhost:50051"
  110. conn, err := grpc.Dial(serverAddr, opts...)
  111. if err != nil {
  112. log.Fatalf("fail to dial: %v", err)
  113. }
  114. defer conn.Close()
  115. client := pb.NewHelloWorldClient(conn)
  116.  
  117. fmt.Println("-------------- Double-------------------")
  118. getDouble(client, 5)
  119. fmt.Println("-------------- Range -------------------")
  120. getRange(client, 5)
  121. fmt.Println("-------------- Sum----------------------")
  122. getSum(client, 5)
  123. fmt.Println("-------------- DoubleIter --------------")
  124. getDoubleIter(client, 5)
  125. }
  126.  
  127. // Output
  128. // -------------- Double-------------------
  129. // double 5 => 10
  130. // -------------- Range -------------------
  131. // range 5 => 0
  132. // range 5 => 1
  133. // range 5 => 2
  134. // range 5 => 3
  135. // range 5 => 4
  136. // -------------- Sum----------------------
  137. // Sum range(5) => 10
  138. // -------------- DoubleIter --------------
  139. // double range(5) => 0
  140. // double range(5) => 4
  141. // double range(5) => 8
  142. // double range(5) => 12
  143. // double range(5) => 16

Python 程式碼

架構
python
  helloWorld_client.py
  helloWorld_pb2.py
  helloWorld_pb2_grpc.py
  helloWorld_server.py

└─protos
        helloWorld.proto

helloWorld_server.py
  1. """The Python implementation of the gRPC server."""
  2.  
  3. from concurrent import futures
  4. import logging
  5. import time
  6.  
  7. import grpc
  8.  
  9. import helloWorld_pb2
  10. import helloWorld_pb2_grpc
  11.  
  12. _ONE_DAY_IN_SECONDS = 60 * 60 * 24
  13.  
  14. class HelloWorldServicer(helloWorld_pb2_grpc.HelloWorldServicer):
  15. """Provides methods that implement functionality of hello world server."""
  16.  
  17. def __init__(self):
  18. pass
  19.  
  20. def Double(self, Int, context):
  21. rsp = helloWorld_pb2.Int(value=Int.value*2)
  22.  
  23. return rsp
  24.  
  25.  
  26. def Range(self, Int, context):
  27. for i in range(Int.value):
  28. rsp = helloWorld_pb2.Int(value=i)
  29. yield rsp
  30.  
  31. def Sum(self, Int_iterator, context):
  32. result = 0
  33. for i in Int_iterator:
  34. result += i.value
  35.  
  36. rsp = helloWorld_pb2.Int(value=result)
  37. return rsp
  38.  
  39. def DoubleIter(self, Int_iterator, context):
  40. for i in Int_iterator:
  41. rsp = helloWorld_pb2.Int(value=i.value*2)
  42. yield rsp
  43.  
  44. def serve():
  45. server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  46. helloWorld_pb2_grpc.add_HelloWorldServicer_to_server(
  47. HelloWorldServicer(), server)
  48. server.add_insecure_port('[::]:50051')
  49. server.start()
  50. print("Listen to", "[::]:50051")
  51. try:
  52. while True:
  53. time.sleep(_ONE_DAY_IN_SECONDS)
  54. except KeyboardInterrupt:
  55. server.stop(0)
  56.  
  57.  
  58. if __name__ == '__main__':
  59. logging.basicConfig()
  60. serve()

helloWorld_client.py
  1. """The Python implementation of the gRPC client."""
  2.  
  3. import logging
  4.  
  5. import grpc
  6.  
  7. import helloWorld_pb2
  8. import helloWorld_pb2_grpc
  9.  
  10.  
  11. def getDoubleSync(stub, value):
  12. req = helloWorld_pb2.Int(value=value)
  13. rsp = stub.Double(req)
  14.  
  15. print(f"double {value} => {rsp.value}")
  16.  
  17.  
  18. def getDoubleAsync(stub, value):
  19. req = helloWorld_pb2.Int(value=value)
  20. rspFuture = stub.Double.future(req)
  21.  
  22. rsp = rspFuture.result()
  23. print("Async")
  24.  
  25. print(f"double {value} => {rsp.value}")
  26.  
  27.  
  28. def getRange(stub, value):
  29. req = helloWorld_pb2.Int(value=value)
  30. rsp = stub.Range(req)
  31.  
  32. for i in rsp:
  33. print(f"range {value} => {i.value}")
  34.  
  35.  
  36. def getSumSync(stub, value):
  37. req = (helloWorld_pb2.Int(value=i) for i in range(value))
  38. rsp = stub.Sum(req)
  39.  
  40. print(f"Sum range({value}) => {rsp.value}")
  41.  
  42.  
  43. def getSumAsync(stub, value):
  44. req = (helloWorld_pb2.Int(value=i) for i in range(value))
  45. rspFuture = stub.Sum.future(req)
  46.  
  47. rsp = rspFuture.result()
  48. print("Async")
  49.  
  50. print(f"Sum range({value}) => {rsp.value}")
  51.  
  52.  
  53. def getDoubleIter(stub, value):
  54. req = (helloWorld_pb2.Int(value=i) for i in range(value))
  55. rsp = stub.DoubleIter(req)
  56. for i in rsp:
  57. print(f"double range({value}) => {i.value * 2}")
  58.  
  59.  
  60. def run():
  61. # NOTE(gRPC Python Team): .close() is possible on a channel and should be
  62. # used in circumstances in which the with statement does not fit the needs
  63. # of the code.
  64. with grpc.insecure_channel("localhost:50051") as channel:
  65. stub = helloWorld_pb2_grpc.HelloWorldStub(channel)
  66. print("-------------- Double Sync--------------")
  67. getDoubleSync(stub, 5)
  68. print("-------------- Double Async-------------")
  69. getDoubleAsync(stub, 5)
  70. print("-------------- Range -------------------")
  71. getRange(stub, 5)
  72. print("-------------- Sum Sync-----------------")
  73. getSumSync(stub, 5)
  74. print("-------------- Sum Async----------------")
  75. getSumAsync(stub, 5)
  76. print("-------------- DoubleIter --------------")
  77. getDoubleIter(stub, 5)
  78.  
  79.  
  80. if __name__ == "__main__":
  81. logging.basicConfig()
  82. run()
  83.  
  84. # Output
  85. # -------------- Double Sync--------------
  86. # double 5 => 10
  87. # -------------- Double Async-------------
  88. # Async
  89. # double 5 => 10
  90. # -------------- Range -------------------
  91. # range 5 => 0
  92. # range 5 => 1
  93. # range 5 => 2
  94. # range 5 => 3
  95. # range 5 => 4
  96. # -------------- Sum Sync-----------------
  97. # Sum range(5) => 10
  98. # -------------- Sum Async----------------
  99. # Async
  100. # Sum range(5) => 10
  101. # -------------- DoubleIter --------------
  102. # double range(5) => 0
  103. # double range(5) => 4
  104. # double range(5) => 8
  105. # double range(5) => 12
  106. # double range(5) => 16

參考

gRPC Tutorial for Python
Golang 如何使用 grpc

留言