当前位置:网站首页>golang控制goroutine数量以及获取处理结果
golang控制goroutine数量以及获取处理结果
2022-07-27 05:12:00 【铁柱同学】
一、前言
最近遇到批量刷新ES数据的需求,为了加快处理速度,那必须首选goroutine了,但是众所周知,goroutine的返回值和错误处理一直都让人难以捉摸,go出去简单,怎么监测go出去的结果是个问题。
1、goroutine的错误处理
sync.ErrGroup在sync.WaitGroup功能的基础上,增加了错误传递,以及在发生不可恢复的错误时取消整个goroutine集合,或者等待超时。
具体的大家可以百度学习下。errGroup
2、goroutine的处理结果
目前使用goroutine一般采用的是 channel 、 sync.WaitGroup 、context,来实现各个协程之间的流程控制和消息传递,首选是channel来获取处理结果,channel参考:go通过channel获取goroutine的处理结果。
除了channel,那么是否可以用并发安全的sync.Map来存储结果,在所有的goroutine执行完毕后,再统一获取处理结果呢?答案是可以的,sync.Map就可以完美实现。
二、控制goroutine数量以及获取处理结果
1、实战代码
以下是使用goroutine批量刷新ES数据,并获取处理结果的代码:
//定义要获取的返回值,成功数量,失败数量,失败id集合
succeededNums, failedNums:= 0, 0,
errData:=""
var syncMap sync.Map
wg := sync.WaitGroup{
}
//控制goroutine数量,保证同时只有10个goroutine
chan1 := make(chan struct{
}, 10)
for {
//业务逻辑
// ....
//goroutine加速,chan1写满,则阻塞。等待之前的goroutine释放才能继续循环
chan1 <- struct{
}{
}
wg.Add(1)
go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
defer func() {
if err1 := recover(); err1 != nil {
//产生了panic异常
log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
}
wg.Done() //每个goroutine执行完毕则释放
return
}()
bulkRequest := esClient.Bulk()
//ES使用bulk方法批量刷新数据
bulkResByAssetInfo := new(EsBulkDataRes)
bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
if err != nil {
log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
return
}
//累加执行结果到sync.map,保证并发安全
tempMap := make(map[string]interface{
})
tempMap["successNums"] = bulkResByAssetInfo.SucceededNums
tempMap["failedNums"] = bulkResByAssetInfo.FailedNums
tempMap["errData"] = bulkResByAssetInfo.ErrData
//每次取循环的的最大id,作为syncMap的key
syncMap.Store(test[nums-1].ID, tempMap)
//执行完毕再释放channel
<-chan1
}(ctx, targetIndexName, esClient)
}
wg.Wait()
//刷新结束,写入通知,通知内容包括,遍历sync.map,获取返回值
syncMap.Range(func(key, value interface{
}) bool {
val := value.(map[string]interface{
})
succeededNums += val["successNums"].(int)
failedNums += val["failedNums"].(int)
errData += val["errData"].(string)
return true
})
//打印结果
fmt.Println("成功数量:",succeededNums)
fmt.Println("失败数量:",failedNums)
fmt.Println("失败id:",errData)
2、syncMap的使用
(1)写入处理结果到map
(2) 写入map到sync.Map中,注意key不要重复
(3)使用Range来循环sync.Map,获取处理结果,并累加
3、控制goroutine的数量
这块主要是通过设置channel的长度来实现的。
(1)设定channel长度,循环开始每生成一个goroutine则写入一次channel
(2) channel写满则阻塞
(3)goroutine执行完毕,释放channel
(4) for循环中继续写入channel,保证同时执行的goroutine只有10个
三、sync.Map的缺点
1、需要对value做断言处理,这个是interface{}的特性决定的
2、大家都知道sync.Map适合读多写少的场景,博主这里因为是跑脚本,所以使用sync.Map也无伤大雅,大家要追求性能的话,可以看一下currentMap的实现,通过hash分桶,减小锁的粒度来提升性能。
current-Map
end
边栏推荐
- Dimitra and ocean protocol interpret the secrets behind agricultural data
- Prototype and prototype chain in JS
- 手把手教你搭建钉钉预警机器人
- 「PHP基础知识」布尔型的使用
- Deploy redis with docker for high availability master-slave replication
- 「PHP基础知识」PHP中的注释
- User page management
- AQUANEE将在近期登陆Gate以及BitMart,低位布局的良机
- 未来刷脸支付是能够占据市场很多的份额
- kettle的文件名通配规则
猜你喜欢
随机推荐
M-DAO 7大赋能方案,助力DAO生态走向模式与标准化
Dimitra 和 Ocean Protocol 解读农业数据背后的秘密
Minio fragment upload lifting fragment size limit - chunk size must be greater than 5242880
How to judge whether an object is empty in JS
How can seektiger go against the cold air in the market?
Specific matters of opening accounts of futures companies
If the interviewer asks you about JVM, the extra answer of "escape analysis" technology will give you extra points
选择正规的资质好的期货公司开户
NFT市场格局仍未变化,Okaleido能否掀起新一轮波澜?
神芷迦蓝寺
Minio8.x version setting policy bucket policy
Okaleido launched the fusion mining mode, which is the only way for Oka to verify the current output
如果面试官问你 JVM,额外回答“逃逸分析”技术会让你加分
「PHP基础知识」整型数据的使用
Amazon evaluation autotrophic number, how to carry out systematic learning?
「PHP基础知识」PHP中的标记
Js== mandatory type conversion provisions of operators
NFT新的契机,多媒体NFT聚合平台OKALEIDO即将上线
给测试小姐姐的第三封信 | ORACLE存储过程知识分享和测试说明
Seektiger's okaleido has a big move. Will the STI of ecological pass break out?








