当前位置:网站首页>Implement graph data construction task based on check point
Implement graph data construction task based on check point
2022-07-04 16:40:00 【Ma Chao's blog】
be based on check-point Realize the task of graph data construction
The problem background
Figure data schema
Structure design of checkpoint record table
TASK Design
node TASK
Relationship TASK
remarks
The problem background
Extract graph data from relational database , One scenario to consider is the processing of new data 【 Among them, the dependency of task status and data dependency are very important 】. From the perspective of a tool for automatically extracting graph data , The automatic generation script can be docked with the following implementation 【 That is, good design schema Then the following script is automatically generated 】. The design scheme can be seamlessly integrated with the tool of automatically extracting graph data . In the existing Airflow In the scheduling system 【 It can implement scheduling logic by itself or other scheduling systems , The design idea of this paper can be used for reference 】, Can design Task and DAG To complete the processing of incremental data , Complete the continuous update of online data . In the build TASK when , The nodes are designed according to the characteristics of the graph data TASK And relationship TASK, And in the same DAG Execution scheduling in .【DAG The design of can be the processing flow of a certain kind of business data 】 In the following case, it mainly shows the construction design of guarantee relationship diagram data .
Figure data schema
Guarantee relationship schema Exhibition ~
Structure design of checkpoint record table
The checkpoint record table is mainly used to record the processing status of tasks , Implementation nodes TASK And relationship TASK Task status docking . The scheduling system is responsible for executing logical and periodic scheduling ,TASK The dependency between states cannot be realized directly , Need additional implementation ; Data dependencies also require additional implementation . TASK In this case, the dependence of data between is actually through ONgDB Realization ,TASK The dependency between States relies on MySQL To achieve .
CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ' Since the primary key ',
`hcode` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Code :HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' name ',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Association type ',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG Of hcode',
`node_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT ' The node can get the checkpoint time, which can be changed , Relationship TASK You can get the checkpoint time 【 A complete graph data DAG-TASK Must include nodes and relationship building TASK】',
`rel_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT ' Before saving the update node_check_point Value ',
`from_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'from Whether to update checkpoints :0- no ,1- yes 【from and to The same label does not need to use this judgment 】',
`to_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'to Are checkpoints updated :0- no ,1- yes 【from and to The same label does not need to use this judgment 】',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' A specific description of the checkpoint task ',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT ' Synchronize full data CYPHER: Data blocking scheme script ',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT ' Synchronize full data CYPHER: Synchronization script without setting time range ',
`hcreatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ' Creation time ',
`hupdatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT ' Update time ',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' founder ',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT ' Updated by ',
`hisvalid` int(11) NOT NULL DEFAULT 1 COMMENT ' Logical deletion mark :0- Invalid ;1- It works ',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT ' Data source tag ',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE INDEX `unique_key_02`(`hcode`) USING BTREE COMMENT ' unique index ',
UNIQUE INDEX `unique_key_01`(`from`, `to`, `relationship`) USING BTREE COMMENT ' unique index ',
INDEX `updateTime`(`hupdatetime`) USING BTREE,
INDEX `name`(`from`) USING BTREE,
INDEX `hisvalid`(`hisvalid`) USING BTREE,
INDEX `type`(`relationship`) USING BTREE,
INDEX `check_point`(`node_check_point`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 742715628 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'ONgDB DAG TASK Checklist ' ROW_FORMAT = Dynamic;
TASK Design
TASK Design with a CYPHER Statements are designed for units , Every CYPHER Is a complete TASK. One at a time TASK Get the last recorded checkpoint time . Running build relationships TASK The checkpoint time must also be consistent with the checkpoint time of the node 【 Prevent data omission caused by time difference 】.
node TASK
Roughly four steps
- Get checkpoint time
- Definition SQL How to get data
- Perform the build task in batch iterations
- Update task status
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=?\',[check_point])','check_point',check_point) AS sqlData,currentTime,rawCheckPoint
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// When the number of failed packets is less than 1 when 【 That is, all operations are successfully executed 】 Then update the checkpoint 【 to update node_check_point Is the system time 】【rel_check_point Set to before update node_check_point Value 】
WHERE batch.failed<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row RETURN row,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime;
Relationship TASK
There are roughly three steps
- Get checkpoint time 【 Relationship TASK It is not responsible for the update of task status and depends on the node TASK Task status 】
- Definition SQL How to get data
- Perform the build task in batch iterations
// Get checkpoint time 【 Modify when running full data CHECK_POINT The time point of is the earliest time 】【 If the amount of data is higher than the heap memory limit, the data blocking scheme must be used 】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[ guarantee ]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point
// Definition SQL How to get data
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?\',[check_point])','check_point',check_point) AS sqlData
// Batch iteration execution node construction
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r: guarantee ]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;
remarks
In the above, we mainly show how to build data between nodes with the same label , It can be understood as the construction of isomorphic graphs . There are some differences in the dependence of task state in the construction of heterogeneous diagrams , That is, the task status cannot be modified twice , Please look at the next breakdown .
边栏推荐
- Proxifier global agent software, which provides cross platform port forwarding and agent functions
- Scientific research cartoon | what else to do after connecting with the subjects?
- [flask] ORM one to many relationship
- ~89 deformation translation
- System.currentTimeMillis() 和 System.nanoTime() 哪个更快?别用错了!
- Web components series - detailed slides
- Selenium browser (2)
- Hair and fuzz interceptor Industry Research Report - market status analysis and development prospect forecast
- PR FAQ: how to set PR vertical screen sequence?
- Practice: fabric user certificate revocation operation process
猜你喜欢
《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下
What is torch NN?
Move, say goodbye to the past again
DIY a low-cost multi-functional dot matrix clock!
多年锤炼,迈向Kata 3.0 !走进开箱即用的安全容器体验之旅| 龙蜥技术
Principle and general steps of SQL injection
Ten clothing stores have nine losses. A little change will make you buy every day
[North Asia data recovery] data recovery case of database data loss caused by HP DL380 server RAID disk failure
Interface test - knowledge points and common interview questions
The 17 year growth route of Zhang Liang, an open source person, can only be adhered to if he loves it
随机推荐
Talking about Net core how to use efcore to inject multiple instances of a context annotation type for connecting to the master-slave database
Anta is actually a technology company? These operations fool netizens
[North Asia data recovery] a database data recovery case where the disk on which the database is located is unrecognized due to the RAID disk failure of HP DL380 server
Daily notes~
基于check-point实现图数据构建任务
Will the memory of ParticleSystem be affected by maxparticles
Digital recognition system based on OpenCV
[native JS] optimized text rotation effect
L1-072 scratch lottery
Socks agent tools earthworm, ssoks
[flask] ORM one to many relationship
Application and Optimization Practice of redis in vivo push platform
Model fusion -- stacking principle and Implementation
China's plastic processing machinery market trend report, technological innovation and market forecast
Accounting regulations and professional ethics [9]
~89 deformation translation
Application of clock wheel in RPC
Salient map drawing based on OpenCV
Selenium element interaction
error: ‘connect‘ was not declared in this scope connect(timer, SIGNAL(timeout()), this, SLOT(up