当前位置:网站首页>Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs

Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs

2022-07-04 16:41:00 Ma Chao's blog

@TOC[1]

The problem background

Large scale repeated and concurrent write operations will cause the graph database service to accumulate a large number of write requests , Lead to service performance degradation or even downtime . therefore TASK The design of locking mechanism is very important , It must be ensured that the write task cannot be repeated at the same time ; The design of checkpoint mechanism ensures the consistency and integrity of data synchronization ;TASK Taking up too much system memory, especially when processing a large amount of data, the graph database service will have the risk of downtime , The design of data blocking scheme avoids this problem very well .

CYPHER-TASK Design

Isomorphic graph

• Each task needs to acquire the lock and then execute the data construction logic , Whether or not the build logic is successfully executed TASK The lock must be released at the end •[NODE-TASK] Responsible for locking node_check-point Update and follow-up tasks rel_check_point Sync •[REL-TASK] be responsible for node_check-point Rollback and task state synchronization rel_check_point=node_check_point

# TASK Execute the process 
[NODE-TASK]->[REL-TASK]

Heterogeneous graph

• Each task needs to acquire the lock and then execute the data construction logic , Whether or not the build logic is successfully executed TASK The lock must be released at the end •[FROM-NODE-TASK] Responsible for locking node_check-point Update and follow-up tasks rel_check_point Sync •[TO-NODE-TASK] be responsible for node_check-point Roll back of •[REL-TASK] be responsible for node_check-point Rollback and task state synchronization rel_check_point=node_check_point

# TASK Execute the process 
[FROM-NODE-TASK]->[TO-NODE-TASK]->[REL-TASK]

check-point Table structure design

The task status table is responsible for saving nodes TASK And relationship TASK Task status , Realize the transfer of task status on the whole task flow , At the same time, ensure data consistency and integrity .

CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ' Since the primary key ',
`hcode` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Code :HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' name ',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Association type ',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG Of hcode',
`node_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT ' The node can get the checkpoint time, which can be changed , Relationship TASK You can get the checkpoint time 【 A complete graph data DAG-TASK Must include nodes and relationship building TASK】',
`rel_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT ' Before saving the update node_check_point Value ',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' A specific description of the checkpoint task ',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT ' Synchronize full data CYPHER: Data blocking scheme script ',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT ' Synchronize full data CYPHER: Synchronization script without setting time range ',
`hcreatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' Creation time ',
`hupdatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ' Update time ',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' founder ',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Updated by ',
`hisvalid` int(11) NOT NULL DEFAULT '1' COMMENT ' Logical deletion mark :0- Invalid ;1- It works ',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT ' Data source tag ',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE KEY `unique_key_02` (`hcode`) USING BTREE COMMENT ' unique index ',
UNIQUE KEY `unique_key_01` (`from`,`to`,`relationship`) USING BTREE COMMENT ' unique index ',
KEY `updateTime` (`hupdatetime`) USING BTREE,
KEY `name` (`from`) USING BTREE,
KEY `hisvalid` (`hisvalid`) USING BTREE,
KEY `type` (`relationship`) USING BTREE,
KEY `check_point` (`node_check_point`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=742715632 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='ONgDB DAG TASK Checklist ';

task-lock Table structure design

ONGDB_TASK_CHECK_POINT_LOCK】 The task status lock table is responsible for locking the task status , Guarantee TASK Uniqueness of runtime .

Task module deconstruction

Data chunking

Control the amount of data loaded into memory , Avoid occupying too much heap memory to ensure the reliable operation of the graph database .

//  Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value

Task status rollback

Rollback to the task state of the build node , The next time the node relationship is built, the operation task starts from the rollback point and runs from the node TASK Start .

//  When the number of failed packets is greater than 0 when , Roll back node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint

Task status synchronization

Relationship TASK-CHECK-POINT And nodes TASK-CHECK-POINT State synchronization .

// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint

Task status lock

【 Figure data construction task status lock 】【 Guarantee the relationship at a certain moment DAG in TASK Uniqueness of operation 】.• Get the lock

//  Get the task lock and lock the task 
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock

• Release the lock

//  Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

Complete implementation case

Isomorphic graph

node TASK

// =========================== Acquire the lock and execute TASK===========================
//  Get the task lock and lock the task 
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
//  Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
//  Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
//  Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
//  Definition SQL How to get data 
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
//  Batch iteration execution node construction 
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV001 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
//  When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
//  Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;

Relationship TASK

// =========================== Acquire the lock and execute TASK===========================
//  Get the task lock and lock the task 
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
//  Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
//  Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
//  Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
//  Definition SQL How to get data 
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
//  Batch iteration execution node construction 
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV001 {hcode:row.from}),(to:HORGGuaranteeV001 {hcode:row.to}) MERGE (from)-[r: guarantee ]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
//  Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

Heterogeneous graph

Of heterogeneous graphs TASK Execute the process :(FROM-TASK)->(TO-TASK)->(REL-TASK), among (TO-TASK) Only responsible for node_check_point Roll back of , Not responsible for node_check_point Update operation and task status synchronization operation .

node TASK

•FROM node TASK

// =========================== Acquire the lock and execute TASK===========================
//  Get the task lock and lock the task 
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
//  Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
//  Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
//  Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
//  Definition SQL How to get data 
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT org_hcode AS hcode,org_name AS name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
//  Batch iteration execution node construction 
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HBondOrg {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
//  When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
//  Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;

•TO node TASK

// =========================== Acquire the lock and execute TASK===========================
//  Get the task lock and lock the task 
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
//  Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
//  Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
//  Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
//  Definition SQL How to get data 
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode,name,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
//  Batch iteration execution node construction 
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HEventBond {hcode:row.hcode}) SET n+=row', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
//  When the number of failed packets is greater than 0 when , Roll back node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
//  Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;

Relationship TASK

// =========================== Acquire the lock and execute TASK===========================
//  Get the task lock and lock the task 
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
//  Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
//  Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
//  Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
//  Definition SQL How to get data 
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode AS `to`,org_hcode AS `from`,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
//  Batch iteration execution node construction 
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HBondOrg {hcode:row.org_hcode}),(to:HEventBond {hcode:row.hcode}) MERGE (from)-[r: Issuance of securities ]->(to) SET r+=olab.reset.map(row,[\'from\',\'to\'])', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
//  Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

remarks

Through the deconstruction analysis of graph data task , You can imagine CYPHER The automatic generation of scripts is completely achievable . It is very valuable to further design and implement a system to automatically extract graph data . From the graph data schema Design , To data model recommendation and graph data automatic extraction , It can greatly liberate the energy of data engineers .

#  Figure database related plug-in package download 
https://github.com/neo4j-contrib/neo4j-apoc-procedures
https://github.com/ongdb-contrib/ongdb-lab-apoc

• Related articles

be based on check-point Realize the task of graph data construction be based on check-point The task state rollback and data blocking tasks of the mechanism

References

[1] TOC: Isomorphic graph and heterogeneous graph CYPHER-TASK Design and TASK Locking mechanism [2] be based on check-point Realize the task of graph data construction : https://yc-ma.blog.csdn.net/article/details/112055402[3] be based on check-point The task state rollback and data blocking tasks of the mechanism : https://yc-ma.blog.csdn.net/article/details/112200819

原网站

版权声明
本文为[Ma Chao's blog]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/185/202207041452570466.html