当前位置:网站首页>【demo】循环队列及条件锁实现goroutine间的通信
【demo】循环队列及条件锁实现goroutine间的通信
2022-07-07 16:03:00 【Viva Python】
循环队列及条件锁实现goroutine
间的通信
一、循环队列的实现
1. 循环队列
底层通过切片维护队列的元素,两个指针(整型变量)Front
、Rear
分别指向队列的头部和尾部元素,用于出队和入队操作。同时维护两个整型变量Size
、MaxSize
记录队列的元素个数和最大的元素个数,用于显式地判空、判满及循环操作。
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
2.消息队列
(1) 消息队列以循环队列作为容器的基础上,添加条件锁,用于控制goroutine
互斥、有条件地访问循环队列。
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
(2) 消息队列的操作
消息队列主要有两个操作,对应两个`goroutine`,模拟`goroutine`生成与处理消息。
生成消息:生成10000内的随机数并入队,延时 1 s~5 s(随机)。
处理消息:将队列头部消息打印并置零,延时1 s。
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
二、条件锁
本示例中的条件锁使用Go内置sync
包中的Cond
条件锁。它可以控制临界区、设置等待条件、通知其它goroutine
的功能。对于*sync.Cond
变量cond
,具有如下常用的操作:
1.cond.L.Lock()
方法
获得条件锁,进入临界区。
2. cond.L.Unlock()
方法
释放条件锁,其它goroutine
可以获得条件锁。
3. cond.Wait()
方法
阻塞goroutine
,等待其它的goroutine
通知,通常用于设置循环条件,等待满足某一条件后再继续。如上的Produce对应的goroutine
中等待消息队列不为满后再进行产生消息操作。
4. cond.Signal()
方法
通知在cond.Wait()
方法中阻塞的goroutine
跳出阻塞。
5. cond.Broadcast()
方法
与cond.Signal()
方法类似,通知所有在cond.Wait()
方法的goroutine
跳出阻塞。
三、完整的程序代码及运行结果
// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description: 模拟消息队列,消息生成者一段随机的时间向队列写入消息,
// 当队列满了的时候,不再写入;消息队列不为空时,消息消费队列处理消息,需要2秒
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
func main() {
var cond = sync.NewCond(&sync.Mutex{
})
mq := NewMsgQueue(cond)
wg.Add(2)
go mq.Produce()
go mq.Consume()
wg.Wait()
}
运行结果:
The queue is empty...
==>>Produce: 8081 [0 8081 0 0 0]
<<==Consume: 8081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 1847 [0 0 1847 0 0]
<<==Consume: 1847 [0 0 0 0 0]
The queue is empty...
==>>Produce: 2081 [0 0 0 2081 0]
<<==Consume: 2081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 4425 [0 0 0 0 4425]
==>>Produce: 456 [456 0 0 0 4425]
==>>Produce: 694 [456 694 0 0 4425]
<<==Consume: 4425 [456 694 0 0 0]
==>>Produce: 8162 [456 694 8162 0 0]
<<==Consume: 456 [0 694 8162 0 0]
<<==Consume: 694 [0 0 8162 0 0]
==>>Produce: 4728 [0 0 8162 4728 0]
......
......
当消息队列不为空时,消息处理goroutine
处理消息,否则等待消息生成goroutine
生成随机数。
边栏推荐
- [trusted computing] Lesson 13: TPM extended authorization and key management
- 性能测试过程和计划
- 现在网上期货开户安全吗?国内有多少家正规的期货公司?
- js拉下帷幕js特效显示层
- 元宇宙带来的创意性改变
- < code random recording two brushes> linked list
- 万字保姆级长文——Linkedin元数据管理平台Datahub离线安装指南
- MySQL index hit level analysis
- [OKR target management] value analysis
- [principles and technologies of network attack and Defense] Chapter 5: denial of service attack
猜你喜欢
2021年全国平均工资出炉,你达标了吗?
[answer] if the app is in the foreground, the activity will not be recycled?
swiper左右切换滑块插件
回归测试的分类
Dragging the custom style of Baidu map to the right makes the global map longitude 0 unable to be displayed normally
[4500 word summary] a complete set of skills that a software testing engineer needs to master
ICer知识点杂烩(后附大量题目,持续更新中)
『HarmonyOS』DevEco的下载安装与开发环境搭建
< code random recording two brushes> linked list
手机版像素小鸟游js戏代码
随机推荐
<代码随想录二刷>链表
现在网上期货开户安全吗?国内有多少家正规的期货公司?
Functions and usage of tabhost tab
【蓝桥杯集训100题】scratch从小到大排序 蓝桥杯scratch比赛专项预测编程题 集训模拟练习题第17题
物联网OTA技术介绍
Sanxian Guidong JS game source code
Chapter 2 build CRM project development environment (database design)
Click on the top of today's headline app to navigate in the middle
做软件测试 掌握哪些技术才能算作 “ 测试高手 ”?
TaffyDB开源的JS数据库
什么是敏捷测试
Tips for this week 131: special member functions and ` = Default`
debian10系统问题总结
[answer] if the app is in the foreground, the activity will not be recycled?
手机版像素小鸟游js戏代码
手机app外卖订餐个人中心页面
Personal best practice demo sharing of enum + validation
Explain it in simple terms. CNN convolutional neural network
Tips of the week 136: unordered containers
Please insert the disk into "U disk (H)" & unable to access the disk structure is damaged and cannot be read