当前位置:网站首页>基于check-point机制的任务状态回滚和数据分块任务
基于check-point机制的任务状态回滚和数据分块任务
2022-07-04 14:53:00 【马超的博客】
基于check-point机制的任务状态回滚和数据分块任务 - 问题背景
- 节点TASK
- 关系TASK
- 资料
- 备注
问题背景
基于check-point实现图数据构建任务针对这篇文章提出的方案增加了数据分块操作与任务状态回滚操作。 数据分块:控制加载到内存的数据量,避免占用过多堆内存保证图数据库可靠运行。 任务状态回滚:回滚到构建节点的任务状态,下一次构建节点关系时从回滚点开始操作【构建任务分为节点TASK和关系TASK,任务回滚操作是在关系TASK中进行回滚】。
节点TASK
大致为七步
- 获取检查点时间
- 数据分块-从数据库获取检查点之后最大最小自增ID
- 数据分块-从检查点开始按照指定数据块大小执行数据分块
- 按照指定数据块大小执行数据分块
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态-当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块
WITH olab.ids.batch(min,max,100000) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
WHERE batchFailedSize<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,currentTime,rawCheckPoint;关系TASK
大致为七步
- 获取检查点时间
- 数据分块-从数据库获取检查点之后最大最小自增ID
- 数据分块-从检查点开始按照指定数据块大小执行数据分块
- 按照指定数据块大小执行数据分块
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态-任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// 数据分块-从数据库获取检查点之后最大最小自增ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// 数据分块-从检查点开始按照指定数据块大小执行数据分块
WITH olab.ids.batch(min,max,100000) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// 任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
WHERE batchFailedSize>0
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?',[rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batchFailedSize,rawCheckPoint;资料
上述TASK中提到的过程和函数可以从下面的链接下载:
https://github.com/ongdb-contrib/ongdb-lab-apoc
https://github.com/neo4j-contrib/neo4j-apoc-procedures备注
上述方案在【基于check-point实现图数据构建任务】图数据构建任务基础上补充了任务回滚策略和数据分块操作,对于任务TASK的可用性和性能起到了极大的增强作用。
边栏推荐
- AI system content recommendation issue 24
- Accounting regulations and professional ethics [10]
- 165 webmaster online toolbox website source code / hare online tool system v2.2.7 Chinese version
- Research Report on market supply and demand and strategy of China's plastics and polymer industry
- Application of clock wheel in RPC
- Neuf tendances et priorités du DPI en 2022
- Market trend report, technical innovation and market forecast of China's hair repair therapeutic apparatus
- Accounting regulations and professional ethics [11]
- Big God explains open source buff gain strategy live broadcast
- Accounting regulations and professional ethics [8]
猜你喜欢

Model fusion -- stacking principle and Implementation

Game theory

Interface fonctionnelle, référence de méthode, Widget de tri de liste implémenté par lambda

~89 deformation translation

Common knowledge of unity Editor Extension
![[North Asia data recovery] a database data recovery case where the partition where the database is located is unrecognized due to the RAID disk failure of HP DL380 server](/img/21/513042008483cf21fc66729ae1d41f.jpg)
[North Asia data recovery] a database data recovery case where the partition where the database is located is unrecognized due to the RAID disk failure of HP DL380 server

Talking about Net core how to use efcore to inject multiple instances of a context annotation type for connecting to the master-slave database

TypeError: list indices must be integers or slices, not str

Will the memory of ParticleSystem be affected by maxparticles

Book of night sky 53 "stone soup" of Apache open source community
随机推荐
AI system content recommendation issue 24
C implementation defines a set of intermediate SQL statements that can be executed across libraries
Expression #1 of ORDER BY clause is not in SELECT list, references column ‘d.dept_ no‘ which is not i
Lv166 turned over
Preliminary practice of niuke.com (10)
165 webmaster online toolbox website source code / hare online tool system v2.2.7 Chinese version
Functional interface, method reference, list collection sorting gadget implemented by lambda
Model fusion -- stacking principle and Implementation
Stress, anxiety or depression? Correct diagnosis and retreatment
[native JS] optimized text rotation effect
[North Asia data recovery] a database data recovery case where the partition where the database is located is unrecognized due to the RAID disk failure of HP DL380 server
2021 Google vulnerability reward program review
@EnableAspectAutoJAutoProxy_ Exposeproxy property
Qt---error: ‘QObject‘ is an ambiguous base of ‘MyView‘
Laravel simply realizes Alibaba cloud storage + Baidu AI Cloud image review
[tutorial] yolov5_ DeepSort_ The whole process of pytoch target tracking and detection
Understand asp Net core - Authentication Based on jwtbearer
Common knowledge of unity Editor Extension
[Chongqing Guangdong education] National Open University spring 2019 1248 public sector human resource management reference questions
How to decrypt worksheet protection password in Excel file