当前位置:网站首页>Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
2022-07-04 16:41:00 【Ma Chao's blog】
@TOC[1]
The problem background
Large scale repeated and concurrent write operations will cause the graph database service to accumulate a large number of write requests , Lead to service performance degradation or even downtime . therefore TASK The design of locking mechanism is very important , It must be ensured that the write task cannot be repeated at the same time ; The design of checkpoint mechanism ensures the consistency and integrity of data synchronization ;TASK Taking up too much system memory, especially when processing a large amount of data, the graph database service will have the risk of downtime , The design of data blocking scheme avoids this problem very well .
CYPHER-TASK Design
Isomorphic graph
• Each task needs to acquire the lock and then execute the data construction logic , Whether or not the build logic is successfully executed TASK The lock must be released at the end •[NODE-TASK] Responsible for locking node_check-point Update and follow-up tasks rel_check_point Sync •[REL-TASK] be responsible for node_check-point Rollback and task state synchronization rel_check_point=node_check_point
# TASK Execute the process
[NODE-TASK]->[REL-TASK]
Heterogeneous graph
• Each task needs to acquire the lock and then execute the data construction logic , Whether or not the build logic is successfully executed TASK The lock must be released at the end •[FROM-NODE-TASK] Responsible for locking node_check-point Update and follow-up tasks rel_check_point Sync •[TO-NODE-TASK] be responsible for node_check-point Roll back of •[REL-TASK] be responsible for node_check-point Rollback and task state synchronization rel_check_point=node_check_point
# TASK Execute the process
[FROM-NODE-TASK]->[TO-NODE-TASK]->[REL-TASK]
check-point Table structure design
The task status table is responsible for saving nodes TASK And relationship TASK Task status , Realize the transfer of task status on the whole task flow , At the same time, ensure data consistency and integrity .
CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ' Since the primary key ',
`hcode` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Code :HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' name ',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Association type ',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG Of hcode',
`node_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT ' The node can get the checkpoint time, which can be changed , Relationship TASK You can get the checkpoint time 【 A complete graph data DAG-TASK Must include nodes and relationship building TASK】',
`rel_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT ' Before saving the update node_check_point Value ',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' A specific description of the checkpoint task ',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT ' Synchronize full data CYPHER: Data blocking scheme script ',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT ' Synchronize full data CYPHER: Synchronization script without setting time range ',
`hcreatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' Creation time ',
`hupdatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ' Update time ',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' founder ',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Updated by ',
`hisvalid` int(11) NOT NULL DEFAULT '1' COMMENT ' Logical deletion mark :0- Invalid ;1- It works ',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT ' Data source tag ',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE KEY `unique_key_02` (`hcode`) USING BTREE COMMENT ' unique index ',
UNIQUE KEY `unique_key_01` (`from`,`to`,`relationship`) USING BTREE COMMENT ' unique index ',
KEY `updateTime` (`hupdatetime`) USING BTREE,
KEY `name` (`from`) USING BTREE,
KEY `hisvalid` (`hisvalid`) USING BTREE,
KEY `type` (`relationship`) USING BTREE,
KEY `check_point` (`node_check_point`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=742715632 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='ONgDB DAG TASK Checklist ';
task-lock Table structure design
【ONGDB_TASK_CHECK_POINT_LOCK】 The task status lock table is responsible for locking the task status , Guarantee TASK Uniqueness of runtime .
Task module deconstruction
Data chunking
Control the amount of data loaded into memory , Avoid occupying too much heap memory to ensure the reliable operation of the graph database .
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value
Task status rollback
Rollback to the task state of the build node , The next time the node relationship is built, the operation task starts from the rollback point and runs from the node TASK Start .
// When the number of failed packets is greater than 0 when , Roll back node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
Task status synchronization
Relationship TASK-CHECK-POINT And nodes TASK-CHECK-POINT State synchronization .
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
Task status lock
【 Figure data construction task status lock 】【 Guarantee the relationship at a certain moment DAG in TASK Uniqueness of operation 】.• Get the lock
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
• Release the lock
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
Complete implementation case
Isomorphic graph
node TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV001 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
Relationship TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV001 {hcode:row.from}),(to:HORGGuaranteeV001 {hcode:row.to}) MERGE (from)-[r: guarantee ]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
Heterogeneous graph
Of heterogeneous graphs TASK Execute the process :(FROM-TASK)->(TO-TASK)->(REL-TASK), among (TO-TASK) Only responsible for node_check_point Roll back of , Not responsible for node_check_point Update operation and task status synchronization operation .
node TASK
•FROM node TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT org_hcode AS hcode,org_name AS name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HBondOrg {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
•TO node TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode,name,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HEventBond {hcode:row.hcode}) SET n+=row', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is greater than 0 when , Roll back node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
Relationship TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode AS `to`,org_hcode AS `from`,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HBondOrg {hcode:row.org_hcode}),(to:HEventBond {hcode:row.hcode}) MERGE (from)-[r: Issuance of securities ]->(to) SET r+=olab.reset.map(row,[\'from\',\'to\'])', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
remarks
Through the deconstruction analysis of graph data task , You can imagine CYPHER The automatic generation of scripts is completely achievable . It is very valuable to further design and implement a system to automatically extract graph data . From the graph data schema Design , To data model recommendation and graph data automatic extraction , It can greatly liberate the energy of data engineers .
# Figure database related plug-in package download
https://github.com/neo4j-contrib/neo4j-apoc-procedures
https://github.com/ongdb-contrib/ongdb-lab-apoc
• Related articles
be based on check-point Realize the task of graph data construction be based on check-point The task state rollback and data blocking tasks of the mechanism
References
[1]
TOC: Isomorphic graph and heterogeneous graph CYPHER-TASK Design and TASK Locking mechanism [2]
be based on check-point Realize the task of graph data construction : https://yc-ma.blog.csdn.net/article/details/112055402[3]
be based on check-point The task state rollback and data blocking tasks of the mechanism : https://yc-ma.blog.csdn.net/article/details/112200819
边栏推荐
- Sql实现Split
- A trap used by combinelatest and a debouncetime based solution
- Intranet penetrating FRP: hidden communication tunnel technology
- 科普达人丨一文看懂阿里云的秘密武器“神龙架构”
- DC-2靶场搭建及渗透实战详细过程(DC靶场系列)
- Hidden communication tunnel technology: intranet penetration tool NPS
- Final consistency of MESI cache in CPU -- why does CPU need cache
- Change the mouse pointer on ngclick - change the mouse pointer on ngclick
- Opencv learning -- arithmetic operation of image of basic operation
- .Net 应用考虑x64生成
猜你喜欢
error: ‘connect‘ was not declared in this scope connect(timer, SIGNAL(timeout()), this, SLOT(up
I let the database lock the table! Almost fired!
Common knowledge of unity Editor Extension
What is torch NN?
Model fusion -- stacking principle and Implementation
How was MP3 born?
[tutorial] yolov5_ DeepSort_ The whole process of pytoch target tracking and detection
《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下
时钟轮在 RPC 中的应用
Intranet penetrating FRP: hidden communication tunnel technology
随机推荐
Application and Optimization Practice of redis in vivo push platform
Variable cannot have type 'void'
The 17 year growth route of Zhang Liang, an open source person, can only be adhered to if he loves it
The vscode waveform curve prompts that the header file cannot be found (an error is reported if the header file exists)
Redis: SDS source code analysis
TypeError: not enough arguments for format string
China's roof ladder market trend report, technological innovation and market forecast
js中的数组筛选fliter
Hair growth shampoo industry Research Report - market status analysis and development prospect forecast
Final consistency of MESI cache in CPU -- why does CPU need cache
Research Report on market supply and demand and strategy of China's plastics and polymer industry
I let the database lock the table! Almost fired!
Oracle监听器Server端与Client端配置实例
基于check-point机制的任务状态回滚和数据分块任务
嵌入式软件架构设计-函数调用
Common knowledge of unity Editor Extension
线程池的使用和原理
Stew in disorder
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
Qt---error: ‘QObject‘ is an ambiguous base of ‘MyView‘