当前位置:网站首页>基于check-point机制的任务状态回滚和数据分块任务
基于check-point机制的任务状态回滚和数据分块任务
2022-07-04 14:53:00 【马超的博客】
基于check-point机制的任务状态回滚和数据分块任务 - 问题背景
- 节点TASK
- 关系TASK
- 资料
- 备注
问题背景
基于check-point实现图数据构建任务针对这篇文章提出的方案增加了数据分块操作与任务状态回滚操作。 数据分块:控制加载到内存的数据量,避免占用过多堆内存保证图数据库可靠运行。 任务状态回滚:回滚到构建节点的任务状态,下一次构建节点关系时从回滚点开始操作【构建任务分为节点TASK和关系TASK,任务回滚操作是在关系TASK中进行回滚】。
节点TASK
大致为七步
- 获取检查点时间
- 数据分块-从数据库获取检查点之后最大最小自增ID
- 数据分块-从检查点开始按照指定数据块大小执行数据分块
- 按照指定数据块大小执行数据分块
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态-当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块
WITH olab.ids.batch(min,max,100000) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
WHERE batchFailedSize<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,currentTime,rawCheckPoint;关系TASK
大致为七步
- 获取检查点时间
- 数据分块-从数据库获取检查点之后最大最小自增ID
- 数据分块-从检查点开始按照指定数据块大小执行数据分块
- 按照指定数据块大小执行数据分块
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态-任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块
WITH olab.ids.batch(min,max,100000) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// 任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
WHERE batchFailedSize>0
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?',[rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,rawCheckPoint;资料
上述TASK中提到的过程和函数可以从下面的链接下载:
https://github.com/ongdb-contrib/ongdb-lab-apoc
https://github.com/neo4j-contrib/neo4j-apoc-procedures备注
上述方案在【基于check-point实现图数据构建任务】图数据构建任务基础上补充了任务回滚策略和数据分块操作,对于任务TASK的可用性和性能起到了极大的增强作用。
边栏推荐
- Model fusion -- stacking principle and Implementation
- Penetration test --- database security: detailed explanation of SQL injection into database principle
- Research Report on surgical otorhinolaryngology equipment industry - market status analysis and development prospect prediction
- 科普达人丨一文看懂阿里云的秘密武器“神龙架构”
- [Chongqing Guangdong education] National Open University spring 2019 1248 public sector human resource management reference questions
- Some fields of the crawler that should be output in Chinese are output as none
- Recommend 10 excellent mongodb GUI tools
- Book of night sky 53 "stone soup" of Apache open source community
- Hair growth shampoo industry Research Report - market status analysis and development prospect forecast
- Review of Weibo hot search in 2021 and analysis of hot search in the beginning of the year
猜你喜欢

I let the database lock the table! Almost fired!

Working group and domain analysis of Intranet
![[tutorial] yolov5_ DeepSort_ The whole process of pytoch target tracking and detection](/img/a7/92d670776e3fd3d5add3aa144617c7.jpg)
[tutorial] yolov5_ DeepSort_ The whole process of pytoch target tracking and detection

Big God explains open source buff gain strategy live broadcast
![[hcie TAC] question 5 - 1](/img/e0/1b546de7628695ebed422ae57942e4.jpg)
[hcie TAC] question 5 - 1

Ten clothing stores have nine losses. A little change will make you buy every day

@EnableAspectAutoJAutoProxy_ Exposeproxy property

Interface fonctionnelle, référence de méthode, Widget de tri de liste implémenté par lambda

Anta is actually a technology company? These operations fool netizens

DIY a low-cost multi-functional dot matrix clock!
随机推荐
How can floating point numbers be compared with 0?
China tall oil fatty acid market trend report, technical dynamic innovation and market forecast
Interface test - knowledge points and common interview questions
ECCV 2022放榜了:1629篇论文中选,录用率不到20%
Scientific research cartoon | what else to do after connecting with the subjects?
Logstash ~ detailed explanation of logstash configuration (logstash.yml)
Selenium element interaction
Essential basic knowledge of digital image processing
Research Report on surgical otorhinolaryngology equipment industry - market status analysis and development prospect prediction
Research Report on market supply and demand and strategy of tetramethylpyrazine industry in China
China Indonesia adhesive market trend report, technological innovation and market forecast
Unity animation day05
Working group and domain analysis of Intranet
The vscode waveform curve prompts that the header file cannot be found (an error is reported if the header file exists)
Unity script API - GameObject game object, object object
Feature extraction and detection 15-akaze local matching
Software Engineer vs Hardware Engineer
C language: implementation of daffodil number function
I let the database lock the table! Almost fired!
Market trend report, technical innovation and market forecast of tetrabromophthalate (pht4 diol) in China