当前位置:网站首页>Bifrost 位点管理 之 异构中间件实现难点(1)
Bifrost 位点管理 之 异构中间件实现难点(1)
2020-11-09 23:53:00 【布哦】
Bifrost ---- 面向生产环境的 MySQL 同步到 Redis,MongoDB 等服务的异构中间件
源码下载 (你的点击 star 就是对 Bifrost 最大的支持!!!): Github Gitee
现在行里有不少的数据同步的工具,比如 datalink, 阿里的DTS , otter 等工具,甚至很多企业都在 otter 等开源工具里进行二次开发,实更了很多功能。
也和不少的开发人员,聊过类似工具的架构设计
大多数的开发人员架构设计 都是先用 canal binlog解析出来,每个表一个topic,然后再起额外的topic 消费线程,进行数据转换,再写入相对应的目标库里。其实这个设计没有毛病,而且看上去解偶设计非常得好。
咱在这里也先不讨论说一个表一个topic,等太多topic造成 kafka 磁盘IO性能下降明显 等问题。
只是有不少开发人员,对 kafka 集群的数据认为是绝对安全,并没有考滤到 kafka 数据集群崩溃等不可控的情况下造成数据丢失,
假如Kafka等工具出现严重不可恢复的情况下,怎么通知canal该从哪一个位点开始进行重新解析呢?当然完全重新来,也不是不可以,再全量一次。比如用阿里的datax,再来一次全量。
Bifrost 在开发设计的时候,考滤到性能,还有数据位点安全,还有资源最小化等多情况下,全局采用当前进程里的内存队列(当然在出现同步阻塞的时候,会自动启动文件队列功能)。全局采用内存队列,减少网络延迟,还有减少硬件资源的消耗。
Bifrost 全局有多个地方有对位点进行保存管理:
1. Bristol 模块里,在解析出 query,commit 事件的时候,对记录一次位点信息,保存到一个变量中,row_event,map_event 等其他事件的位点,并不会记录,这和binlog数据有关,map_event紧跟着row_event,row_event的解析又依懒map_event,假如 map_event之后,连接出错了,Bristol 需要自动重连等原因,假如重连接,没有map_event事件的数据,不就解析出错了么?
binlog.go
func (parser *eventParser) saveBinlog(event *EventReslut) {
switch event.Header.EventType {
case QUERY_EVENT,XID_EVENT:
if event.BinlogFileName == "" {
return
}
parser.binlogDump.Lock()
parser.binlogFileName = event.BinlogFileName
parser.binlogPosition = event.Header.LogPos
parser.binlogTimestamp = event.Header.Timestamp
parser.binlogDump.Unlock()
break
case ROTATE_EVENT:
parser.binlogDump.Lock()
parser.currentBinlogFileName = event.BinlogFileName
parser.binlogDump.Unlock()
default:
break
}
}
2. 每个数据源列表数据里,每3秒,调用 Bristol 模块,拿Bristol 最后一次解析的 query,commit事件的位点.
1).将位点保存到 Db数据源在内存里对应的变量中,用于界面显示显示,还有正常关闭的时候,将配置文件刷到db.bifrost文件中
2). 将DB数据源位点保存到 leveldb中,每隔2秒刷一次leveldb数据到磁盘
func (db *db) saveBinlog(){
FileName,Position,Timestamp := db.binlogDump.GetBinlog()
if FileName == ""{
return
}
//db.Lock()
//保存位点,这个位点在重启 配置文件恢复的时候
//一个db有可能有多个channel,数据顺序不用担心,因为实际在重启的时候 会根据最小的 ToServerList 的位点进行自动替换
db.binlogDumpFileName,db.binlogDumpPosition,db.binlogDumpTimestamp = FileName,Position,Timestamp
if db.DBBinlogKey == nil{
db.DBBinlogKey = getDBBinlogkey(db)
}
//db.Unlock()
index := strings.IndexAny(FileName, ".")
BinlogFileNum,_ := strconv.Atoi(FileName[index+1:])
saveBinlogPosition(db.DBBinlogKey,BinlogFileNum,db.binlogDumpPosition)
}
func saveBinlogPositionToStorageFromCache() {
for {
time.Sleep(2 * time.Second)
for _, t := range TmpPositioin {
t.Lock()
for k, v := range t.Data {
Val, _ := json.Marshal(v)
storage.PutKeyVal([]byte(k) , Val)
}
t.Data = make(map[string]positionStruct,0)
t.Unlock()
}
}
}
3. 每个数据同步队列对应最后同步成功和最后进入队列的2个位点
1). 最后同步成功的位点
这个就是字段上的意思 ,最后同步到目标库的位点
2). 最后进入队列的位点
每次写数据到队列的时候,都会更新最后一条更新到当前队列的数据位点
不管是 最后同步成功 还是 最后进入队列的位点 ,都会和数据源位点一样,一份存到内存变量中,另一份存到leveldb中,每隔2秒进行刷到磁盘
以上 除了 Bristol 模块为了解决 mysql 连接异常,造成的自动重连的位点,不会被保存到磁盘,另外2种位点都会刷到磁盘,而且还会刷到leveldb中,为了解决中途断点或者被kill -9 等非正常退出的情况下,数据恢复的时候,才知道是从哪一个位点开始重新解析!
位点恢复
即然每个同步都保存了有相对应的位点信息,那肯定 就能算出一个,所有同步中,最小的位点,然后进行从算出来最小的位点进行重新开始解析。实际有,也有个别情况,是不需要参与位点大写计算的或者说,有个别情况,在两者之前位点比较的时候取更大位点的
1). 不管数据源的位点,还是同步队列的最后成功的位点,或者 最后进入队列的位点,首先配置文件中的位点数据和leveldb中取出来位点数据做对比,取大值。
刷到leveldb的数据 是每2秒刷一次,数据理论上是延迟的,除非非正常关闭的时候,leveldb 前2位的位点数据已经进入到 磁盘,但是同步文件中的数据没刷到db.bifrost 文件的时候进程就被强制退出了。所以这里取两者的大值
2). 数据同步队列的 最后成功的位点 和 最后进入队列的位点 相同步的时候,放弃位点比较。
这里解决的问题是 假如有2个表配置了同步到 ClickHouse 或者 Redis 等目标库,Table1 更新了一条数据当前位点是 1000,然后再也没更新过数据了,以后都是Table2 这个表的数据更新,当Table2 同步数据的位点到了 3000 的时候,重启了Bfirost,那这个时候,应该是按3000这个位点开始重新解析,对不对?而不是从1000这个位点开始对吧?
会进行 保存 最后进入队列的位点 的位点,就是为了解决这个问题的
3). 假如所有同步队列 的 最后成功的位点 和 最后进入队列的位点 都是相同的情况下,直接取 数据源对应的最后的位点.
相关源码:
server/recovery.go
func recoveryData(data map[string]dbSaveInfo,isStop bool){}
为了实现这个数据位点安全及性能,在位点管理上,做了不少事情 ,并不是代表这个方案好,而是我们在设计系统上,需要考滤极端情况下,数据怎么恢复,重新拉取一次全量任务,其实也是很不错的选择!
GitHub : https://github.com/brokercap/Bifrost
版权声明
本文为[布哦]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4233862/blog/4710366
边栏推荐
- Python提示AttributeError 或者DeprecationWarning: This module was deprecated解决方法
- 会展云技术解读 | 面对突发事故,APP 如何做好崩溃分析与性能监控?
- DB engines database ranking in November: PostgreSQL holds the top spot in the same period
- 接缝雕刻算法:一种看似不可能的图像大小调整方法
- 2020-11-09:谈谈布隆过滤器和布谷鸟过滤器的相同点和不同点?
- Technical point 5: XML language
- Analysis steps of commodity background management
- 异常:Invalid or unexpected token
- jt-京淘项目
- 利用尾巴作为时间序列进行处理来识别鲸鱼
猜你喜欢
How to greatly improve the performance of larravel framework under php7? Install stone!
Guest interview: Wang Jian
How much is the cost of CRM system?
函数计算进阶-IP查询工具开发
Fear of reconstruction? I'm too late to tell you how to refactor. Now I'm here
SQL case conversion, remove the space before and after
将Map中对应的key和value赋值到对象中
SSL证书和公网IP地址如何影响SEO
Must see! RDS database all in one
接缝雕刻算法:一种看似不可能的图像大小调整方法
随机推荐
win7+vs2015+cuda10.2配置TensorRT7.0
No space left on device
Brief analysis of LinkedList source code
将Map中对应的key和value赋值到对象中
Prometheus installation configuration
CUDA_共享内存、访存机制、访问优化
爱康国宾怒斥国信证券报告失实,已发律师函
What can CRM system help enterprises do?
Postman (1) -- function introduction
面试官:缓存穿透、缓存雪崩和缓存击穿是什么?
异常:Invalid or unexpected token
当我们开发一个接口时需要注意些什么
剑指offer之打印超过数组一半的数字
手把手教你使用容器服务 TKE 集群审计排查问题
Visit 2020 PG Technology Conference
【CentOS7操作系统安全加固系列】第(2)篇
C + + game development
技术点5:XML语言
Python中[:]与[::]的用法
What is the architecture of a live broadcast system worth more than $1 billion?