当前位置:网站首页>[demo] circular queue and conditional lock realize the communication between goroutines

[demo] circular queue and conditional lock realize the communication between goroutines

2022-07-07 18:19:00 Viva Python

Circular queue and conditional lock implementation goroutine Communication between

One 、 Implementation of circular queue

1. Circular queue

​ Through the bottom layer section Maintain the elements of the queue , Two pointers ( Integer variables )FrontRear Point to the head and tail elements of the queue respectively , Used for outgoing and incoming operations . Maintain two integer variables at the same time SizeMaxSize Record the number of elements in the queue and the maximum number of elements , Used to explicitly judge empty 、 Full rating and cyclic operation .

type EleType int  //  The element type of the queue 
const MAXSIZE = 5 //  The maximum capacity of the queue 
var wg sync.WaitGroup

// CycleQueue  Circular queue structure 
type CycleQueue struct {
    
	Arr     []EleType //  Slice maintenance 
	Size    int       //  The current number of elements in the queue 
	Front   int       // Front The pointer , Point to the queue header element 
	Rear    int       // Rear The pointer , Point to the tail element of the queue 
	MaxSize int       //  The maximum capacity of the queue 
}

// NewCycleQueue CycleQueue Constructor for 
func NewCycleQueue() *CycleQueue {
    
	queue := new(CycleQueue)
	queue.MaxSize = MAXSIZE
	queue.Front = 1
	queue.Arr = make([]EleType, MAXSIZE)
	return queue
}

// Empty  Determines if the queue is empty 
func (queue *CycleQueue) Empty() bool {
    
	return queue.Size == 0
}

// Full  Determine if the queue is full 
func (queue *CycleQueue) Full() bool {
    
	return queue.Size == queue.MaxSize
}

// Push  Joining operation 
func (queue *CycleQueue) Push(val EleType) {
    
	if queue.Full() {
    
		return
	}
	queue.Rear++
	if queue.Rear == queue.MaxSize {
    
		queue.Rear = 0
	}
	queue.Size++
	queue.Arr[queue.Rear] = val
}

// PopFront  Out of line operation 
func (queue *CycleQueue) PopFront() EleType {
    
	if queue.Empty() {
    
		return 0
	}
	val := queue.Arr[queue.Front]
	queue.Arr[queue.Front] = 0  // ( Optional ) The elements that have been out of the team are cleared 
	queue.Size--
	queue.Front++
	if queue.Front == queue.MaxSize {
    
		queue.Front = 0
	}
	return val
}

2. Message queue

​ (1) Message queue is based on circular queue as container , Add conditional lock , Used to control the goroutine Mutually exclusive 、 Conditionally access circular queues .

// MsgQueue  Message queue , Maintain a circular queue and conditional locks 
type MsgQueue struct {
    
	container CycleQueue
	cond      *sync.Cond
}

// NewMsgQueue  Constructor of message queue 
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
    
	msgQueue := new(MsgQueue)
	msgQueue.container = *NewCycleQueue()
	msgQueue.cond = cond
	return msgQueue
}

​ (2) Message queuing operations

 Message queuing mainly has two operations , Corresponding to the two `goroutine`, simulation `goroutine` Generate and process messages .
  • Generate messages : Generate 10000 And join the team , Time delay 1 s~5 s( Random ).

  • Process the message : Print and zero the queue header message , Time delay 1 s.

// Consume  One goroutine, Consumption of message queue 
func (mq *MsgQueue) Consume() {
    
	wg.Add(1)
	defer wg.Done()
	for {
    
		mq.cond.L.Lock()           //  Get the condition lock and enter the adjacent block 
		for mq.container.Empty() {
     //  Conditions wait : If the queue is empty, wait 
			fmt.Println("The queue is empty...")
			mq.cond.Wait()
		}
		data := mq.container.PopFront()
		fmt.Println("<<==Consume: ", data, mq.container.Arr)
		mq.cond.L.Unlock()          //  Release condition lock 
		mq.cond.Signal()            // Signal
		time.Sleep(time.Second * 2) //  Delay , Simulating message processing takes time 
	}
}

// Produce  One goroutine, Produce message elements and join the team 
func (mq *MsgQueue) Produce() {
    
	wg.Add(1)
	defer wg.Done()
	for {
    
		mq.cond.L.Lock()
		for mq.container.Full() {
     //  Conditions wait : Wait when the queue is full 
			fmt.Println("The queue is full...")
			mq.cond.Wait()
		}
		data := rand.Intn(10000) //  Generate random numbers , The simulation generates a message 
		mq.container.Push(EleType(data))
		fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
		mq.cond.L.Unlock()
		mq.cond.Signal()
		dur := time.Duration(rand.Intn(5)) * time.Second //  Random waiting time 
		time.Sleep(dur)
	}
}

Two 、 Conditional lock

​ The conditional lock in this example uses Go built-in sync In bag Cond Conditional lock . It can control the critical zone 、 Set the waiting conditions 、 Notify others goroutine The function of . about *sync.Cond Variable cond, It has the following common operations :

1.cond.L.Lock() Method

​ Acquire condition lock , Enter the critical area .

2. cond.L.Unlock() Method

​ Release condition lock , Other goroutine You can obtain a conditional lock .

3. cond.Wait() Method

​ Blocking goroutine, Wait for others goroutine notice , Usually used to set cycle conditions , Wait until a certain condition is met before continuing . As above Produce Corresponding goroutine Wait until the message queue is not full before generating messages .

4. cond.Signal() Method

​ Notice on cond.Wait() Method goroutine Jump out of blocking .

5. cond.Broadcast() Method

