[Go & Python] gRPC 簡介

程式語言:Go & Python
Go Package:
google.golang.org/grpc
github.com/golang/protobuf/protoc-gen-go
Python Module:
grpcio
grpcio-tools
官網
GitHub

功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言

Go 基本步驟

  • 官方教學
  • 簡言之
    1. 下載 protoc.exe,並將路徑放進 PATH
    2. go get -u google.golang.org/grpc
    3. go get -u github.com/golang/protobuf/protoc-gen-go
    4. protoc -I <所在資料夾> <proto檔案> --go_out=plugins=grpc:<所在資料夾>
    5. 產生一個檔案
      • <檔名>.pb.go


Python 基本步驟

  • 官方教學
  • 簡言之
    1. python -m pip install grpcio
    2. python -m pip install grpcio-tools
    3. python -m grpc_tools.protoc -I <所在資料夾> --python_out=<輸出位置> --grpc_python_out=<輸出位置> <proto檔案>
    4. 產生兩個檔案
      • <檔名>_pb2.py
      • <檔名>_pb2_grpc.py


Message 定義

message Int { int32 value = 1; }

基本用法

// A simple RPC.
rpc Double(Int) returns (Int) {}
Server
  • Go
    func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
        rsp := &pb.Int{Value: Int.Value * 2}
    
        return rsp, nil
    }
    
  • Python
    def Double(self, Int, context):
        rsp = helloWorld_pb2.Int(value=Int.value*2)
    
        return rsp
    

Client
  • Go
    func getDouble(client pb.HelloWorldClient, value int32) {
        i := &pb.Int{Value: value}
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
    
        rsp, err := client.Double(ctx, i)
        if err != nil {
            log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
        }
    
        fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
    }
    
  • Python
    def getDoubleSync(stub, value):
        req = helloWorld_pb2.Int(value=value)
        rsp = stub.Double(req)
    
        print(f"double {value} => {rsp.value}")
    
    def getDoubleAsync(stub, value):
        req = helloWorld_pb2.Int(value=value)
        rspFuture = stub.Double.future(req)
    
        rsp = rspFuture.result()
        print("Async")
    
        print(f"double {value} => {rsp.value}")
    

Server-to-Client Streaming

// A server-to-client streaming RPC.
rpc Range(Int) returns (stream Int) {}
Server
  • Go
    func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
        var i int32 = 0
        for ; i < Int.Value; i++ {
            rsp := &pb.Int{Value: i}
            if err := stream.Send(rsp); err != nil {
                return err
            }
        }
        return nil
    }
    
  • Python
    def Range(self, Int, context):
        for i in range(Int.value):
            rsp = helloWorld_pb2.Int(value=i)
            yield rsp
    

Client
  • Go
    func getRange(client pb.HelloWorldClient, value int32) {
        _i := &pb.Int{Value: value}
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
    
        stream, err := client.Range(ctx, _i)
        if err != nil {
            log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
        }
    
        for {
            i, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("%v.Range(_) = _, %v", client, err)
            }
            fmt.Printf("range %d => %d\n", _i.Value, i.Value)
        }
    }
    
  • Python
    def getRange(stub, value):
        req = helloWorld_pb2.Int(value=value)
        rsp = stub.Range(req)
    
        for i in rsp:
            print(f"range {value} => {i.value}")
    

Client-to-Server Streaming

// A client-to-server streaming RPC.
rpc Sum(stream Int) returns (Int) {}
Server
  • Go
    func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
        var result int32 = 0
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                rsp := &pb.Int{Value: result}
                return stream.SendAndClose(rsp)
            }
            if err != nil {
                return err
            }
            result += in.Value
        }
    }
    
  • Python
    def Sum(self, Int_iterator, context):
        result = 0
        for i in Int_iterator:
            result += i.value
    
        rsp = helloWorld_pb2.Int(value=result)
        return rsp
    

Client
  • Go
    func getSum(client pb.HelloWorldClient, value int) {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
    
        stream, err := client.Sum(ctx)
        if err != nil {
            log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
        }
    
        for i := 0; i < value; i++ {
            _i := &pb.Int{Value: int32(i)}
            if err := stream.Send(_i); err != nil {
                log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
            }
        }
        reply, err := stream.CloseAndRecv()
        if err != nil {
            log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
        }
        fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
    }
    
  • Python
    def getSumSync(stub, value):
        req = (helloWorld_pb2.Int(value=i) for i in range(value))
        rsp = stub.Sum(req)
    
        print(f"Sum range({value}) => {rsp.value}")
    
    def getSumAsync(stub, value):
        req = (helloWorld_pb2.Int(value=i) for i in range(value))
        rspFuture = stub.Sum.future(req)
    
        rsp = rspFuture.result()
        print("Async")
    
        print(f"Sum range({value}) => {rsp.value}")
    

Bidirectional Streaming

