当前位置:网站首页>【demo】循环队列及条件锁实现goroutine间的通信
【demo】循环队列及条件锁实现goroutine间的通信
2022-07-07 16:03:00 【Viva Python】
循环队列及条件锁实现goroutine间的通信
一、循环队列的实现
1. 循环队列
底层通过切片维护队列的元素,两个指针(整型变量)Front、Rear分别指向队列的头部和尾部元素,用于出队和入队操作。同时维护两个整型变量Size、MaxSize记录队列的元素个数和最大的元素个数,用于显式地判空、判满及循环操作。
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
2.消息队列
(1) 消息队列以循环队列作为容器的基础上,添加条件锁,用于控制goroutine互斥、有条件地访问循环队列。
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
(2) 消息队列的操作
消息队列主要有两个操作,对应两个`goroutine`,模拟`goroutine`生成与处理消息。
生成消息:生成10000内的随机数并入队,延时 1 s~5 s(随机)。
处理消息:将队列头部消息打印并置零,延时1 s。
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
二、条件锁
本示例中的条件锁使用Go内置sync包中的Cond条件锁。它可以控制临界区、设置等待条件、通知其它goroutine的功能。对于*sync.Cond变量cond,具有如下常用的操作:
1.cond.L.Lock()方法
获得条件锁,进入临界区。
2. cond.L.Unlock()方法
释放条件锁,其它goroutine可以获得条件锁。
3. cond.Wait()方法
阻塞goroutine,等待其它的goroutine通知,通常用于设置循环条件,等待满足某一条件后再继续。如上的Produce对应的goroutine中等待消息队列不为满后再进行产生消息操作。
4. cond.Signal()方法
通知在cond.Wait()方法中阻塞的goroutine跳出阻塞。
5. cond.Broadcast()方法
与cond.Signal()方法类似,通知所有在cond.Wait()方法的goroutine跳出阻塞。
三、完整的程序代码及运行结果
// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description: 模拟消息队列,消息生成者一段随机的时间向队列写入消息,
// 当队列满了的时候,不再写入;消息队列不为空时,消息消费队列处理消息,需要2秒
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
func main() {
var cond = sync.NewCond(&sync.Mutex{
})
mq := NewMsgQueue(cond)
wg.Add(2)
go mq.Produce()
go mq.Consume()
wg.Wait()
}
运行结果:
The queue is empty...
==>>Produce: 8081 [0 8081 0 0 0]
<<==Consume: 8081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 1847 [0 0 1847 0 0]
<<==Consume: 1847 [0 0 0 0 0]
The queue is empty...
==>>Produce: 2081 [0 0 0 2081 0]
<<==Consume: 2081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 4425 [0 0 0 0 4425]
==>>Produce: 456 [456 0 0 0 4425]
==>>Produce: 694 [456 694 0 0 4425]
<<==Consume: 4425 [456 694 0 0 0]
==>>Produce: 8162 [456 694 8162 0 0]
<<==Consume: 456 [0 694 8162 0 0]
<<==Consume: 694 [0 0 8162 0 0]
==>>Produce: 4728 [0 0 8162 4728 0]
......
......
当消息队列不为空时,消息处理goroutine处理消息,否则等待消息生成goroutine生成随机数。
边栏推荐
- 基于RGB图像阈值分割并利用滑动调节阈值
- Robot engineering lifelong learning and work plan-2022-
- Please insert the disk into "U disk (H)" & unable to access the disk structure is damaged and cannot be read
- Import requirements in batches during Yolo training Txt
- Yearning-SQL审核平台
- 基于PyTorch利用CNN对自己的数据集进行分类
- SD_DATA_SEND_SHIFT_REGISTER
- SD_DATA_RECEIVE_SHIFT_REGISTER
- 深入浅出图解CNN-卷积神经网络
- 手机app外卖订餐个人中心页面
猜你喜欢

什么是敏捷测试

Create dialog style windows with popupwindow
![[PaddleSeg源码阅读] PaddleSeg Validation 中添加 Boundary IoU的计算(1)——val.py文件细节提示](/img/f2/b6a0e5512b35cf1b695a8feecd0895.png)
[PaddleSeg源码阅读] PaddleSeg Validation 中添加 Boundary IoU的计算(1)——val.py文件细节提示

Understanding of 12 methods of enterprise management

数学分析_笔记_第11章:Fourier级数

Functions and usage of tabhost tab

面试官:页面很卡的原因分析及解决方案?【测试面试题分享】

Automated testing: a practical skill that everyone wants to know about robot framework

Functions and usage of serachview

YARN Capacity Scheduler容量调度器(超详细解读)
随机推荐
Native JS verification code
[principle and technology of network attack and Defense] Chapter 6: Trojan horse
Tips for this week 140: constants: safety idioms
Please insert the disk into "U disk (H)" & unable to access the disk structure is damaged and cannot be read
Tear the Nacos source code by hand (tear the client source code first)
[paddleseg source code reading] add boundary IOU calculation in paddleseg validation (1) -- val.py file details tips
In depth understanding of USB communication protocol
SD_DATA_SEND_SHIFT_REGISTER
Deep learning - make your own dataset
深入浅出图解CNN-卷积神经网络
『HarmonyOS』DevEco的下载安装与开发环境搭建
SD_DATA_RECEIVE_SHIFT_REGISTER
元宇宙带来的创意性改变
Use seven methods to enhance all the images in a folder
JS pull down the curtain JS special effect display layer
What is agile testing
debian10系统问题总结
Functions and usage of serachview
Explain it in simple terms. CNN convolutional neural network
Supplementary instructions to relevant rules of online competition