​ And cond.Signal() The method is similar to , notice all stay cond.Wait() Methodical goroutine Jump out of blocking .

3、 ... and 、 Complete program code and running results

// @File: condDemo3.go
// @Author: Jason
// @Date: 2022/7/2
// @Description:  Simulate message queuing , The message generator writes messages to the queue for a random period of time ,
//  When the line is full , No more writing ; When the message queue is not empty , The message consumption queue processes messages , need 2 second 

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type EleType int  //  The element type of the queue 
const MAXSIZE = 5 //  The maximum capacity of the queue 
var wg sync.WaitGroup

// CycleQueue  Circular queue structure 
type CycleQueue struct {
    
	Arr     []EleType //  Slice maintenance 
	Size    int       //  The current number of elements in the queue 
	Front   int       // Front The pointer , Point to the queue header element 
	Rear    int       // Rear The pointer , Point to the tail element of the queue 
	MaxSize int       //  The maximum capacity of the queue 
}

// NewCycleQueue CycleQueue Constructor for 
func NewCycleQueue() *CycleQueue {
    
	queue := new(CycleQueue)
	queue.MaxSize = MAXSIZE
	queue.Front = 1
	queue.Arr = make([]EleType, MAXSIZE)
	return queue
}

// Empty  Determines if the queue is empty 
func (queue *CycleQueue) Empty() bool {
    
	return queue.Size == 0
}

// Full  Determine if the queue is full 
func (queue *CycleQueue) Full() bool {
    
	return queue.Size == queue.MaxSize
}

// Push  Joining operation 
func (queue *CycleQueue) Push(val EleType) {
    
	if queue.Full() {
    
		return
	}
	queue.Rear++
	if queue.Rear == queue.MaxSize {
    
		queue.Rear = 0
	}
	queue.Size++
	queue.Arr[queue.Rear] = val
}

// PopFront  Out of line operation 
func (queue *CycleQueue) PopFront() EleType {
    
	if queue.Empty() {
    
		return 0
	}
	val := queue.Arr[queue.Front]
	queue.Arr[queue.Front] = 0 // ( Optional ) The elements that have been out of the team are cleared 
	queue.Size--
	queue.Front++
	if queue.Front == queue.MaxSize {
    
		queue.Front = 0
	}
	return val
}

// MsgQueue  Message queue , Maintain a circular queue and conditional locks 
type MsgQueue struct {
    
	container CycleQueue
	cond      *sync.Cond
}

// NewMsgQueue  Constructor of message queue 
func NewMsgQueue(cond *sync.Cond) *MsgQueue {
    
	msgQueue := new(MsgQueue)
	msgQueue.container = *NewCycleQueue()
	msgQueue.cond = cond
	return msgQueue
}

// Consume  One goroutine, Consumption of message queue 
func (mq *MsgQueue) Consume() {
    
	wg.Add(1)
	defer wg.Done()
	for {
    
		mq.cond.L.Lock()           //  Get the condition lock and enter the adjacent block 
		for mq.container.Empty() {
     //  Conditions wait : If the queue is empty, wait 
			fmt.Println("The queue is empty...")
			mq.cond.Wait()
		}
		data := mq.container.PopFront()
		fmt.Println("<<==Consume: ", data, mq.container.Arr)
		mq.cond.L.Unlock()          //  Release condition lock 
		mq.cond.Signal()            // Signal
		time.Sleep(time.Second * 2) //  Delay , Simulating message processing takes time 
	}
}

// Produce  One goroutine, Produce message elements and join the team 
func (mq *MsgQueue) Produce() {
    
	wg.Add(1)
	defer wg.Done()
	for {
    
		mq.cond.L.Lock()
		for mq.container.Full() {
     //  Conditions wait : Wait when the queue is full 
			fmt.Println("The queue is full...")
			mq.cond.Wait()
		}
		data := rand.Intn(10000) //  Generate random numbers , The simulation generates a message 
		mq.container.Push(EleType(data))
		fmt.Println("\t==>>Produce: ", data, mq.container.Arr)
		mq.cond.L.Unlock()
		mq.cond.Signal()
		dur := time.Duration(rand.Intn(5)) * time.Second //  Random waiting time 
		time.Sleep(dur)
	}
}

func main() {
    
	var cond = sync.NewCond(&sync.Mutex{
    })
	mq := NewMsgQueue(cond)
	wg.Add(2)
	go mq.Produce()
	go mq.Consume()
	wg.Wait()
}

​ Running results :

The queue is empty...
        ==>>Produce:  8081 [0 8081 0 0 0]
<<==Consume:  8081 [0 0 0 0 0]
The queue is empty...
        ==>>Produce:  1847 [0 0 1847 0 0]
<<==Consume:  1847 [0 0 0 0 0]
The queue is empty...
        ==>>Produce:  2081 [0 0 0 2081 0]
<<==Consume:  2081 [0 0 0 0 0]
The queue is empty...
        ==>>Produce:  4425 [0 0 0 0 4425]
        ==>>Produce:  456 [456 0 0 0 4425]  
        ==>>Produce:  694 [456 694 0 0 4425]
<<==Consume:  4425 [456 694 0 0 0]
        ==>>Produce:  8162 [456 694 8162 0 0]
<<==Consume:  456 [0 694 8162 0 0]
<<==Consume:  694 [0 0 8162 0 0]
        ==>>Produce:  4728 [0 0 8162 4728 0]
        ......
        ......

​ When the message queue is not empty , Message processing goroutine Process the message , Otherwise, wait for the message to be generated goroutine Generate random number .

原网站

版权声明
本文为[Viva Python]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207071603493353.html