当前位置:网站首页>Go obtains the processing results of goroutine through channel
Go obtains the processing results of goroutine through channel
2022-07-27 07:12:00 【Tie Zhu】
One 、 Preface
I wrote an article a few days ago , It's through sync.Map obtain goroutine And then deal with it , But I always felt that the plan was general , Is not very good . After all channel Is the Crown Prince appointed by the emperor , So again channel Better .
golang control goroutine Quantity and obtain processing results
Two 、 Misunderstanding and actual combat code
1、 myth
Blogger's own use channel It is generally used to control goroutine Concurrent , therefore channel Simple structure , Just take it for granted that channel Only suitable for storing simple structures , Complex function processing results pass channel It's not convenient to handle , Yes, yes There is a great deal of fallacy .
2. Combat code
notes : The following is the desensitization pseudocode
Overview of code meaning :
(1) adopt channel control goroutine Number
(2) Definition channel Structure , Other structures can be nested inside the structure according to requirements , Realize the complex structure we want
(3) Each cycle gets go Function to process the result , Process the results of each cycle
//EsBulkDataRes Complex structures can be defined , Or nested structure
type EsBulkDataRes struct {
SucceededNums int `json:"succeeded_nums"`
FailedNums int `json:"failed_nums"`
ErrData string `json:"err_data"`
//TestDoc *TestDoc
}
type TestDoc struct {
ID string `json:"_id,omitempty"`
Source map[string]interface{
} `json:"_source,omitempty"` //map It can store complex structures
}
func testChannel() {
// Define the return value to get , Number of successes , The number of failures , Failure id aggregate
succeededNums, failedNums := 0, 0
var errData string
DealRes := make(chan EsBulkDataRes)
defer func() {
close(DealRes)
}()
wg := sync.WaitGroup{
}
// control goroutine Number , At the same time, only 10 individual goroutine
chan1 := make(chan struct{
}, 10)
ctx := context.Background()
for {
// Business logic
// ....
//goroutine Speed up ,chan1 Write full , The block . Waiting for the previous goroutine Release to continue the cycle
chan1 <- struct{
}{
}
wg.Add(1)
go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
defer func() {
if err1 := recover(); err1 != nil {
// Produced panic abnormal
log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
}
// Release after execution channel
<-chan1
return
}()
bulkRequest := esClient.Bulk()
//ES Use bulk Method batch refresh data
bulkResByAssetInfo := new(EsBulkDataRes)
bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
if err != nil {
log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
return
}
// Write the execution result channel
DealRes <- *bulkResByAssetInfo
// Release after execution channel
<-chan1
}(ctx, targetIndexName, esClient)
//goroutine end of execution , Read channel Cumulative results
// Read channel, Also for the convenience of the next goroutine Writing . If you haven't read , It will block
select {
case d, ok := <-DealRes:
if !ok {
continue
}
// Cumulative results
succeededNums += d.SucceededNums
failedNums += d.FailedNums
errData += d.ErrData
case <-ctx.Done():
return
}
}
wg.Wait()
// Print the results
fmt.Println(" Number of successes :", succeededNums)
fmt.Println(" The number of failures :", failedNums)
fmt.Println(" Failure id:", errData)
}
3、 ... and 、channel Control multiple goroutine Serial
This part is the interview question that bloggers have encountered before , Say more go function , Every go Functions depend on the processing result of the previous step , How to realize serial . Pseudocode is given here , Just refer to .
// Definition channel structure
type chanStruct struct {
Res1 int64
}
// Give... Again in each function channel assignment
func test1(chan1 chan chanStruct) {
res1 := new(chanStruct)
fmt.Println("test1")
res1.Res1 = 2
chan1 <- *res1
return
}
func test2(chan2 chan chanStruct) {
res2 := new(chanStruct)
fmt.Println("test2")
res2.Res1 = 3
chan2 <- *res2
return
}
func test3(chan3 chan chanStruct) {
fmt.Printf("test3,chanStruct:(%+v)", chan3)
return
}
//https://segmentfault.com/q/1010000041024462/
// All unbuffered channel reads and writes must be in the co process , Otherwise, it will block . With buffer channels, you don't need to be ready , Read or write can be written in the current thread without blocking .
func main() {
chan0 := make(chan chanStruct, 1)
// It's used here "golang.org/x/sync/errgroup" This package , Blogger's personal experiment , It is not necessary here
g, ctx := errgroup.WithContext(context.Background())
chan0 <- chanStruct{
Res1: 1,
}
fmt.Println("write chan success!")
//errgroup Control concurrent , And get the goroutine Error returned ( Not used here )
g.Go(func() error {
for {
select {
// Note that there , Because every time we read it channel, Therefore, we need to give channel assignment
// It is guaranteed to trigger the next function
case d, ok := <-chan0:
fmt.Println("d:", d)
if ok {
if d.Res1 == 1 {
go test1(chan0)
} else if d.Res1 == 2 {
go test2(chan0)
} else if d.Res1 == 3 {
go test3(chan0)
fmt.Println("end")
return nil
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
//errgroup Of Wait Be similar to sync.withGroup Of Wait() Method , wait for goroutine end of execution
if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
Four 、 Postscript
How can we make sure that the code we write is better ? How to define this good ? Just realize the function or maintain elegance ? These are the questions the blogger asked when chatting with a boss .
For our developers , It is absolutely no problem to take the realization of needs as the first goal , But the code quality also needs to be continuously improved . How to improve , Of course, it depends on the code of the boss ! Where does the boss have the most code , Certainly github!
Bloggers recently read https://github.com/olivere/esdiff Big guy's code , Before I found myself narrow , I'm also amazed at the wonderful way the boss writes . This is just an unknown open source project , I do not know! k8s,etcd And other well-known projects will be magnificent ! come on. !
end
边栏推荐
- Interpretation of deepsort source code (6)
- Analysis of pix2pix principle
- Express framework
- 用typescript实现排序-递增
- DNA(脱氧核糖核酸)供应|碳纳米管载核酸-DNA/RNA材料|DNA/RNA核酸修饰磁性纳米颗粒
- Gbase 8C technical features
- 基于SSM图书借阅管理系统
- (转帖)eureka、consul、nacos的对比2
- VIM editor deletes all file contents
- [unity URP] the code obtains the universalrendererdata of the current URP configuration and dynamically adds the rendererfeature
猜你喜欢

PNA polypeptide PNA TPP | GLT ala ala Pro Leu PNA | suc ala Pro PNA | suc AAPL PNA | suc AAPM PNA

Reflection on pytorch back propagation

DNA修饰贵金属纳米颗粒|脱氧核糖核酸DNA修饰纳米金(科研级)

基于SSM实现的校园新闻发布管理系统

Hospital reservation management system based on SSM

从技术原理看元宇宙的可能性:Omniverse如何“造”火星

大疆livox定制的格式CustomMsg格式转换pointcloud2

DNA偶联PbSe量子点|近红外硒化铅PbSe量子点修饰脱氧核糖核酸DNA|PbSe-DNA QDs

基于SSM医院预约管理系统

李沐动手学深度学习V2-transformer和代码实现
随机推荐
Gbase 8C - SQL reference 6 SQL syntax (15)
How does golang assign values to empty structures
nvidia-smi 各参数意义
Norms of vectors and matrices
Shell编程的规范和变量
TS learning (VIII): classes in TS
The issuing process of individual developers applying for code signing certificates
DNA修饰贵金属纳米颗粒|脱氧核糖核酸DNA修饰纳米金(科研级)
36 - 新的 Promise 方法:allSettled & any & race
基于SSM音乐网站管理系统
Analysis of strong tennis cup 2021 PWN competition -- babypwn
Using docker to install and deploy redis on CentOS
Express framework
Interpretation of deepsort source code (V)
Cyclegan parsing
基于SSM医院预约管理系统
deepsort源码解读(六)
Basic statement of MySQL (1) - add, delete, modify and query
运行代码报错: libboost_filesystem.so.1.58.0: cannot open shared object file: No such file or directory
Matlab drawing (ultra detailed)