当前位置:网站首页>Golang concurrent lock
Golang concurrent lock
2022-07-28 18:41:00 【Compose】
Golang sync Package provides basic asynchronous operation methods , Including mutexes Mutex, Do it once Once Concurrent waiting group WaitGroup.
This paper mainly introduces sync The basic usage of these functions provided by the package .
- Mutex: The mutex
- RWMutex: Read-write lock
- WaitGroup: Concurrent wait group
- Once: Do it once
- Cond: Semaphore
- Pool: Temporary object pool
- Map: It comes with a lock map
Two . sync.Mutex
sync.Mutex be called The mutex , It is often used in concurrent programming . Coroutine is a lightweight thread in user mode .( So we can use the idea of thread to understand )
The concept of mutex : Lock shared data , Ensure that only one thread or coroutine can operate at the same time .
Be careful : A mutex is one in which multiple threads or coroutines compete , The thread or coroutine that grabs the lock executes first , Wait for what you don't get . After the mutex is used and released , Other waiting threads or coroutines grab the lock .
sync.Mutex Yes 2 A function Lock and UnLock It means acquiring lock and releasing lock respectively .
func (m *Mutex) Lock()
func (m *Mutex) UnLock()
sync.Mutex The initial value is UnLock state , also sync.Mutex It is often used as an anonymous variable of other structures .
for instance : We often use online payment to buy things , There will be both expenditure and income in the same bank account at a certain time , Then the bank must ensure that our balance is accurate , Ensure that the data is correct .
We can simply realize the expenditure and income of the bank to illustrate Mutex Use .
type Bank struct {
sync.Mutex
balance map[string]float64
}
// In income
func (b *Bank) In(account string, value float64) {
// Lock Ensure that only one coroutine can access this code at the same time
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok {
b.balance[account] = 0.0
}
b.balance[account] += v
}
// Out spending
func (b *Bank) Out(account string, value float64) error {
// Lock Ensure that only one coroutine can access this code at the same time
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
3、 ... and . sync.RWMutex
sync.RWMutex Called read-write lock is sync.Mutex A variant of ,RWMutex From the very famous reader writer question of computer operating system .
sync.RWMutex The purpose is to support multiple concurrent processes to read a resource at the same time , But only one concurrent process can update resources . That is to say, reading and writing are mutually exclusive , Writing and writing are also mutually exclusive , Reading and reading are not mutually exclusive .
Summed up as follows :
- When there is a process reading , All write coroutines must wait until all read coroutines are finished before obtaining locks for write operations .
- When there is a process reading , All read coroutines are not affected and can be read .
- When there is a collaborative process in writing , All read 、 The written process must wait until the end of the written process to obtain the lock for reading 、 Write operations .
- RWMutex Yes 5 A function , Provide lock operation for read and write respectively .
Write operations
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
Read operations
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
RLocker() Can acquire read lock , Then it is passed to other coroutines for use .
func (rw *RWMutex) RLocker() Locker
for instance ,sync.Mutex We don't provide query operations in the example , If you use Mutex Mutex can't support multiple people to query at the same time , So we use sync.RWMutex To rewrite this code
type Bank struct {
sync.RWMutex
balance map[string]float64
}
func (b *Bank) In(account string, value float64) {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok {
b.balance[account] = 0.0
}
b.balance[account] += v
}
func (b *Bank) Out(account string, value float64) error {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
func (b *Bank) Query(account string) float64 {
b.RLock()
defer b.RUnlock()
v, ok := b.balance[account]
if !ok {
return 0.0
}
return v
}
sync.WaitGroup It refers to the waiting group , stay Golang Concurrent programming is very common , refer to Wait for a group of work to complete , Then proceed to the next group of work .
sync.WaitGroup Yes 3 A function :
func (wg *WaitGroup) Add(delta int) Add add to n A concurrent coroutine
func (wg *WaitGroup) Done() Done Complete a concurrent process
func (wg *WaitGroup) Wait() Wait Wait for other concurrent processes to end
sync.WaitGroup stay Golang Programming is most commonly used in the process pool , The following example will start at the same time 1000 A concurrent coroutine .
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
// Wait for the end of all processes
wg.Wait()
fmt.Println("WaitGroup all process done ~")
}
sync.WaitGroup There is no way to specify the maximum number of concurrent processes , In some scenarios, there will be problems . For example, in the scenario of operating database , We don't want a large number of database connections at some time to make the database inaccessible . therefore , In order to control the maximum concurrency , It is recommended to use the lowest , Usage and sync.WaitGroup Very similar .
The following example is at most 10 A concurrent coroutine , If it has reached 10 A concurrent coroutine , Only one coordination process is executed Done To start a new process .
import "github.com/remeh/sizedwaitgroup"
func main() {
# Maximum 10 concurrent
wg := sizedwaitgroup.New(10)
for i = 0; i < 1000; i++ {
wg.Add()
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
// Wait for the end of all processes
wg.Wait()
fmt.Println("WaitGroup all process done ~")
}
sync.Once
sync.Once Refers to an object implementation that is executed only once , Commonly used to control that some functions can only be called once .sync.Once Use scenarios such as singleton mode 、 System initialization .
For example, in the case of concurrency, multiple calls channel Of close It can lead to panic, To solve this problem, we can use sync.Once To guarantee close Will only be executed once .
sync.Once The structure of is as follows , There is only one function . Using variables done To record the execution status of the function , Use sync.Mutex and sync.atomic To ensure thread safe reading done.
type Once struct {
m Mutex # The mutex
done uint32 # Execution status
}
func (o *Once) Do(f func())
for instance ,1000 In the case of multiple concurrent processes, only one process will execute to fmt.Printf, In the case of multiple execution, the output content is still different , Because it depends on which coroutine calls the anonymous function first .
func main() {
once := &sync.Once{}
for i := 0; i < 1000; i++ {
go func(idx int) {
once.Do(func() {
time.Sleep(1 * time.Second)
fmt.Printf("hello world index: %d", idx)
})
}(i)
}
time.Sleep(5 * time.Second)
}
sync.Cond
sync.Cond It refers to the synchronization condition variable , Generally, it needs to be used in combination with mutex , In essence, it is the synchronization mechanism of some coordination process waiting for a certain condition .
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
sync.Cond Yes 3 A function Wait、Signal、Broadcast:
// Wait Wait for a notice
func (c *Cond) Wait()
// Signal Unicast notification
func (c *Cond) Signal()
// Broadcast Broadcast notice
func (c *Cond) Broadcast()
for instance ,sync.Cond Used for concurrent coroutine condition variables .
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
sync.Pool
sync.Pool Refers to the temporary object pool ,Golang and Java have GC Mechanism , Therefore, many developers basically do not consider memory recycling , Unlike C++ Many times, developers need to recycle their own objects .
Gc It's a double-edged sword , It brings the convenience of programming, but also increases the runtime overhead , Improper use may seriously affect the performance of the program , Therefore, scenes with high performance requirements cannot generate too much garbage at will .
sync.Pool It is used to solve such problems ,Pool It can be used as a temporary object pool , No longer create your own objects , Instead, get an object from the temporary object pool .
sync.Pool Yes 2 A function Get and Put,Get Take an object from the temporary object pool ,Put It is used to put the object back into the temporary object pool at the end .
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
Look at an official example :
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// Get temporary objects , If not, it will automatically create
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// Put the temporary object back to Pool in
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
From the above example, we can see that creating a Pool Object cannot specify size , therefore sync.Pool There is no limit to the number of cache objects ( Only limited by memory ), that sync.Pool How to control the number of cache temporary objects ?
sync.Pool stay init I registered a poolCleanup function , It will clear all pool All cached objects inside , This function will be registered every time Gc Will call before , therefore sync.Pool The cache period is only twice Gc During this period . Positive cause Gc Cache objects will be cleared when , So don't worry pool Problems that will increase infinitely .
Because of that sync.Pool Suitable for caching temporary objects , It is not suitable for object pools for persistent storage ( Connection pool, etc ).
sync.Map
Go stay 1.9 Built in before version map Objects are not concurrency safe , Most of the time, we have to encapsulate those that support concurrency security Map structure , Give... As follows map Add a read-write lock sync.RWMutex.
type MapWithLock struct {
sync.RWMutex
M map[string]Kline
}
sync.Map in total 5 A function , Usage and native map almost :
// Query a key
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// Set up key value
func (m *Map) Store(key, value interface{})
// If key To be is to return key Corresponding value, Otherwise set key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// Delete one key
func (m *Map) Delete(key interface{})
// Traverse map, It's still out of order
func (m *Map) Range(f func(key, value interface{}) bool)
In order to control the maximum concurrency :
// Based upon sync.WaitGroup, SizedWaitGroup allows to start multiple
// routines and to wait for their end using the simple API.
// SizedWaitGroup adds the feature of limiting the maximum number of
// concurrently started routines. It could for example be used to start
// multiples routines querying a database but without sending too much
// queries in order to not overload the given database.
//
// Rémy Mathieu 2016
package sizedwaitgroup
import (
"context"
"math"
"sync"
)
// SizedWaitGroup has the same role and close to the
// same API as the Golang sync.WaitGroup but adds a limit of
// the amount of goroutines started concurrently.
type SizedWaitGroup struct {
Size int
current chan struct{}
wg sync.WaitGroup
}
// New creates a SizedWaitGroup.
// The limit parameter is the maximum amount of
// goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {
size := math.MaxInt32 // 2^32 - 1
if limit > 0 {
size = limit
}
return SizedWaitGroup{
Size: size,
current: make(chan struct{}, size),
wg: sync.WaitGroup{},
}
}
// Add increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {
s.AddWithContext(context.Background())
}
// AddWithContext increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called, or when the context is canceled. Returns nil on
// success or an error if the context is canceled before the lock
// is acquired.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.current <- struct{}{}:
break
}
s.wg.Add(1)
return nil
}
// Done decrements the SizedWaitGroup counter.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {
<-s.current
s.wg.Done()
}
// Wait blocks until the SizedWaitGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Wait() {
s.wg.Wait()
}
package sizedwaitgroup
import (
"context"
"sync/atomic"
"testing"
)
func TestWait(t *testing.T) {
swg := New(10)
var c uint32
for i := 0; i < 10000; i++ {
swg.Add()
go func(c *uint32) {
defer swg.Done()
atomic.AddUint32(c, 1)
}(&c)
}
swg.Wait()
if c != 10000 {
t.Fatalf("%d, not all routines have been executed.", c)
}
}
func TestThrottling(t *testing.T) {
var c uint32
swg := New(4)
if len(swg.current) != 0 {
t.Fatalf("the SizedWaitGroup should start with zero.")
}
for i := 0; i < 10000; i++ {
swg.Add()
go func(c *uint32) {
defer swg.Done()
atomic.AddUint32(c, 1)
if len(swg.current) > 4 {
t.Fatalf("not the good amount of routines spawned.")
return
}
}(&c)
}
swg.Wait()
}
func TestNoThrottling(t *testing.T) {
var c uint32
swg := New(0)
if len(swg.current) != 0 {
t.Fatalf("the SizedWaitGroup should start with zero.")
}
for i := 0; i < 10000; i++ {
swg.Add()
go func(c *uint32) {
defer swg.Done()
atomic.AddUint32(c, 1)
}(&c)
}
swg.Wait()
if c != 10000 {
t.Fatalf("%d, not all routines have been executed.", c)
}
}
func TestAddWithContext(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.TODO())
swg := New(1)
if err := swg.AddWithContext(ctx); err != nil {
t.Fatalf("AddContext returned error: %v", err)
}
cancelFunc()
if err := swg.AddWithContext(ctx); err != context.Canceled {
t.Fatalf("AddContext returned non-context.Canceled error: %v", err)
}
}
# SizedWaitGroup
[](https://godoc.org/github.com/remeh/sizedwaitgroup)
`SizedWaitGroup` has the same role and API as `sync.WaitGroup` but it adds a limit of the amount of goroutines started concurrently.
`SizedWaitGroup` adds the feature of limiting the maximum number of concurrently started routines. It could for example be used to start multiples routines querying a database but without sending too much queries in order to not overload the given database.
# Example
```go
package main
import (
"fmt"
"math/rand"
"time"
"github.com/remeh/sizedwaitgroup"
)
func main() {
rand.Seed(time.Now().UnixNano())
// Typical use-case:
// 50 queries must be executed as quick as possible
// but without overloading the database, so only
// 8 routines should be started concurrently.
swg := sizedwaitgroup.New(8)
for i := 0; i < 50; i++ {
swg.Add()
go func(i int) {
defer swg.Done()
query(i)
}(i)
}
swg.Wait()
}
func query(i int) {
fmt.Println(i)
ms := i + 500 + rand.Intn(500)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
```
# License
MIT
# Copyright
Rémy Mathieu 2016
边栏推荐
- leetcode 二叉树类
- 多线程与高并发—— 源码解析 AQS 原理
- Introduction to oscilloscope
- Detailed explanation of oscilloscope probe
- MySQL日期函数
- UE5 GAS 学习笔记 1.5 Gameplay Effects游戏效果
- Software testing needs more and more talents, but fewer people are on the road of testing?
- Ue5 gas learning notes 1.6 skills gameplay ability
- What are the conditions for zero foundation learning software testing?
- What skills do you need to master when learning software testing zero foundation?
猜你喜欢

Composition and principle of vector network analyzer (vector network)

Cloud container and cloud native

2022-07-27 第四小组 修身课 学习笔记(every day)

Detailed explanation of network RJ45 interface

一文简述:SRv6基本原理

Detailed explanation of oscilloscope parameters

多线程与高并发—— 源码解析 AQS 原理

Apple develops a complete creation process of Apple certificate and description file

WordPress prompt error in establishing database connection

Experimental building - PHP Dafa
随机推荐
LeetCode_96_不同的二叉搜索树
Ue5 gas learning notes 1.4 attribute set
Ue5 gas learning notes 1.5 gameplay effects game effects
VSC上写Go出现expected ‘package‘, found ‘EOF‘
Leetcode79 method 1: deep search
Golang 打包发布到各个平台
Modifier modifier modifier of solidity _;
What are the conditions for zero foundation learning software testing?
MySQL operation Encyclopedia
UE5 GAS 学习笔记 1.9 技能系统全局类(AbilitySystemGlobals)
Msg.value of solidity
Go的sleep
Performance parameters of spectrum analyzer
DC-DC switching power supply
Seven steps, in-depth interpretation of data meaning
Ue5 gas learning notes 1.8 game special effects (gameplaycue)
Ue5 gas learning notes 1.7 task ability tasks
Look at Devops construction from SRE
冒泡排序和相关视频
Ue5 gas learning notes 0.1 case Preview