当前位置:网站首页>Golang channel channel
Golang channel channel
2022-08-03 13:15:00 【Cloud full of notes】
这里填写标题
1. Golang 通道 channel
Don’t communicate by sharing memory, share memory by communicating.相信学过 Go 的同学都知道这句名言, 可以说 channel 就是后边这句话的具体实现.channel Is a circular queue type safety, 能够控制 groutine 在它上面读写消息的行为, Such as blocking a groutine , 或者唤醒某个 groutine.
1.1. 基本特征
一个通道相当于一个先进先出(FIFO)的队列, Each element value is strictly according to send the order, 先被发送通道的元素值一定会先被接收, 一个左尖括号紧接着一个减号形象地代表了元素值的传输方向.The following is to create several different channel:
ch1 := make(chan int) // 无缓冲通道 ch2 := make(chan int, 3) // 有缓冲通道 ch3 := make(chan<- int, 1) // 单向通道: 只能发送不能接收 ch4 := make(<-chan int, 1) // 单向通道: 只能接收不能发送
下面举一个简单的示例:
func main() {
done := make(chan struct{
}) c := make(chan string) go func() {
s := <-c // 接收消息 println(s) close(done) // 关闭通道, As the end of the notice }() c <- "lvmenglou" // 发送消息 <-done // 阻塞, Know that there are data or channel closed}//最后输出: lvmenglou
Channel to send and receive operation basic characteristics:
- 元素复制: 进入通道的并不是在接收操作符右边的那个元素值, 而是它的副本(发送操作包括"复制元素值"和"In the channel"2 步, 接收操作包括"复制通道内的元素值"、"放置副本到接收方"和"删掉原值"3 步);
- 不可分割: When a data access, No haven't finished copy, Just to be receiving.
1.2. 底层原理
1.2.1. 数据结构
channel 的数据结构如下:
type hchan struct {
qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度, 即可以存放的元素个数 buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标识关闭状态 elemtype *_type // 元素类型 sendx uint // 队列下标, 指示元素写入时存放到队列中的位置 recvx uint // 队列下标, 指示元素从队列的该位置读出 recvq waitq // 等待读消息的 goroutine 队列 sendq waitq // 等待写消息的 goroutine 队列 lock mutex // 互斥锁, chan 不允许并发读写 }
chan 内部实现了一个环形队列作为其缓冲区, 队列的长度是创建 chan 时指定的.
1.2.2. 发送
向一个 channel 中写数据简单过程如下:
- 如果等待接收队列 recvq 不为空, 说明缓冲区中没有数据或者没有缓冲区, 此时直接从 recvq 取出 G, 并把数据写入, 最后把该 G 唤醒, 结束发送过程;
- 如果缓冲区中有空余位置, 将数据写入缓冲区, 结束发送过程;
- 如果缓冲区中没有空余位置, 将待发送数据写入 G, 将当前 G 加入 sendq, 进入睡眠, 等待被读 goroutine 唤醒.
1.2.3. 接收
从一个 channel 读数据简单过程如下:
- 如果等待发送队列 sendq 不为空, 且没有缓冲区, 直接从 sendq 中取出 G, 把 G 中数据读出, 最后把 G 唤醒, 结束 读取过程;
- 如果等待发送队列 sendq 不为空, 此时说明缓冲区已满, 从缓冲区中首部读出数据, 把 G 中数据写入缓冲区尾部, 把 G 唤醒, 结束读取过程;
- 如果缓冲区中有数据, 则从缓冲区取出数据, 结束读取过程;
- 将当前 goroutine 加入 recvq, 进入睡眠, 等待被写 goroutine 唤醒.
1.2.4. 关闭
关闭 channel 时会把 recvq 中的 G 全部唤醒, 本该写入 G 的数据位置为 nil.把 sendq 中的 G 全部唤醒, 但这些 G 会 panic.
1.3. 核心知识
1.3.1. 发送
阻塞情况:
- nil 阻塞: 向
nil
通道发送数据会被阻塞.
ch := make(chan int, 2)ch = nilch <- 4 // all goroutines are asleep - deadlock!
- 无缓冲 channel + 读未 ready: 向无缓冲 channel 写数据, 如果读协程没有准备好, 会阻塞.
- 有缓冲 channel + 缓冲已满: 向有缓冲 channel 写数据, 如果缓冲已满, 会阻塞.
重要知识点:
- panic: closed 的 channel, 写数据会 panic.
ch := make(chan int, 2)
ch <- 4
close(ch)
ch <- 3 // panic: send on closed channel
- 资源回收: channel 使用完后, 需要 close 掉, Resources will not recycling(包括 channel 资源, 以及 channel The data stored inside resources).
- 数据交换: 就算是有缓冲的 channel , 也不是每次发送、接收都要经过缓存, 如果发送的时候, 刚好有等待接收的协程, 那么会直接交换数据.【This my existence question, 待定! ! ! 】
1.3.2. 接收
阻塞情况:
- nil 阻塞: 从 nil 通道接收数据会被阻塞.
- 无缓冲 channel + Writing is not ready: 从无缓冲 channel 读数据, 如果写协程没有准备好, 会阻塞.
- 有缓冲 channel + The buffer is empty: 从有缓冲 channel 读数据, 如果The buffer is empty, 会阻塞.
重要知识点:
- 关闭 channel 数据接收: From the closed channel 接收数据, If there is a data channel, Returns have buffer data; 如果没有数据, 会读到通道传输数据类型的零值, Such as pointer types, 读到 nil.(可以通过 x, ok:=<-c 中的 ok, Determine whether the data read out)
c := make(chan int, 3)c <- 11c <- 12close(c)for i := 0; i < cap(c)+1; i++ {
x, ok := <-c println(i, ":", ok, x)}// 输出// 0: true 11// 1: true 12// 2: false 0// 3: false 0
1.3.3. 关闭
重要知识点:
- close panic: 重复关闭, 或关闭 nil Channel will lead to panic.
ch := make(chan int, 2)ch <- 4close(ch)close(ch) // panic: close of closed channel
- Multithreaded channel closure principle: 由于 close 的 channel, Write data is panic, When multithreaded write and read so, 需要遵循"Who write, 谁负责关闭"原则.(Later by the complete example explain the knowledge)
1.3.4. for-range 读取
我们常常会用 for-range 来读取 channel 的数据
ch := make(chan int, 1)go func(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i } close(ch)}(ch)for val := range ch {
fmt.Println(val)}
重要知识点:
- 如果 channel 已经关闭, 它还是会继续执行, 直到所有值被取完, 然后退出执行;
- 如果 channel 没有关闭, 但是 channel 没有可读取的数据, 它则会阻塞在 range 这句位置, 直到被唤醒;
- 如果 channel 是 nil, 读取会被阻塞, 也就是会一直阻塞在 range 位置.
1.3.5. select
select 是跟 channel 关系最亲密的语句, 它是被专门设计出来处理通道的, 因为每个 case 后面跟的都是通道表达式, 可以是读, 也可以是写.下面看一个简单的示例:
// 准备好几个通道.intChannels := [3]chan int{ make(chan int, 1), make(chan int, 1), make(chan int, 1),}// 随机选择一个通道, 并向它发送元素值.index := rand.Intn(3)fmt.Printf("The index: %d\n", index)intChannels[index] <- index// 哪一个通道中有可取的元素值, 哪个对应的分支就会被执行.select {case <-intChannels[0]: fmt.Println("The first candidate case is selected.")case <-intChannels[1]: fmt.Println("The second candidate case is selected.")case elem := <-intChannels[2]: fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)default: fmt.Println("No candidate case is selected!")}
We use a contains three candidate branch select 语句, 分别尝试从上述三个通道中接收元素值, 哪一个通道中有值, 哪一个对应的候选分支就会被执行.后面还有一个默认分支, 不过在这里它是不可能被选中的.在使用 select 语句的时候, We need to pay attention to the following a few things:
- 有 default 情况: select 只要有默认语句, 就不会被阻塞, 换句话说, 如果没有 default, 然后 case 又都不能读或者写, 则会被阻塞.
- 无 default 情况: 如果没有加入默认分支, 那么一旦所有的 case 表达式都没有满足求值条件, 那么 select 语句就会被阻塞.直到至少有一个 case 表达式满足条件为止.
- multi-valued assignment: select 不能够像 for-range 一样发现 channel 被关闭而终止执行, 我们可能会因为通道关闭了, 而直接从通道接收到一个其元素类型的零值.所以, 在很多时候, 我们需要通过接收表达式的第二个结果值来判断通道是否已经关闭.一旦发现某个通道关闭了, 我们就应该及时地屏蔽掉对应的分支或者采取其他措施.这对于程序逻辑和程序性能都是有好处的.
- select + for: select 语句只能对其中的每一个 case 表达式各求值一次.所以, 如果我们想连续或定时地操作其中的通道的话, 就往往需要通过在 for 语句中嵌入 select 语句的方式实现.但这时要注意, 简单地在 select 语句的分支中使用 break 语句, 只能结束当前的 select 语句的执行, 而并不会对外层的 for 语句产生作用.这种错误的用法可能会让这个 for 语句无休止地运行下去.
intChan := make(chan int, 1)// 一秒后关闭通道.time.AfterFunc(time.Second, func() { close(intChan)})select {case _, ok := <-intChan: if !ok { fmt.Println("The candidate case is closed.") break } fmt.Println("The candidate case is selected.")}
- 随机选择 case: 如果同时有多个 case Sufficient conditions for, 会使用伪随机选择一个 case 来执行.
- All scans before, 再选择: 每次 select 语句的执行, 是会扫码完所有的 case 后才确定如何执行, 而不是说遇到合适的 case 就直接执行了.
- nil 阻塞: nil 的 channel, 不管读写都会被阻塞.
The above knowledge to remember, 面试常考, 下面是讲解 select 执行的流程:
- 对于每一个 case 表达式, 都至少会包含一个代表发送操作的发送表达式或者一个代表接收操作的接收表达式, 同时也可能会包含其他的表达式.比如, 如果 case 表达式是包含了接收表达式的短变量声明时, 那么在赋值符号左边的就可以是一个或两个表达式, 不过此处的表达式的结果必须是可以被赋值的.当这样的 case 表达式被求值时, 它包含的多个表达式总会以从左到右的顺序被求值.
- select 语句包含的候选分支中的 case 表达式都会在该语句执行开始时先被求值, 并且求值的顺序是依从代码编写的顺序从上到下的.结合上一条规则, 在 select 语句开始执行时, 排在最上边的候选分支中最左边的表达式会最先被求值, 然后是它右边的表达式.仅当最上边的候选分支中的所有表达式都被求值完毕后, 从上边数第二个候选分支中的表达式才会被求值, 顺序同样是从左到右, 然后是第三个候选分支、第四个候选分支, 以此类推.
- 对于每一个 case 表达式, 如果其中的发送表达式或者接收表达式在被求值时, 相应的操作正处于阻塞状态, 那么对该 case 表达式的求值就是不成功的.在这种情况下, 我们可以说, 这个 case 表达式所在的候选分支是不满足选择条件的.
- 仅当 select 语句中的所有 case 表达式都被求值完毕后, 它才会开始选择候选分支.这时候, 它只会挑选满足选择条件的候选分支执行.如果所有的候选分支都不满足选择条件, 那么默认分支就会被执行.如果这时没有默认分支, 那么 select 语句就会立即进入阻塞状态, 直到至少有一个候选分支满足选择条件为止.一旦有一个候选分支满足选择条件, select 语句(或者说它所在的 goroutine)就会被唤醒, 这个候选分支就会被执行.
- 如果 select 语句发现同时有多个候选分支满足选择条件, 那么它就会用一种伪随机的算法在这些分支中选择一个并执行.注意, 即使 select 语句是在被唤醒时发现的这种情况, 也会这样做.
- 一条 select 语句中只能够有一个默认分支.并且, 默认分支只在无候选分支可选时才会被执行, 这与它的编写位置无关.
- select 语句的每次执行, 包括 case 表达式求值和分支选择, 都是独立的.不过, 至于它的执行是否是并发安全的, 就要看其中的 case 表达式以及分支中, 是否包含并发不安全的代码了.
Write some more, 简单总结一下: 执行 select 时, 会从左到右, 从上到下, 对每个 case 表达式求值, 当所有 case 求值完毕后, Will choose to meet case 执行, If there are multiple all meet, Randomly select a; 如果都没有满足, 就执行 default; 如果连 default 都没有, 就阻塞住, And meet the conditions of case 出现时, 再执行.
1.4. 并发实例: 海外商城 Push
关于 channel, Fragmentary knowledge very much, I want to by a complete sample, Will these knowledge all strung together, The following to overseas mall Push 为例, To apply the knowledge to the actual scene.
1.4.1. 示例介绍
Overseas shopping need to W We have a business to send Push, For each business, 为了提高 Push 的并发能力, 采用 N Collaborators range from EMQ 中读取数据(EMQ In a message queue, Cache inside a lot of Push 数据), After read data processing, Then the processed data wrote channel 中.同时, 服务有 M Collaborators range from channel To retrieve data and consumer, And then through the millet Push SDK, 给用户发送 Push.Overall send link below:
Before see later, 我先抛出几个问题:
- Producer to close Channel 写数据, 会 Panic, 那么 Channel How to shut down?
- 当 Channel 关闭后(比如服务重启), Need to continue to consume Channel 里面的 Push, 该如何操作呢?
- 每消费一条 Channel 数据, 需要记录 Push 发送成功, 但是一条 Channel 数据包含 2-3 个 Push 内容(IOS/Android/PC), 程序记录 Push 成功前, 如何保证这 2-3 个 Push Have sent?
1.4.2. 初始化
初始化 channel 数组, Array is inside each business appTypes 的 channel, channel 的缓存区大小为 30, 并启动 10 A consumer coroutines:
var ( messageChan map[string]chan *WorkMessage // channel stopMasterChan chan bool // Consumers end inform appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"})func initPushChannel() { maxSize = 30 // channel 缓存区大小 workNum = 10 // goroutine 个数 stopMasterChan = make(chan bool) messageChan = make(map[string]chan *WorkMessage) for _, name := range appTypes { workChan := make(chan *WorkMessage, maxSize) messageChan[name] = workChan for i := 0; i < workNum; i++ { go startMaster(name, workChan) // Start the consumer coroutines } }}func startMaster(name string, workChan chan *WorkMessage) { for { if exit := dostartMaster(name, workChan); exit { return } }}
初始化 EMQ 的 Client, 并启动 10 A producer coroutines:
var ( clientFactory client.ClientFactory // EMQ Client stopChan chan bool // Producers over notice)func initEmq() { // 初始化 EMQ 的 Client And the number of single article reading data, 该处代码省略... maxConsumerNum := 10 stopChan = make(chan bool) for i := 0; i < maxConsumerNum; i++ { go receiveMsg(i) // Start the producer coroutines }}func receiveMsg(queueID int) { for { if exit := doReceiveMsg(queueID); exit { logz.Info("stop receive msg ...", logz.F("queueID", queueID)) return } }}
主方法调用:
func InitWorker() {
// 初始化 push SDK, 逻辑省略... initPushChannel() // 初始化 Channel, 启动消费者 initEmq() // 启动生产者}
1.4.3. 发送
func doReceiveMsg(queueID int) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.") } }() ticker := time.NewTicker(time.Second) for {
select {
case <-ticker.C: // 1. 从 EMQ 获取数据 List, 逻辑省略... // 2. 遍历 List, 获取业务类型, 逻辑省略... // 3. 根据业务类型, 获取对应的 channel name := "sharesave" // 示例数据 pushChannel, _ := messageChan[name] // 4. 构造 Push 数据, 然后放入 channel pushData := &WorkMessage{AppLocal: "id", AppType: 1} // 示例数据 pushChannel <- pushData case <-stopChan: println("stop to send data to channel.") return true } }}
This part of the code I have done a lot of simplified, 这里主要做了 2 件事情:
- 通过 select + 定时器, 每隔 1S 就会从 EMQ 中获取数据, Then after structure data into a corresponding business channel;
- 当收到 stopChan 事件时, Will inform all producers coroutines, 退出 goroutine, Here is actually coroutines exit one way.
1.4.4. 接收
func dostartMaster(name string, workChan chan *WorkMessage) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.") } }() for {
select {
case t := <-workChan: if t != nil {
for _, message := range t.PushMessages {
// 接受 channel 数据 t, Push the data to Push SDK // 逻辑省略... } } case <-stopMasterChan: println("stop to get data from channel.") return true } }}
This code also done a lot of simplified, 这里主要做了 2 件事情:
- 通过 select, 如果 channel 里面有数据, 直接读取, And then to users to send Push;
- 当收到 stopMasterChan 事件时, Will inform all producers coroutines, 退出 goroutine.
1.4.5. 关闭
// Inform the producer coroutines closed, Coroutines no longer write channelfunc stopRecvMsgFromQueue() { close(stopChan)}// Inform consumers coroutines closed, Coroutines no longer read channel, 并关闭 channel, 消费完 channel In the rest of the message func stopPushChannel() { close(stopMasterChan) time.Sleep(time.Second) for _, c := range messageChan { close(c) for msg := range c { if msg != nil { for _, message := range msg.PushMessages { // 接受 channel 数据 t, Push the data to Push SDK // 逻辑省略... } } } }}// 主方法调用 func StopWorker() { stopRecvMsgFromQueue() time.Sleep(time.Second * 2) stopPushChannel()}
比如服务重启, Need closed coroutines, 主要做以下事情:
- 执行 close(stopChan), To notify the producer coroutines, 不再往 channel 里面写数据;
- 执行 close(stopMasterChan), Inform consumers coroutines, 不再从 channel 里面读取数据;
- 关闭数组 messageChan 的每个 channel;
- 继续读取 channel 中剩余的数据, 因为使用的是 for-range 方式, 所以当 channel All the data after the read, for-range 会自动退出.
There are two place sleep 了一下, Respectively have the following effect:
- 调用 stopPushChannel() 前 sleep: After close the producer, Consumers continue to spend the rest of the data;
- 调用 close 前 sleep: Avoid coroutines does not completely shut down, Cause to shut down channel 写数据, 导致 panic.
1.5. 总结
In this chapter the basic are dry, The above summary of comprehensive, 这里就不再重复了, If you can answer me these questions, You should have mastered this chapter content:
- When sending and receiving, What are respectively conditions will lead to channel 阻塞呢?
- To send and close channel, 有哪些情况会导致 panic 呢?
- 当 channel 关闭后, Continue to read the inside of the data, Can read it? How to ensure the data read out?
- 对于生产者和消费者模型, How do you elegant closed channel, 避免写 channel 导致的 panic 呢?
- for-range 读取 channel 数据, 对于 channel The situation of the closed and open, 是如何处理的呢? There is congestion?
- 使用 select 时, 有哪些注意事项呢? 你知道 select Perform process?
最后就是 Push Concurrent sample, It is strongly recommended that everyone can master, Mastery of the sample, Later you should also can easily through channel 实现数据共享, 并结合 goroutine Write your own high concurrency.
边栏推荐
- 基于php家具销售管理系统获取(php毕业设计)
- 力扣刷题 每日两题(一)
- 新评论接口——京东评论接口
- When Nodejs installation depends on cpnm, the install shows Error: Cannot find module 'fs/promises'
- leetcode16最接近的三数之和 (排序+ 双指针)
- Basic principle of the bulk of the animation and shape the An animation tip point
- Sogou news - dataset
- Classes and objects (upper)
- An animation optimization of traditional guide layer animation
- 【深度学习】高效轻量级语义分割综述
猜你喜欢
An动画基础之按钮动画与基础代码相结合
Kubernetes 网络入门
GameFi 行业下滑但未出局| June Report
shell编程条件语句
YOLOv5 training data prompts No labels found, with_suffix is used, WARNING: Ignoring corrupted image and/or label appears during yolov5 training
BOM系列之sessionStorage
漫画:怎么证明sleep不释放锁,而wait释放锁?
【实战技能】单片机bootloader的CANFD,I2C,SPI和串口方式更新APP视频教程(2022-08-01)
Comics: how do you prove that sleep does not release the lock, and wait to release lock?
论文理解:“Gradient-enhanced physics-informed neural networks for forwardand inverse PDE problems“
随机推荐
图像融合SDDGAN文章学习
力扣刷题 每日两题(一)
Tinymce plugins [Tinymce扩展插件集合]
Golang 接口 interface
云计算服务主要安全风险及应对措施初探
leetcode 11. 盛最多水的容器
[Blue Bridge Cup Trial Question 48] Scratch Dance Machine Game Children's Programming Scratch Blue Bridge Cup Trial Question Explanation
可重入锁详解(什么是可重入)
图像融合DDcGAN学习笔记
Golang 字符串
基于php旅游网站管理系统获取(php毕业设计)
An工具介绍之形状工具及渐变变形工具
[Deep Learning] Overview of Efficient and Lightweight Semantic Segmentation
SQL分页查询_Sql根据某个字段分页
Real number rounding and writing to file (C language file)
AMS simulation
期货开户中常见问题汇总
一些测试相关知识
基于php家具销售管理系统获取(php毕业设计)
新评论接口——京东评论接口