当前位置:网站首页>Task state rollback and data blocking tasks based on check point mechanism
Task state rollback and data blocking tasks based on check point mechanism
2022-07-04 16:40:00 【Ma Chao's blog】
be based on check-point The task state rollback and data blocking tasks of the mechanism - The problem background
- node TASK
- Relationship TASK
- Information
- remarks
The problem background
be based on check-point Realize the task of graph data construction The scheme proposed in this article adds data blocking operation and task state rollback operation . Data chunking : Control the amount of data loaded into memory , Avoid occupying too much heap memory to ensure the reliable operation of the graph database . Task status rollback : Rollback to the task state of the build node , The next time you build a node relationship, start from the rollback point 【 The construction task is divided into nodes TASK And relationship TASK, The task rollback operation is in relation TASK Rollback in 】.
node TASK
Roughly seven steps
- Get checkpoint time
- Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
- Data chunking - Perform data blocking according to the specified data block size from the checkpoint
- Perform data blocking according to the specified data block size
- Definition SQL How to get data
- Perform the build task in batch iterations
- Update task status - When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint
WITH olab.ids.batch(min,max,100000) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
WHERE batchFailedSize<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,currentTime,rawCheckPoint;
Relationship TASK
Roughly seven steps
- Get checkpoint time
- Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
- Data chunking - Perform data blocking according to the specified data block size from the checkpoint
- Perform data blocking according to the specified data block size
- Definition SQL How to get data
- Perform the build task in batch iterations
- Update task status - Task status rollback 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint
WITH olab.ids.batch(min,max,100000) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r: guarantee ]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// Task status rollback 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
WHERE batchFailedSize>0
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?',[rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,rawCheckPoint;
Information
Above TASK The procedures and functions mentioned in can be downloaded from the following link :
https://github.com/ongdb-contrib/ongdb-lab-apoc
https://github.com/neo4j-contrib/neo4j-apoc-procedures
remarks
The above scheme is in 【 be based on check-point Realize the task of graph data construction 】 Figure data construction task is supplemented with task rollback strategy and data blocking operation , For the task TASK Has greatly enhanced the availability and performance of .
边栏推荐
- Accounting regulations and professional ethics [7]
- What does IOT engineering learn and work for?
- %F format character
- Transformer中position encoding实践
- . Net applications consider x64 generation
- 165 webmaster online toolbox website source code / hare online tool system v2.2.7 Chinese version
- ONgDB图数据库与Spark的集成
- std::shared_ ptr initialization: make_ shared&lt; Foo&gt; () vs shared_ ptr&lt; T&gt; (new Foo) [duplicate]
- Selenium element interaction
- Qt---error: ‘QObject‘ is an ambiguous base of ‘MyView‘
猜你喜欢
Talking about Net core how to use efcore to inject multiple instances of a context annotation type for connecting to the master-slave database
Function test - knowledge points and common interview questions
时钟轮在 RPC 中的应用
Interface fonctionnelle, référence de méthode, Widget de tri de liste implémenté par lambda
Cut! 39 year old Ali P9, saved 150million
Object.keys()的用法
Interface test - knowledge points and common interview questions
Actual combat | use composite material 3 in application
《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下
[North Asia data recovery] a database data recovery case where the partition where the database is located is unrecognized due to the RAID disk failure of HP DL380 server
随机推荐
Variable cannot have type 'void'
System. Currenttimemillis() and system Nanotime (), which is faster? Don't use it wrong!
Accounting regulations and professional ethics [9]
TypeError: not enough arguments for format string
What should ABAP do when it calls a third-party API and encounters garbled code?
Rearrange array
China's plastic processing machinery market trend report, technological innovation and market forecast
What encryption algorithm is used for the master password of odoo database?
Will the memory of ParticleSystem be affected by maxparticles
FIREBIRD使用经验总结
函數式接口,方法引用,Lambda實現的List集合排序小工具
Object.keys()的用法
Interface fonctionnelle, référence de méthode, Widget de tri de liste implémenté par lambda
Function test - knowledge points and common interview questions
Expression #1 of ORDER BY clause is not in SELECT list, references column ‘d.dept_ no‘ which is not i
Transformer中position encoding实践
How to decrypt worksheet protection password in Excel file
Talking about Net core how to use efcore to inject multiple instances of a context annotation type for connecting to the master-slave database
165 webmaster online toolbox website source code / hare online tool system v2.2.7 Chinese version
Interpretation of the champion scheme of CVPR 2020 night target detection challenge