// A Bidirectional streaming RPC.
rpc DoubleIter(stream Int) returns (stream Int) {}
Server
  • Go
    func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                return err
            }
            rsp := &pb.Int{Value: in.Value * 2}
            if err := stream.Send(rsp); err != nil {
                return err
            }
        }
    }
    
  • Python
    def DoubleIter(self, Int_iterator, context):
        for i in Int_iterator:
            rsp = helloWorld_pb2.Int(value=i.value*2)
            yield rsp
    

Client
  • Go
    func getDoubleIter(client pb.HelloWorldClient, value int) {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
    
        stream, err := client.DoubleIter(ctx)
        if err != nil {
            log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
        }
        waitc := make(chan struct{})
        go func() {
            for {
                in, err := stream.Recv()
                if err == io.EOF {
                    // read done.
                    close(waitc)
                    return
                }
                if err != nil {
                    log.Fatalf("Failed to receive: %v", err)
                }
                fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
            }
        }()
        for i := 0; i < value; i++ {
            _i := &pb.Int{Value: int32(i)}
            if err := stream.Send(_i); err != nil {
                log.Fatalf("Failed to send: %v", err)
            }
        }
        stream.CloseSend()
        <-waitc
    }
    
  • Python
    def getDoubleIter(stub, value):
        req = (helloWorld_pb2.Int(value=i) for i in range(value))
        rsp = stub.DoubleIter(req)
        for i in rsp:
            print(f"double range({value}) => {i.value * 2}")
    
    

Proto 程式碼

helloWorld.proto
syntax = "proto3";

package helloworld;

// Interface exported by the server.
service HelloWorld {
  // A simple RPC.
  rpc Double(Int) returns (Int) {}

  // A server-to-client streaming RPC.
  rpc Range(Int) returns (stream Int) {}

  // A client-to-server streaming RPC.
  rpc Sum(stream Int) returns (Int) {}

  // A Bidirectional streaming RPC.
  rpc DoubleIter(stream Int) returns (stream Int) {}
}

message Int { int32 value = 1; }

Go 程式碼

架構
go
├─client
│      helloWorld_client.go
│
├─protos
│      helloWorld.pb.go
│      helloWorld.proto
│
└─server
        helloWorld_server.go

helloWorld_server.go
//The Go implementation of the gRPC server.
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"

    "google.golang.org/grpc"

    pb "../protos"
)

type helloWorldServer struct {
}

func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
    rsp := &pb.Int{Value: Int.Value * 2}

    return rsp, nil
}

func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
    var i int32 = 0
    for ; i < Int.Value; i++ {
        rsp := &pb.Int{Value: i}
        if err := stream.Send(rsp); err != nil {
            return err
        }
    }
    return nil
}

func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
    var result int32 = 0
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            rsp := &pb.Int{Value: result}
            return stream.SendAndClose(rsp)
        }
        if err != nil {
            return err
        }
        result += in.Value
    }
}

func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        rsp := &pb.Int{Value: in.Value * 2}
        if err := stream.Send(rsp); err != nil {
            return err
        }
    }
}

func newServer() *helloWorldServer {
    s := &helloWorldServer{}
    return s
}

func main() {
    port := 50051
    addr := fmt.Sprintf("localhost:%d", port)

    lis, err := net.Listen("tcp", addr)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    fmt.Printf("Listen to %s\n", addr)

    var opts []grpc.ServerOption
    grpcServer := grpc.NewServer(opts...)
    pb.RegisterHelloWorldServer(grpcServer, newServer())
    grpcServer.Serve(lis)
}

helloWorld_client.go
//The Go implementation of the gRPC client.
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    pb "../protos"
    "google.golang.org/grpc"
)

func getDouble(client pb.HelloWorldClient, value int32) {
    i := &pb.Int{Value: value}
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    rsp, err := client.Double(ctx, i)
    if err != nil {
        log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    }

    fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
}

func getRange(client pb.HelloWorldClient, value int32) {
    _i := &pb.Int{Value: value}
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.Range(ctx, _i)
    if err != nil {
        log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    }

    for {
        i, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("%v.Range(_) = _, %v", client, err)
        }
        fmt.Printf("range %d => %d\n", _i.Value, i.Value)
    }
}

func getSum(client pb.HelloWorldClient, value int) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.Sum(ctx)
    if err != nil {
        log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    }

    for i := 0; i < value; i++ {
        _i := &pb.Int{Value: int32(i)}
        if err := stream.Send(_i); err != nil {
            log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
        }
    }
    reply, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    }
    fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
}

func getDoubleIter(client pb.HelloWorldClient, value int) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.DoubleIter(ctx)
    if err != nil {
        log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
    }
    waitc := make(chan struct{})
    go func() {
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                // read done.
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("Failed to receive: %v", err)
            }
            fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
        }
    }()
    for i := 0; i < value; i++ {
        _i := &pb.Int{Value: int32(i)}
        if err := stream.Send(_i); err != nil {
            log.Fatalf("Failed to send: %v", err)
        }
    }
    stream.CloseSend()
    <-waitc
}

