当前位置:网站首页>Teach your sister to write the message queue hand in hand

Teach your sister to write the message queue hand in hand

2022-07-07 19:06:00 Golang DreamWorks

Preface

This week, my sister joined the new company , The boss wants to find out about him , Took a look at his resume , Whoa , Master kafka, This little girl has two sons , In this case , Then write a message queue . Because of the need to use go Language writing , This can make my sister sad . Come to me quickly , I am so faithful and unyielding , Under my sister's insistence, I promised him , So next, I taught my sister how to write a message queue . Now let's take a look at how I wrote it ~~~.

This code has been uploaded to my github: Small partners in need , You can download it yourself

What is message queuing

My sister really makes me sad , I am proficient in writing kafka, I don't even know what a message queue is , therefore , Always good tempered, I began to tell my sister what a message queue is .

Message queue , We generally call it MQ(Message Queue), A combination of two words , These two English words must be known by everyone , In fact, the most familiar thing is Queue Well , That's the queue . A queue is a first in, first out data structure , The use of queues is still relatively common , But there is already a queue , Why do you need MQ Well ?

I : Ask you , sister , Do you know? ? Why do we still need MQ? sister : Come on , Want to be beaten ? I : poof ... Count me talkative , hum ~~~

I started the next patient explanation ......

Let's take a simple example , Suppose now we want to build a system , The login system needs to be successful after the user login , Send an email to the user's email for reminder , The demand is still very simple , Let's have a look first MQ, How can we achieve it ? Draw a sequence diagram to have a look :

Look at this picture , The email is sent when requesting login , When the password is verified successfully , Just send an email , Then return to login successfully . This is OK , But he is flawed . This makes our login operation complicated , You need to send an email every time you request to log in , If there's a mistake here , The whole login request also has errors , It leads to unsuccessful login ; There's another problem , Originally, we only need 100ms, Because you have to wait for an email in the middle , Then the time to call a login interface will increase , That's the problem , His priority for an email Not very high , Users do not need to receive this email in real time , So then , This reflects the importance of message queues , Let's improve with message queue .

Here we will send an email request to Mq in , In this way, we can improve the throughput of user experience , This is very important , The customer is God , After all, no one likes to use a slow one app.

Here is just an example MQ One of many applications , Asynchronous application ,MQ Still decoupling the system 、 Peak shaving / It has important applications in current limiting , I won't explain these two in detail , The principle is the same , Think about it , You can all understand .

channel

All right. , Sister finally knows what a message queue is , But it is still impossible to develop message queues , Because there is still a lack of knowledge , namely go In language channel. This is very important , We also need this to develop our message queue .

Because of the limited space , It's not detailed here channel, Just introduce the basic usage .

What is? channel

Goroutine and Channel yes Go Two cornerstones of language concurrent programming .Goroutine Used to perform concurrent tasks ,Channel be used for goroutine Synchronization between 、 signal communication .Go Advocate the use of communication instead of shared memory , When one Goroutine Need and others Goroutine When sharing resources ,Channel A bridge will be built between them , And provide a mechanism to ensure safe synchronization .channel In essence, it is still a queue , follow FIFO principle . The specific rules are as follows :

  • First from Channel Reading data Goroutine Will receive the data first ;
  • First direction Channel Sending data Goroutine Will get the right to send data first ;

Create channels

Creating channels requires keywords make , The format is as follows :

 Examples of channels  := make(chan  data type )
  • data type : The type of element transferred within the channel .
  • Examples of channels : adopt make The created channel handle .

Use of unbuffered channels

Go Unbuffered channels in languages (unbuffered channel) It refers to the channel that does not have the ability to save any value before receiving . This type of channel requires sending goroutine And receiving goroutine At the same time be ready to , To complete the sending and receiving operations .

Unbuffered channels are defined as follows :

 Examples of channels  := make(chan  Channel type )
  • Channel type : Consistent with unbuffered channel usage , Affect the type of data sent and received by the channel .
  • Buffer size :0
  • Examples of channels : The created channel instance .

Write an example to help you understand :

package main

import (
    "sync"
    "time"
)

func main() {
    c := make(chan string)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        c <- `Golang DreamWorks `
    }()

    go func() {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(`Message: `+ <-c)
    }()

    wg.Wait()
}

Use of buffered channels

Go There are buffered channels in the language (buffered channel) A channel that can store one or more values before being received . This type of channel is not mandatory goroutine Must be sent and received at the same time . The channel will block and the conditions for sending and receiving actions will be different . Only if there is no value to receive in the channel , Receiving action will block . Only if the channel has no available buffer to hold the value being sent , Send action will block .

Buffered channels are defined as follows :

 Examples of channels  := make(chan  Channel type ,  Buffer size )
  • Channel type : Consistent with unbuffered channel usage , Affect the type of data sent and received by the channel .
  • Buffer size : Determines the maximum number of elements that a channel can hold .
  • Examples of channels : The created channel instance .

Let's write an example to explain :

package main

import (
    "sync"
    "time"
)

