当前位置:网站首页>go 双向流模式
go 双向流模式
2022-07-30 01:39:00 【给我一瓶冰阔洛】
food.proto
syntax = "proto3";
package food;
option go_package = "./pb";
service FoodService {
rpc SayName (FoodStreamRequest) returns (stream FoodStreamResponse) ; // 服务端流模式
rpc PostName (stream FoodStreamRequest) returns (FoodStreamResponse) ; // 客户端流模式
rpc FullStream (stream FoodStreamRequest) returns (stream FoodStreamResponse) ; // 客户端流模式
}
message FoodStreamRequest {
string name = 1;
}
message FoodStreamResponse {
string msg = 1;
}
生成grpc文件
protoc food.proto --go_out=./ --go-grpc_out=./ --go-grpc_opt=require_unimplemented_servers=false服务端
package main
import (
"fmt"
"google.golang.org/grpc"
pb "grpc_test/food/pb"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
)
func init(){
log.SetFlags(log.Llongfile | log.Lmicroseconds | log.Ldate)
logFile, err := os.OpenFile("./1.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
fmt.Println("open log file failed, err:", err)
return
}
log.SetPrefix("[stream]")
log.SetOutput(logFile)
}
type FoodInfo struct {
}
func (f *FoodInfo)SayName(request *pb.FoodStreamRequest, service pb.FoodService_SayNameServer) error{
fmt.Println("Sayname 请求")
for i:=0;i<=20;i++ {
// 通过 send 方法不断推送数据
time.Sleep(time.Second)
err := service.Send(&pb.FoodStreamResponse{
Msg: "stream : " + request.Name + strconv.Itoa(i),
})
if err != nil {
log.Println(err)
}
}
return nil
}
func (f *FoodInfo)PostName(putServer pb.FoodService_PostNameServer) error{
log.Println("PostName Start.")
var cliStr strings.Builder
for {
if putReq, err := putServer.Recv(); err == nil {
fmt.Println("Put Req: " + putReq.Name)
cliStr.WriteString(putReq.Name)
} else {
putServer.SendAndClose(&pb.FoodStreamResponse{
Msg:"postname:" + cliStr.String(),
} )
break
}
}
fmt.Println("PutServer Done.")
return nil
}
//双向流
func (f *FoodInfo)FullStream(Server pb.FoodService_FullStreamServer) error{
fmt.Println("FullStream Start.")
wg := sync.WaitGroup{}
wg.Add(2)
var i int
go func() {
for {
Req, err := Server.Recv()
if err != nil {
fmt.Println("1双向流:", err)
break
} else {
fmt.Println("1双向流-Req: " + Req.Name)
}
}
defer wg.Done()
}()
go func() {
for {
err := Server.Send(&pb.FoodStreamResponse{
Msg:"2双向流:" + strconv.Itoa(i),
})
if err != nil {
fmt.Println("2双向流:", err)
break
}
time.Sleep(time.Second)
}
defer wg.Done()
}()
wg.Wait()
log.Println("Server Done.")
return nil
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:8889")
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterFoodServiceServer(grpcServer, &FoodInfo{})
err = grpcServer.Serve(listen)
if err != nil {
panic(err)
}
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_test/food/pb"
"strconv"
"sync"
//"log"
//"strconv"
//"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8889", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
client := pb.NewFoodServiceClient(conn)
//req := &pb.FoodStreamRequest{
// Name: " hello grpc",
//}
//调用服务端推送流,获取服务端流数据
//response, err := client.SayName(context.Background(), req)
//if err != nil {
// panic(err)
//}
//for {
// recv,err := response.Recv()
// if err != nil {
// fmt.Println(err)
// break
// }
// if err == io.EOF {
// log.Println("server closed")
// break
// }
// fmt.Println(recv.Msg)
//}
//客户端流模式
//putClient, err := client.PostName(context.Background())
//if err != nil {
// log.Fatalln(err)
// return
//}
//for i := 0; i < 20; i++ {
// var putData = pb.FoodStreamRequest{
// Name: "putClient:" + strconv.Itoa(i),
// }
// log.Println("Put Req Data: " + putData.Name)
// putClient.Send(&putData)
// time.Sleep(time.Second)
//}
//putRes, err := putClient.CloseAndRecv()
//if err != nil {
// log.Fatalln(err)
//}
//log.Printf("Put Done. Res is %v", putRes.Msg)
//双向流
FullClient, Err := client.FullStream(context.Background())
if err != nil {
panic(Err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
for {
item, err := FullClient.Recv()
if err != nil {
fmt.Println("双向流客户端接收:",err)
}
fmt.Println("双向流客户端item.msg", item.Msg)
}
defer wg.Done()
}()
go func() {
defer wg.Done()
for i:=0; i<=20; i++{
err := FullClient.Send(&pb.FoodStreamRequest{
Name: "双向流send" + strconv.Itoa(i),
})
if err != nil {
fmt.Println("双向流ERR:",err)
}
}
}()
wg.Wait()
}
边栏推荐
- [Flutter] Detailed explanation of the use of the Flutter inspector tool, viewing the Flutter layout, widget tree, debugging interface, etc.
- 泰克Tektronix示波器软件TDS420|TDS430|TDS460上位机软件NS-Scope
- 基于SSM实现个性化健康饮食推荐系统
- CMake Tutorial Tour (1)_Basic starting point
- 05.script_setup中的私有属性
- Teach you how to achieve a flowing gradient border
- LeetCode 2342. Digital and equal number of one of the biggest and
- vscode 工作区配置插件 配置不同工作环境
- 自学HarmonyOS应用开发(47)- 自定义switch组件
- nacos的共享配置和扩展配置
猜你喜欢

TCP/IP 常见问题

9 common mistakes testers fall into

基于蒙特卡诺的风场景模型出力(Matlab代码实现)

Self-study HarmonyOS application development (56) - Use Service to ensure that the application runs continuously in the background

What to test for app testing

The solution to the bug, the test will no longer be blamed

Towards Better Understanding of Self-Supervised Representations / Q-Score

推荐系统:特征工程、常用特征

Typora transparent background image

【VMWARE--共享文件】
随机推荐
基于低能耗自适应聚类层次结构(LEACH)(Matlab代码实现)
tcp ip
Recommendation systems: feature engineering, common features
How Junior Testers Grow Fast
Sublime does background transparency and column editor
Baidu Intelligent Cloud Zhangmiao: Detailed explanation of enterprise-level seven-layer load balancing open source software BFE
图解LeetCode——593. 有效的正方形(难度:中等)
Typora transparent background image
mysql error is too long for user name (should be no longer than 16)
LABVIEW详细介绍:LABVIEW是什么软件?都可以干什么?
【LeetCode每日一题】——404.左叶子之和
基于SSM实现个性化健康饮食推荐系统
LeetCode 2348. 全 0 子数组的数目
手把手教你实现一个流动的渐变色边框
Tcp ip
CMake Tutorial 巡礼(0)_总述
API interface batch test
Recommendation system: collection of user "behavioral data" [use Kafka and Cassandra to process data] [if it overlaps with business data, it also needs to be collected independently]
npm ERR! code ENOTSUP npm ERR! notsup Unsupported engine for [email protected]: wanted: {“n
液压滑环的应用介绍