One 、 Preface
Go Language is designed to synchronize (Synchronization, Data synchronization and thread synchronization ) Provide a lot of support , such as goroutine and channel Synchronization primitives , There are
- sync: Provides basic synchronization primitives ( such as Mutex、RWMutex、Locker) and Tool class (Once、WaitGroup、Cond、Pool、Map)
- sync/atomic: Provides atomic operations for variables ( Based on hardware instruction compare-and-swap)
-- Quote from 《Golang package sync analyse ( One ): sync.Once》
In the last issue , We introduced sync.Once
How to protect exactly once
semantics , In this issue, we introduce package sync
Another tool class under :sync.WaitGroup
.
Two 、 Why WaitGroup
?
Imagine a scene : We have a user portrait service , When a request comes , need
- from request Parsing out user_id and Portrait dimension parameters
- according to user_id from ABCDE Five sub services ( Database services 、 Storage service 、rpc Service etc. ) Pull information from different dimensions
- Integrate the information read , Return to caller
hypothesis ABCDE Response time for five services p99 yes 20~50ms Between . If we call ABCDE Read information , Regardless of the time consumed by data consolidation , The overall response time of the server p99 yes :
sum(A, B, C, D, E) => [100ms, 250ms]
Let's not say whether it can be accepted in business , There is obviously a lot of room for optimization in response time . The most intuitive optimization direction is , The total time consumption of access logic :
sum(A, B, C, D, E) -> max(A, B, C, D, E)
Specific to the coding On , We need to call in parallel ABCDE Five sub services , To be called All After returning , Data integration . How to protect All
How about going back ?
here ,sync.WaitGroup
Glaring debut .
3、 ... and 、WaitGroup
usage
The official document is right WaitGroup The description of is : One WaitGroup Object can wait for a set of coroutines to end
. How to use it :
- main By calling
wg.Add(delta int)
Set up worker The number of coprocesses , Then create worker coroutines ; - worker After the execution of the program , Call the
wg.Done()
; - main Coroutine call
wg.Wait()
And be block, Until all the worker When all the programs are executed, it returns .
Here's a typical example :
// src/cmd/compile/internal/ssa/gen/main.go
func main() {
// Omitted code ...
var wg sync.WaitGroup
for _, task := range tasks {
task := task
wg.Add(1)
go func() {
task()
wg.Done()
}()
}
wg.Wait()
// Omitted code ...
}
This example has WaitGroup
Most of the elements of proper use , Include :
wg.Done
Must be in allwg.Add
After performing , So make sure that both functions are in main Call in association ;wg.Done
stay worker Call in the coroutine , In particular, make sure to call once , Can't because panic Or for any reason ( It is recommended to usedefer wg.Done()
);wg.Done
andwg.Wait
There is no order in time sequence .
Careful friends may find a very strange line of code :
task := task
Go Yes array/slice When traversing ,runtime Will be able to task[i]
copy to task
Memory address of , Subscript i
Will change , and task
The memory address of will not change . If you don't do this assignment , all goroutine Maybe all I read is the last one task. In order to give you an intuitive feeling , Let's experiment with the following code :
package main
import (
"fmt"
"unsafe"
)
func main() {
tasks := []func(){
func() { fmt.Printf("1. ") },
func() { fmt.Printf("2. ") },
}
for idx, task := range tasks {
task()
fmt.Printf(" Traverse = %v, ", unsafe.Pointer(&task))
fmt.Printf(" Subscript = %v, ", unsafe.Pointer(&tasks[idx]))
task := task
fmt.Printf(" local variable = %vn", unsafe.Pointer(&task))
}
}
The printed result of this code is :
1. Traverse = 0x40c140, Subscript = 0x40c138, local variable = 0x40c150
2. Traverse = 0x40c140, Subscript = 0x40c13c, local variable = 0x40c158
Printing results are different on different machines , But they have something in common :
- Ergodic time , The memory address of the data remains unchanged
- When accessing data by subscript , Different memory addresses
- for-loop Local variables created within , Even if the name is the same , Memory addresses will not be reused
Use WaitGroup
when , In addition to the precautions mentioned above , We also need to solve the problem of data collection and exception handling . Here we also provide two ways for reference :
- about rpc call , Can pass data channel and error channel Gather information , Or two in one channel
- Shared variables , Like locked map
Four 、WaitGroup
Realization
Before we discuss the subject , Readers are advised to think about it first : If you're going to make it happen WaitGroup
, What would you do ?
lock ? Definitely not !
Semaphore ? How to achieve ?
------------ Cut to the chase ------------
stay Go Source code ,WaitGroup
Include logically :
- worker Counter :main Coroutine call
wg.Add(delta int)
Increase whendelta
, callwg.Done
Time minus one . - waiter Counter : call
wg.Wait
when , Add one counter ; worker Counter down to 0 when , Reset waiter Counter . - Semaphore : For blocking main coroutines . call
wg.Wait
when , adoptruntime_Semacquire
Acquisition semaphore ; Reduce waiter Counter time , adoptruntime_Semrelease
Release semaphore .
For demonstration purposes , Let's change the example above :
package main
import (
"fmt"
"sync"
"time"
)
func main() {
tasks := []func(){
func() { time.Sleep(time.Second); fmt.Println("1 sec later") },
func() { time.Sleep(time.Second * 2); fmt.Println("2 sec later") },
}
var wg sync.WaitGroup // 1-1
wg.Add(len(tasks)) // 1-2
for _, task := range tasks {
task := task
go func() { // 1-3-1
defer wg.Done() // 1-3-2
task() // 1-3-3
}() // 1-3-1
}
wg.Wait() // 1-4
fmt.Println("exit")
}
In the above code ,
- 1-1 Create a
WaitGroup
object ,worker The counter and waiter The default value of the counter is 0. - 1-2 Set up worker The counter for
len(tasks)
. - 1-3-1 establish worker coroutines , And start the task .
- 1-4 Set up waiter Counter , Acquisition semaphore ,main Coprocess blocked .
1-3-3 After the end of execution ,1-3-2 Reduce worker Counter . When worker Counter down to 0 when ,
- Reset waiter Counter
- Release semaphore ,main The coroutine is activated ,1-4
wg.Wait
return
Even though Add(delta int)
in delta It can be a positive number 、0、 negative . When we use ,delta
Always positive .
wg.Done
Equivalent to wg.Add(-1)
. In this paper , We mentioned wg.Add
when , Default delta > 0
.
I understand WaitGroup
After the principle of , Let's take a look at the source code . For the sake of understanding , I only keep the core logic . For this part of logic , Let's talk about... In three parts :
WaitGroup
structureAdd
andDone
Wait
Tips : If you just want to know WaitGroup The correct use of , This is enough for this article . Those who are interested in the bottom can continue to read , But it's better to open it IDE, Read the source code together .
4.1 WaitGroup structure
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
WaitGroup
There are in the structure noCopy
and state1
Two fields .
When compiling code ,go vet
Tools will check noCopy
Field , avoid WaitGroup
The object is copied .
state1
Field comparison , Logically, it contains worker Counter 、waiter Counter and semaphore . How to read these three variables , Refer to the following code :
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
// Read counter and semaphore
statep, semap := wg.state()
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
The retrieval logic of the three variables is :
- worker Counter :
v
yesstatep *uint64
OfLeft 32 position
- waiter Counter :
w
yesstatep *uint64
OfRight 32 position
- Semaphore :
semap
yesstate1 [3]uint32
The first byte of / Last byte
therefore , to update worker Counter , It needs to be done :
state := atomic.AddUint64(statep, uint64(delta)<<32)
to update waiter Counter , It needs to be done :
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// Ignore other logic
return
}
}
Careful friends may find out ,worker The update of the counter is a direct accumulation , and waiter The update of the counter is CompareAndSwap. This is because in the main In the process of cooperation wg.Add
when , Only main Association pair state1
Making a change ; and wg.Wait
Revision in China waiter Counter time , There may be many collaborators updating state1
. If you don't quite understand this passage , You might as well go down first , understand wg.Add
and wg.Wait
And then look back at the details of .
4.2 Add and Done
wg.Add
The core logic of the operation is relatively simple , I.e. modification worker Counter , according to worker The state of the counter is followed up . The code for the simplified version is as follows :
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 1. modify worker Counter
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 2. Judgment counter
if v > 0 || w == 0 {
return
}
// 3. When worker Counter down to 0 when
// Reset waiter Counter , And release the semaphore
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
4.3 Wait
wg.Wait
The logic is to modify waiter Counter , And wait for the semaphore to be released . The code for the simplified version is as follows :
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
// 1. Read counter
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// 2. increase waiter Counter
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 3. Acquisition semaphore
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
// 4. Semaphore acquisition successful
return
}
}
}
Because the source code is relatively long , Contains a lot of validation logic and comments , In this paper, when quoting , At the same time, the core logic has been deleted in varying degrees . Last , Recommend that you download the source code , Read it carefully , Right in detail WaitGroup
There is a deeper understanding of the design of .