当前位置:网站首页>基于check-point实现图数据构建任务
基于check-point实现图数据构建任务
2022-07-04 14:54:00 【马超的博客】
基于check-point实现图数据构建任务
问题背景
图数据schema
检查点记录表结构设计
TASK设计
节点TASK
关系TASK
备注
问题背景
从关系数据库抽取图数据,需要考虑的一个场景是新增数据的处理【其中任务状态的依赖与数据依赖关系非常重要】。从一个自动化抽取图数据的工具角度来说,自动化生成脚本可以与如下实现完成对接【即设计好schema之后自动生成如下脚本】。该设计方案可以与自动化抽取图数据的工具无缝集成。 在现有的Airflow调度系统中【可以自行实现调度逻辑或者可以是其它的调度系统,本文的设计思路可以借鉴】,可以设计Task和DAG来完整增量数据的处理,完成线上数据的持续更新需求。在构建TASK时,按照图数据的特点设计了节点TASK和关系TASK,并在同一个DAG中执行调度。【DAG的设计可以是某一类业务数据的处理流程】在下面的案例中主要展示了担保关系图数据的构建设计。
图数据schema
担保关系schema展示~
检查点记录表结构设计
检查点记录表主要用来记录任务的处理状态,实现节点TASK和关系TASK的任务状态对接。 调度系统负责执行逻辑和周期性调度,TASK之间状态的依赖无法直接实现,需要借助额外实现;数据依赖关系也需要额外实现。 TASK之间数据的依赖在这个案例中其实是借助ONgDB实现,TASK之间状态的依赖借助了MySQL来实现。
CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`hcode` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '代码:HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '名称',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '关联类型',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG的hcode',
`node_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT '节点可以获取检查点时间可更改,关系TASK可以获取检查点时间【一个完整的图数据DAG-TASK必须包含节点和关系构建TASK】',
`rel_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT '保存更新前node_check_point的值',
`from_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'from是否更新检查点:0-否,1-是【from和to是一样的标签则不需要使用此判断】',
`to_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'to是否更新了检查点:0-否,1-是【from和to是一样的标签则不需要使用此判断】',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对该检查点任务的具体描述',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '同步全量数据的CYPHER:数据分块方案脚本',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '同步全量数据的CYPHER:不设置时间范围的同步脚本',
`hcreatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`hupdatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '创建人',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '更新人',
`hisvalid` int(11) NOT NULL DEFAULT 1 COMMENT '逻辑删除标记:0-无效;1-有效',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT '数据来源标记',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE INDEX `unique_key_02`(`hcode`) USING BTREE COMMENT '唯一索引',
UNIQUE INDEX `unique_key_01`(`from`, `to`, `relationship`) USING BTREE COMMENT '唯一索引',
INDEX `updateTime`(`hupdatetime`) USING BTREE,
INDEX `name`(`from`) USING BTREE,
INDEX `hisvalid`(`hisvalid`) USING BTREE,
INDEX `type`(`relationship`) USING BTREE,
INDEX `check_point`(`node_check_point`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 742715628 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'ONgDB DAG TASK检查点记录表' ROW_FORMAT = Dynamic;
TASK设计
TASK在设计时以一个CYPHER语句为单位设计,每个CYPHER都是一个完整的TASK。每次执行一个TASK时都获取上一次记录的检查点时间。在运行构建关系的TASK时检查点也必须与节点的检查点时间一致【防止时间差导致的数据遗漏】。
节点TASK
大致为四步
- 获取检查点时间
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=?\',[check_point])','check_point',check_point) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WHERE batch.failed<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime;
关系TASK
大致为三步
- 获取检查点时间【关系TASK不负责任务状态的更新依赖节点TASK的任务状态】
- 定义SQL获取数据方式
- 批量迭代执行构建任务
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?\',[check_point])','check_point',check_point) AS sqlData
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;
备注
在上文中主要展示了构建具有相同标签的节点之间的数据,可理解为同构图的构建。异构图的构建方式在任务状态的依赖上有一些区别,即任务状态不可被二次修改,请看下回分解。
边栏推荐
- Unity script API - component component
- 函數式接口,方法引用,Lambda實現的List集合排序小工具
- Accounting regulations and professional ethics [11]
- Statistical learning: logistic regression and cross entropy loss (pytoch Implementation)
- Rearrange array
- The four most common errors when using pytorch
- 《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下
- TypeError: not enough arguments for format string
- Neuf tendances et priorités du DPI en 2022
- Research Report on market supply and demand and strategy of surgical stapler industry in China
猜你喜欢
MySQL learning notes - data type (2)
对人胜率84%,DeepMind AI首次在西洋陆军棋中达到人类专家水平
L1-072 scratch lottery
[native JS] optimized text rotation effect
What is torch NN?
PR FAQ: how to set PR vertical screen sequence?
Filtered off site request to
Big God explains open source buff gain strategy live broadcast
Cut! 39 year old Ali P9, saved 150million
Game theory
随机推荐
Unity prefab day04
Web components series - detailed slides
科普达人丨一文看懂阿里云的秘密武器“神龙架构”
Ten clothing stores have nine losses. A little change will make you buy every day
JS to realize the countdown function
China Indonesia adhesive market trend report, technological innovation and market forecast
China tall oil fatty acid market trend report, technical dynamic innovation and market forecast
多年锤炼,迈向Kata 3.0 !走进开箱即用的安全容器体验之旅| 龙蜥技术
Research Report on market supply and demand and strategy of tetramethylpyrazine industry in China
[Chongqing Guangdong education] National Open University spring 2019 1248 public sector human resource management reference questions
TypeError: not enough arguments for format string
Summary of database 2
Digital recognition system based on OpenCV
Market trend report, technical innovation and market forecast of tetrabromophthalate (pht4 diol) in China
Overview of convolutional neural network structure optimization
Common knowledge of unity Editor Extension
Change the mouse pointer on ngclick - change the mouse pointer on ngclick
Research Report on surgical otorhinolaryngology equipment industry - market status analysis and development prospect prediction
ECCV 2022放榜了:1629篇论文中选,录用率不到20%
After the eruption of Tonga volcano, we analyzed the global volcanic distribution and found that the area with the most volcanoes is here!