当前位置:网站首页>【demo】循环队列及条件锁实现goroutine间的通信
【demo】循环队列及条件锁实现goroutine间的通信
2022-07-07 16:03:00 【Viva Python】
循环队列及条件锁实现goroutine
间的通信
一、循环队列的实现
1. 循环队列
底层通过切片维护队列的元素,两个指针(整型变量)Front
、Rear
分别指向队列的头部和尾部元素,用于出队和入队操作。同时维护两个整型变量Size
、MaxSize
记录队列的元素个数和最大的元素个数,用于显式地判空、判满及循环操作。
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
2.消息队列
(1) 消息队列以循环队列作为容器的基础上,添加条件锁,用于控制goroutine
互斥、有条件地访问循环队列。
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
(2) 消息队列的操作
消息队列主要有两个操作,对应两个`goroutine`,模拟`goroutine`生成与处理消息。
生成消息:生成10000内的随机数并入队,延时 1 s~5 s(随机)。
处理消息:将队列头部消息打印并置零,延时1 s。
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
二、条件锁
本示例中的条件锁使用Go内置sync
包中的Cond
条件锁。它可以控制临界区、设置等待条件、通知其它goroutine
的功能。对于*sync.Cond
变量cond
,具有如下常用的操作:
1.cond.L.Lock()
方法
获得条件锁,进入临界区。
2. cond.L.Unlock()
方法
释放条件锁,其它goroutine
可以获得条件锁。
3. cond.Wait()
方法
阻塞goroutine
,等待其它的goroutine
通知,通常用于设置循环条件,等待满足某一条件后再继续。如上的Produce对应的goroutine
中等待消息队列不为满后再进行产生消息操作。
4. cond.Signal()
方法
通知在cond.Wait()
方法中阻塞的goroutine
跳出阻塞。
5. cond.Broadcast()
方法
与cond.Signal()
方法类似,通知所有在cond.Wait()
方法的goroutine
跳出阻塞。
三、完整的程序代码及运行结果
// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description: 模拟消息队列,消息生成者一段随机的时间向队列写入消息,
// 当队列满了的时候,不再写入;消息队列不为空时,消息消费队列处理消息,需要2秒
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
func main() {
var cond = sync.NewCond(&sync.Mutex{
})
mq := NewMsgQueue(cond)
wg.Add(2)
go mq.Produce()
go mq.Consume()
wg.Wait()
}
运行结果:
The queue is empty...
==>>Produce: 8081 [0 8081 0 0 0]
<<==Consume: 8081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 1847 [0 0 1847 0 0]
<<==Consume: 1847 [0 0 0 0 0]
The queue is empty...
==>>Produce: 2081 [0 0 0 2081 0]
<<==Consume: 2081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 4425 [0 0 0 0 4425]
==>>Produce: 456 [456 0 0 0 4425]
==>>Produce: 694 [456 694 0 0 4425]
<<==Consume: 4425 [456 694 0 0 0]
==>>Produce: 8162 [456 694 8162 0 0]
<<==Consume: 456 [0 694 8162 0 0]
<<==Consume: 694 [0 0 8162 0 0]
==>>Produce: 4728 [0 0 8162 4728 0]
......
......
当消息队列不为空时,消息处理goroutine
处理消息,否则等待消息生成goroutine
生成随机数。
边栏推荐
- [4500 word summary] a complete set of skills that a software testing engineer needs to master
- 直播软件搭建,canvas文字加粗
- mui侧边导航锚点定位js特效
- [distributed theory] (I) distributed transactions
- [tpm2.0 principle and Application guide] Chapter 1-3
- How to implement safety practice in software development stage
- YARN Capacity Scheduler容量调度器(超详细解读)
- Mui side navigation anchor positioning JS special effect
- SD_DATA_RECEIVE_SHIFT_REGISTER
- [answer] if the app is in the foreground, the activity will not be recycled?
猜你喜欢
深度学习机器学习各种数据集汇总地址
Mobile pixel bird game JS play code
自动化测试:Robot FrameWork框架大家都想知道的实用技巧
测试3个月,成功入职 “字节”,我的面试心得总结
Simple loading animation
Pytorch中自制数据集进行Dataset重写
Tear the Nacos source code by hand (tear the client source code first)
3分钟学会制作动态折线图!
Test for 3 months, successful entry "byte", my interview experience summary
Dragging the custom style of Baidu map to the right makes the global map longitude 0 unable to be displayed normally
随机推荐
AI 击败了人类,设计了更好的经济机制
【蓝桥杯集训100题】scratch从小到大排序 蓝桥杯scratch比赛专项预测编程题 集训模拟练习题第17题
同消费互联网的较为短暂的产业链不同,产业互联网的产业链是相当漫长的
用存储过程、定时器、触发器来解决数据分析问题
Functions and usage of tabhost tab
Tips for this week 134: make_ Unique and private constructors
[4500 word summary] a complete set of skills that a software testing engineer needs to master
Youth experience and career development
目标管理【管理学之十四】
Face recognition attendance system based on Baidu flying plasma platform (easydl)
回归测试的分类
Machine vision (1) - Overview
Tips of the week 136: unordered containers
AI defeated mankind and designed a better economic mechanism
Management by objectives [14 of management]
Chapter 2 build CRM project development environment (database design)
Performance test process and plan
直播软件搭建,canvas文字加粗
Notification is the notification displayed in the status bar of the phone
2021-06-28