当前位置:网站首页>Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
2022-07-04 16:41:00 【Ma Chao's blog】
@TOC[1]
The problem background
Large scale repeated and concurrent write operations will cause the graph database service to accumulate a large number of write requests , Lead to service performance degradation or even downtime . therefore TASK The design of locking mechanism is very important , It must be ensured that the write task cannot be repeated at the same time ; The design of checkpoint mechanism ensures the consistency and integrity of data synchronization ;TASK Taking up too much system memory, especially when processing a large amount of data, the graph database service will have the risk of downtime , The design of data blocking scheme avoids this problem very well .
CYPHER-TASK Design
Isomorphic graph
• Each task needs to acquire the lock and then execute the data construction logic , Whether or not the build logic is successfully executed TASK The lock must be released at the end •[NODE-TASK] Responsible for locking node_check-point Update and follow-up tasks rel_check_point Sync •[REL-TASK] be responsible for node_check-point Rollback and task state synchronization rel_check_point=node_check_point
# TASK Execute the process
[NODE-TASK]->[REL-TASK]
Heterogeneous graph
• Each task needs to acquire the lock and then execute the data construction logic , Whether or not the build logic is successfully executed TASK The lock must be released at the end •[FROM-NODE-TASK] Responsible for locking node_check-point Update and follow-up tasks rel_check_point Sync •[TO-NODE-TASK] be responsible for node_check-point Roll back of •[REL-TASK] be responsible for node_check-point Rollback and task state synchronization rel_check_point=node_check_point
# TASK Execute the process
[FROM-NODE-TASK]->[TO-NODE-TASK]->[REL-TASK]
check-point Table structure design
The task status table is responsible for saving nodes TASK And relationship TASK Task status , Realize the transfer of task status on the whole task flow , At the same time, ensure data consistency and integrity .
CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ' Since the primary key ',
`hcode` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Code :HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' name ',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Association type ',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG Of hcode',
`node_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT ' The node can get the checkpoint time, which can be changed , Relationship TASK You can get the checkpoint time 【 A complete graph data DAG-TASK Must include nodes and relationship building TASK】',
`rel_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT ' Before saving the update node_check_point Value ',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' A specific description of the checkpoint task ',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT ' Synchronize full data CYPHER: Data blocking scheme script ',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT ' Synchronize full data CYPHER: Synchronization script without setting time range ',
`hcreatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' Creation time ',
`hupdatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ' Update time ',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' founder ',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Updated by ',
`hisvalid` int(11) NOT NULL DEFAULT '1' COMMENT ' Logical deletion mark :0- Invalid ;1- It works ',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT ' Data source tag ',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE KEY `unique_key_02` (`hcode`) USING BTREE COMMENT ' unique index ',
UNIQUE KEY `unique_key_01` (`from`,`to`,`relationship`) USING BTREE COMMENT ' unique index ',
KEY `updateTime` (`hupdatetime`) USING BTREE,
KEY `name` (`from`) USING BTREE,
KEY `hisvalid` (`hisvalid`) USING BTREE,
KEY `type` (`relationship`) USING BTREE,
KEY `check_point` (`node_check_point`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=742715632 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='ONgDB DAG TASK Checklist ';
task-lock Table structure design
【ONGDB_TASK_CHECK_POINT_LOCK】 The task status lock table is responsible for locking the task status , Guarantee TASK Uniqueness of runtime .
Task module deconstruction
Data chunking
Control the amount of data loaded into memory , Avoid occupying too much heap memory to ensure the reliable operation of the graph database .
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value
Task status rollback
Rollback to the task state of the build node , The next time the node relationship is built, the operation task starts from the rollback point and runs from the node TASK Start .
// When the number of failed packets is greater than 0 when , Roll back node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
Task status synchronization
Relationship TASK-CHECK-POINT And nodes TASK-CHECK-POINT State synchronization .
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
Task status lock
【 Figure data construction task status lock 】【 Guarantee the relationship at a certain moment DAG in TASK Uniqueness of operation 】.• Get the lock
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
• Release the lock
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
Complete implementation case
Isomorphic graph
node TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV001 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
Relationship TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV001 {hcode:row.from}),(to:HORGGuaranteeV001 {hcode:row.to}) MERGE (from)-[r: guarantee ]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[ guarantee ]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
Heterogeneous graph
Of heterogeneous graphs TASK Execute the process :(FROM-TASK)->(TO-TASK)->(REL-TASK), among (TO-TASK) Only responsible for node_check_point Roll back of , Not responsible for node_check_point Update operation and task status synchronization operation .
node TASK
•FROM node TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT org_hcode AS hcode,org_name AS name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HBondOrg {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
•TO node TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode,name,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HEventBond {hcode:row.hcode}) SET n+=row', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is greater than 0 when , Roll back node_check_point
WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
Relationship TASK
// =========================== Acquire the lock and execute TASK===========================
// Get the task lock and lock the task
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
// Data chunking - The maximum and minimum self increment after obtaining checkpoints from the database ID
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,' Issuance of securities ']) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
// Data chunking - Perform data blocking according to the specified data block size from the checkpoint 【 Set a default block , Ensure that the lock can be released smoothly 】
WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
UNWIND value AS bactIdList
WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode AS `to`,org_hcode AS `from`,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\' Issuance of securities \'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HBondOrg {hcode:row.org_hcode}),(to:HEventBond {hcode:row.hcode}) MERGE (from)-[r: Issuance of securities ]->(to) SET r+=olab.reset.map(row,[\'from\',\'to\'])', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
// batchFailedSize>0 Then the task state rolls back 【 Roll back the task status when any task of batch building relationship fails 】【 Roll back : Set up node_check_point Equal to the current rel_check_point】
// batchFailedSize<=0【 If the execution is successful, the node TASK And relationships TASK State synchronization 】 If it is executed normally, it will be updated rel_check_point=node_check_point
WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
// Release the lock 【TASK End the operation of releasing lock 】【 Set a default block at the data block , Ensure the smooth implementation of the lock release operation 】
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[ Issuance of securities ]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
remarks
Through the deconstruction analysis of graph data task , You can imagine CYPHER The automatic generation of scripts is completely achievable . It is very valuable to further design and implement a system to automatically extract graph data . From the graph data schema Design , To data model recommendation and graph data automatic extraction , It can greatly liberate the energy of data engineers .
# Figure database related plug-in package download
https://github.com/neo4j-contrib/neo4j-apoc-procedures
https://github.com/ongdb-contrib/ongdb-lab-apoc
• Related articles
be based on check-point Realize the task of graph data construction be based on check-point The task state rollback and data blocking tasks of the mechanism
References
[1]
TOC: Isomorphic graph and heterogeneous graph CYPHER-TASK Design and TASK Locking mechanism [2]
be based on check-point Realize the task of graph data construction : https://yc-ma.blog.csdn.net/article/details/112055402[3]
be based on check-point The task state rollback and data blocking tasks of the mechanism : https://yc-ma.blog.csdn.net/article/details/112200819
边栏推荐
- 高度剩余法
- China's plastic processing machinery market trend report, technological innovation and market forecast
- How can floating point numbers be compared with 0?
- Selenium browser (2)
- 对人胜率84%,DeepMind AI首次在西洋陆军棋中达到人类专家水平
- .Net 应用考虑x64生成
- MFC implementation of ACM basic questions encoded by the number of characters
- Essential basic knowledge of digital image processing
- Final consistency of MESI cache in CPU -- why does CPU need cache
- Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
猜你喜欢
PR FAQ: how to set PR vertical screen sequence?
Function test - knowledge points and common interview questions
Common knowledge of unity Editor Extension
Interface fonctionnelle, référence de méthode, Widget de tri de liste implémenté par lambda
[North Asia data recovery] data recovery case of database data loss caused by HP DL380 server RAID disk failure
Principle and general steps of SQL injection
The vscode waveform curve prompts that the header file cannot be found (an error is reported if the header file exists)
error: ‘connect‘ was not declared in this scope connect(timer, SIGNAL(timeout()), this, SLOT(up
What is torch NN?
Will the memory of ParticleSystem be affected by maxparticles
随机推荐
Object distance measurement of stereo vision
std::shared_ ptr initialization: make_ shared&lt; Foo&gt; () vs shared_ ptr&lt; T&gt; (new Foo) [duplicate]
Research Report on market supply and demand and strategy of China's four sided flat bag industry
L1-072 scratch lottery
Final consistency of MESI cache in CPU -- why does CPU need cache
TypeError: list indices must be integers or slices, not str
D3D11_ Chili_ Tutorial (2): draw a triangle
Accounting regulations and professional ethics [11]
Market trend report, technical innovation and market forecast of taillight components in China
ONgDB图数据库与Spark的集成
Daily notes~
Research Report on market supply and demand and strategy of China's well completion equipment industry
Variable cannot have type 'void'
Market trend report, technical innovation and market forecast of tetrabromophthalate (pht4 diol) in China
Redis' optimistic lock and pessimistic lock for solving transaction conflicts
China's roof ladder market trend report, technological innovation and market forecast
One question per day 540 A single element in an ordered array
Working group and domain analysis of Intranet
Application of clock wheel in RPC
Communication mode based on stm32f1 single chip microcomputer