当前位置:网站首页>go+mysql+redis+vue3简单聊室,第5弹:使用消息队列和定时任务同步消息到mysql
go+mysql+redis+vue3简单聊室,第5弹:使用消息队列和定时任务同步消息到mysql
2022-07-26 19:05:00 【whynogome】
完成消息发送后,我们需要保存聊天记录到数据库中。像聊天记录这种访问不频繁的冷数据,保存在mysql数据是常规做法。但是当用户增多,消息发送频率上升,会在短时间频繁链接mysql,大并发下会造成mysql的阻塞
我们可以使用reids消息队列作为中间缓冲,先把用户的聊天记录保存在队列中,在服务器空闲时段,使用定时任务,在把数据同步到mysql中即可。redis是基于内存的,可以承受比mysql大得多的并发
go连接redis
在db目录下,创建redis.go文件,获取redis连接
package db
import (
"github.com/garyburd/redigo/redis"
"project1/config"
)
//RedisPool是一个redis连接池,使用时,需要使用get获取一个连接后,再使用
var RedisPool *redis.Pool
func init(){
addr := config.ConfigFile.Section("redis").Key("Addr").String()
//创建连接池
RedisPool = &redis.Pool{
MaxIdle:15,//最初连接池数量
MaxActive:0,//最大连接池数量,为0表示自定义,按需设置
IdleTimeout:300,//连接关闭时间 300秒 (300秒不使用自动关闭)
Dial:func()(redis.Conn,error){
return redis.Dial("tcp",addr)
},
}
}
使用stream
先记录下redis使用stream的常用命令
- 向一个队列中插入数据,队列不存在则自动创建
redis.Do(“XADD”,“chatMsg”,“*”,“json”,“{1:666}”)- 创建一个队列下的一个消息消费组,并指定消费者从队列读取数据的id。
id为0,表示从头开始。id为$表示从队列末尾开始,只接受新消息
redis.Do(“XGROUP”,“CREATE”,“chatMsg”,“group1”,“1655188517335-0”)- 查询一个队列下,指定名称的消费组信息
//redis.Do(“XINFO”,“GROUPS”,“group1”)- 读取一个消费组下的消息
id为 >,表示只读取最新消息,成功后,消费组的last-delivered-id向后移动一位,该消息会在c1(该消费组下的一个消费者,不存在会自动创建)的pending中,直到ack,从pending中移除。无消息时返回nil
id不为 >,表示读取c1 pending(待处理)的消息,从传入的消息id开始,读取指定条数。传0表示从第一条数据开始读。无消息时只返回chatMsg字符串。
//redis.Do(“XREADGROUP”,“GROUP”,“group1”,“c1”,“COUNT”,“1”,“STREAMS”,“chatMsg”,“>”)- 将消费组消息标记为已处理
//redis.Do(“XACK”,“chatMsg”,“group1”,“1655188528552-0”)
除了以上常用命令外,为了防止stream越来越大,我们可以指定stream的最大长度,超出时会自动删除老的数据
也可以在ack成功后,调用删除命令,直接从stream中删除指定消息
在service新建redis-mq.go文件
package service
import (
"encoding/json"
"fmt"
"project1/db"
"project1/models"
)
//把消息队列中的聊天信息,插入到mysql数据库
func IntoMsgToMysql(){
//消息队列名称
streamName := "chatMsg"
//消费组名称
//消息队列名和消费者中的消费者,会插入和读取命令自定判断创建,创建消费者组,则需要单独执行
//我们的项目目前没有多线程读取消息队列的需求,所以创建一个消费组就行了,直接在reids命令行执行下,消息id设为$,就不放在代码中了
groupName := "group1"
//消费组中一个消费者名称
groupMember := "c1"
//从redis连接池,获取一个链接
redis := db.RedisPool.Get()
defer redis.Close()
//首先读取消费者pending中的消息,一次读取100条
//为了兼容服务中断,导致pending中的消息没有来的即被消费,所以先检查pending中是否有消息
if val,err := redis.Do("XREADGROUP","GROUP",groupName,groupMember,"COUNT","100","STREAMS",streamName,0);err == nil{
//从消息队列中读取的数据是一个接口类型的数据,里面是切片类型的接口,有多层的嵌套
//所以我们自定义一个递归函数interfaceToNormal,用于遍历和转换返回数据,返回一个字符串切片
back := make([]string,0)
data := interfaceToNormal(val,back)
//正常的data数据格式是一个切片类型的字符串,如下
//[chatMsg 1655188547271-0 data {"username":"老王","userid":"2","msg":"老王:上线了"}]
//chatMsg是常规输出,后面三个值表示一条数据,多条数据的话,以3个为一组往后循环
//从pending中读取数据时,没有值于也会返回一个 chatmsg 字符串
//返回切片数量为1,表示当前无等待消费的数据,从消息队列重新读取
if len(data) < 4{
if val2,err2 := redis.Do("XREADGROUP","GROUP",groupName,groupMember,"COUNT","100","STREAMS",streamName,">");err2 == nil{
//队列无数据时,返回nil。pending中无数据时,返回[chatMsg]
if val2 == nil{
Mqlog("Success:stream msg is finish","info")
return
}
data = interfaceToNormal(val2,back)
}else{
Mqlog("get stream fail step2:"+err2.Error(),"error")
return
}
}
//切片数量除以3的值,为data中数据量,根据该值循环遍历data
//go语言中,整数之间的运算,返回只会是整数
dataNum := len(data)/3
for i:=1;i<=dataNum;i++{
//获取一条信息的data,在切片中的位置
dataKey := 3*i
//获取一条信息的 队列消息id,在切片中的位置
dataTimeKey := dataKey-2
//把信息的data字符先串转换为[]byte
dataByte := []byte(data[dataKey])
//再把[]byte值转换为map格式
dataMap := make(map[string]string)
merr := json.Unmarshal(dataByte,&dataMap)
if merr != nil{
Mqlog("map转换异常","error")
}
dataMap["created_at"] = data[dataTimeKey][0:10]
//向mysql插入数据
if back := models.AddMsg(dataMap);back == "ok"{
//插入成功,执行ACK
if _,ackErr := redis.Do("XACK","chatMsg","group1",data[dataTimeKey]);ackErr != nil{
Mqlog(ackErr.Error(),"error")
}
}else{
Mqlog(back,"error")
}
}
}else{
Mqlog("get stream fail step1:"+err.Error(),"error")
return
}
}
//多层接切片->接口->切片.....数据转换
func interfaceToNormal(data interface{
},back []string) []string{
for _,val := range data.([]interface{
}){
type1 := fmt.Sprintf("%T",val)
if type1 == "[]interface {}"{
back = interfaceToNormal(val.([]interface{
}),back)
}else{
var typeName int64
if type1 == "[]byte"{
typeName = 1
}else{
typeName = 2
}
back = append(back,anyToString(typeName,val))
}
}
return back
}
func anyToString(typeName int64,info interface{
}) string{
back := ""
if typeName == 1 {
data := info.([]byte)
back = string(data)
}else if typeName == 2{
data := info.([]uint8)
back = string(data)
}
return back
}
接下来我们需要建立一个定时任务,在定时任务用调用IntoMsgToMysql方法即可
在service目录下建立gron.go 文件
package service
import (
"fmt"
"github.com/roylee0704/gron"
"github.com/roylee0704/gron/xtime"
)
//定时任务
//go主进程需保持运行状态,否则定时任务会跟随主进程一起退出
func GronAddMysql(){
c := gron.New()
c.AddFunc(gron.Every(1*xtime.Minute),func(){
IntoMsgToMysql()})
c.Start()
}
//使用案例
type printJob struct{
Msg string}
func (p printJob) Run() {
fmt.Println(p.Msg)
}
func test() {
var (
// schedules
daily = gron.Every(1 * xtime.Day)
weekly = gron.Every(1 * xtime.Week)
monthly = gron.Every(30 * xtime.Day)
yearly = gron.Every(365 * xtime.Day)
// contrived jobs
purgeTask = func() {
fmt.Println("purge aged records") }
printFoo = printJob{
"Foo"}
printBar = printJob{
"Bar"}
)
c := gron.New()
//At 指定具体时间开始执行定时任务
c.Add(daily.At("12:30"), printFoo)
//AddFunc需传入方法,Add需传入实现了Run方法的对象
c.AddFunc(weekly, func() {
fmt.Println("Every week") })
c.Start()
// Jobs may also be added to a running Gron
c.Add(monthly, printBar)
c.AddFunc(yearly, purgeTask)
// Stop Gron (running jobs are not halted).
c.Stop()
}
接下来我们在main.go文件中,在路由运行前,开启定时任务即可
package main
import (
"encoding/gob"
"project1/models"
"project1/routes"
"project1/service"
)
func main(){
//开启定时任务
service.GronAddMysql()
// 设置session相当于是把对象序列化了,当对象是高级类型,如自定义的struct或map[string]interface{}{}时,需要先声明
// 此处我们用gob.register声明了一个自定义的struct,用于之后在session中保存该类型的变量
gob.Register(models.User{
})
//启动http服务
r := routes.NewRouter()
r.Run(":8080")
}
边栏推荐
猜你喜欢

