当前位置:网站首页>基于check-point机制的任务状态回滚和数据分块任务
基于check-point机制的任务状态回滚和数据分块任务
2022-07-04 14:53:00 【马超的博客】
基于check-point机制的任务状态回滚和数据分块任务 - 问题背景
- 节点TASK
- 关系TASK
- 资料
- 备注
问题背景
基于check-point实现图数据构建任务针对这篇文章提出的方案增加了数据分块操作与任务状态回滚操作。 数据分块:控制加载到内存的数据量,避免占用过多堆内存保证图数据库可靠运行。 任务状态回滚:回滚到构建节点的任务状态,下一次构建节点关系时从回滚点开始操作【构建任务分为节点TASK和关系TASK,任务回滚操作是在关系TASK中进行回滚】。
节点TASK
大致为七步
- 获取检查点时间
- 数据分块-从数据库获取检查点之后最大最小自增ID
- 数据分块-从检查点开始按照指定数据块大小执行数据分块
- 按照指定数据块大小执行数据分块
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态-当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块
WITH olab.ids.batch(min,max,100000) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
WHERE batchFailedSize<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,currentTime,rawCheckPoint;
关系TASK
大致为七步
- 获取检查点时间
- 数据分块-从数据库获取检查点之后最大最小自增ID
- 数据分块-从检查点开始按照指定数据块大小执行数据分块
- 按照指定数据块大小执行数据分块
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态-任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块
WITH olab.ids.batch(min,max,100000) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// 任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
WHERE batchFailedSize>0
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?',[rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,rawCheckPoint;
资料
上述TASK中提到的过程和函数可以从下面的链接下载:
https://github.com/ongdb-contrib/ongdb-lab-apoc
https://github.com/neo4j-contrib/neo4j-apoc-procedures
备注
上述方案在【基于check-point实现图数据构建任务】图数据构建任务基础上补充了任务回滚策略和数据分块操作,对于任务TASK的可用性和性能起到了极大的增强作用。
边栏推荐
- Neuf tendances et priorités du DPI en 2022
- System.currentTimeMillis() 和 System.nanoTime() 哪个更快?别用错了!
- The vscode waveform curve prompts that the header file cannot be found (an error is reported if the header file exists)
- Talking about Net core how to use efcore to inject multiple instances of a context annotation type for connecting to the master-slave database
- Research Report on surgical otorhinolaryngology equipment industry - market status analysis and development prospect prediction
- Some fields of the crawler that should be output in Chinese are output as none
- [Previous line repeated 995 more times]RecursionError: maximum recursion depth exceeded
- Principle and general steps of SQL injection
- TypeError: not enough arguments for format string
- C implementation defines a set of intermediate SQL statements that can be executed across libraries
猜你喜欢
How to decrypt worksheet protection password in Excel file
[North Asia data recovery] a database data recovery case where the partition where the database is located is unrecognized due to the RAID disk failure of HP DL380 server
165 webmaster online toolbox website source code / hare online tool system v2.2.7 Chinese version
D3D11_ Chili_ Tutorial (2): draw a triangle
AutoCAD - set color
Redis' optimistic lock and pessimistic lock for solving transaction conflicts
Case sharing | integrated construction of data operation and maintenance in the financial industry
[hcie TAC] question 5 - 1
Ten clothing stores have nine losses. A little change will make you buy every day
error: ‘connect‘ was not declared in this scope connect(timer, SIGNAL(timeout()), this, SLOT(up
随机推荐
Nine CIO trends and priorities in 2022
@EnableAspectAutoJAutoProxy_ Exposeproxy property
Scientific research cartoon | what else to do after connecting with the subjects?
[North Asia data recovery] a database data recovery case where the partition where the database is located is unrecognized due to the RAID disk failure of HP DL380 server
Research Report of exoskeleton robot industry - market status analysis and development prospect prediction
Accounting regulations and professional ethics [7]
MySQL learning notes - data type (2)
Unity script API - component component
Market trend report, technical innovation and market forecast of electrochromic glass and devices in China and Indonesia
Feature extraction and detection 15-akaze local matching
Functional interface, method reference, list collection sorting gadget implemented by lambda
error: ‘connect‘ was not declared in this scope connect(timer, SIGNAL(timeout()), this, SLOT(up
Research Report on market supply and demand and strategy of China's well completion equipment industry
[book club issue 13] coding format of video files
China's roof ladder market trend report, technological innovation and market forecast
Understand asp Net core - Authentication Based on jwtbearer
Blood cases caused by Lombok use
Some fields of the crawler that should be output in Chinese are output as none
Digital recognition system based on OpenCV
Market trend report, technical innovation and market forecast of taillight components in China