当前位置:网站首页>Go zero micro service practical series (IX. ultimate optimization of seckill performance)
Go zero micro service practical series (IX. ultimate optimization of seckill performance)
2022-07-04 15:01:00 【kevwan】
In the last article, message queue was introduced to cut the peak of spike traffic , We use Kafka, It seems to work well , But there are still many hidden dangers , If these hidden dangers are not optimized and disposed , Then there may be news accumulation after the spike buying activity starts 、 Delay in consumption 、 Data inconsistency 、 Even problems such as service crash , Then the consequences can be imagined . In this article, we will work together to solve these hidden dangers .
Bulk data aggregation
stay SeckillOrder In this method , Every second kill rush purchase request is often Kafka Send a message in . If 10 million users rush to buy at the same time , Even if we have made various current limiting strategies , In an instant, millions of messages may be sent Kafka, Will produce a large number of Networks IO And disk IO cost , Everybody knows Kafka Is a log based messaging system , Although writing messages is mostly in order IO, But when a large number of messages are written at the same time, it may still be unbearable .
How to solve this problem ? The answer is to aggregate messages . Sending a message before will generate a network IO And primary disk IO, After we do message aggregation , Like polymerization 100 Send a message to Kafka, This is the time 100 A message will generate a network IO And disk IO, To the whole Kafka The throughput and performance of is a great improvement . In fact, this is an idea of small package aggregation , Or call it Batch Or the idea of batch . This kind of thought can also be seen everywhere , For example, we use Mysql When inserting batch data , You can go through one SQL Statement execution rather than loop by loop insertion , also Redis Of Pipeline Operation and so on .
Then how to aggregate , What is the aggregation strategy ? The aggregation strategy has two dimensions: the number of aggregated messages and the aggregation time , For example, aggregate messages reach 100 We'll go to Kafka Send it once , This number can be configured , If you can't reach it all the time 100 What about this message ? Through the polymerization time to reveal , The aggregation time can also be configured , For example, configure the aggregation time to 1 Second , That is, no matter how many messages are aggregated at present, as long as the aggregation time reaches 1 second , Then go to Kafka Send data once . The relationship between the number of aggregations and the aggregation time is or , That is, as long as one condition is satisfied .
Here we provide a tool to aggregate data in batches Batcher, The definition is as follows
type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup}
Do Method : When the aggregation condition is met, it will execute Do Method , among val Parameters are aggregated data
Sharding Method : adopt Key Conduct sharding, same key Messages are written to the same channel in , Be the same goroutine Handle
stay merge There are two triggers in the method to execute Do The conditions of the method , First, when the number of aggregated data is greater than or equal to the set number , The second is when the timer set is triggered
The code implementation is relatively simple , The following is the specific implementation :
type msg struct { key string val interface{}}type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup}func New(opts ...Option) *Batcher { b := &Batcher{} for _, opt := range opts { opt.apply(&b.opts) } b.opts.check() b.chans = make([]chan *msg, b.opts.worker) for i := 0; i < b.opts.worker; i++ { b.chans[i] = make(chan *msg, b.opts.buffer) } return b}func (b *Batcher) Start() { if b.Do == nil { log.Fatal("Batcher: Do func is nil") } if b.Sharding == nil { log.Fatal("Batcher: Sharding func is nil") } b.wait.Add(len(b.chans)) for i, ch := range b.chans { go b.merge(i, ch) }}func (b *Batcher) Add(key string, val interface{}) error { ch, msg := b.add(key, val) select { case ch <- msg: default: return ErrFull } return nil}func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) { sharding := b.Sharding(key) % b.opts.worker ch := b.chans[sharding] msg := &msg{key: key, val: val} return ch, msg}func (b *Batcher) merge(idx int, ch <-chan *msg) { defer b.wait.Done() var ( msg *msg count int closed bool lastTicker = true interval = b.opts.interval vals = make(map[string][]interface{}, b.opts.size) ) if idx > 0 { interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker))) } ticker := time.NewTicker(interval) for { select { case msg = <-ch: if msg == nil { closed = true break } count++ vals[msg.key] = append(vals[msg.key], msg.val) if count >= b.opts.size { break } continue case <-ticker.C: if lastTicker { ticker.Stop() ticker = time.NewTicker(b.opts.interval) lastTicker = false } } if len(vals) > 0 { ctx := context.Background() b.Do(ctx, vals) vals = make(map[string][]interface{}, b.opts.size) count = 0 } if closed { ticker.Stop() return } }}func (b *Batcher) Close() { for _, ch := range b.chans { ch <- nil } b.wait.Wait()}
When using, you need to create a Batcher, Then define Batcher Of Sharding Methods and Do Method , stay Sharding In the method, we use ProductID Deliver the aggregation of different commodities to different goroutine In dealing with , stay Do In the method, we send the aggregated data to Kafka, The definition is as follows :
b := batcher.New( batcher.WithSize(batcherSize), batcher.WithBuffer(batcherBuffer), batcher.WithWorker(batcherWorker), batcher.WithInterval(batcherInterval),)b.Sharding = func(key string) int { pid, _ := strconv.ParseInt(key, 10, 64) return int(pid) % batcherWorker}b.Do = func(ctx context.Context, val map[string][]interface{}) { var msgs []*KafkaData for _, vs := range val { for _, v := range vs { msgs = append(msgs, v.(*KafkaData)) } } kd, err := json.Marshal(msgs) if err != nil { logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err) } if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil { logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err) }}s.batcher = bs.batcher.Start()
stay SeckillOrder The method is no longer to go every time a request is made Kafka Deliver a message in , But first through batcher Provided Add Method to add to Batcher Wait until the polymerization conditions are met before going Kafka China Post .
err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId})if err!= nil { logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err)}
Reduce the consumption delay of messages
Through the idea of batch message processing , We provide Batcher Tools , Improved performance , But this is mainly for the production side . When we consume data in batches , You still need to process data serially one by one , Is there any way to accelerate consumption and reduce the delay of consumption news ? There are two options :
- Increase the number of consumers
- Increase the parallelism of message processing in a consumer
Because in Kafka in , One Topci You can configure multiple Partition, The data will be written to multiple partitions on average or in the manner specified by the producer , So when consuming ,Kafka Agree that a partition can only be consumed by one consumer , Why do you design it like this ? What I understand is that if there are multiple Consumer Consume data of a partition at the same time , Then you need to lock when operating this consumption progress , It has a great impact on performance . So when the number of consumers is less than the number of partitions , We can increase the number of consumers to increase the message processing capacity , But when the number of consumers is greater than the number of partitions, it is meaningless to continue to increase the number of consumers .
Don't add Consumer When , Can be in the same Consumer Improve the parallelism of processing messages , That is, through multiple goroutine To parallel consumption data , Let's see how to pass multiple goroutine To consume news .
stay Service In the definition of msgsChan,msgsChan by Slice,Slice The length of indicates how many goroutine Processing data in parallel , The initialization is as follows :
func NewService(c config.Config) *Service { s := &Service{ c: c, ProductRPC: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)), OrderRPC: order.NewOrder(zrpc.MustNewClient(c.OrderRPC)), msgsChan: make([]chan *KafkaData, chanCount), } for i := 0; i < chanCount; i++ { ch := make(chan *KafkaData, bufferCount) s.msgsChan[i] = ch s.waiter.Add(1) go s.consume(ch) } return s}
from Kafka After data consumption , Deliver data to Channel in , Pay attention to the delivery of messages according to the commodity id do Sharding, This can ensure that in the same Consumer The processing of the same commodity in is serial , Serial data processing will not lead to data competition caused by concurrency
func (s *Service) Consume(_ string, value string) error { logx.Infof("Consume value: %s\n", value) var data []*KafkaData if err := json.Unmarshal([]byte(value), &data); err != nil { return err } for _, d := range data { s.msgsChan[d.Pid%chanCount] <- d } return nil}
We defined chanCount individual goroutine Processing data at the same time , Every channel The length of is defined as bufferCount, The method of parallel processing data is consume, as follows :
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err) return } if p.Stock <= 0 { logx.Errorf("stock is zero pid: %d", m.Pid) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } }}
How to guarantee that it will not oversold
When the second kill starts , A large number of users click the second kill button on the product details page , There will be a large number of concurrent requests to query inventory , Once a request finds inventory , Then the system will deduct inventory . then , The system generates the actual order , And carry out subsequent treatment . If the request does not find inventory , It will return , Users usually continue to click the second kill button , Continue to query inventory . Simply speaking , There are three operations in this stage : Check stock , Inventory deduction 、 And order processing . Because every second kill request will query the inventory , And the request only finds that there is a surplus in inventory , Subsequent inventory deduction and order processing will be executed , therefore , The biggest concurrent pressure in this stage is the inventory check operation .
In order to support a large number of highly concurrent inventory inspection requests , We need to use Redis Store inventory separately . that , Whether inventory deduction and order processing can be handed over to Mysql To deal with it ? Actually , Order processing can be performed in the database , However, inventory deduction cannot be entrusted to Mysql Deal directly with . Because it comes to the actual order processing , The pressure of asking is not much , The database can fully support these order processing requests . Then why can't inventory deduction be performed directly in the database ? This is because , Once the request is found in stock , It means that the request is eligible for purchase , Then the order will be placed , At the same time, the inventory will be reduced by one , At this time, if you directly operate the database to reduce inventory, it may lead to oversold .
Why does directly operating the database to reduce inventory lead to oversold ? Because the processing speed of the database is slow , Unable to update inventory balance in time , This will cause a large number of requests to query inventory to read the old inventory value , And place an order , At this point, the order quantity is greater than the actual inventory , Cause oversold . therefore , It needs to be directly in Redis Make inventory deduction in , The specific operation is , When the inventory is checked , Once there is a surplus in stock , We'll be right there Redis Less inventory , meanwhile , In order to avoid the request to query the old inventory value , Inventory check and inventory deduction need to ensure atomicity .
We use Redis Of Hash To store inventory ,total Total inventory ,seckill Is the number that has been killed in seconds , In order to ensure the atomicity of inventory query and inventory reduction , We use Lua Scripts perform atomic operations , Return when the second kill is less than the inventory 1, It means that the second kill is successful , Otherwise return to 0, It means that the second kill failed , The code is as follows :
const ( luaCheckAndUpdateScript = `local counts = redis.call("HMGET", KEYS[1], "total", "seckill")local total = tonumber(counts[1])local seckill = tonumber(counts[2])if seckill + 1 <= total then redis.call("HINCRBY", KEYS[1], "seckill", 1) return 1endreturn 0`)func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) { val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)}) if err != nil { return nil, err } if val.(int64) == 0 { return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId)) } return &product.CheckAndUpdateStockResponse{}, nil}func stockKey(pid int64) string { return fmt.Sprintf("stock:%d", pid)}
Corresponding seckill-rmq The code is modified as follows :
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } }}
Come here , We have learned how to use atomic Lua Script to check and deduct inventory . In fact, to ensure the atomicity of inventory inspection and deduction , There is another way , That is to use distributed locks .
There are many ways to implement distributed locks , Can be based on Redis、Etcd wait , use Redis There are many articles about implementing distributed locks , Those who are interested can search for references by themselves . Here is a brief introduction based on Etcd To implement distributed locking . To simplify distributed locks 、 Distributed election 、 Implementation of distributed transactions ,etcd The community offers a program called concurrency To help us make it easier 、 Use distributed locks correctly . Its implementation is very simple , The main process is as follows :
- First, through concurrency.NewSession Method creation Session, Essentially, it creates a TTL by 10 Of Lease
- obtain Session After the object , adopt concurrency.NewMutex Create a mutex object , It includes Lease、key prefix Etc
- Then I heard it mutex Object's Lock Method attempts to acquire the lock
- Finally through mutex Object's Unlock Method release lock
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})if err != nil { log.Fatal(err)}defer cli.Close()session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))if err != nil { log.Fatal(err)}defer session.Close()mux := concurrency.NewMutex(session, "lock")if err := mux.Lock(context.Background()); err != nil { log.Fatal(err)}if err := mux.Unlock(context.Background()); err != nil { log.Fatal(err)}
Conclusion
This article mainly aims to continue to optimize the seckill function . stay Kafka The production side of the message has optimized the delivery of bulk message aggregation ,Batch Ideas are often used in actual production and development , I hope you can use it flexibly , On the consumer side of the message, increase the parallelism to improve the throughput , This is also a common optimization method to improve performance . Finally, it introduces the reasons that may lead to oversold , And give the corresponding solutions . meanwhile , Based on Etcd Distributed locks for , Data competition often occurs in distributed services , Generally, it can be solved through distributed locks , However, the introduction of distributed locks will inevitably lead to performance degradation , therefore , We also need to consider whether we need to introduce distributed locks in combination with the actual situation .
I hope this article can help you , thank you .
Every Monday 、 Thursday update
Code warehouse : https://github.com/zhoushuguang/lebron
Project address
https://github.com/zeromicro/go-zero
https://gitee.com/kevwan/go-zero
Welcome to use go-zero
and star Support us !
WeChat ac group
Focus on 『 Microservice practice 』 Official account and click Communication group Get community group QR code .
边栏推荐
- LVGL 8.2 LED
- [C language] Pointer written test questions
- (1) The standard of performance tuning and the correct posture for tuning - if you have performance problems, go to the heapdump performance community!
- Free, easy-to-use, powerful lightweight note taking software evaluation: drafts, apple memo, flomo, keep, flowus, agenda, sidenote, workflow
- 进制形式
- Memory management summary
- LVGL 8.2 List
- 2022 financial products that can be invested
- 如何配和弦
- Flutter reports an error no mediaquery widget ancestor found
猜你喜欢
Ranking list of databases in July: mongodb and Oracle scores fell the most
Introduction to asynchronous task capability of function calculation - task trigger de duplication
Redis 发布和订阅
现代控制理论入门+理解
The performance of major mainstream programming languages is PK, and the results are unexpected
Kubernets pod exists finalizers are always in terminating state
Five minutes of machine learning every day: how to use matrix to represent the sample data of multiple characteristic variables?
Intelligent customer service track: Netease Qiyu and Weier technology play different ways
程序员自曝接私活:10个月时间接了30多个单子,纯收入40万
Guitar Pro 8win10最新版吉他学习 / 打谱 / 创作
随机推荐
Red envelope activity design in e-commerce system
No servers available for service: xxxx
开发中常见问题总结
Techsmith Camtasia Studio 2022.0.2屏幕录制软件
LVGL 8.2 text shadow
《opencv学习笔记》-- 线性滤波:方框滤波、均值滤波、高斯滤波
Is BigDecimal safe to calculate the amount? Look at these five pits~~
EventBridge 在 SaaS 企业集成领域的探索与实践
Ffprobe common commands
Luo Gu - some interesting questions 2
Gin integrated Alipay payment
Introduction to asynchronous task capability of function calculation - task trigger de duplication
重排数组
LVGL 8.2 text shadow
十六进制
深度学习7 Transformer系列实例分割Mask2Former
Kubernets Pod 存在 Finalizers 一直处于 Terminating 状态
LVLG 8.2 circular scrolling animation of a label
宽度精度
Openresty current limiting