func main() {
    var opts []grpc.DialOption
    opts = append(opts, grpc.WithInsecure())

    serverAddr := "localhost:50051"
    conn, err := grpc.Dial(serverAddr, opts...)
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()
    client := pb.NewHelloWorldClient(conn)

    fmt.Println("-------------- Double-------------------")
    getDouble(client, 5)
    fmt.Println("-------------- Range -------------------")
    getRange(client, 5)
    fmt.Println("-------------- Sum----------------------")
    getSum(client, 5)
    fmt.Println("-------------- DoubleIter --------------")
    getDoubleIter(client, 5)
}

// Output
// -------------- Double-------------------
// double 5 => 10
// -------------- Range -------------------
// range 5 => 0
// range 5 => 1
// range 5 => 2
// range 5 => 3
// range 5 => 4
// -------------- Sum----------------------
// Sum range(5) => 10
// -------------- DoubleIter --------------
// double range(5) => 0
// double range(5) => 4
// double range(5) => 8
// double range(5) => 12
// double range(5) => 16

Python 程式碼

架構
python
│  helloWorld_client.py
│  helloWorld_pb2.py
│  helloWorld_pb2_grpc.py
│  helloWorld_server.py
│
└─protos
        helloWorld.proto

helloWorld_server.py
"""The Python implementation of the gRPC server."""

from concurrent import futures
import logging
import time

import grpc

import helloWorld_pb2
import helloWorld_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24

class HelloWorldServicer(helloWorld_pb2_grpc.HelloWorldServicer):
    """Provides methods that implement functionality of hello world server."""

    def __init__(self):
        pass

    def Double(self, Int, context):
        rsp = helloWorld_pb2.Int(value=Int.value*2)

        return rsp


    def Range(self, Int, context):
        for i in range(Int.value):
            rsp = helloWorld_pb2.Int(value=i)
            yield rsp

    def Sum(self, Int_iterator, context):
        result = 0
        for i in Int_iterator:
            result += i.value

        rsp = helloWorld_pb2.Int(value=result)
        return rsp

    def DoubleIter(self, Int_iterator, context):
        for i in Int_iterator:
            rsp = helloWorld_pb2.Int(value=i.value*2)
            yield rsp

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloWorld_pb2_grpc.add_HelloWorldServicer_to_server(
        HelloWorldServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Listen to", "[::]:50051")
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    logging.basicConfig()
    serve()

helloWorld_client.py
"""The Python implementation of the gRPC client."""

import logging

import grpc

import helloWorld_pb2
import helloWorld_pb2_grpc


def getDoubleSync(stub, value):
    req = helloWorld_pb2.Int(value=value)
    rsp = stub.Double(req)

    print(f"double {value} => {rsp.value}")


def getDoubleAsync(stub, value):
    req = helloWorld_pb2.Int(value=value)
    rspFuture = stub.Double.future(req)

    rsp = rspFuture.result()
    print("Async")

    print(f"double {value} => {rsp.value}")


def getRange(stub, value):
    req = helloWorld_pb2.Int(value=value)
    rsp = stub.Range(req)

    for i in rsp:
        print(f"range {value} => {i.value}")


def getSumSync(stub, value):
    req = (helloWorld_pb2.Int(value=i) for i in range(value))
    rsp = stub.Sum(req)

    print(f"Sum range({value}) => {rsp.value}")


def getSumAsync(stub, value):
    req = (helloWorld_pb2.Int(value=i) for i in range(value))
    rspFuture = stub.Sum.future(req)

    rsp = rspFuture.result()
    print("Async")

    print(f"Sum range({value}) => {rsp.value}")


def getDoubleIter(stub, value):
    req = (helloWorld_pb2.Int(value=i) for i in range(value))
    rsp = stub.DoubleIter(req)
    for i in rsp:
        print(f"double range({value}) => {i.value * 2}")


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = helloWorld_pb2_grpc.HelloWorldStub(channel)
        print("-------------- Double Sync--------------")
        getDoubleSync(stub, 5)
        print("-------------- Double Async-------------")
        getDoubleAsync(stub, 5)
        print("-------------- Range -------------------")
        getRange(stub, 5)
        print("-------------- Sum Sync-----------------")
        getSumSync(stub, 5)
        print("-------------- Sum Async----------------")
        getSumAsync(stub, 5)
        print("-------------- DoubleIter --------------")
        getDoubleIter(stub, 5)


if __name__ == "__main__":
    logging.basicConfig()
    run()

# Output
# -------------- Double Sync--------------
# double 5 => 10
# -------------- Double Async-------------
# Async
# double 5 => 10
# -------------- Range -------------------
# range 5 => 0
# range 5 => 1
# range 5 => 2
# range 5 => 3
# range 5 => 4
# -------------- Sum Sync-----------------
# Sum range(5) => 10
# -------------- Sum Async----------------
# Async
# Sum range(5) => 10
# -------------- DoubleIter --------------
# double range(5) => 0
# double range(5) => 4
# double range(5) => 8
# double range(5) => 12
# double range(5) => 16

參考

gRPC Tutorial for Python
Golang 如何使用 grpc

留言