当前位置:网站首页>Go+mysql+redis+vue3 simple chat room, bullet 5: synchronize messages to MySQL using message queues and scheduled tasks
Go+mysql+redis+vue3 simple chat room, bullet 5: synchronize messages to MySQL using message queues and scheduled tasks
2022-07-26 20:56:00 【whynogome】
After sending the message , We need to save chat records to the database . Cold data with infrequent access such as chat records , Save in mysql Data is routine . But when the number of users increases , Message sending frequency increases , Will link frequently in a short time mysql, Large concurrency will cause mysql The block
We can use reids The message queue acts as an intermediate buffer , First save the chat records of users in the queue , During server idle time , Using scheduled tasks , Synchronizing data to mysql Then you can .redis It's memory based , Can bear more than mysql Much larger concurrency
go Connect redis
stay db Under the table of contents , establish redis.go file , obtain redis Connect
package db
import (
"github.com/garyburd/redigo/redis"
"project1/config"
)
//RedisPool It's a redis Connection pool , When using , Need to use get After getting a connection , Reuse
var RedisPool *redis.Pool
func init(){
addr := config.ConfigFile.Section("redis").Key("Addr").String()
// Create connection pool
RedisPool = &redis.Pool{
MaxIdle:15,// Initial number of connection pools
MaxActive:0,// Maximum number of connection pools , by 0 Show custom , Set on demand
IdleTimeout:300,// Connection closing time 300 second (300 Seconds do not use automatic shutdown )
Dial:func()(redis.Conn,error){
return redis.Dial("tcp",addr)
},
}
}
Use stream
Record first redis Use stream Common commands
- Insert data into a queue , If the queue does not exist, it will be created automatically
redis.Do(“XADD”,“chatMsg”,“*”,“json”,“{1:666}”)- Create a message consumption group under a queue , And specify that the consumer reads data from the queue id.
id by 0, To start from scratch .id by $ Indicates starting from the end of the queue , Accept only new news
redis.Do(“XGROUP”,“CREATE”,“chatMsg”,“group1”,“1655188517335-0”)- Query a queue , Specify the consumption group information of the name
//redis.Do(“XINFO”,“GROUPS”,“group1”)- Read messages under a consumption group
id by >, It means that only the latest news is read , After success , Consumer group last-delivered-id Move back a bit , The message will be in c1( A consumer under this consumer group , Does not exist will automatically create ) Of pending in , until ack, from pending Remove . Return when there is no message nil
id Not for >, Means read c1 pending( To be processed ) The news of , Incoming messages from id Start , Read the specified number . Pass on 0 It means reading from the first data . Only return when there is no message chatMsg character string .
//redis.Do(“XREADGROUP”,“GROUP”,“group1”,“c1”,“COUNT”,“1”,“STREAMS”,“chatMsg”,“>”)- Mark the consumer group message as processed
//redis.Do(“XACK”,“chatMsg”,“group1”,“1655188528552-0”)
In addition to the above common commands , In order to prevent stream More and more big , We can specify stream Maximum length of , The old data will be automatically deleted when it exceeds
It can also be in ack After success , Call the delete command , Directly from stream Delete the specified message
stay service newly build redis-mq.go file
package service
import (
"encoding/json"
"fmt"
"project1/db"
"project1/models"
)
// Put the chat information in the message queue , Insert into mysql database
func IntoMsgToMysql(){
// Message queue name
streamName := "chatMsg"
// Name of consumption group
// Message queue names and consumers among consumers , It can insert and read commands to create by self-determination , Create consumer groups , It needs to be executed separately
// At present, our project does not have the requirement of multithreading to read message queues , So create a consumption group , Directly in reids Execute the command line , news id Set to $, Not in the code
groupName := "group1"
// A consumer name in the consumer group
groupMember := "c1"
// from redis Connection pool , Get a link
redis := db.RedisPool.Get()
defer redis.Close()
// First read the consumer pending The messages in the , Read once 100 strip
// For compatibility with service interruption , Lead to pending The news in is consumed if it doesn't come , So check first pending Is there any news in the news
if val,err := redis.Do("XREADGROUP","GROUP",groupName,groupMember,"COUNT","100","STREAMS",streamName,0);err == nil{
// The data read from the message queue is an interface type data , Inside is the interface of slice type , There are multiple levels of nesting
// So we customize a recursive function interfaceToNormal, Used to traverse and transform the returned data , Returns a string slice
back := make([]string,0)
data := interfaceToNormal(val,back)
// natural data The data format is a string of slice type , as follows
//[chatMsg 1655188547271-0 data {"username":" Lao Wang ","userid":"2","msg":" Lao Wang : launched "}]
//chatMsg Is a regular output , The last three values represent a piece of data , If there are multiple pieces of data , With 3 One for a group and cycle back
// from pending When reading data in , If there is no value, a chatmsg character string
// The number of returned slices is 1, Indicates that there is no data waiting for consumption , Read again from the message queue
if len(data) < 4{
if val2,err2 := redis.Do("XREADGROUP","GROUP",groupName,groupMember,"COUNT","100","STREAMS",streamName,">");err2 == nil{
// When there is no data in the queue , return nil.pending When there is no data in , return [chatMsg]
if val2 == nil{
Mqlog("Success:stream msg is finish","info")
return
}
data = interfaceToNormal(val2,back)
}else{
Mqlog("get stream fail step2:"+err2.Error(),"error")
return
}
}
// Divide the number of slices by 3 Value , by data Medium data volume , Loop through according to this value data
//go In language , Operations between integers , The return will only be an integer
dataNum := len(data)/3
for i:=1;i<=dataNum;i++{
// Get a piece of information data, Position in the slice
dataKey := 3*i
// Get a piece of information Queue messages id, Position in the slice
dataTimeKey := dataKey-2
// Put the information data The character string is first converted to []byte
dataByte := []byte(data[dataKey])
// And then []byte Value to map Format
dataMap := make(map[string]string)
merr := json.Unmarshal(dataByte,&dataMap)
if merr != nil{
Mqlog("map Convert exceptions ","error")
}
dataMap["created_at"] = data[dataTimeKey][0:10]
// towards mysql insert data
if back := models.AddMsg(dataMap);back == "ok"{
// Insert the success , perform ACK
if _,ackErr := redis.Do("XACK","chatMsg","group1",data[dataTimeKey]);ackErr != nil{
Mqlog(ackErr.Error(),"error")
}
}else{
Mqlog(back,"error")
}
}
}else{
Mqlog("get stream fail step1:"+err.Error(),"error")
return
}
}
// Multi slice -> Interface -> section ..... Data conversion
func interfaceToNormal(data interface{
},back []string) []string{
for _,val := range data.([]interface{
}){
type1 := fmt.Sprintf("%T",val)
if type1 == "[]interface {}"{
back = interfaceToNormal(val.([]interface{
}),back)
}else{
var typeName int64
if type1 == "[]byte"{
typeName = 1
}else{
typeName = 2
}
back = append(back,anyToString(typeName,val))
}
}
return back
}
func anyToString(typeName int64,info interface{
}) string{
back := ""
if typeName == 1 {
data := info.([]byte)
back = string(data)
}else if typeName == 2{
data := info.([]uint8)
back = string(data)
}
return back
}
Next, we need to establish a scheduled task , Call in the scheduled task IntoMsgToMysql The method can
stay service Create under directory gron.go file
package service
import (
"fmt"
"github.com/roylee0704/gron"
"github.com/roylee0704/gron/xtime"
)
// Timing task
//go The main process needs to keep running , Otherwise, the scheduled task will exit with the main process
func GronAddMysql(){
c := gron.New()
c.AddFunc(gron.Every(1*xtime.Minute),func(){
IntoMsgToMysql()})
c.Start()
}
// Use cases
type printJob struct{
Msg string}
func (p printJob) Run() {
fmt.Println(p.Msg)
}
func test() {
var (
// schedules
daily = gron.Every(1 * xtime.Day)
weekly = gron.Every(1 * xtime.Week)
monthly = gron.Every(30 * xtime.Day)
yearly = gron.Every(365 * xtime.Day)
// contrived jobs
purgeTask = func() {
fmt.Println("purge aged records") }
printFoo = printJob{
"Foo"}
printBar = printJob{
"Bar"}
)
c := gron.New()
//At Specify a specific time to start executing scheduled tasks
c.Add(daily.At("12:30"), printFoo)
//AddFunc Incoming method required ,Add Need to pass in the implementation Run Object of method
c.AddFunc(weekly, func() {
fmt.Println("Every week") })
c.Start()
// Jobs may also be added to a running Gron
c.Add(monthly, printBar)
c.AddFunc(yearly, purgeTask)
// Stop Gron (running jobs are not halted).
c.Stop()
}
And then we have main.go In file , Before the route runs , Start the scheduled task
package main
import (
"encoding/gob"
"project1/models"
"project1/routes"
"project1/service"
)
func main(){
// Turn on timed tasks
service.GronAddMysql()
// Set up session It is equivalent to serializing the object , When the object is of advanced type , Like custom struct or map[string]interface{}{} when , You need to declare
// Here we use gob.register Declare a custom struct, Used later in session Save variables of this type in
gob.Register(models.User{
})
// start-up http service
r := routes.NewRouter()
r.Run(":8080")
}
边栏推荐
- Beginner experience of safety testing
- Correlation analysis between [machine learning] variables
- Shell comprehensive application cases, archive files
- 韩国计划每年砸1万亿韩元,研发半导体材料及设备
- 20220726
- 没有网络怎么配置传奇SF登陆器自动读取列表
- 数据块的存储系统中缓存的作用是什么?
- 创建一个自身类的静态对象变量,究竟会如何执行?
- 分组卷积(Group Converlution)
- [OBS] solve the problem of OBS pushing two RTMP streams + timestamp
猜你喜欢
![[experiment sharing] CCIE BGP routing black hole experiment]](/img/c8/7ccb879ad7c739d3573637fd14f4e1.png)
[experiment sharing] CCIE BGP routing black hole experiment]

