当前位置:网站首页>GRPC的四种数据流以及案例
GRPC的四种数据流以及案例
2022-07-03 13:39:00 【天龙至尊】
GRPC的四种数据流:
1、简单模式(Simple RPC)
2、服务端数据流模式(Server-side Streaming RPC)
3、客户端数据流模式(Client-side Streaming RPC)
4、双向数据流模式(Bidirectional Streaming RPC)
如果对GRPC和protobuf不了解的话,可以参看一下文章:
syntax = "proto3";
option go_package = "./;grpc_proto";
service Stream {
rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData);
}
message StreamRequestData {
string data = 1;
}
message StreamResponseData {
string data = 1;
}执行命令,进行grpc代码的生成:
stream.pb.go:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.2
// source: stream.proto
package grpc_proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamRequestData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamRequestData) Reset() {
*x = StreamRequestData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamRequestData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamRequestData) ProtoMessage() {}
func (x *StreamRequestData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamRequestData.ProtoReflect.Descriptor instead.
func (*StreamRequestData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{0}
}
func (x *StreamRequestData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
type StreamResponseData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamResponseData) Reset() {
*x = StreamResponseData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamResponseData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamResponseData) ProtoMessage() {}
func (x *StreamResponseData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamResponseData.ProtoReflect.Descriptor instead.
func (*StreamResponseData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{1}
}
func (x *StreamResponseData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
var File_stream_proto protoreflect.FileDescriptor
var file_stream_proto_rawDesc = []byte{
0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x27,
0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x28, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74,
0x61, 0x32, 0xb2, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x36, 0x0a, 0x09,
0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x09, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x28, 0x01, 0x12, 0x38, 0x0a, 0x09,
0x41, 0x6c, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x67, 0x72, 0x70,
0x63, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_stream_proto_rawDescOnce sync.Once
file_stream_proto_rawDescData = file_stream_proto_rawDesc
)
func file_stream_proto_rawDescGZIP() []byte {
file_stream_proto_rawDescOnce.Do(func() {
file_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_stream_proto_rawDescData)
})
return file_stream_proto_rawDescData
}
var file_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_stream_proto_goTypes = []interface{}{
(*StreamRequestData)(nil), // 0: StreamRequestData
(*StreamResponseData)(nil), // 1: StreamResponseData
}
var file_stream_proto_depIdxs = []int32{
0, // 0: Stream.GetStream:input_type -> StreamRequestData
0, // 1: Stream.PutStream:input_type -> StreamRequestData
0, // 2: Stream.AllStream:input_type -> StreamRequestData
1, // 3: Stream.GetStream:output_type -> StreamResponseData
1, // 4: Stream.PutStream:output_type -> StreamResponseData
1, // 5: Stream.AllStream:output_type -> StreamResponseData
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_stream_proto_init() }
func file_stream_proto_init() {
if File_stream_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamRequestData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamResponseData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_stream_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_stream_proto_goTypes,
DependencyIndexes: file_stream_proto_depIdxs,
MessageInfos: file_stream_proto_msgTypes,
}.Build()
File_stream_proto = out.File
file_stream_proto_rawDesc = nil
file_stream_proto_goTypes = nil
file_stream_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// StreamClient is the client API for Stream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StreamClient interface {
GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error)
PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error)
AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error)
}
type streamClient struct {
cc grpc.ClientConnInterface
}
func NewStreamClient(cc grpc.ClientConnInterface) StreamClient {
return &streamClient{cc}
}
func (c *streamClient) GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[0], "/Stream/GetStream", opts...)
if err != nil {
return nil, err
}
x := &streamGetStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Stream_GetStreamClient interface {
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamGetStreamClient struct {
grpc.ClientStream
}
func (x *streamGetStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[1], "/Stream/PutStream", opts...)
if err != nil {
return nil, err
}
x := &streamPutStreamClient{stream}
return x, nil
}
type Stream_PutStreamClient interface {
Send(*StreamRequestData) error
CloseAndRecv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamPutStreamClient struct {
grpc.ClientStream
}
func (x *streamPutStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamPutStreamClient) CloseAndRecv() (*StreamResponseData, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[2], "/Stream/AllStream", opts...)
if err != nil {
return nil, err
}
x := &streamAllStreamClient{stream}
return x, nil
}
type Stream_AllStreamClient interface {
Send(*StreamRequestData) error
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamAllStreamClient struct {
grpc.ClientStream
}
func (x *streamAllStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamAllStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StreamServer is the server API for Stream service.
type StreamServer interface {
GetStream(*StreamRequestData, Stream_GetStreamServer) error
PutStream(Stream_PutStreamServer) error
AllStream(Stream_AllStreamServer) error
}
// UnimplementedStreamServer can be embedded to have forward compatible implementations.
type UnimplementedStreamServer struct {
}
func (*UnimplementedStreamServer) GetStream(*StreamRequestData, Stream_GetStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetStream not implemented")
}
func (*UnimplementedStreamServer) PutStream(Stream_PutStreamServer) error {
return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
}
func (*UnimplementedStreamServer) AllStream(Stream_AllStreamServer) error {
return status.Errorf(codes.Unimplemented, "method AllStream not implemented")
}
func RegisterStreamServer(s *grpc.Server, srv StreamServer) {
s.RegisterService(&_Stream_serviceDesc, srv)
}
func _Stream_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamRequestData)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StreamServer).GetStream(m, &streamGetStreamServer{stream})
}
type Stream_GetStreamServer interface {
Send(*StreamResponseData) error
grpc.ServerStream
}
type streamGetStreamServer struct {
grpc.ServerStream
}
func (x *streamGetStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func _Stream_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).PutStream(&streamPutStreamServer{stream})
}
type Stream_PutStreamServer interface {
SendAndClose(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamPutStreamServer struct {
grpc.ServerStream
}
func (x *streamPutStreamServer) SendAndClose(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamPutStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Stream_AllStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).AllStream(&streamAllStreamServer{stream})
}
type Stream_AllStreamServer interface {
Send(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamAllStreamServer struct {
grpc.ServerStream
}
func (x *streamAllStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamAllStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Stream_serviceDesc = grpc.ServiceDesc{
ServiceName: "Stream",
HandlerType: (*StreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "GetStream",
Handler: _Stream_GetStream_Handler,
ServerStreams: true,
},
{
StreamName: "PutStream",
Handler: _Stream_PutStream_Handler,
ClientStreams: true,
},
{
StreamName: "AllStream",
Handler: _Stream_AllStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "stream.proto",
}
server.go:
package main
import (
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
type Server struct {
}
func (s *Server) GetStream(stream *grpc_proto.StreamRequestData, response grpc_proto.Stream_GetStreamServer) error {
var count = 0
for {
count++
err := response.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("%d", time.Now().Unix()),
})
time.Sleep(time.Second)
if err != nil {
return err
}
if count > 10 {
break
}
}
return nil
}
func (s *Server) PutStream(server grpc_proto.Stream_PutStreamServer) error {
for {
data, err := server.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("服务器接收到的数据=>>>>>>", data)
}
return nil
}
func (s *Server) AllStream(allStream grpc_proto.Stream_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println("服务器接收的数据=>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("服务器发送的消息:%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
return nil
}
func main() {
server := grpc.NewServer()
listen, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
return
}
grpc_proto.RegisterStreamServer(server, &Server{})
err = server.Serve(listen)
if err != nil {
panic(err)
}
}
client.go:
package main
import (
"context"
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
"time"
)
func main() {
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
client := grpc_proto.NewStreamClient(conn)
// 服务器流
stream, err := client.GetStream(context.Background(), &grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
panic(err)
}
for {
responseData, err := stream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(responseData)
}
// 客户端流
putStream, err := client.PutStream(context.Background())
if err != nil {
panic(err)
}
var count = 0
for {
count++
err := putStream.Send(&grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
fmt.Println(err)
break
}
time.Sleep(time.Second)
if count > 10 {
break
}
}
//双向流
wg := sync.WaitGroup{}
wg.Add(2)
allStream, err := client.AllStream(context.Background())
if err != nil {
panic(err)
}
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println("客户端接收的数据=>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamRequestData{
Data: fmt.Sprintf("客户端发送的消息:%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
}
client的输出:

server的输出:

边栏推荐
- How to use lxml to judge whether the website announcement is updated
- Implementation of Muduo accept connection, disconnection and sending data
- QT learning 22 layout manager (I)
- Collection of mobile adaptation related articles
- [acnoi2022] guess numbers
- Exercise 6-2 using functions to sum special A-string sequences
- Comprehensive case of MySQL data addition, deletion, modification and query
- Function calling convention
- JS Part III
- Common network state detection and analysis tools
猜你喜欢

FPGA test method takes mentor tool as an example

Conversion function and explicit

Print. JS -- web page file printing
[email "/>Doxorubicin loaded on metal organic framework MIL-88 DOX | folic acid modified uio-66-nh2 doxorubicin loaded [email

好看、好用、强大的手写笔记软件综合评测:Notability、GoodNotes、MarginNote、随手写、Notes Writers、CollaNote、CollaNote、Prodrafts、Noteshelf、FlowUs、OneNote、苹果备忘录

Exercise 6-1 classify and count the number of characters

MySQL data processing value addition, deletion and modification

Spring cup eight school league

FPGA测试方法以Mentor工具为例

“又土又穷”的草根高校,凭什么被称为“东北小清华”?
随机推荐
Scroll detection of the navigation bar enables the navigation bar to slide and fix with no content
Qt学习23 布局管理器(二)
7-11 calculation of residential water charges by sections
Exercise 6-2 using functions to sum special A-string sequences
JS download files through URL links
Cross linked cyclodextrin metal organic framework loaded methotrexate slow-release particles | metal organic porous material uio-66 loaded with flavonoid glycosides | Qiyue
Webpage connection database ~ simple implementation of addition, deletion, modification and query complete code
RocksDB LRUCache
Page generation QR code
JS matrix zero
simpleParallax. JS (create poor visual effects for website pictures)
Qt学习17 对话框及其类型
Message subscription and publishing
MySQL data processing value addition, deletion and modification
虽然不一定最优秀,但一定是最努力的!
Selenium browser (1)
Scroll detection, so that the content in the lower right corner is not displayed at the top of the page, but is displayed as the mouse slides
Exercise 9-1 time conversion
Common mixins
js . Find the first palindrome string in the array