当前位置:网站首页>Difficulties in heterogeneous middleware implementation of Bifrost site management (1)
Difficulties in heterogeneous middleware implementation of Bifrost site management (1)
2020-11-09 23:53:00 【Boo】
Source download ( Your click star That's right Bifrost The biggest support !!!): Github Gitee
Now there are a lot of data synchronization tools in the line , such as datalink, Ali's DTS , otter Tools such as , Even a lot of companies are otter And other open source tools for secondary development , A lot of functions have been improved .
And a lot of developers , I talked about the architecture design of similar tools
Most developers architecture design It's all used first canal binlog Analyze it , One for each watch topic, And then there's the extra topic Consumer thread , Data conversion , Then write it to the corresponding target library . In fact, there is nothing wrong with this design , And it looks like the decoupling design is very good .
I'm not going to talk about one watch and another topic, Waiting too much topic cause kafka disk IO Obvious performance degradation Other questions .
It's just that there are a lot of developers , Yes kafka The data of the cluster is considered absolutely safe , It didn't filter out kafka Data loss caused by uncontrollable data cluster crash ,
If Kafka When the tool is seriously irrecoverable , How to inform canal Which site should we start from for re analysis ? Of course, it's all over again , It's not impossible , One more full dose . For example, use Ali's datax, One more full dose .
Bifrost In the development of design , Test the performance , And data site security , There are also many cases such as resource minimization , Global uses the memory queue in the current process ( Of course, in the case of synchronous blocking , Will automatically start the file queue function ). Memory queue is used globally , Reduce network latency , And reduce the consumption of hardware resources .
Bifrost There are many places in the whole world to save and manage the bit points :
1. Bristol In the module , In the analysis of query,commit At the time of the event , To record the first locus information , Save to a variable ,row_event,map_event And other events , It doesn't record , This sum binlog The data is about ,map_event Follow closely row_event,row_event The analysis of "is lazy" map_event, If map_event after , Connection error ,Bristol Need automatic reconnection and other reasons , If you reconnect , No, map_event The data of the event , There is no error in parsing ?
binlog.go
func (parser *eventParser) saveBinlog(event *EventReslut) {
switch event.Header.EventType {
case QUERY_EVENT,XID_EVENT:
if event.BinlogFileName == "" {
return
}
parser.binlogDump.Lock()
parser.binlogFileName = event.BinlogFileName
parser.binlogPosition = event.Header.LogPos
parser.binlogTimestamp = event.Header.Timestamp
parser.binlogDump.Unlock()
break
case ROTATE_EVENT:
parser.binlogDump.Lock()
parser.currentBinlogFileName = event.BinlogFileName
parser.binlogDump.Unlock()
default:
break
}
}
2. In each data source list data , Every time 3 second , call Bristol modular , take Bristol Last time it was resolved query,commit The site of the event .
1). Save the locus to Db The data source is in the corresponding variable in memory , For interface display , And when it's normally closed , Brush the configuration file to db.bifrost In file
2). take DB Data source sites are saved to leveldb in , every other 2 Every second leveldb Data to disk
func (db *db) saveBinlog(){
FileName,Position,Timestamp := db.binlogDump.GetBinlog()
if FileName == ""{
return
}
//db.Lock()
// Save sites , This site is restarting When the configuration file is restored
// One db There may be more than one channel, Don't worry about the order of the data , Because actually when you restart According to the smallest ToServerList To replace the site automatically
db.binlogDumpFileName,db.binlogDumpPosition,db.binlogDumpTimestamp = FileName,Position,Timestamp
if db.DBBinlogKey == nil{
db.DBBinlogKey = getDBBinlogkey(db)
}
//db.Unlock()
index := strings.IndexAny(FileName, ".")
BinlogFileNum,_ := strconv.Atoi(FileName[index+1:])
saveBinlogPosition(db.DBBinlogKey,BinlogFileNum,db.binlogDumpPosition)
}
func saveBinlogPositionToStorageFromCache() {
for {
time.Sleep(2 * time.Second)
for _, t := range TmpPositioin {
t.Lock()
for k, v := range t.Data {
Val, _ := json.Marshal(v)
storage.PutKeyVal([]byte(k) , Val)
}
t.Data = make(map[string]positionStruct,0)
t.Unlock()
}
}
}
3. Each data synchronization queue corresponds to the last successful synchronization and the last entered the queue 2 Sites
1). The last site of successful synchronization
This is the meaning of the field , Finally, it is synchronized to the site of the target library
2). The last site in the queue
Every time you write data to the queue , Will update the last update to the current queue data point
Whether it's Finally, the synchronization succeeded still The last site in the queue , Will be the same as the data source site , A copy is stored in the memory variable , Another copy is deposited in leveldb in , every other 2 Seconds to the disk
above except Bristol Module to solve mysql Abnormal connection , Resulting in automatic reconnection sites , Will not be saved to disk , in addition 2 All of these sites will be flushed to disk , And it's going to brush leveldb in , In order to solve the midway point or be kill -9 In case of abnormal exit , When it comes to data recovery , It's only when we know which site to start the re analysis !
Site recovery
That is to say, each synchronization has the corresponding site information , That's for sure You can figure out one , All syncing , The smallest site , And then we start the analysis again from the smallest calculated site . There is actually , There are also individual cases , We don't need to be involved in the capitalization calculation of loci , There are individual cases , When comparing the two previous loci, take the larger one
1). Regardless of the location of the data source , Or the last successful site of the synchronization queue , perhaps The last site in the queue , First, configure the site data in the file and leveldb Take out the site data for comparison , Take a big value .
Brush to leveldb The data of Each 2 Every second , Data is theoretically delayed , Unless it's shut down abnormally ,leveldb front 2 The site data has entered disk , But the data in the synchronization file is not flashed to db.bifrost The process is forced out of the file . So here's the big value of both
2). Data synchronization queue The last successful site and The last site in the queue When it's in sync , Abandon site comparison .
The problem solved here is If there is 2 Tables are configured to synchronize to ClickHouse perhaps Redis Wait for the target library ,Table1 Updated a piece of data, the current locus is 1000, And I never updated the data again , Later on Table2 The data of this table is updated , When Table2 The point of synchronization data has arrived 3000 When , It's restarted Bfirost, So at this point , It should be by pressing 3000 This site begins to be reanalyzed , Isn't it ? Not from 1000 This site starts right ?
Will be carried out in preservation The last site in the queue The locus of , To solve this problem
3). If all synchronous queues Of The last successful site and The last site in the queue It's the same situation , Take... Directly The last site corresponding to the data source .
Related to the source code :
server/recovery.go
func recoveryData(data map[string]dbSaveInfo,isStop bool){}
In order to achieve the safety and performance of this data site , In terms of locus Management , Did a lot of things , It doesn't mean the plan is good , It's about designing systems , In extreme cases it is necessary to examine , How to recover the data , Pull a full task again , It's also a good choice !
GitHub : https://github.com/brokercap/Bifrost
版权声明
本文为[Boo]所创,转载请带上原文链接,感谢
边栏推荐
- leetcode之最后一个单词的长度
- Gets the property value of a column in the list collection object
- Detach ()
- JMeter的简单使用
- js label语法跳出多重循环
- 白山云科技入选2020中国互联网企业百强
- Interviewer: what are cache penetration, cache avalanche and cache breakdown?
- 探访2020 PG技术大会
- iNeuOS工业互联平台,WEB组态(iNeuView)增加工程视图导入、导出功能,及优化和修复,发布:v3.2.1版本
- 编码风格:Mvc模式下SSM环境,代码分层管理
猜你喜欢
sql 截取数据中 ‘.’的前后数据
proxy 的不完整polyfill
C++ exception implementation mechanism
PL/SQL Developer临时用户和新手的功能指南
js label语法跳出多重循环
一幅图像能顶16x16字!——用于大规模图像缩放识别的变压器(对ICLR 2021年论文的简要回顾)
函数计算进阶-IP查询工具开发
Promote China manufacturing upgrade, 3D visualization of production line in automobile assembly workshop
必看!RDS 数据库入门一本通(附网盘链接)
Must see! RDS database all in one
随机推荐
CUDA_ constant memory
Optimization of commodity backstage system
获取List集合对象中某一列属性值
mongodb内核源码实现、性能调优、最佳运维实践系列-command命令处理模块源码实现一
Gets the property value of a column in the list collection object
CUDA_存储器模型
leetcode之最后一个单词的长度
Aikang Guobin denounced Guoxin Securities report as untrue and sent a lawyer's letter
CUDA_主机内存
Self writing performance testing tool (2)
Error running app:Default Activity not found 解决方法
11.9
Problems of input and button (GAP / unequal height / misalignment) and Solutions
Prometheus安装配置
JS label syntax jumps out of multiple loops
CUDA_常量内存
一幅图像能顶16x16字!——用于大规模图像缩放识别的变压器(对ICLR 2021年论文的简要回顾)
Promote China manufacturing upgrade, 3D visualization of production line in automobile assembly workshop
推动中国制造升级,汽车装配车间生产流水线3D可视化
[leetcode] 92 integer inversion