当前位置:网站首页>【demo】循环队列及条件锁实现goroutine间的通信
【demo】循环队列及条件锁实现goroutine间的通信
2022-07-07 16:03:00 【Viva Python】
循环队列及条件锁实现goroutine间的通信
一、循环队列的实现
1. 循环队列
底层通过切片维护队列的元素,两个指针(整型变量)Front、Rear分别指向队列的头部和尾部元素,用于出队和入队操作。同时维护两个整型变量Size、MaxSize记录队列的元素个数和最大的元素个数,用于显式地判空、判满及循环操作。
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
2.消息队列
(1) 消息队列以循环队列作为容器的基础上,添加条件锁,用于控制goroutine互斥、有条件地访问循环队列。
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
(2) 消息队列的操作
消息队列主要有两个操作,对应两个`goroutine`,模拟`goroutine`生成与处理消息。
生成消息:生成10000内的随机数并入队,延时 1 s~5 s(随机)。
处理消息:将队列头部消息打印并置零,延时1 s。
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
二、条件锁
本示例中的条件锁使用Go内置sync包中的Cond条件锁。它可以控制临界区、设置等待条件、通知其它goroutine的功能。对于*sync.Cond变量cond,具有如下常用的操作:
1.cond.L.Lock()方法
获得条件锁,进入临界区。
2. cond.L.Unlock()方法
释放条件锁,其它goroutine可以获得条件锁。
3. cond.Wait()方法
阻塞goroutine,等待其它的goroutine通知,通常用于设置循环条件,等待满足某一条件后再继续。如上的Produce对应的goroutine中等待消息队列不为满后再进行产生消息操作。
4. cond.Signal()方法
通知在cond.Wait()方法中阻塞的goroutine跳出阻塞。
5. cond.Broadcast()方法
与cond.Signal()方法类似,通知所有在cond.Wait()方法的goroutine跳出阻塞。
三、完整的程序代码及运行结果
// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description: 模拟消息队列,消息生成者一段随机的时间向队列写入消息,
// 当队列满了的时候,不再写入;消息队列不为空时,消息消费队列处理消息,需要2秒
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type EleType int // 队列的元素类型
const MAXSIZE = 5 // 队列的最大容量
var wg sync.WaitGroup
// CycleQueue 循环队列结构体
type CycleQueue struct {
Arr []EleType // 切片维护
Size int // 队列当前的元素个数
Front int // Front指针,指向队列头部元素
Rear int // Rear指针,指向队列的尾部元素
MaxSize int // 队列的最大容量
}
// NewCycleQueue CycleQueue的构造函数
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty 判断队列是否为空
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full 判断队列是否已满
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push 入队操作
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront 出队操作
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // (可选)已出队的元素清零
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
// MsgQueue 消息队列,维护一个循环队列和条件锁
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue 消息队列的构造函数
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
// Consume 一个goroutine,对消息队列的消费操作
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // 获得条件锁并进入临街区
for mq.container.Empty() {
// 条件等待:队列空则等待
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // 释放条件锁
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // 延迟,模拟消息处理消耗时间
}
}
// Produce 一个goroutine,生产消息元素并入队
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// 条件等待:队列满则等待
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // 产生随机数,模拟生成消息
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // 随机等待时长
time.Sleep(dur)
}
}
func main() {
var cond = sync.NewCond(&sync.Mutex{
})
mq := NewMsgQueue(cond)
wg.Add(2)
go mq.Produce()
go mq.Consume()
wg.Wait()
}
运行结果:
The queue is empty...
==>>Produce: 8081 [0 8081 0 0 0]
<<==Consume: 8081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 1847 [0 0 1847 0 0]
<<==Consume: 1847 [0 0 0 0 0]
The queue is empty...
==>>Produce: 2081 [0 0 0 2081 0]
<<==Consume: 2081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 4425 [0 0 0 0 4425]
==>>Produce: 456 [456 0 0 0 4425]
==>>Produce: 694 [456 694 0 0 4425]
<<==Consume: 4425 [456 694 0 0 0]
==>>Produce: 8162 [456 694 8162 0 0]
<<==Consume: 456 [0 694 8162 0 0]
<<==Consume: 694 [0 0 8162 0 0]
==>>Produce: 4728 [0 0 8162 4728 0]
......
......
当消息队列不为空时,消息处理goroutine处理消息,否则等待消息生成goroutine生成随机数。
边栏推荐
- SD_DATA_RECEIVE_SHIFT_REGISTER
- 数学分析_笔记_第11章:Fourier级数
- 面试官:页面很卡的原因分析及解决方案?【测试面试题分享】
- Dragging the custom style of Baidu map to the right makes the global map longitude 0 unable to be displayed normally
- [trusted computing] Lesson 11: TPM password resource management (III) NV index and PCR
- 性能测试过程和计划
- MySQL index hit level analysis
- Pytorch中自制数据集进行Dataset重写
- 讨论| 坦白局,工业 AR 应用为什么难落地?
- Management by objectives [14 of management]
猜你喜欢

MRS离线数据分析:通过Flink作业处理OBS数据

手机app外卖订餐个人中心页面
![[OKR target management] value analysis](/img/d9/1f0022d3aa34cc10f1151e181dd695.png)
[OKR target management] value analysis

Import requirements in batches during Yolo training Txt

Deep learning - make your own dataset

Mobile app takeout ordering personal center page

Chapter 3 business function development (to remember account and password)

Tear the Nacos source code by hand (tear the client source code first)

AI 击败了人类,设计了更好的经济机制

上市十天就下线过万台,欧尚Z6产品实力备受点赞
随机推荐
Native JS verification code
DatePickerDialog and trimepickerdialog
Mui side navigation anchor positioning JS special effect
Robot engineering lifelong learning and work plan-2022-
Mrs offline data analysis: process OBS data through Flink job
[principle and technology of network attack and Defense] Chapter 1: Introduction
Pro2:修改div块的颜色
[OKR target management] case analysis
深入浅出【机器学习之线性回归】
Tips of this week 141: pay attention to implicit conversion to bool
<代码随想录二刷>链表
Functions and usage of serachview
Import requirements in batches during Yolo training Txt
Introduction to OTA technology of Internet of things
Using stored procedures, timers, triggers to solve data analysis problems
Alertdialog create dialog
[answer] if the app is in the foreground, the activity will not be recycled?
Self made dataset in pytoch for dataset rewriting
Hutool - 轻量级 DB 操作解决方案
如何在软件研发阶段落地安全实践