当前位置:网站首页>go bidirectional streaming mode
go bidirectional streaming mode
2022-07-30 01:50:00 【give me a bottle of ice kuoluo】
food.proto
syntax = "proto3";
package food;
option go_package = "./pb";
service FoodService {
rpc SayName (FoodStreamRequest) returns (stream FoodStreamResponse) ; // 服务端流模式
rpc PostName (stream FoodStreamRequest) returns (FoodStreamResponse) ; // 客户端流模式
rpc FullStream (stream FoodStreamRequest) returns (stream FoodStreamResponse) ; // 客户端流模式
}
message FoodStreamRequest {
string name = 1;
}
message FoodStreamResponse {
string msg = 1;
}
生成grpc文件
protoc food.proto --go_out=./ --go-grpc_out=./ --go-grpc_opt=require_unimplemented_servers=false服务端
package main
import (
"fmt"
"google.golang.org/grpc"
pb "grpc_test/food/pb"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
)
func init(){
log.SetFlags(log.Llongfile | log.Lmicroseconds | log.Ldate)
logFile, err := os.OpenFile("./1.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
fmt.Println("open log file failed, err:", err)
return
}
log.SetPrefix("[stream]")
log.SetOutput(logFile)
}
type FoodInfo struct {
}
func (f *FoodInfo)SayName(request *pb.FoodStreamRequest, service pb.FoodService_SayNameServer) error{
fmt.Println("Sayname 请求")
for i:=0;i<=20;i++ {
// 通过 send method keeps pushing data
time.Sleep(time.Second)
err := service.Send(&pb.FoodStreamResponse{
Msg: "stream : " + request.Name + strconv.Itoa(i),
})
if err != nil {
log.Println(err)
}
}
return nil
}
func (f *FoodInfo)PostName(putServer pb.FoodService_PostNameServer) error{
log.Println("PostName Start.")
var cliStr strings.Builder
for {
if putReq, err := putServer.Recv(); err == nil {
fmt.Println("Put Req: " + putReq.Name)
cliStr.WriteString(putReq.Name)
} else {
putServer.SendAndClose(&pb.FoodStreamResponse{
Msg:"postname:" + cliStr.String(),
} )
break
}
}
fmt.Println("PutServer Done.")
return nil
}
//双向流
func (f *FoodInfo)FullStream(Server pb.FoodService_FullStreamServer) error{
fmt.Println("FullStream Start.")
wg := sync.WaitGroup{}
wg.Add(2)
var i int
go func() {
for {
Req, err := Server.Recv()
if err != nil {
fmt.Println("1双向流:", err)
break
} else {
fmt.Println("1双向流-Req: " + Req.Name)
}
}
defer wg.Done()
}()
go func() {
for {
err := Server.Send(&pb.FoodStreamResponse{
Msg:"2双向流:" + strconv.Itoa(i),
})
if err != nil {
fmt.Println("2双向流:", err)
break
}
time.Sleep(time.Second)
}
defer wg.Done()
}()
wg.Wait()
log.Println("Server Done.")
return nil
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:8889")
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterFoodServiceServer(grpcServer, &FoodInfo{})
err = grpcServer.Serve(listen)
if err != nil {
panic(err)
}
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_test/food/pb"
"strconv"
"sync"
//"log"
//"strconv"
//"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8889", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
client := pb.NewFoodServiceClient(conn)
//req := &pb.FoodStreamRequest{
// Name: " hello grpc",
//}
//调用服务端推送流,获取服务端流数据
//response, err := client.SayName(context.Background(), req)
//if err != nil {
// panic(err)
//}
//for {
// recv,err := response.Recv()
// if err != nil {
// fmt.Println(err)
// break
// }
// if err == io.EOF {
// log.Println("server closed")
// break
// }
// fmt.Println(recv.Msg)
//}
//客户端流模式
//putClient, err := client.PostName(context.Background())
//if err != nil {
// log.Fatalln(err)
// return
//}
//for i := 0; i < 20; i++ {
// var putData = pb.FoodStreamRequest{
// Name: "putClient:" + strconv.Itoa(i),
// }
// log.Println("Put Req Data: " + putData.Name)
// putClient.Send(&putData)
// time.Sleep(time.Second)
//}
//putRes, err := putClient.CloseAndRecv()
//if err != nil {
// log.Fatalln(err)
//}
//log.Printf("Put Done. Res is %v", putRes.Msg)
//双向流
FullClient, Err := client.FullStream(context.Background())
if err != nil {
panic(Err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
for {
item, err := FullClient.Recv()
if err != nil {
fmt.Println("Two-way stream client receive:",err)
}
fmt.Println("Bidirectional Streaming Clientitem.msg", item.Msg)
}
defer wg.Done()
}()
go func() {
defer wg.Done()
for i:=0; i<=20; i++{
err := FullClient.Send(&pb.FoodStreamRequest{
Name: "双向流send" + strconv.Itoa(i),
})
if err != nil {
fmt.Println("双向流ERR:",err)
}
}
}()
wg.Wait()
}
边栏推荐
- typora 透明背景图片
- 记一次搭建conda虚拟环境
- npm ERR! code ENOTSUPnpm ERR! notsup Unsupported engine for [email protected]: wanted: {“n
- anaconda打开闪退解决
- 基于低能耗自适应聚类层次结构(LEACH)(Matlab代码实现)
- postgresql日常运维技能,适合初学者
- 聊聊性能测试环境搭建
- Implementation of Portable VR in Unity
- Recommendation system: collection of user "behavioral data" [use Kafka and Cassandra to process data] [if it overlaps with business data, it also needs to be collected independently]
- 我的创作纪念日
猜你喜欢

LABVIEW详细介绍:LABVIEW是什么软件?都可以干什么?

泰克Tektronix示波器软件TDS1012|TDS2002|TDS2004上位机软件NS-Scope

vscode 工作区配置插件 配置不同工作环境

MATLAB被禁下一个会是LABVIEW吗?国产测试软件ATECLOUD崛起发力
![[Microservice~Nacos] Configuration Center of Nacos](/img/c3/9d8fb0fd49a0ebab43ed604f9bd1cc.png)
[Microservice~Nacos] Configuration Center of Nacos

Recommendation systems: feature engineering, common features

The role of interface testing

exness: U.S. GDP shrinks, yen bounces back

机械设备制造企业如何借助ERP系统,解决成本核算难题?

jar包解压后再打包为jar
随机推荐
【SemiDrive源码分析】【MailBox核间通信】43 - 基于Mailbox IPCC RPC 实现核间通信(代码实现篇)
绘制热度图、频谱图、地形图、colormap
Postgresql daily operation and maintenance skills, suitable for beginners
05. Private properties in script_setup
About offline use of SAP Fiori apps
Recommendation systems: feature engineering, common features
Understanding the prototype chain in js, what problem does the prototype chain solve?
网络原理 基础知识
Google浏览器打开axure产品原型的解决方案
CMake Tutorial Tour(0)_Overview
go grpc 自定义拦截器
API interface batch test
利用ESP32构造一个ZIGBEE的网络发送转接
聊聊性能测试环境搭建
Implementation of Portable VR in Unity
【Vmware NSX-V基本架构及组件安装】
Towards Better Understanding of Self-Supervised Representations / Q-Score
【MySQL总结】
Huawei's "genius boy" Zhihui Jun has made a new work, creating a "customized" smart keyboard from scratch
Docker一键安装MySQL