一年卖7亿,德州扒鸡赶考IPO

How to adjust the abnormal win11 USB drive to normal?

YOLO V1详解

LeetCode每日一练 —— 189. 轮转数组

一家芯片公司倒在了B轮

Detailed explanation of Yolo V2

After working for 13 years, I have a little software testing experience and feelings

IM即时通讯开发如何压缩移动网络下APP的流量消耗

ipad下载的文件在哪里可以找到

Leetcode daily practice - 88. Merge two ordered arrays
随机推荐
千亿酸奶赛道,乳企巨头和新品牌打响拉锯战
Bug 反馈:同步失败
LeetCode每日一练 —— 189. 轮转数组
IM即时通讯开发如何压缩移动网络下APP的流量消耗
Ijcai2022 meeting! Brescia et al. Tutorial of evidential reasoning and learning, describing its latest progress, with 96 slides attached
6种方法帮你搞定SimpleDateFormat类不是线程安全的问题
Solution to the third game of 2022 Niuke multi school league
【OBS】Dropped Frames And General Connection Issues
阿里三面:MQ 消息丢失、重复、积压问题,如何解决?
负载均衡的使用
Detailed explanation of call function in solidity
浅析接口测试
金仓数据库 KingbaseES SQL 语言参考手册 (17. SQL语句: DISCARD 到 DROP LANGUAGE)
MySQL 子查询使用方式
What should we do about the fragmentation of internal information? Try this
开源 | AREX-携程无代码侵入的流量回放实践
【实习经验】日期校验
[binary tree] balance the binary search tree
KVM virtualization
金仓数据库 KingbaseES SQL 语言参考手册 (13. SQL语句:ALTER SYNONYM 到 COMMENT)