当前位置:网站首页>Teach your sister to write the message queue hand in hand
Teach your sister to write the message queue hand in hand
2022-07-07 19:06:00 【Golang DreamWorks】
Preface
This week, my sister joined the new company , The boss wants to find out about him , Took a look at his resume , Whoa , Master kafka, This little girl has two sons , In this case , Then write a message queue . Because of the need to use go Language writing , This can make my sister sad . Come to me quickly , I am so faithful and unyielding , Under my sister's insistence, I promised him , So next, I taught my sister how to write a message queue . Now let's take a look at how I wrote it ~~~.
This code has been uploaded to my github: Small partners in need , You can download it yourself
What is message queuing
My sister really makes me sad , I am proficient in writing kafka
, I don't even know what a message queue is , therefore , Always good tempered, I began to tell my sister what a message queue is .
Message queue , We generally call it MQ(Message Queue)
, A combination of two words , These two English words must be known by everyone , In fact, the most familiar thing is Queue
Well , That's the queue . A queue is a first in, first out data structure , The use of queues is still relatively common , But there is already a queue , Why do you need MQ
Well ?
I : Ask you , sister , Do you know? ? Why do we still need
MQ
? sister : Come on , Want to be beaten ? I : poof ... Count me talkative , hum ~~~
I started the next patient explanation ......
Let's take a simple example , Suppose now we want to build a system , The login system needs to be successful after the user login , Send an email to the user's email for reminder , The demand is still very simple , Let's have a look first MQ
, How can we achieve it ? Draw a sequence diagram to have a look :
Look at this picture , The email is sent when requesting login , When the password is verified successfully , Just send an email , Then return to login successfully . This is OK , But he is flawed . This makes our login operation complicated , You need to send an email every time you request to log in , If there's a mistake here , The whole login request also has errors , It leads to unsuccessful login ; There's another problem , Originally, we only need 100ms, Because you have to wait for an email in the middle , Then the time to call a login interface will increase , That's the problem , His priority for an email Not very high , Users do not need to receive this email in real time , So then , This reflects the importance of message queues , Let's improve with message queue .
Here we will send an email request to Mq
in , In this way, we can improve the throughput of user experience , This is very important , The customer is God , After all, no one likes to use a slow one app.
Here is just an example MQ
One of many applications , Asynchronous application ,MQ
Still decoupling the system 、 Peak shaving / It has important applications in current limiting , I won't explain these two in detail , The principle is the same , Think about it , You can all understand .
channel
All right. , Sister finally knows what a message queue is , But it is still impossible to develop message queues , Because there is still a lack of knowledge , namely go In language channel
. This is very important , We also need this to develop our message queue .
Because of the limited space , It's not detailed here channel
, Just introduce the basic usage .
What is? channel
Goroutine and Channel yes Go Two cornerstones of language concurrent programming .Goroutine Used to perform concurrent tasks ,Channel be used for goroutine Synchronization between 、 signal communication .Go Advocate the use of communication instead of shared memory , When one Goroutine Need and others Goroutine When sharing resources ,Channel A bridge will be built between them , And provide a mechanism to ensure safe synchronization .channel
In essence, it is still a queue , follow FIFO principle . The specific rules are as follows :
- First from Channel Reading data Goroutine Will receive the data first ;
- First direction Channel Sending data Goroutine Will get the right to send data first ;
Create channels
Creating channels requires keywords make , The format is as follows :
Examples of channels := make(chan data type )
- data type : The type of element transferred within the channel .
- Examples of channels : adopt make The created channel handle .
Use of unbuffered channels
Go Unbuffered channels in languages (unbuffered channel) It refers to the channel that does not have the ability to save any value before receiving . This type of channel requires sending goroutine And receiving goroutine At the same time be ready to , To complete the sending and receiving operations .
Unbuffered channels are defined as follows :
Examples of channels := make(chan Channel type )
- Channel type : Consistent with unbuffered channel usage , Affect the type of data sent and received by the channel .
- Buffer size :0
- Examples of channels : The created channel instance .
Write an example to help you understand :
package main
import (
"sync"
"time"
)
func main() {
c := make(chan string)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang DreamWorks `
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(`Message: `+ <-c)
}()
wg.Wait()
}
Use of buffered channels
Go There are buffered channels in the language (buffered channel) A channel that can store one or more values before being received . This type of channel is not mandatory goroutine Must be sent and received at the same time . The channel will block and the conditions for sending and receiving actions will be different . Only if there is no value to receive in the channel , Receiving action will block . Only if the channel has no available buffer to hold the value being sent , Send action will block .
Buffered channels are defined as follows :
Examples of channels := make(chan Channel type , Buffer size )
- Channel type : Consistent with unbuffered channel usage , Affect the type of data sent and received by the channel .
- Buffer size : Determines the maximum number of elements that a channel can hold .
- Examples of channels : The created channel instance .
Let's write an example to explain :
package main
import (
"sync"
"time"
)
func main() {
c := make(chan string, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang DreamWorks `
c <- `asong`
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(` official account : `+ <-c)
println(` author : `+ <-c)
}()
wg.Wait()
}
All right. , The concept of channel is introduced here , if necessary , In the next article, I'll make one channel
A detailed article .
Message queue coding implementation
Preparation
Finally, it begins to enter the theme , My sister is almost asleep , I roared , Immediate spirit , But what? ,asong Also got a small electric gun , The price is painful , Purring ............
At the beginning of writing code, write directly , I need to conceive our entire code architecture , This is the correct encoding . Let's define an interface first , List the methods we need to implement first , It's OK to implement each code later . Therefore, the following methods can be listed :
type Broker interface {
publish(topic string, msg interface{}) error
subscribe(topic string) (<-chan interface{}, error)
unsubscribe(topic string, sub <-chan interface{}) error
close()
broadcast(msg interface{}, subscribers []chan interface{})
setConditions(capacity int)
}
publish
: Push messages , There are two parameters, namelytopic
、msg
, They are the topics of subscription 、 Message to be deliveredsubscribe
: News subscription , Incoming subscription topic , To complete the subscription , And return the correspondingchannel
Channels are used to receive dataunsubscribe
: Unsubscribe , Enter the topic of the subscription and the corresponding channelclose
: The effect of this is obvious , It is used to close the message queuebroadCast
: This is an internal method , The function is to broadcast , Broadcast the pushed message , Ensure that every subscriber can receivesetConditions
: Here is used to set conditions , The condition is the capacity of the message queue , In this way, we can control the size of the message queue
Carefully, have you found any problems , These codes I define are internal methods , That is, it is not available outside the package . Why do you do this , Because this is what the agent has to do , We also need a layer of encapsulation , That is, the methods that the client can call directly , Only in this way can it conform to the software architecture . So you can write the following code :
package mq
type Client struct {
bro *BrokerImpl
}
func NewClient() *Client {
return &Client{
bro: NewBroker(),
}
}
func (c *Client)SetConditions(capacity int) {
c.bro.setConditions(capacity)
}
func (c *Client)Publish(topic string, msg interface{}) error{
return c.bro.publish(topic,msg)
}
func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
return c.bro.subscribe(topic)
}
func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
return c.bro.unsubscribe(topic,sub)
}
func (c *Client)Close() {
c.bro.close()
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
The above is just about the code structure , But we haven't designed the structure of message queue implementation , Now let's design .
type BrokerImpl struct {
exit chan bool
capacity int
topics map[string][]chan interface{} // key:topic value :queue
sync.RWMutex // Synchronization lock
}
exit
: It is also a channel , This is used to close the message queuecapacity
: It is used to set the capacity of the message queuetopics
: Here we use a map structure ,key That istopic
, Its value is a slice ,chan
type , The reason for doing this here is that we are one topic There can be multiple subscribers , So a subscriber corresponds to a channelsync.RWMutex
: Read-write lock , This is to prevent concurrency , Data push error , Therefore, the way of locking is adopted to ensure
All right. , Now we are well prepared , Start the next method filling trip ~~~
Publish
and broadcast
The reason why the two are combined here is braodcast
It belongs to publish
Inside . The idea here is very simple , We just need to broadcast the incoming data , Now let's look at the code implementation :
func (b *BrokerImpl) publish(topic string, pub interface{}) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
b.broadcast(pub, subscribers)
return nil
}
func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
count := len(subscribers)
concurrency := 1
switch {
case count > 1000:
concurrency = 3
case count > 100:
concurrency = 2
default:
concurrency = 1
}
pub := func(start int) {
for j := start; j < count; j += concurrency {
select {
case subscribers[j] <- msg:
case <-time.After(time.Millisecond * 5):
case <-b.exit:
return
}
}
}
for i := 0; i < concurrency; i++ {
go pub(i)
}
}
publish
There is nothing to say in the method , Here's the main thing broadcast
The implementation of the :
Here we mainly broadcast data , So just push the data , There is no need to wait for him to push successfully , So here we use goroutine
. While pushing , When push fails , We can't wait all the time , So here we add a timeout mechanism , exceed 5 Stop pushing in milliseconds , Then push the following .
You may have doubts , Why is there another one on it switch
Options , What's the use ? Consider such a question , When there are a large number of subscribers ,, such as 10000 individual , One of us for Cycle to push messages , It will take a lot of time to push once , And there will be delays between different consumers ,, Therefore, using this method to decompose can reduce a certain amount of time .
subscribe
and unsubScribe
Let's look at the code first :
func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
select {
case <-b.exit:
return nil, errors.New("broker closed")
default:
}
ch := make(chan interface{}, b.capacity)
b.Lock()
b.topics[topic] = append(b.topics[topic], ch)
b.Unlock()
return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
// delete subscriber
var newSubs []chan interface{}
for _, subscriber := range subscribers {
if subscriber == sub {
continue
}
newSubs = append(newSubs, subscriber)
}
b.Lock()
b.topics[topic] = newSubs
b.Unlock()
return nil
}
It's actually very simple here :
subscribe
: The implementation here is to create achannel
, Then add the subscriber to the correspondingtopic
Medium will do , And return a receivechannel
.unsubScribe
: The idea of implementation here is to add what we just addedchannel
Delete it .
close
func (b *BrokerImpl) close() {
select {
case <-b.exit:
return
default:
close(b.exit)
b.Lock()
b.topics = make(map[string][]chan interface{})
b.Unlock()
}
return
}
This is to close the entire message queue , This code b.topics = make(map[string][]chan interface{})
More important , The main purpose here is to ensure that there is no conflict when using the message queue next time .
setConditions
GetPayLoad
We still need the last two methods , One is to set our message queue capacity , The other is to encapsulate a method to get the messages we subscribe to :
func (b *BrokerImpl)setConditions(capacity int) {
b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
test
All right. , The code was written so quickly , Next, let's have a test .
unit testing
Before the formal test , We still need to do unit tests first , Form good habits , Only self-test first , To have the confidence to say that my code is ok , Don't run the program directly , There will be a lot of bug
Of .
Here our test method is as follows : We are different topic
Send different messages , When the subscriber receives the message , Just cancel the subscription .
func TestClient(t *testing.T) {
b := NewClient()
b.SetConditions(100)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
topic := fmt.Sprintf("Golang DreamWorks %d", i)
payload := fmt.Sprintf("asong%d", i)
ch, err := b.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
wg.Add(1)
go func() {
e := b.GetPayLoad(ch)
if e != payload {
t.Fatalf("%s expected %s but get %s", topic, payload, e)
}
if err := b.Unsubscribe(topic, ch); err != nil {
t.Fatal(err)
}
wg.Done()
}()
if err := b.Publish(topic, payload); err != nil {
t.Fatal(err)
}
}
wg.Wait()
}
The test passed , That's all right. , Next, we are writing several methods to test
test
There are two ways to test
Test one : Use a timer , Regularly push messages to a topic .
// One topic test
func OnceTopic() {
m := mq.NewClient()
m.SetConditions(10)
ch,err :=m.Subscribe(topic)
if err != nil{
fmt.Println("subscribe failed")
return
}
go OncePub(m)
OnceSub(ch,m)
defer m.Close()
}
// Timed push
func OncePub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
err := c.Publish(topic,"asong How handsome ")
if err != nil{
fmt.Println("pub message failed")
}
default:
}
}
}
// Accept subscription messages
func OnceSub(m <-chan interface{},c *mq.Client) {
for {
val := c.GetPayLoad(m)
fmt.Printf("get message is %s\n",val)
}
}
Test two : Use a timer , Send messages to multiple topics regularly :
// Multiple topic test
func ManyTopic() {
m := mq.NewClient()
defer m.Close()
m.SetConditions(10)
top := ""
for i:=0;i<10;i++{
top = fmt.Sprintf("Golang DreamWorks _%02d",i)
go Sub(m,top)
}
ManyPub(m)
}
func ManyPub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
for i:= 0;i<10;i++{
// Multiple topic Push different messages
top := fmt.Sprintf("Golang DreamWorks _%02d",i)
payload := fmt.Sprintf("asong How handsome _%02d",i)
err := c.Publish(top,payload)
if err != nil{
fmt.Println("pub message failed")
}
}
default:
}
}
}
func Sub(c *mq.Client,top string) {
ch,err := c.Subscribe(top)
if err != nil{
fmt.Printf("sub top:%s failed\n",top)
}
for {
val := c.GetPayLoad(ch)
if val != nil{
fmt.Printf("%s get message is %s\n",top,val)
}
}
}
summary
Finally, I helped my sister solve this problem , My sister is so happy , Give me a kiss , Ah, no , It's a compliment , People who boast are embarrassed .
Have you learned this one ? It doesn't matter if you don't learn , Hurry to download the source code , Read through carefully , It's easy to understand ~~~.
In fact, this one is for the next kafka Learning to lay the foundation , Learned this article well , The next thing to learn kafka It will be much easier ~~~
github Address :https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue
边栏推荐
- 2022-07-04 matlab读取视频帧并保存
- Nunjuks template engine
- I feel cheated. Wechat tests the function of "size number" internally, and two wechat can be registered with the same mobile number
- Kirk Borne的本周学习资源精选【点击标题直接下载】
- Will domestic software testing be biased
- SD_ DATA_ SEND_ SHIFT_ REGISTER
- Reinforcement learning - learning notes 8 | Q-learning
- Industry case | digital operation base helps the transformation of life insurance industry
- Charles+drony的APP抓包
- Cadre de validation des données Apache bval réutilisé
猜你喜欢
A hodgepodge of ICER knowledge points (attached with a large number of topics, which are constantly being updated)
小试牛刀之NunJucks模板引擎
Thread pool and singleton mode and file operation
Multimodal point cloud fusion and visual location based on image and laser
Redis集群与扩展
Reinforcement learning - learning notes 8 | Q-learning
标准ACL与扩展ACL
NAT地址转换
gsap动画库
RIP和OSPF的区别和配置命令
随机推荐
數據驗證框架 Apache BVal 再使用
Reject policy of thread pool
SlashData开发者工具榜首等你而定!!!
微信网页调试8.0.19换掉X5内核,改用xweb,所以x5调试方式已经不能用了,现在有了解决方案
Interview vipshop internship testing post, Tiktok internship testing post [true submission]
In 2021, the national average salary was released. Have you reached the standard?
Redis publishing and subscription
[unity shader] insert pass to realize the X-ray perspective effect of model occlusion
What is the general yield of financial products in 2022?
[tpm2.0 principle and Application guide] Chapter 9, 10 and 11
IP netns command (memo)
Charles+Postern的APP抓包
【HDU】5248-序列变换(贪心+二分)「建议收藏」
[Blue Bridge Cup training 100 questions] sort scratch from small to large. Blue Bridge Cup scratch competition special prediction programming question centralized training simulation exercise question
AntiSamy:防 XSS 攻击的一种解决方案使用教程
Reinforcement learning - learning notes 8 | Q-learning
unity2d的Rigidbody2D的MovePosition函数移动时人物或屏幕抖动问题解决
Standard ACL and extended ACL
Kirk borne's selection of learning resources this week [click the title to download directly]
Nunjuks template engine