当前位置:网站首页>go 双向流模式
go 双向流模式
2022-07-30 01:39:00 【给我一瓶冰阔洛】
food.proto
syntax = "proto3";
package food;
option go_package = "./pb";
service FoodService {
rpc SayName (FoodStreamRequest) returns (stream FoodStreamResponse) ; // 服务端流模式
rpc PostName (stream FoodStreamRequest) returns (FoodStreamResponse) ; // 客户端流模式
rpc FullStream (stream FoodStreamRequest) returns (stream FoodStreamResponse) ; // 客户端流模式
}
message FoodStreamRequest {
string name = 1;
}
message FoodStreamResponse {
string msg = 1;
}
生成grpc文件
protoc food.proto --go_out=./ --go-grpc_out=./ --go-grpc_opt=require_unimplemented_servers=false
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
pb "grpc_test/food/pb"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
)
func init(){
log.SetFlags(log.Llongfile | log.Lmicroseconds | log.Ldate)
logFile, err := os.OpenFile("./1.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
fmt.Println("open log file failed, err:", err)
return
}
log.SetPrefix("[stream]")
log.SetOutput(logFile)
}
type FoodInfo struct {
}
func (f *FoodInfo)SayName(request *pb.FoodStreamRequest, service pb.FoodService_SayNameServer) error{
fmt.Println("Sayname 请求")
for i:=0;i<=20;i++ {
// 通过 send 方法不断推送数据
time.Sleep(time.Second)
err := service.Send(&pb.FoodStreamResponse{
Msg: "stream : " + request.Name + strconv.Itoa(i),
})
if err != nil {
log.Println(err)
}
}
return nil
}
func (f *FoodInfo)PostName(putServer pb.FoodService_PostNameServer) error{
log.Println("PostName Start.")
var cliStr strings.Builder
for {
if putReq, err := putServer.Recv(); err == nil {
fmt.Println("Put Req: " + putReq.Name)
cliStr.WriteString(putReq.Name)
} else {
putServer.SendAndClose(&pb.FoodStreamResponse{
Msg:"postname:" + cliStr.String(),
} )
break
}
}
fmt.Println("PutServer Done.")
return nil
}
//双向流
func (f *FoodInfo)FullStream(Server pb.FoodService_FullStreamServer) error{
fmt.Println("FullStream Start.")
wg := sync.WaitGroup{}
wg.Add(2)
var i int
go func() {
for {
Req, err := Server.Recv()
if err != nil {
fmt.Println("1双向流:", err)
break
} else {
fmt.Println("1双向流-Req: " + Req.Name)
}
}
defer wg.Done()
}()
go func() {
for {
err := Server.Send(&pb.FoodStreamResponse{
Msg:"2双向流:" + strconv.Itoa(i),
})
if err != nil {
fmt.Println("2双向流:", err)
break
}
time.Sleep(time.Second)
}
defer wg.Done()
}()
wg.Wait()
log.Println("Server Done.")
return nil
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:8889")
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterFoodServiceServer(grpcServer, &FoodInfo{})
err = grpcServer.Serve(listen)
if err != nil {
panic(err)
}
}
客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_test/food/pb"
"strconv"
"sync"
//"log"
//"strconv"
//"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8889", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
client := pb.NewFoodServiceClient(conn)
//req := &pb.FoodStreamRequest{
// Name: " hello grpc",
//}
//调用服务端推送流,获取服务端流数据
//response, err := client.SayName(context.Background(), req)
//if err != nil {
// panic(err)
//}
//for {
// recv,err := response.Recv()
// if err != nil {
// fmt.Println(err)
// break
// }
// if err == io.EOF {
// log.Println("server closed")
// break
// }
// fmt.Println(recv.Msg)
//}
//客户端流模式
//putClient, err := client.PostName(context.Background())
//if err != nil {
// log.Fatalln(err)
// return
//}
//for i := 0; i < 20; i++ {
// var putData = pb.FoodStreamRequest{
// Name: "putClient:" + strconv.Itoa(i),
// }
// log.Println("Put Req Data: " + putData.Name)
// putClient.Send(&putData)
// time.Sleep(time.Second)
//}
//putRes, err := putClient.CloseAndRecv()
//if err != nil {
// log.Fatalln(err)
//}
//log.Printf("Put Done. Res is %v", putRes.Msg)
//双向流
FullClient, Err := client.FullStream(context.Background())
if err != nil {
panic(Err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
for {
item, err := FullClient.Recv()
if err != nil {
fmt.Println("双向流客户端接收:",err)
}
fmt.Println("双向流客户端item.msg", item.Msg)
}
defer wg.Done()
}()
go func() {
defer wg.Done()
for i:=0; i<=20; i++{
err := FullClient.Send(&pb.FoodStreamRequest{
Name: "双向流send" + strconv.Itoa(i),
})
if err != nil {
fmt.Println("双向流ERR:",err)
}
}
}()
wg.Wait()
}
边栏推荐
猜你喜欢
CAPL中的键值对(hash)数据类型
帽式滑环的工作原理
Interviews with big factories under the trend of layoffs: "ByteDance"
Postgresql daily operation and maintenance skills, suitable for beginners
API 接口批量测试
LeetCode 2342. 数位和相等数对的最大和
Baidu Intelligent Cloud Zhangmiao: Detailed explanation of enterprise-level seven-layer load balancing open source software BFE
Towards Better Understanding of Self-Supervised Representations / Q-Score
网络原理 基础知识
基于低能耗自适应聚类层次结构(LEACH)(Matlab代码实现)
随机推荐
Fabric 私有数据案例
华为“天才少年”稚晖君又出新作,从零开始造“客制化”智能键盘
短期风电预测(Matlab代码实现)
【VMWARE--共享文件】
基于SSM开发实现校园疫情防控管理系统
Elephant Swap:借助ePLATO提供加密市场的套利空间
气路旋转连接器怎么用
Leetcode70. 爬楼梯
mysql 报错 is too long for user name (should be no longer than 16)
Win11的WSL2系统更换磁盘和wsl使用简介
mysql error is too long for user name (should be no longer than 16)
MySQL高级篇(高阳)建表sql语句大全
【2023海康威视提前批笔试题】~ 题目及参考答案
Push the image to the Alibaba Cloud private warehouse
图解LeetCode——593. 有效的正方形(难度:中等)
Performance Testing Theory 1 | Sorting out difficult problems in performance testing
API 接口批量测试
经济衰退时期的对比:如今更像历史上的哪段时期?
接口测试自动化后起之秀-YApi接口管理平台
日期时间存入数据库会差一天?