当前位置:网站首页>[demo] circular queue and conditional lock realize the communication between goroutines
[demo] circular queue and conditional lock realize the communication between goroutines
2022-07-07 18:19:00 【Viva Python】
Circular queue and conditional lock implementation goroutine
Communication between
One 、 Implementation of circular queue
1. Circular queue
Through the bottom layer section Maintain the elements of the queue , Two pointers ( Integer variables )Front
、Rear
Point to the head and tail elements of the queue respectively , Used for outgoing and incoming operations . Maintain two integer variables at the same time Size
、MaxSize
Record the number of elements in the queue and the maximum number of elements , Used to explicitly judge empty 、 Full rating and cyclic operation .
type EleType int // The element type of the queue
const MAXSIZE = 5 // The maximum capacity of the queue
var wg sync.WaitGroup
// CycleQueue Circular queue structure
type CycleQueue struct {
Arr []EleType // Slice maintenance
Size int // The current number of elements in the queue
Front int // Front The pointer , Point to the queue header element
Rear int // Rear The pointer , Point to the tail element of the queue
MaxSize int // The maximum capacity of the queue
}
// NewCycleQueue CycleQueue Constructor for
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty Determines if the queue is empty
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full Determine if the queue is full
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push Joining operation
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront Out of line operation
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // ( Optional ) The elements that have been out of the team are cleared
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
2. Message queue
(1) Message queue is based on circular queue as container , Add conditional lock , Used to control the goroutine
Mutually exclusive 、 Conditionally access circular queues .
// MsgQueue Message queue , Maintain a circular queue and conditional locks
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue Constructor of message queue
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
(2) Message queuing operations
Message queuing mainly has two operations , Corresponding to the two `goroutine`, simulation `goroutine` Generate and process messages .
Generate messages : Generate 10000 And join the team , Time delay 1 s~5 s( Random ).
Process the message : Print and zero the queue header message , Time delay 1 s.
// Consume One goroutine, Consumption of message queue
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // Get the condition lock and enter the adjacent block
for mq.container.Empty() {
// Conditions wait : If the queue is empty, wait
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // Release condition lock
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // Delay , Simulating message processing takes time
}
}
// Produce One goroutine, Produce message elements and join the team
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// Conditions wait : Wait when the queue is full
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // Generate random numbers , The simulation generates a message
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // Random waiting time
time.Sleep(dur)
}
}
Two 、 Conditional lock
The conditional lock in this example uses Go built-in sync
In bag Cond
Conditional lock . It can control the critical zone 、 Set the waiting conditions 、 Notify others goroutine
The function of . about *sync.Cond
Variable cond
, It has the following common operations :
1.cond.L.Lock()
Method
Acquire condition lock , Enter the critical area .
2. cond.L.Unlock()
Method
Release condition lock , Other goroutine
You can obtain a conditional lock .
3. cond.Wait()
Method
Blocking goroutine
, Wait for others goroutine
notice , Usually used to set cycle conditions , Wait until a certain condition is met before continuing . As above Produce Corresponding goroutine
Wait until the message queue is not full before generating messages .
4. cond.Signal()
Method
Notice on cond.Wait()
Method goroutine
Jump out of blocking .
5. cond.Broadcast()
Method
And cond.Signal()
The method is similar to , notice all stay cond.Wait()
Methodical goroutine
Jump out of blocking .
3、 ... and 、 Complete program code and running results
// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description: Simulate message queuing , The message generator writes messages to the queue for a random period of time ,
// When the line is full , No more writing ; When the message queue is not empty , The message consumption queue processes messages , need 2 second
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type EleType int // The element type of the queue
const MAXSIZE = 5 // The maximum capacity of the queue
var wg sync.WaitGroup
// CycleQueue Circular queue structure
type CycleQueue struct {
Arr []EleType // Slice maintenance
Size int // The current number of elements in the queue
Front int // Front The pointer , Point to the queue header element
Rear int // Rear The pointer , Point to the tail element of the queue
MaxSize int // The maximum capacity of the queue
}
// NewCycleQueue CycleQueue Constructor for
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty Determines if the queue is empty
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full Determine if the queue is full
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push Joining operation
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront Out of line operation
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // ( Optional ) The elements that have been out of the team are cleared
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
// MsgQueue Message queue , Maintain a circular queue and conditional locks
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue Constructor of message queue
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
// Consume One goroutine, Consumption of message queue
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // Get the condition lock and enter the adjacent block
for mq.container.Empty() {
// Conditions wait : If the queue is empty, wait
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // Release condition lock
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // Delay , Simulating message processing takes time
}
}
// Produce One goroutine, Produce message elements and join the team
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// Conditions wait : Wait when the queue is full
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // Generate random numbers , The simulation generates a message
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // Random waiting time
time.Sleep(dur)
}
}
func main() {
var cond = sync.NewCond(&sync.Mutex{
})
mq := NewMsgQueue(cond)
wg.Add(2)
go mq.Produce()
go mq.Consume()
wg.Wait()
}
Running results :
The queue is empty...
==>>Produce: 8081 [0 8081 0 0 0]
<<==Consume: 8081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 1847 [0 0 1847 0 0]
<<==Consume: 1847 [0 0 0 0 0]
The queue is empty...
==>>Produce: 2081 [0 0 0 2081 0]
<<==Consume: 2081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 4425 [0 0 0 0 4425]
==>>Produce: 456 [456 0 0 0 4425]
==>>Produce: 694 [456 694 0 0 4425]
<<==Consume: 4425 [456 694 0 0 0]
==>>Produce: 8162 [456 694 8162 0 0]
<<==Consume: 456 [0 694 8162 0 0]
<<==Consume: 694 [0 0 8162 0 0]
==>>Produce: 4728 [0 0 8162 4728 0]
......
......
When the message queue is not empty , Message processing goroutine
Process the message , Otherwise, wait for the message to be generated goroutine
Generate random number .
边栏推荐
- Mrs offline data analysis: process OBS data through Flink job
- 简单几步教你如何看k线图图解
- AI 击败了人类,设计了更好的经济机制
- Backup Alibaba cloud instance OSS browser
- TaffyDB开源的JS数据库
- Robot engineering lifelong learning and work plan-2022-
- 小程序中实现付款功能
- Pro2: modify the color of div block
- [principle and technology of network attack and Defense] Chapter 1: Introduction
- 科学家首次观察到“电子漩涡” 有助于设计出更高效的电子产品
猜你喜欢
Use onedns to perfectly solve the optimization problem of office network
Self made dataset in pytoch for dataset rewriting
Mobile app takeout ordering personal center page
数学分析_笔记_第11章:Fourier级数
Sanxian Guidong JS game source code
Management by objectives [14 of management]
Year SQL audit platform
[4500 word summary] a complete set of skills that a software testing engineer needs to master
回归测试的分类
卖空、加印、保库存,东方甄选居然一个月在抖音卖了266万单书
随机推荐
保证接口数据安全的10种方案
手机app外卖订餐个人中心页面
科学家首次观察到“电子漩涡” 有助于设计出更高效的电子产品
Use seven methods to enhance all the images in a folder
不能忽略的现货白银短线操作小技巧
回归测试的分类
Do you really understand sticky bag and half bag? 3 minutes to understand it
[trusted computing] Lesson 10: TPM password resource management (II)
Tips for short-term operation of spot silver that cannot be ignored
物联网OTA技术介绍
同消费互联网的较为短暂的产业链不同,产业互联网的产业链是相当漫长的
[principle and technology of network attack and Defense] Chapter 7: password attack technology Chapter 8: network monitoring technology
手撕Nacos源码(先撕客户端源码)
带动画的列表选中js特效
ICer知识点杂烩(后附大量题目,持续更新中)
More than 10000 units were offline within ten days of listing, and the strength of Auchan Z6 products was highly praised
nest.js入门之 database
海量数据去重的hash,bitmap与布隆过滤器Bloom Filter
Face recognition attendance system based on Baidu flying plasma platform (easydl)
Chapter 3 business function development (safe exit)