Leetcode-300 longest increasing subsequence

每日练习------有一组学员的成绩,将它们按降序排列,要增加一个学员的成绩,将它插入成绩序列,并保持降序
![[JVM series] JVM tuning](/img/6b/f7c402b2ff5fc4f11f9656a7a59873.png)
[JVM series] JVM tuning

7-year-old boy playing chess too fast? The robot actually broke its finger

连接池快速入门

Buu brush inscription 3

St table, weighted and search set
![[基础服务] [数据库] ClickHouse的安装和配置](/img/fe/5c24e4c3dc17a6a96985e4fe97024e.png)
[基础服务] [数据库] ClickHouse的安装和配置

Experiment 6 BGP federal comprehensive experiment
随机推荐
leetcode 链表类
What is the role of cache in the storage system of data blocks?
拦截器、、
Single core A7 plays with face recognition, and NXP "crossover processor" plays a new trick!
GOM登录器配置免费版生成图文教程
09_ UE4 advanced_ Enter the next level and reserve the blood volume
SSM整合实例
[OBS] solve the problem of OBS pushing two RTMP streams + timestamp
Shell function, system function, basename [string / pathname] [suffix] can be understood as taking the file name in the path, dirname file absolute path, and user-defined function
New features of ES6
【问题篇】浏览器get请求带token
TableWidget
Houdini finds the midpoint and connects the points to form a line
serializable接口的作用是什么?
LeetCode链表问题——24.两两交换链表中的结点(一题一文学会链表)
The UK and Germany have successively launched 5g commercial services, and Huawei has become a behind the scenes hero
传惠普/戴尔/微软/亚马逊考虑将部分硬件生产线转到大陆以外
[JVM series] JVM tuning
Leetcode linked list problem - 19. Delete the penultimate node of the linked list (learn the linked list with one question and one article)
arm tz硬件支撑