当前位置:网站首页>Four data flows and cases of grpc
Four data flows and cases of grpc
2022-07-03 14:24:00 【Tianlong supreme】
GRPC Four data streams :
1、 Simple mode (Simple RPC)
2、 Server side data flow mode (Server-side Streaming RPC)
3、 Client data stream mode (Client-side Streaming RPC)
4、 Two way data flow pattern (Bidirectional Streaming RPC)
If the GRPC and protobuf If you don't understand , See the article :
syntax = "proto3";
option go_package = "./;grpc_proto";
service Stream {
rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData);
}
message StreamRequestData {
string data = 1;
}
message StreamResponseData {
string data = 1;
}
Carry out orders , Conduct grpc Code generation :
stream.pb.go:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.2
// source: stream.proto
package grpc_proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamRequestData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamRequestData) Reset() {
*x = StreamRequestData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamRequestData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamRequestData) ProtoMessage() {}
func (x *StreamRequestData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamRequestData.ProtoReflect.Descriptor instead.
func (*StreamRequestData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{0}
}
func (x *StreamRequestData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
type StreamResponseData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *StreamResponseData) Reset() {
*x = StreamResponseData{}
if protoimpl.UnsafeEnabled {
mi := &file_stream_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamResponseData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamResponseData) ProtoMessage() {}
func (x *StreamResponseData) ProtoReflect() protoreflect.Message {
mi := &file_stream_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamResponseData.ProtoReflect.Descriptor instead.
func (*StreamResponseData) Descriptor() ([]byte, []int) {
return file_stream_proto_rawDescGZIP(), []int{1}
}
func (x *StreamResponseData) GetData() string {
if x != nil {
return x.Data
}
return ""
}
var File_stream_proto protoreflect.FileDescriptor
var file_stream_proto_rawDesc = []byte{
0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x27,
0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x28, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74,
0x61, 0x32, 0xb2, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x36, 0x0a, 0x09,
0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x09, 0x50, 0x75, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x28, 0x01, 0x12, 0x38, 0x0a, 0x09,
0x41, 0x6c, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x13, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61,
0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x67, 0x72, 0x70,
0x63, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_stream_proto_rawDescOnce sync.Once
file_stream_proto_rawDescData = file_stream_proto_rawDesc
)
func file_stream_proto_rawDescGZIP() []byte {
file_stream_proto_rawDescOnce.Do(func() {
file_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_stream_proto_rawDescData)
})
return file_stream_proto_rawDescData
}
var file_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_stream_proto_goTypes = []interface{}{
(*StreamRequestData)(nil), // 0: StreamRequestData
(*StreamResponseData)(nil), // 1: StreamResponseData
}
var file_stream_proto_depIdxs = []int32{
0, // 0: Stream.GetStream:input_type -> StreamRequestData
0, // 1: Stream.PutStream:input_type -> StreamRequestData
0, // 2: Stream.AllStream:input_type -> StreamRequestData
1, // 3: Stream.GetStream:output_type -> StreamResponseData
1, // 4: Stream.PutStream:output_type -> StreamResponseData
1, // 5: Stream.AllStream:output_type -> StreamResponseData
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_stream_proto_init() }
func file_stream_proto_init() {
if File_stream_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamRequestData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamResponseData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_stream_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_stream_proto_goTypes,
DependencyIndexes: file_stream_proto_depIdxs,
MessageInfos: file_stream_proto_msgTypes,
}.Build()
File_stream_proto = out.File
file_stream_proto_rawDesc = nil
file_stream_proto_goTypes = nil
file_stream_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// StreamClient is the client API for Stream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StreamClient interface {
GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error)
PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error)
AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error)
}
type streamClient struct {
cc grpc.ClientConnInterface
}
func NewStreamClient(cc grpc.ClientConnInterface) StreamClient {
return &streamClient{cc}
}
func (c *streamClient) GetStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Stream_GetStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[0], "/Stream/GetStream", opts...)
if err != nil {
return nil, err
}
x := &streamGetStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Stream_GetStreamClient interface {
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamGetStreamClient struct {
grpc.ClientStream
}
func (x *streamGetStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Stream_PutStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[1], "/Stream/PutStream", opts...)
if err != nil {
return nil, err
}
x := &streamPutStreamClient{stream}
return x, nil
}
type Stream_PutStreamClient interface {
Send(*StreamRequestData) error
CloseAndRecv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamPutStreamClient struct {
grpc.ClientStream
}
func (x *streamPutStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamPutStreamClient) CloseAndRecv() (*StreamResponseData, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *streamClient) AllStream(ctx context.Context, opts ...grpc.CallOption) (Stream_AllStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Stream_serviceDesc.Streams[2], "/Stream/AllStream", opts...)
if err != nil {
return nil, err
}
x := &streamAllStreamClient{stream}
return x, nil
}
type Stream_AllStreamClient interface {
Send(*StreamRequestData) error
Recv() (*StreamResponseData, error)
grpc.ClientStream
}
type streamAllStreamClient struct {
grpc.ClientStream
}
func (x *streamAllStreamClient) Send(m *StreamRequestData) error {
return x.ClientStream.SendMsg(m)
}
func (x *streamAllStreamClient) Recv() (*StreamResponseData, error) {
m := new(StreamResponseData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StreamServer is the server API for Stream service.
type StreamServer interface {
GetStream(*StreamRequestData, Stream_GetStreamServer) error
PutStream(Stream_PutStreamServer) error
AllStream(Stream_AllStreamServer) error
}
// UnimplementedStreamServer can be embedded to have forward compatible implementations.
type UnimplementedStreamServer struct {
}
func (*UnimplementedStreamServer) GetStream(*StreamRequestData, Stream_GetStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetStream not implemented")
}
func (*UnimplementedStreamServer) PutStream(Stream_PutStreamServer) error {
return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
}
func (*UnimplementedStreamServer) AllStream(Stream_AllStreamServer) error {
return status.Errorf(codes.Unimplemented, "method AllStream not implemented")
}
func RegisterStreamServer(s *grpc.Server, srv StreamServer) {
s.RegisterService(&_Stream_serviceDesc, srv)
}
func _Stream_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StreamRequestData)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StreamServer).GetStream(m, &streamGetStreamServer{stream})
}
type Stream_GetStreamServer interface {
Send(*StreamResponseData) error
grpc.ServerStream
}
type streamGetStreamServer struct {
grpc.ServerStream
}
func (x *streamGetStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func _Stream_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).PutStream(&streamPutStreamServer{stream})
}
type Stream_PutStreamServer interface {
SendAndClose(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamPutStreamServer struct {
grpc.ServerStream
}
func (x *streamPutStreamServer) SendAndClose(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamPutStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Stream_AllStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(StreamServer).AllStream(&streamAllStreamServer{stream})
}
type Stream_AllStreamServer interface {
Send(*StreamResponseData) error
Recv() (*StreamRequestData, error)
grpc.ServerStream
}
type streamAllStreamServer struct {
grpc.ServerStream
}
func (x *streamAllStreamServer) Send(m *StreamResponseData) error {
return x.ServerStream.SendMsg(m)
}
func (x *streamAllStreamServer) Recv() (*StreamRequestData, error) {
m := new(StreamRequestData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Stream_serviceDesc = grpc.ServiceDesc{
ServiceName: "Stream",
HandlerType: (*StreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "GetStream",
Handler: _Stream_GetStream_Handler,
ServerStreams: true,
},
{
StreamName: "PutStream",
Handler: _Stream_PutStream_Handler,
ClientStreams: true,
},
{
StreamName: "AllStream",
Handler: _Stream_AllStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "stream.proto",
}
server.go:
package main
import (
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
type Server struct {
}
func (s *Server) GetStream(stream *grpc_proto.StreamRequestData, response grpc_proto.Stream_GetStreamServer) error {
var count = 0
for {
count++
err := response.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf("%d", time.Now().Unix()),
})
time.Sleep(time.Second)
if err != nil {
return err
}
if count > 10 {
break
}
}
return nil
}
func (s *Server) PutStream(server grpc_proto.Stream_PutStreamServer) error {
for {
data, err := server.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(" Data received by the server =>>>>>>", data)
}
return nil
}
func (s *Server) AllStream(allStream grpc_proto.Stream_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println(" Data received by the server =>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamResponseData{
Data: fmt.Sprintf(" The message sent by the server :%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
return nil
}
func main() {
server := grpc.NewServer()
listen, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
return
}
grpc_proto.RegisterStreamServer(server, &Server{})
err = server.Serve(listen)
if err != nil {
panic(err)
}
}
client.go:
package main
import (
"context"
"fmt"
"go-micro-service-architect/cn/ljxwtl/micro/stream/grpc_proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
"time"
)
func main() {
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
client := grpc_proto.NewStreamClient(conn)
// Server stream
stream, err := client.GetStream(context.Background(), &grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
panic(err)
}
for {
responseData, err := stream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(responseData)
}
// Client stream
putStream, err := client.PutStream(context.Background())
if err != nil {
panic(err)
}
var count = 0
for {
count++
err := putStream.Send(&grpc_proto.StreamRequestData{
Data: "bobby",
})
if err != nil {
fmt.Println(err)
break
}
time.Sleep(time.Second)
if count > 10 {
break
}
}
// Two-way flow
wg := sync.WaitGroup{}
wg.Add(2)
allStream, err := client.AllStream(context.Background())
if err != nil {
panic(err)
}
go func() {
defer wg.Done()
for {
recv, err2 := allStream.Recv()
if err2 != nil {
panic(err2)
}
fmt.Println(" Data received by the client =>>>>>>", recv)
}
}()
go func() {
defer wg.Done()
for {
err2 := allStream.Send(&grpc_proto.StreamRequestData{
Data: fmt.Sprintf(" Messages sent by clients :%d---bobby", time.Now().Unix()),
})
time.Sleep(time.Second)
if err2 != nil {
panic(err2)
}
}
}()
wg.Wait()
}
client Output :
server Output :
边栏推荐
- Zabbix添加Calculated items后保存页面成空白
- Exercise 9-3 plane vector addition
- Tiantu investment sprint Hong Kong stocks: asset management scale of 24.9 billion, invested in xiaohongshu and Naixue
- Preliminary summary of structure
- 7-23 currency conversion (using array conversion)
- Common commands for getting started with mongodb database
- Onmenusharetimeline custom shared content is invalid, and the title and icon are not displayed
- Mongodb index
- concat和concat_ws()区别及group_concat()和repeat()函数的使用
- Sendmail can't send mail and it's too slow to send. Solve it
猜你喜欢
Interface for querying IP home
Exercise 6-6 use a function to output an integer in reverse order
7-15 calculation of PI
Tailing rushes to the scientific and Technological Innovation Board: it plans to raise 1.3 billion, and Xiaomi Changjiang is the shareholder
愉悦资本新双币基金近40亿元完成首次关账
洛谷P5018 [NOIP2018 普及组] 对称二叉树 题解
JS first summary
556. The next larger element III
Leetcode(4)——寻找两个正序数组的中位数
Comprehensive evaluation of good-looking, easy-to-use and powerful handwriting note taking software: notability, goodnotes, marginnote, handwriting, notes writers, collanote, collanote, prodrafts, not
随机推荐
7-10 calculate salary
7-15 calculation of PI
Exercise 10-2 recursive factorial sum
JS get DPI, PX to cm, cm to PX
7-18 finding the single root of polynomial by dichotomy
中国PETG市场预测及战略研究报告(2022版)
PCB中常用快捷键
Etcd cluster permission management and account password usage
Add ZABBIX calculation type itemcalculated items
添加Zabbix计算类型项目Calculated items
Recent learning summary
Tiantu investment sprint Hong Kong stocks: asset management scale of 24.9 billion, invested in xiaohongshu and Naixue
洛谷P5018 [NOIP2018 普及组] 对称二叉树 题解
Page generation QR code
TS code automatically generates JS
Mongodb index
js . Find the first palindrome string in the array
洛谷P5194 [USACO05DEC]Scales S 题解
String substitution
Exercise 8-2 calculate the sum and difference of two numbers