func main() {
    c := make(chan string, 2)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()

        c <- `Golang DreamWorks `
        c <- `asong`
    }()

    go func() {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(` official account : `+ <-c)
        println(` author : `+ <-c)
    }()

    wg.Wait()
}

All right. , The concept of channel is introduced here , if necessary , In the next article, I'll make one channel A detailed article .

Message queue coding implementation

Preparation

Finally, it begins to enter the theme , My sister is almost asleep , I roared , Immediate spirit , But what? ,asong Also got a small electric gun , The price is painful , Purring ............

At the beginning of writing code, write directly , I need to conceive our entire code architecture , This is the correct encoding . Let's define an interface first , List the methods we need to implement first , It's OK to implement each code later . Therefore, the following methods can be listed :

type Broker interface {
 publish(topic string, msg interface{}) error
 subscribe(topic string) (<-chan interface{}, error)
 unsubscribe(topic string, sub <-chan interface{}) error
 close()
 broadcast(msg interface{}, subscribers []chan interface{})
 setConditions(capacity int)
}
  • publish: Push messages , There are two parameters, namely topicmsg, They are the topics of subscription 、 Message to be delivered
  • subscribe: News subscription , Incoming subscription topic , To complete the subscription , And return the corresponding channel Channels are used to receive data
  • unsubscribe: Unsubscribe , Enter the topic of the subscription and the corresponding channel
  • close: The effect of this is obvious , It is used to close the message queue
  • broadCast: This is an internal method , The function is to broadcast , Broadcast the pushed message , Ensure that every subscriber can receive
  • setConditions: Here is used to set conditions , The condition is the capacity of the message queue , In this way, we can control the size of the message queue

Carefully, have you found any problems , These codes I define are internal methods , That is, it is not available outside the package . Why do you do this , Because this is what the agent has to do , We also need a layer of encapsulation , That is, the methods that the client can call directly , Only in this way can it conform to the software architecture . So you can write the following code :

package mq


type Client struct {
 bro *BrokerImpl
}

func NewClient() *Client {
 return &Client{
  bro: NewBroker(),
 }
}

func (c *Client)SetConditions(capacity int)  {
 c.bro.setConditions(capacity)
}

func (c *Client)Publish(topic string, msg interface{}) error{
 return c.bro.publish(topic,msg)
}

func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
 return c.bro.subscribe(topic)
}

func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
 return c.bro.unsubscribe(topic,sub)
}

func (c *Client)Close()  {
  c.bro.close()
}

func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

The above is just about the code structure , But we haven't designed the structure of message queue implementation , Now let's design .

type BrokerImpl struct {
 exit chan bool
 capacity int

 topics map[string][]chan interface{} // key:topic  value :queue
 sync.RWMutex //  Synchronization lock 
}
  • exit: It is also a channel , This is used to close the message queue
  • capacity: It is used to set the capacity of the message queue
  • topics: Here we use a map structure ,key That is topic, Its value is a slice ,chan type , The reason for doing this here is that we are one topic There can be multiple subscribers , So a subscriber corresponds to a channel
  • sync.RWMutex: Read-write lock , This is to prevent concurrency , Data push error , Therefore, the way of locking is adopted to ensure

All right. , Now we are well prepared , Start the next method filling trip ~~~

Publish and broadcast

The reason why the two are combined here is braodcast It belongs to publish Inside . The idea here is very simple , We just need to broadcast the incoming data , Now let's look at the code implementation :

func (b *BrokerImpl) publish(topic string, pub interface{}) error {
 select {
 case <-b.exit:
  return errors.New("broker closed")
 default:
 }

 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()
 if !ok {
  return nil
 }

 b.broadcast(pub, subscribers)
 return nil
}


func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
 count := len(subscribers)
 concurrency := 1

 switch {
 case count > 1000:
  concurrency = 3
 case count > 100:
  concurrency = 2
 default:
  concurrency = 1
 }
 pub := func(start int) {
  for j := start; j < count; j += concurrency {
   select {
   case subscribers[j] <- msg:
   case <-time.After(time.Millisecond * 5):
   case <-b.exit:
    return
   }
  }
 }
 for i := 0; i < concurrency; i++ {
  go pub(i)
 }
}

publish There is nothing to say in the method , Here's the main thing broadcast The implementation of the :

Here we mainly broadcast data , So just push the data , There is no need to wait for him to push successfully , So here we use goroutine. While pushing , When push fails , We can't wait all the time , So here we add a timeout mechanism , exceed 5 Stop pushing in milliseconds , Then push the following .

You may have doubts , Why is there another one on it switch Options , What's the use ? Consider such a question , When there are a large number of subscribers ,, such as 10000 individual , One of us for Cycle to push messages , It will take a lot of time to push once , And there will be delays between different consumers ,, Therefore, using this method to decompose can reduce a certain amount of time .

subscribe and unsubScribe

Let's look at the code first :

