当前位置:网站首页>GRPC的四种数据流以及案例
GRPC的四种数据流以及案例
2022-07-03 13:39:00 【天龙至尊】
GRPC的四种数据流:
1、简单模式(Simple RPC)
2、服务端数据流模式(Server-side Streaming RPC)
3、客户端数据流模式(Client-side Streaming RPC)
4、双向数据流模式(Bidirectional Streaming RPC)
如果对GRPC和protobuf不了解的话,可以参看一下文章:
syntax = "proto3";
option go_package = "./;grpc_proto";
service Stream {
rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData);
}
message StreamRequestData {
string data = 1;
}
message StreamResponseData {
string data = 1;
}执行命令,进行grpc代码的生成:
stream.pb.go:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.2
// source: stream.proto
package grpc_proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamRequestData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamRequestData) Reset() {
*x = StreamRequestData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamRequestData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamRequestData) ProtoMessage() {}
func (x *StreamRequestData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamRequestData.ProtoReflect.Descriptor instead.
func (*StreamRequestData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{0}
}
func (x *StreamRequestData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
type StreamResponseData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamResponseData) Reset() {
*x = StreamResponseData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamResponseData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamResponseData) ProtoMessage() {}
func (x *StreamResponseData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamResponseData.ProtoReflect.Descriptor instead.
func (*StreamResponseData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{1}
}
func (x *StreamResponseData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
var File_stream_proto protoreflect.FileDescriptor
var file_stream_proto_rawDesc = []byte{
0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x27,
0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x28, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74,
0x61, 0x32, 0xb2, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x36, 0x0a, 0x09,
0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x09, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x28, 0x01, 0x12, 0x38, 0x0a, 0x09,
0x41, 0x6c, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x67, 0x72, 0x70,
0x63, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_stream_proto_rawDescOnce sync.Once
file_stream_proto_rawDescData = file_stream_proto_rawDesc
)
func file_stream_proto_rawDescGZIP() []byte {
file_stream_proto_rawDescOnce.Do(func() {
file_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_stream_proto_rawDescData)
})
return file_stream_proto_rawDescData
}
var file_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_stream_proto_goTypes = []interface{}{
(*StreamRequestData)(nil), // 0: StreamRequestData
(*StreamResponseData)(nil), // 1: StreamResponseData
}
var file_stream_proto_depIdxs = []int32{
0, // 0: Stream.GetStream:input_type -> StreamRequestData
0, // 1: Stream.PutStream:input_type -> StreamRequestData
0, // 2: Stream.AllStream:input_type -> StreamRequestData
1, // 3: Stream.GetStream:output_type -> StreamResponseData
1, // 4: Stream.PutStream:output_type -> StreamResponseData
1, // 5: Stream.AllStream:output_type -> StreamResponseData
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_stream_proto_init() }
func file_stream_proto_init() {
if File_stream_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamRequestData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamResponseData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_stream_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_stream_proto_goTypes,
DependencyIndexes: file_stream_proto_depIdxs,
MessageInfos: file_stream_proto_msgTypes,
}.Build()
File_stream_proto = out.File
file_stream_proto_rawDesc = nil
file_stream_proto_goTypes = nil
file_stream_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// StreamClient is the client API for Stream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StreamClient interface {
GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error)
PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error)
AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error)
}
type streamClient struct {
cc grpc.ClientConnInterface
}
func NewStreamClient(cc grpc.ClientConnInterface) StreamClient {
return &streamClient{cc}
}
func (c *streamClient) GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[0], "/Stream/GetStream", opts...)
if err != nil {
return nil, err
}
x := &streamGetStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Stream_GetStreamClient interface {
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamGetStreamClient struct {
grpc.ClientStream
}
func (x *streamGetStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[1], "/Stream/PutStream", opts...)
if err != nil {
return nil, err
}
x := &streamPutStreamClient{stream}
return x, nil
}
type Stream_PutStreamClient interface {
Send(*StreamRequestData) error
CloseAndRecv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamPutStreamClient struct {
grpc.ClientStream
}
func (x *streamPutStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamPutStreamClient) CloseAndRecv() (*StreamResponseData, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[2], "/Stream/AllStream", opts...)
if err != nil {
return nil, err
}
x := &streamAllStreamClient{stream}
return x, nil
}
type Stream_AllStreamClient interface {
Send(*StreamRequestData) error
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamAllStreamClient struct {
grpc.ClientStream
}
func (x *streamAllStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamAllStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StreamServer is the server API for Stream service.
type StreamServer interface {
GetStream(*StreamRequestData, Stream_GetStreamServer) error
PutStream(Stream_PutStreamServer) error
AllStream(Stream_AllStreamServer) error
}
// UnimplementedStreamServer can be embedded to have forward compatible implementations.
type UnimplementedStreamServer struct {
}
func (*UnimplementedStreamServer) GetStream(*StreamRequestData, Stream_GetStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetStream not implemented")
}
func (*UnimplementedStreamServer) PutStream(Stream_PutStreamServer) error {
return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
}
func (*UnimplementedStreamServer) AllStream(Stream_AllStreamServer) error {
return status.Errorf(codes.Unimplemented, "method AllStream not implemented")
}
func RegisterStreamServer(s *grpc.Server, srv StreamServer) {
s.RegisterService(&_Stream_serviceDesc, srv)
}
func _Stream_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamRequestData)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StreamServer).GetStream(m, &streamGetStreamServer{stream})
}
type Stream_GetStreamServer interface {
Send(*StreamResponseData) error
grpc.ServerStream
}
type streamGetStreamServer struct {
grpc.ServerStream
}
func (x *streamGetStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func _Stream_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).PutStream(&streamPutStreamServer{stream})
}
type Stream_PutStreamServer interface {
SendAndClose(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamPutStreamServer struct {
grpc.ServerStream
}
func (x *streamPutStreamServer) SendAndClose(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamPutStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Stream_AllStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).AllStream(&streamAllStreamServer{stream})
}
type Stream_AllStreamServer interface {
Send(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamAllStreamServer struct {
grpc.ServerStream
}
func (x *streamAllStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamAllStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Stream_serviceDesc = grpc.ServiceDesc{
ServiceName: "Stream",
HandlerType: (*StreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "GetStream",
Handler: _Stream_GetStream_Handler,
ServerStreams: true,
},
{
StreamName: "PutStream",
Handler: _Stream_PutStream_Handler,
ClientStreams: true,
},
{
StreamName: "AllStream",
Handler: _Stream_AllStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "stream.proto",
}
server.go:
package main
import (
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
type Server struct {
}
func (s *Server) GetStream(stream *grpc_proto.StreamRequestData, response grpc_proto.Stream_GetStreamServer) error {
var count = 0
for {
count++
err := response.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("%d", time.Now().Unix()),
})
time.Sleep(time.Second)
if err != nil {
return err
}
if count > 10 {
break
}
}
return nil
}
func (s *Server) PutStream(server grpc_proto.Stream_PutStreamServer) error {
for {
data, err := server.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("服务器接收到的数据=>>>>>>", data)
}
return nil
}
func (s *Server) AllStream(allStream grpc_proto.Stream_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println("服务器接收的数据=>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("服务器发送的消息:%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
return nil
}
func main() {
server := grpc.NewServer()
listen, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
return
}
grpc_proto.RegisterStreamServer(server, &Server{})
err = server.Serve(listen)
if err != nil {
panic(err)
}
}
client.go:
package main
import (
"context"
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
"time"
)
func main() {
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
client := grpc_proto.NewStreamClient(conn)
// 服务器流
stream, err := client.GetStream(context.Background(), &grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
panic(err)
}
for {
responseData, err := stream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(responseData)
}
// 客户端流
putStream, err := client.PutStream(context.Background())
if err != nil {
panic(err)
}
var count = 0
for {
count++
err := putStream.Send(&grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
fmt.Println(err)
break
}
time.Sleep(time.Second)
if count > 10 {
break
}
}
//双向流
wg := sync.WaitGroup{}
wg.Add(2)
allStream, err := client.AllStream(context.Background())
if err != nil {
panic(err)
}
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println("客户端接收的数据=>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamRequestData{
Data: fmt.Sprintf("客户端发送的消息:%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
}
client的输出:

server的输出:

边栏推荐
- How to promote the progress of project collaboration | community essay solicitation
- QT learning 20 standard dialog box in QT (middle)
- Collection of mobile adaptation related articles
- 7-10 calculate salary
- Exercise 10-1 calculate the sum of 1 to n using recursive functions
- JVM family - overview, program counter day1-1
- Back to top implementation
- JS shift operators (< <,> > and > > >)
- simpleParallax. JS (create poor visual effects for website pictures)
- 战略、战术(和 OKR)
猜你喜欢

JS Part 2

好看、好用、强大的手写笔记软件综合评测:Notability、GoodNotes、MarginNote、随手写、Notes Writers、CollaNote、CollaNote、Prodrafts、Noteshelf、FlowUs、OneNote、苹果备忘录

Uio-66-cooh loaded bendamostine | hydroxyapatite (HA) coated MIL-53 (FE) nanoparticles | baicalin loaded manganese based metal organic skeleton material

Implementation of Muduo asynchronous logging

Exercise 6-6 use a function to output an integer in reverse order

QT learning 25 layout manager (4)

Qt学习19 Qt 中的标准对话框(上)

QT learning 17 dialog box and its types

Qt学习17 对话框及其类型

Qt学习25 布局管理器(四)
随机推荐
金属有机骨架(MOFs)抗肿瘤药载体|PCN-223装载甲硝唑|UiO-66包载盐酸环丙沙星([email protected])
Leetcode(4)——寻找两个正序数组的中位数
金属有机骨架MOFs装载非甾体类抗炎药物|ZIF-8包裹普鲁士蓝负载槲皮素(制备方法)
Implementation of Muduo asynchronous logging
JS get DPI, PX to cm, cm to PX
消息订阅与发布
Uniapp tips - set background music
JS shift operators (< <,> > and > > >)
MySQL data processing value addition, deletion and modification
Thrift threadmanager and three monitors
Solve the problem of dormitory router campus network sharing login
好看、好用、强大的手写笔记软件综合评测:Notability、GoodNotes、MarginNote、随手写、Notes Writers、CollaNote、CollaNote、Prodrafts、Noteshelf、FlowUs、OneNote、苹果备忘录
Scroll detection, so that the content in the lower right corner is not displayed at the top of the page, but is displayed as the mouse slides
【吉林大学】考研初试复试资料分享
Uniapp skills - dom display and hiding
QT learning 22 layout manager (I)
Programmable logic device software testing
JS general form submission 1-onsubmit
jvm-对象生命周期
Dlopen() implements dynamic loading of third-party libraries