- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Go & Python
GitHub
功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言
Client
Client
Client
Client
Golang 如何使用 grpc
- Go Package:
- google.golang.org/grpc
- github.com/golang/protobuf/protoc-gen-go
- Python Module:
- grpcio
- grpcio-tools
GitHub
功能:即 RPC,允許電腦呼叫另一台電腦的子程式,且無需是同一個程式語言
Go 基本步驟
- 官方教學
- 簡言之
- 下載 protoc.exe,並將路徑放進 PATH
go get -u google.golang.org/grpc
go get -u github.com/golang/protobuf/protoc-gen-go
protoc -I <所在資料夾> <proto檔案> --go_out=plugins=grpc:<所在資料夾>
- 產生一個檔案
- <檔名>.pb.go
Python 基本步驟
- 官方教學
- 簡言之
python -m pip install grpcio
python -m pip install grpcio-tools
python -m grpc_tools.protoc -I <所在資料夾> --python_out=<輸出位置> --grpc_python_out=<輸出位置> <proto檔案>
- 產生兩個檔案
- <檔名>_pb2.py
- <檔名>_pb2_grpc.py
Message 定義
message Int { int32 value = 1; }
基本用法
// A simple RPC.
rpc Double(Int) returns (Int) {}
Server- Go
func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) { rsp := &pb.Int{Value: Int.Value * 2} return rsp, nil } - Python
def Double(self, Int, context): rsp = helloWorld_pb2.Int(value=Int.value*2) return rsp
Client
- Go
func getDouble(client pb.HelloWorldClient, value int32) { i := &pb.Int{Value: value} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() rsp, err := client.Double(ctx, i) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } fmt.Printf("double %d => %d\n", i.Value, rsp.Value) } - Python
def getDoubleSync(stub, value): req = helloWorld_pb2.Int(value=value) rsp = stub.Double(req) print(f"double {value} => {rsp.value}")def getDoubleAsync(stub, value): req = helloWorld_pb2.Int(value=value) rspFuture = stub.Double.future(req) rsp = rspFuture.result() print("Async") print(f"double {value} => {rsp.value}")
Server-to-Client Streaming
// A server-to-client streaming RPC.
rpc Range(Int) returns (stream Int) {}
Server- Go
func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error { var i int32 = 0 for ; i < Int.Value; i++ { rsp := &pb.Int{Value: i} if err := stream.Send(rsp); err != nil { return err } } return nil } - Python
def Range(self, Int, context): for i in range(Int.value): rsp = helloWorld_pb2.Int(value=i) yield rsp
Client
- Go
func getRange(client pb.HelloWorldClient, value int32) { _i := &pb.Int{Value: value} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Range(ctx, _i) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } for { i, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("%v.Range(_) = _, %v", client, err) } fmt.Printf("range %d => %d\n", _i.Value, i.Value) } } - Python
def getRange(stub, value): req = helloWorld_pb2.Int(value=value) rsp = stub.Range(req) for i in rsp: print(f"range {value} => {i.value}")
Client-to-Server Streaming
// A client-to-server streaming RPC.
rpc Sum(stream Int) returns (Int) {}
Server- Go
func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error { var result int32 = 0 for { in, err := stream.Recv() if err == io.EOF { rsp := &pb.Int{Value: result} return stream.SendAndClose(rsp) } if err != nil { return err } result += in.Value } } - Python
def Sum(self, Int_iterator, context): result = 0 for i in Int_iterator: result += i.value rsp = helloWorld_pb2.Int(value=result) return rsp
Client
- Go
func getSum(client pb.HelloWorldClient, value int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Sum(ctx) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } for i := 0; i < value; i++ { _i := &pb.Int{Value: int32(i)} if err := stream.Send(_i); err != nil { log.Fatalf("%v.Send(%v) = %v", stream, _i, err) } } reply, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } fmt.Printf("Sum range(%d) => %d\n", value, reply.Value) } - Python
def getSumSync(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rsp = stub.Sum(req) print(f"Sum range({value}) => {rsp.value}")def getSumAsync(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rspFuture = stub.Sum.future(req) rsp = rspFuture.result() print("Async") print(f"Sum range({value}) => {rsp.value}")
Bidirectional Streaming
// A Bidirectional streaming RPC.
rpc DoubleIter(stream Int) returns (stream Int) {}
Server- Go
func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } rsp := &pb.Int{Value: in.Value * 2} if err := stream.Send(rsp); err != nil { return err } } } - Python
def DoubleIter(self, Int_iterator, context): for i in Int_iterator: rsp = helloWorld_pb2.Int(value=i.value*2) yield rsp
Client
- Go
func getDoubleIter(client pb.HelloWorldClient, value int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.DoubleIter(ctx) if err != nil { log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err) } waitc := make(chan struct{}) go func() { for { in, err := stream.Recv() if err == io.EOF { // read done. close(waitc) return } if err != nil { log.Fatalf("Failed to receive: %v", err) } fmt.Printf("double range(%d) => %d\n", value, in.Value*2) } }() for i := 0; i < value; i++ { _i := &pb.Int{Value: int32(i)} if err := stream.Send(_i); err != nil { log.Fatalf("Failed to send: %v", err) } } stream.CloseSend() <-waitc } - Python
def getDoubleIter(stub, value): req = (helloWorld_pb2.Int(value=i) for i in range(value)) rsp = stub.DoubleIter(req) for i in rsp: print(f"double range({value}) => {i.value * 2}")
Proto 程式碼
helloWorld.proto
syntax = "proto3";
package helloworld;
// Interface exported by the server.
service HelloWorld {
// A simple RPC.
rpc Double(Int) returns (Int) {}
// A server-to-client streaming RPC.
rpc Range(Int) returns (stream Int) {}
// A client-to-server streaming RPC.
rpc Sum(stream Int) returns (Int) {}
// A Bidirectional streaming RPC.
rpc DoubleIter(stream Int) returns (stream Int) {}
}
message Int { int32 value = 1; }
Go 程式碼
架構
helloWorld_server.go
helloWorld_client.go
go
├─client
│ helloWorld_client.go
│
├─protos
│ helloWorld.pb.go
│ helloWorld.proto
│
└─server
helloWorld_server.go
helloWorld_server.go
//The Go implementation of the gRPC server.
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"google.golang.org/grpc"
pb "../protos"
)
type helloWorldServer struct {
}
func (s *helloWorldServer) Double(ctx context.Context, Int *pb.Int) (*pb.Int, error) {
rsp := &pb.Int{Value: Int.Value * 2}
return rsp, nil
}
func (s *helloWorldServer) Range(Int *pb.Int, stream pb.HelloWorld_RangeServer) error {
var i int32 = 0
for ; i < Int.Value; i++ {
rsp := &pb.Int{Value: i}
if err := stream.Send(rsp); err != nil {
return err
}
}
return nil
}
func (s *helloWorldServer) Sum(stream pb.HelloWorld_SumServer) error {
var result int32 = 0
for {
in, err := stream.Recv()
if err == io.EOF {
rsp := &pb.Int{Value: result}
return stream.SendAndClose(rsp)
}
if err != nil {
return err
}
result += in.Value
}
}
func (s *helloWorldServer) DoubleIter(stream pb.HelloWorld_DoubleIterServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
rsp := &pb.Int{Value: in.Value * 2}
if err := stream.Send(rsp); err != nil {
return err
}
}
}
func newServer() *helloWorldServer {
s := &helloWorldServer{}
return s
}
func main() {
port := 50051
addr := fmt.Sprintf("localhost:%d", port)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
fmt.Printf("Listen to %s\n", addr)
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
pb.RegisterHelloWorldServer(grpcServer, newServer())
grpcServer.Serve(lis)
}
helloWorld_client.go
//The Go implementation of the gRPC client.
package main
import (
"context"
"fmt"
"io"
"log"
"time"
pb "../protos"
"google.golang.org/grpc"
)
func getDouble(client pb.HelloWorldClient, value int32) {
i := &pb.Int{Value: value}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rsp, err := client.Double(ctx, i)
if err != nil {
log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
}
fmt.Printf("double %d => %d\n", i.Value, rsp.Value)
}
func getRange(client pb.HelloWorldClient, value int32) {
_i := &pb.Int{Value: value}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Range(ctx, _i)
if err != nil {
log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
}
for {
i, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.Range(_) = _, %v", client, err)
}
fmt.Printf("range %d => %d\n", _i.Value, i.Value)
}
}
func getSum(client pb.HelloWorldClient, value int) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Sum(ctx)
if err != nil {
log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
}
for i := 0; i < value; i++ {
_i := &pb.Int{Value: int32(i)}
if err := stream.Send(_i); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, _i, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
fmt.Printf("Sum range(%d) => %d\n", value, reply.Value)
}
func getDoubleIter(client pb.HelloWorldClient, value int) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.DoubleIter(ctx)
if err != nil {
log.Fatalf("%v.GetDouble(_) = _, %v: ", client, err)
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive: %v", err)
}
fmt.Printf("double range(%d) => %d\n", value, in.Value*2)
}
}()
for i := 0; i < value; i++ {
_i := &pb.Int{Value: int32(i)}
if err := stream.Send(_i); err != nil {
log.Fatalf("Failed to send: %v", err)
}
}
stream.CloseSend()
<-waitc
}
func main() {
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
serverAddr := "localhost:50051"
conn, err := grpc.Dial(serverAddr, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewHelloWorldClient(conn)
fmt.Println("-------------- Double-------------------")
getDouble(client, 5)
fmt.Println("-------------- Range -------------------")
getRange(client, 5)
fmt.Println("-------------- Sum----------------------")
getSum(client, 5)
fmt.Println("-------------- DoubleIter --------------")
getDoubleIter(client, 5)
}
// Output
// -------------- Double-------------------
// double 5 => 10
// -------------- Range -------------------
// range 5 => 0
// range 5 => 1
// range 5 => 2
// range 5 => 3
// range 5 => 4
// -------------- Sum----------------------
// Sum range(5) => 10
// -------------- DoubleIter --------------
// double range(5) => 0
// double range(5) => 4
// double range(5) => 8
// double range(5) => 12
// double range(5) => 16
Python 程式碼
架構
helloWorld_server.py
helloWorld_client.py
python
│ helloWorld_client.py
│ helloWorld_pb2.py
│ helloWorld_pb2_grpc.py
│ helloWorld_server.py
│
└─protos
helloWorld.proto
helloWorld_server.py
"""The Python implementation of the gRPC server."""
from concurrent import futures
import logging
import time
import grpc
import helloWorld_pb2
import helloWorld_pb2_grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class HelloWorldServicer(helloWorld_pb2_grpc.HelloWorldServicer):
"""Provides methods that implement functionality of hello world server."""
def __init__(self):
pass
def Double(self, Int, context):
rsp = helloWorld_pb2.Int(value=Int.value*2)
return rsp
def Range(self, Int, context):
for i in range(Int.value):
rsp = helloWorld_pb2.Int(value=i)
yield rsp
def Sum(self, Int_iterator, context):
result = 0
for i in Int_iterator:
result += i.value
rsp = helloWorld_pb2.Int(value=result)
return rsp
def DoubleIter(self, Int_iterator, context):
for i in Int_iterator:
rsp = helloWorld_pb2.Int(value=i.value*2)
yield rsp
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
helloWorld_pb2_grpc.add_HelloWorldServicer_to_server(
HelloWorldServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Listen to", "[::]:50051")
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
logging.basicConfig()
serve()
helloWorld_client.py
"""The Python implementation of the gRPC client."""
import logging
import grpc
import helloWorld_pb2
import helloWorld_pb2_grpc
def getDoubleSync(stub, value):
req = helloWorld_pb2.Int(value=value)
rsp = stub.Double(req)
print(f"double {value} => {rsp.value}")
def getDoubleAsync(stub, value):
req = helloWorld_pb2.Int(value=value)
rspFuture = stub.Double.future(req)
rsp = rspFuture.result()
print("Async")
print(f"double {value} => {rsp.value}")
def getRange(stub, value):
req = helloWorld_pb2.Int(value=value)
rsp = stub.Range(req)
for i in rsp:
print(f"range {value} => {i.value}")
def getSumSync(stub, value):
req = (helloWorld_pb2.Int(value=i) for i in range(value))
rsp = stub.Sum(req)
print(f"Sum range({value}) => {rsp.value}")
def getSumAsync(stub, value):
req = (helloWorld_pb2.Int(value=i) for i in range(value))
rspFuture = stub.Sum.future(req)
rsp = rspFuture.result()
print("Async")
print(f"Sum range({value}) => {rsp.value}")
def getDoubleIter(stub, value):
req = (helloWorld_pb2.Int(value=i) for i in range(value))
rsp = stub.DoubleIter(req)
for i in rsp:
print(f"double range({value}) => {i.value * 2}")
def run():
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
with grpc.insecure_channel("localhost:50051") as channel:
stub = helloWorld_pb2_grpc.HelloWorldStub(channel)
print("-------------- Double Sync--------------")
getDoubleSync(stub, 5)
print("-------------- Double Async-------------")
getDoubleAsync(stub, 5)
print("-------------- Range -------------------")
getRange(stub, 5)
print("-------------- Sum Sync-----------------")
getSumSync(stub, 5)
print("-------------- Sum Async----------------")
getSumAsync(stub, 5)
print("-------------- DoubleIter --------------")
getDoubleIter(stub, 5)
if __name__ == "__main__":
logging.basicConfig()
run()
# Output
# -------------- Double Sync--------------
# double 5 => 10
# -------------- Double Async-------------
# Async
# double 5 => 10
# -------------- Range -------------------
# range 5 => 0
# range 5 => 1
# range 5 => 2
# range 5 => 3
# range 5 => 4
# -------------- Sum Sync-----------------
# Sum range(5) => 10
# -------------- Sum Async----------------
# Async
# Sum range(5) => 10
# -------------- DoubleIter --------------
# double range(5) => 0
# double range(5) => 4
# double range(5) => 8
# double range(5) => 12
# double range(5) => 16
參考
gRPC Tutorial for PythonGolang 如何使用 grpc
留言
張貼留言