func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
 select {
 case <-b.exit:
  return nil, errors.New("broker closed")
 default:
 }

 ch := make(chan interface{}, b.capacity)
 b.Lock()
 b.topics[topic] = append(b.topics[topic], ch)
 b.Unlock()
 return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
 select {
 case <-b.exit:
  return errors.New("broker closed")
 default:
 }

 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()

 if !ok {
  return nil
 }
 // delete subscriber
 var newSubs []chan interface{}
 for _, subscriber := range subscribers {
  if subscriber == sub {
   continue
  }
  newSubs = append(newSubs, subscriber)
 }

 b.Lock()
 b.topics[topic] = newSubs
 b.Unlock()
 return nil
}

It's actually very simple here :

  • subscribe: The implementation here is to create a channel, Then add the subscriber to the corresponding topic Medium will do , And return a receive channel.
  • unsubScribe: The idea of implementation here is to add what we just added channel Delete it .

close

func (b *BrokerImpl) close()  {
 select {
 case <-b.exit:
  return
 default:
  close(b.exit)
  b.Lock()
  b.topics = make(map[string][]chan interface{})
  b.Unlock()
 }
 return
}

This is to close the entire message queue , This code b.topics = make(map[string][]chan interface{}) More important , The main purpose here is to ensure that there is no conflict when using the message queue next time .

setConditionsGetPayLoad

We still need the last two methods , One is to set our message queue capacity , The other is to encapsulate a method to get the messages we subscribe to :

func (b *BrokerImpl)setConditions(capacity int)  {
 b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

test

All right. , The code was written so quickly , Next, let's have a test .

unit testing

Before the formal test , We still need to do unit tests first , Form good habits , Only self-test first , To have the confidence to say that my code is ok , Don't run the program directly , There will be a lot of bug Of .

Here our test method is as follows : We are different topic Send different messages , When the subscriber receives the message , Just cancel the subscription .

func TestClient(t *testing.T) {
 b := NewClient()
 b.SetConditions(100)
 var wg sync.WaitGroup

 for i := 0; i < 100; i++ {
  topic := fmt.Sprintf("Golang DreamWorks %d", i)
  payload := fmt.Sprintf("asong%d", i)

  ch, err := b.Subscribe(topic)
  if err != nil {
   t.Fatal(err)
  }

  wg.Add(1)
  go func() {
   e := b.GetPayLoad(ch)
   if e != payload {
    t.Fatalf("%s expected %s but get %s", topic, payload, e)
   }
   if err := b.Unsubscribe(topic, ch); err != nil {
    t.Fatal(err)
   }
   wg.Done()
  }()

  if err := b.Publish(topic, payload); err != nil {
   t.Fatal(err)
  }
 }

 wg.Wait()
}

The test passed , That's all right. , Next, we are writing several methods to test

test

There are two ways to test

Test one : Use a timer , Regularly push messages to a topic .

//  One topic  test 
func OnceTopic()  {
 m := mq.NewClient()
 m.SetConditions(10)
 ch,err :=m.Subscribe(topic)
 if err != nil{
  fmt.Println("subscribe failed")
  return
 }
 go OncePub(m)
 OnceSub(ch,m)
 defer m.Close()
}

//  Timed push 
func OncePub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   err := c.Publish(topic,"asong How handsome ")
   if err != nil{
    fmt.Println("pub message failed")
   }
  default:

  }
 }
}

//  Accept subscription messages 
func OnceSub(m <-chan interface{},c *mq.Client)  {
 for  {
  val := c.GetPayLoad(m)
  fmt.Printf("get message is %s\n",val)
 }
}

Test two : Use a timer , Send messages to multiple topics regularly :

// Multiple topic test 
func ManyTopic()  {
 m := mq.NewClient()
 defer m.Close()
 m.SetConditions(10)
 top := ""
 for i:=0;i<10;i++{
  top = fmt.Sprintf("Golang DreamWorks _%02d",i)
  go Sub(m,top)
 }
 ManyPub(m)
}

func ManyPub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   for i:= 0;i<10;i++{
    // Multiple topic  Push different messages 
    top := fmt.Sprintf("Golang DreamWorks _%02d",i)
    payload := fmt.Sprintf("asong How handsome _%02d",i)
    err := c.Publish(top,payload)
    if err != nil{
     fmt.Println("pub message failed")
    }
   }
  default:

  }
 }
}

func Sub(c *mq.Client,top string)  {
 ch,err := c.Subscribe(top)
 if err != nil{
  fmt.Printf("sub top:%s failed\n",top)
 }
 for  {
  val := c.GetPayLoad(ch)
  if val != nil{
   fmt.Printf("%s get message is %s\n",top,val)
  }
 }
}

summary

Finally, I helped my sister solve this problem , My sister is so happy , Give me a kiss , Ah, no , It's a compliment , People who boast are embarrassed .

Have you learned this one ? It doesn't matter if you don't learn , Hurry to download the source code , Read through carefully , It's easy to understand ~~~.

In fact, this one is for the next kafka Learning to lay the foundation , Learned this article well , The next thing to learn kafka It will be much easier ~~~

github Address :https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue

原网站

版权声明
本文为[Golang DreamWorks]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207071653298194.html