当前位置:网站首页>GRPC的四种数据流以及案例
GRPC的四种数据流以及案例
2022-07-03 13:39:00 【天龙至尊】
GRPC的四种数据流:
1、简单模式(Simple RPC)
2、服务端数据流模式(Server-side Streaming RPC)
3、客户端数据流模式(Client-side Streaming RPC)
4、双向数据流模式(Bidirectional Streaming RPC)
如果对GRPC和protobuf不了解的话,可以参看一下文章:
syntax = "proto3";
option go_package = "./;grpc_proto";
service Stream {
rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData);
}
message StreamRequestData {
string data = 1;
}
message StreamResponseData {
string data = 1;
}
执行命令,进行grpc代码的生成:
stream.pb.go:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.2
// source: stream.proto
package grpc_proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamRequestData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamRequestData) Reset() {
*x = StreamRequestData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamRequestData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamRequestData) ProtoMessage() {}
func (x *StreamRequestData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamRequestData.ProtoReflect.Descriptor instead.
func (*StreamRequestData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{0}
}
func (x *StreamRequestData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
type StreamResponseData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamResponseData) Reset() {
*x = StreamResponseData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamResponseData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamResponseData) ProtoMessage() {}
func (x *StreamResponseData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamResponseData.ProtoReflect.Descriptor instead.
func (*StreamResponseData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{1}
}
func (x *StreamResponseData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
var File_stream_proto protoreflect.FileDescriptor
var file_stream_proto_rawDesc = []byte{
0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x27,
0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x28, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74,
0x61, 0x32, 0xb2, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x36, 0x0a, 0x09,
0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x09, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x28, 0x01, 0x12, 0x38, 0x0a, 0x09,
0x41, 0x6c, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x67, 0x72, 0x70,
0x63, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_stream_proto_rawDescOnce sync.Once
file_stream_proto_rawDescData = file_stream_proto_rawDesc
)
func file_stream_proto_rawDescGZIP() []byte {
file_stream_proto_rawDescOnce.Do(func() {
file_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_stream_proto_rawDescData)
})
return file_stream_proto_rawDescData
}
var file_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_stream_proto_goTypes = []interface{}{
(*StreamRequestData)(nil), // 0: StreamRequestData
(*StreamResponseData)(nil), // 1: StreamResponseData
}
var file_stream_proto_depIdxs = []int32{
0, // 0: Stream.GetStream:input_type -> StreamRequestData
0, // 1: Stream.PutStream:input_type -> StreamRequestData
0, // 2: Stream.AllStream:input_type -> StreamRequestData
1, // 3: Stream.GetStream:output_type -> StreamResponseData
1, // 4: Stream.PutStream:output_type -> StreamResponseData
1, // 5: Stream.AllStream:output_type -> StreamResponseData
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_stream_proto_init() }
func file_stream_proto_init() {
if File_stream_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamRequestData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamResponseData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_stream_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_stream_proto_goTypes,
DependencyIndexes: file_stream_proto_depIdxs,
MessageInfos: file_stream_proto_msgTypes,
}.Build()
File_stream_proto = out.File
file_stream_proto_rawDesc = nil
file_stream_proto_goTypes = nil
file_stream_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// StreamClient is the client API for Stream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StreamClient interface {
GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error)
PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error)
AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error)
}
type streamClient struct {
cc grpc.ClientConnInterface
}
func NewStreamClient(cc grpc.ClientConnInterface) StreamClient {
return &streamClient{cc}
}
func (c *streamClient) GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[0], "/Stream/GetStream", opts...)
if err != nil {
return nil, err
}
x := &streamGetStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Stream_GetStreamClient interface {
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamGetStreamClient struct {
grpc.ClientStream
}
func (x *streamGetStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[1], "/Stream/PutStream", opts...)
if err != nil {
return nil, err
}
x := &streamPutStreamClient{stream}
return x, nil
}
type Stream_PutStreamClient interface {
Send(*StreamRequestData) error
CloseAndRecv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamPutStreamClient struct {
grpc.ClientStream
}
func (x *streamPutStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamPutStreamClient) CloseAndRecv() (*StreamResponseData, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[2], "/Stream/AllStream", opts...)
if err != nil {
return nil, err
}
x := &streamAllStreamClient{stream}
return x, nil
}
type Stream_AllStreamClient interface {
Send(*StreamRequestData) error
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamAllStreamClient struct {
grpc.ClientStream
}
func (x *streamAllStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamAllStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StreamServer is the server API for Stream service.
type StreamServer interface {
GetStream(*StreamRequestData, Stream_GetStreamServer) error
PutStream(Stream_PutStreamServer) error
AllStream(Stream_AllStreamServer) error
}
// UnimplementedStreamServer can be embedded to have forward compatible implementations.
type UnimplementedStreamServer struct {
}
func (*UnimplementedStreamServer) GetStream(*StreamRequestData, Stream_GetStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetStream not implemented")
}
func (*UnimplementedStreamServer) PutStream(Stream_PutStreamServer) error {
return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
}
func (*UnimplementedStreamServer) AllStream(Stream_AllStreamServer) error {
return status.Errorf(codes.Unimplemented, "method AllStream not implemented")
}
func RegisterStreamServer(s *grpc.Server, srv StreamServer) {
s.RegisterService(&_Stream_serviceDesc, srv)
}
func _Stream_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamRequestData)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StreamServer).GetStream(m, &streamGetStreamServer{stream})
}
type Stream_GetStreamServer interface {
Send(*StreamResponseData) error
grpc.ServerStream
}
type streamGetStreamServer struct {
grpc.ServerStream
}
func (x *streamGetStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func _Stream_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).PutStream(&streamPutStreamServer{stream})
}
type Stream_PutStreamServer interface {
SendAndClose(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamPutStreamServer struct {
grpc.ServerStream
}
func (x *streamPutStreamServer) SendAndClose(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamPutStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Stream_AllStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).AllStream(&streamAllStreamServer{stream})
}
type Stream_AllStreamServer interface {
Send(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamAllStreamServer struct {
grpc.ServerStream
}
func (x *streamAllStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamAllStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Stream_serviceDesc = grpc.ServiceDesc{
ServiceName: "Stream",
HandlerType: (*StreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "GetStream",
Handler: _Stream_GetStream_Handler,
ServerStreams: true,
},
{
StreamName: "PutStream",
Handler: _Stream_PutStream_Handler,
ClientStreams: true,
},
{
StreamName: "AllStream",
Handler: _Stream_AllStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "stream.proto",
}
server.go:
package main
import (
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
type Server struct {
}
func (s *Server) GetStream(stream *grpc_proto.StreamRequestData, response grpc_proto.Stream_GetStreamServer) error {
var count = 0
for {
count++
err := response.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("%d", time.Now().Unix()),
})
time.Sleep(time.Second)
if err != nil {
return err
}
if count > 10 {
break
}
}
return nil
}
func (s *Server) PutStream(server grpc_proto.Stream_PutStreamServer) error {
for {
data, err := server.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("服务器接收到的数据=>>>>>>", data)
}
return nil
}
func (s *Server) AllStream(allStream grpc_proto.Stream_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println("服务器接收的数据=>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("服务器发送的消息:%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
return nil
}
func main() {
server := grpc.NewServer()
listen, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
return
}
grpc_proto.RegisterStreamServer(server, &Server{})
err = server.Serve(listen)
if err != nil {
panic(err)
}
}
client.go:
package main
import (
"context"
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
"time"
)
func main() {
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
client := grpc_proto.NewStreamClient(conn)
// 服务器流
stream, err := client.GetStream(context.Background(), &grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
panic(err)
}
for {
responseData, err := stream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(responseData)
}
// 客户端流
putStream, err := client.PutStream(context.Background())
if err != nil {
panic(err)
}
var count = 0
for {
count++
err := putStream.Send(&grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
fmt.Println(err)
break
}
time.Sleep(time.Second)
if count > 10 {
break
}
}
//双向流
wg := sync.WaitGroup{}
wg.Add(2)
allStream, err := client.AllStream(context.Background())
if err != nil {
panic(err)
}
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println("客户端接收的数据=>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamRequestData{
Data: fmt.Sprintf("客户端发送的消息:%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
}
client的输出:
server的输出:
边栏推荐
- Redis: operation command of string type data
- 核酸修饰的金属有机框架药物载体|PCN-223金属有机骨架包载Ad金刚烷|ZIF-8包裹阿霉素(DOX)
- Canvas utility library fabric JS user manual
- js 2023. String pair equal to the target string after connection
- 可编程逻辑器件软件测试
- Exercise 8-7 string sorting
- Function calling convention
- 金属有机骨架MIL-88负载阿霉素DOX|叶酸修饰UiO-66-NH2负载阿霉素[email protected]纳米粒子
- FPGA测试方法以Mentor工具为例
- Qt学习20 Qt 中的标准对话框(中)
猜你喜欢
[email protected])"/>
金属有机骨架(MOFs)抗肿瘤药载体|PCN-223装载甲硝唑|UiO-66包载盐酸环丙沙星([email protected])
Qt学习20 Qt 中的标准对话框(中)
Using registered classes to realize specific type matching function template
UiO-66-COOH装载苯达莫司汀|羟基磷灰石( HA) 包裹MIL-53(Fe)纳米粒子|装载黄芩苷锰基金属有机骨架材料
GoLand 2021.1.1: configure the multi line display of the tab of the open file
Page generation QR code
The small project (servlet+jsp+mysql+el+jstl) completes a servlet with login function, with the operation of adding, deleting, modifying and querying. Realize login authentication, prevent illegal log
Installation impression notes
Qt学习25 布局管理器(四)
JS Part III
随机推荐
How to promote the progress of project collaboration | community essay solicitation
JS first summary
玖逸云黑免费无加密版本源码
Dlopen() implements dynamic loading of third-party libraries
Current situation, analysis and prediction of information and innovation industry
Common mixins
【吉林大学】考研初试复试资料分享
Rasp implementation of PHP
Learn to punch in today
Cross linked cyclodextrin metal organic framework loaded methotrexate slow-release particles | metal organic porous material uio-66 loaded with flavonoid glycosides | Qiyue
Exercise 9-1 time conversion
Mysql:insert date:sql error [1292] [22001]: data truncation: incorrect date value:
Scroll detection of the navigation bar enables the navigation bar to slide and fix with no content
Metal organic framework material zif-8 containing curcumin( [email protected] Nanoparticles) | nano metal organic framework carry
JS get DPI, PX to cm, cm to PX
Print. JS -- web page file printing
MySQL data processing value addition, deletion and modification
Qt学习23 布局管理器(二)
Uio-66-cooh loaded bendamostine | hydroxyapatite (HA) coated MIL-53 (FE) nanoparticles | baicalin loaded manganese based metal organic skeleton material
Why are grass-roots colleges and universities with "soil and poverty" called "Northeast small Tsinghua"?