当前位置:网站首页>[demo] circular queue and conditional lock realize the communication between goroutines
[demo] circular queue and conditional lock realize the communication between goroutines
2022-07-07 18:19:00 【Viva Python】
Circular queue and conditional lock implementation goroutine
Communication between
One 、 Implementation of circular queue
1. Circular queue
Through the bottom layer section Maintain the elements of the queue , Two pointers ( Integer variables )Front
、Rear
Point to the head and tail elements of the queue respectively , Used for outgoing and incoming operations . Maintain two integer variables at the same time Size
、MaxSize
Record the number of elements in the queue and the maximum number of elements , Used to explicitly judge empty 、 Full rating and cyclic operation .
type EleType int // The element type of the queue
const MAXSIZE = 5 // The maximum capacity of the queue
var wg sync.WaitGroup
// CycleQueue Circular queue structure
type CycleQueue struct {
Arr []EleType // Slice maintenance
Size int // The current number of elements in the queue
Front int // Front The pointer , Point to the queue header element
Rear int // Rear The pointer , Point to the tail element of the queue
MaxSize int // The maximum capacity of the queue
}
// NewCycleQueue CycleQueue Constructor for
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty Determines if the queue is empty
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full Determine if the queue is full
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push Joining operation
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront Out of line operation
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // ( Optional ) The elements that have been out of the team are cleared
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
2. Message queue
(1) Message queue is based on circular queue as container , Add conditional lock , Used to control the goroutine
Mutually exclusive 、 Conditionally access circular queues .
// MsgQueue Message queue , Maintain a circular queue and conditional locks
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue Constructor of message queue
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
(2) Message queuing operations
Message queuing mainly has two operations , Corresponding to the two `goroutine`, simulation `goroutine` Generate and process messages .
Generate messages : Generate 10000 And join the team , Time delay 1 s~5 s( Random ).
Process the message : Print and zero the queue header message , Time delay 1 s.
// Consume One goroutine, Consumption of message queue
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // Get the condition lock and enter the adjacent block
for mq.container.Empty() {
// Conditions wait : If the queue is empty, wait
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // Release condition lock
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // Delay , Simulating message processing takes time
}
}
// Produce One goroutine, Produce message elements and join the team
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// Conditions wait : Wait when the queue is full
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // Generate random numbers , The simulation generates a message
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // Random waiting time
time.Sleep(dur)
}
}
Two 、 Conditional lock
The conditional lock in this example uses Go built-in sync
In bag Cond
Conditional lock . It can control the critical zone 、 Set the waiting conditions 、 Notify others goroutine
The function of . about *sync.Cond
Variable cond
, It has the following common operations :
1.cond.L.Lock()
Method
Acquire condition lock , Enter the critical area .
2. cond.L.Unlock()
Method
Release condition lock , Other goroutine
You can obtain a conditional lock .
3. cond.Wait()
Method
Blocking goroutine
, Wait for others goroutine
notice , Usually used to set cycle conditions , Wait until a certain condition is met before continuing . As above Produce Corresponding goroutine
Wait until the message queue is not full before generating messages .
4. cond.Signal()
Method
Notice on cond.Wait()
Method goroutine
Jump out of blocking .
5. cond.Broadcast()
Method
And cond.Signal()
The method is similar to , notice all stay cond.Wait()
Methodical goroutine
Jump out of blocking .
3、 ... and 、 Complete program code and running results
// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description: Simulate message queuing , The message generator writes messages to the queue for a random period of time ,
// When the line is full , No more writing ; When the message queue is not empty , The message consumption queue processes messages , need 2 second
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type EleType int // The element type of the queue
const MAXSIZE = 5 // The maximum capacity of the queue
var wg sync.WaitGroup
// CycleQueue Circular queue structure
type CycleQueue struct {
Arr []EleType // Slice maintenance
Size int // The current number of elements in the queue
Front int // Front The pointer , Point to the queue header element
Rear int // Rear The pointer , Point to the tail element of the queue
MaxSize int // The maximum capacity of the queue
}
// NewCycleQueue CycleQueue Constructor for
func NewCycleQueue() *CycleQueue {
queue := new(CycleQueue)
queue.MaxSize = MAXSIZE
queue.Front = 1
queue.Arr = make([]EleType, MAXSIZE)
return queue
}
// Empty Determines if the queue is empty
func (queue *CycleQueue) Empty() bool {
return queue.Size == 0
}
// Full Determine if the queue is full
func (queue *CycleQueue) Full() bool {
return queue.Size == queue.MaxSize
}
// Push Joining operation
func (queue *CycleQueue) Push(val EleType) {
if queue.Full() {
return
}
queue.Rear++
if queue.Rear == queue.MaxSize {
queue.Rear = 0
}
queue.Size++
queue.Arr[queue.Rear] = val
}
// PopFront Out of line operation
func (queue *CycleQueue) PopFront() EleType {
if queue.Empty() {
return 0
}
val := queue.Arr[queue.Front]
queue.Arr[queue.Front] = 0 // ( Optional ) The elements that have been out of the team are cleared
queue.Size--
queue.Front++
if queue.Front == queue.MaxSize {
queue.Front = 0
}
return val
}
// MsgQueue Message queue , Maintain a circular queue and conditional locks
type MsgQueue struct {
container CycleQueue
cond *sync.Cond
}
// NewMsgQueue Constructor of message queue
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
msgQueue := new(MsgQueue)
msgQueue.container = *NewCycleQueue()
msgQueue.cond = cond
return msgQueue
}
// Consume One goroutine, Consumption of message queue
func (mq *MsgQueue) Consume() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock() // Get the condition lock and enter the adjacent block
for mq.container.Empty() {
// Conditions wait : If the queue is empty, wait
fmt.Println("The queue is empty...")
mq.cond.Wait()
}
data := mq.container.PopFront()
fmt.Println("<<==Consume: ", data, mq.container.Arr)
mq.cond.L.Unlock() // Release condition lock
mq.cond.Signal() // Signal
time.Sleep(time.Second * 2) // Delay , Simulating message processing takes time
}
}
// Produce One goroutine, Produce message elements and join the team
func (mq *MsgQueue) Produce() {
wg.Add(1)
defer wg.Done()
for {
mq.cond.L.Lock()
for mq.container.Full() {
// Conditions wait : Wait when the queue is full
fmt.Println("The queue is full...")
mq.cond.Wait()
}
data := rand.Intn(10000) // Generate random numbers , The simulation generates a message
mq.container.Push(EleType(data))
fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
mq.cond.L.Unlock()
mq.cond.Signal()
dur := time.Duration(rand.Intn(5)) * time.Second // Random waiting time
time.Sleep(dur)
}
}
func main() {
var cond = sync.NewCond(&sync.Mutex{
})
mq := NewMsgQueue(cond)
wg.Add(2)
go mq.Produce()
go mq.Consume()
wg.Wait()
}
Running results :
The queue is empty...
==>>Produce: 8081 [0 8081 0 0 0]
<<==Consume: 8081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 1847 [0 0 1847 0 0]
<<==Consume: 1847 [0 0 0 0 0]
The queue is empty...
==>>Produce: 2081 [0 0 0 2081 0]
<<==Consume: 2081 [0 0 0 0 0]
The queue is empty...
==>>Produce: 4425 [0 0 0 0 4425]
==>>Produce: 456 [456 0 0 0 4425]
==>>Produce: 694 [456 694 0 0 4425]
<<==Consume: 4425 [456 694 0 0 0]
==>>Produce: 8162 [456 694 8162 0 0]
<<==Consume: 456 [0 694 8162 0 0]
<<==Consume: 694 [0 0 8162 0 0]
==>>Produce: 4728 [0 0 8162 4728 0]
......
......
When the message queue is not empty , Message processing goroutine
Process the message , Otherwise, wait for the message to be generated goroutine
Generate random number .
边栏推荐
- idea彻底卸载安装及配置笔记
- 开发一个小程序商城需要多少钱?
- MySQL index hit level analysis
- Click on the top of today's headline app to navigate in the middle
- 云景网络科技面试题【杭州多测师】【杭州多测师_王sir】
- [network attack and defense principle and technology] Chapter 4: network scanning technology
- 财富证券证券怎么开户?通过链接办理股票开户安全吗
- SD_DATA_SEND_SHIFT_REGISTER
- Automated testing: a practical skill that everyone wants to know about robot framework
- go语言的字符串类型、常量类型和容器类型
猜你喜欢
Click on the top of today's headline app to navigate in the middle
Chapter 3 business function development (user login)
Introduction of common API for socket programming and code implementation of socket, select, poll, epoll high concurrency server model
Self made dataset in pytoch for dataset rewriting
上市十天就下线过万台,欧尚Z6产品实力备受点赞
手机app外卖订餐个人中心页面
Backup Alibaba cloud instance OSS browser
How to clean when win11 C disk is full? Win11 method of cleaning C disk
『HarmonyOS』DevEco的下载安装与开发环境搭建
[PaddleSeg源码阅读] PaddleSeg Validation 中添加 Boundary IoU的计算(1)——val.py文件细节提示
随机推荐
Main work of digital transformation
USB通信协议深入理解
Discuss | frankly, why is it difficult to implement industrial AR applications?
socket编程之常用api介绍与socket、select、poll、epoll高并发服务器模型代码实现
golang 客户端服务端登录
[trusted computing] Lesson 13: TPM extended authorization and key management
[tpm2.0 principle and Application guide] Chapter 5, 7 and 8
Tips for short-term operation of spot silver that cannot be ignored
Chapter 3 business function development (user access project)
简单几步教你如何看k线图图解
Click on the top of today's headline app to navigate in the middle
Sanxian Guidong JS game source code
Easy to understand [linear regression of machine learning]
Summary of evaluation indicators and important knowledge points of regression problems
[trusted computing] Lesson 11: TPM password resource management (III) NV index and PCR
Self made dataset in pytoch for dataset rewriting
Chapter 1 Introduction to CRM core business
保证接口数据安全的10种方案
debian10编译安装mysql
用存储过程、定时器、触发器来解决数据分析问题