当前位置:网站首页>基于check-point实现图数据构建任务
基于check-point实现图数据构建任务
2022-07-04 14:54:00 【马超的博客】
基于check-point实现图数据构建任务
问题背景
图数据schema
检查点记录表结构设计
TASK设计
节点TASK
关系TASK
备注
问题背景
从关系数据库抽取图数据,需要考虑的一个场景是新增数据的处理【其中任务状态的依赖与数据依赖关系非常重要】。从一个自动化抽取图数据的工具角度来说,自动化生成脚本可以与如下实现完成对接【即设计好schema之后自动生成如下脚本】。该设计方案可以与自动化抽取图数据的工具无缝集成。 在现有的Airflow调度系统中【可以自行实现调度逻辑或者可以是其它的调度系统,本文的设计思路可以借鉴】,可以设计Task和DAG来完整增量数据的处理,完成线上数据的持续更新需求。在构建TASK时,按照图数据的特点设计了节点TASK和关系TASK,并在同一个DAG中执行调度。【DAG的设计可以是某一类业务数据的处理流程】在下面的案例中主要展示了担保关系图数据的构建设计。
图数据schema
担保关系schema展示~
检查点记录表结构设计
检查点记录表主要用来记录任务的处理状态,实现节点TASK和关系TASK的任务状态对接。 调度系统负责执行逻辑和周期性调度,TASK之间状态的依赖无法直接实现,需要借助额外实现;数据依赖关系也需要额外实现。 TASK之间数据的依赖在这个案例中其实是借助ONgDB实现,TASK之间状态的依赖借助了MySQL来实现。
CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`hcode` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '代码:HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '名称',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '关联类型',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG的hcode',
`node_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT '节点可以获取检查点时间可更改,关系TASK可以获取检查点时间【一个完整的图数据DAG-TASK必须包含节点和关系构建TASK】',
`rel_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT '保存更新前node_check_point的值',
`from_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'from是否更新检查点:0-否,1-是【from和to是一样的标签则不需要使用此判断】',
`to_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'to是否更新了检查点:0-否,1-是【from和to是一样的标签则不需要使用此判断】',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对该检查点任务的具体描述',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '同步全量数据的CYPHER:数据分块方案脚本',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '同步全量数据的CYPHER:不设置时间范围的同步脚本',
`hcreatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`hupdatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '创建人',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '更新人',
`hisvalid` int(11) NOT NULL DEFAULT 1 COMMENT '逻辑删除标记:0-无效;1-有效',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT '数据来源标记',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE INDEX `unique_key_02`(`hcode`) USING BTREE COMMENT '唯一索引',
UNIQUE INDEX `unique_key_01`(`from`, `to`, `relationship`) USING BTREE COMMENT '唯一索引',
INDEX `updateTime`(`hupdatetime`) USING BTREE,
INDEX `name`(`from`) USING BTREE,
INDEX `hisvalid`(`hisvalid`) USING BTREE,
INDEX `type`(`relationship`) USING BTREE,
INDEX `check_point`(`node_check_point`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 742715628 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'ONgDB DAG TASK检查点记录表' ROW_FORMAT = Dynamic;
TASK设计
TASK在设计时以一个CYPHER语句为单位设计,每个CYPHER都是一个完整的TASK。每次执行一个TASK时都获取上一次记录的检查点时间。在运行构建关系的TASK时检查点也必须与节点的检查点时间一致【防止时间差导致的数据遗漏】。
节点TASK
大致为四步
- 获取检查点时间
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=?\',[check_point])','check_point',check_point) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WHERE batch.failed<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime;
关系TASK
大致为三步
- 获取检查点时间【关系TASK不负责任务状态的更新依赖节点TASK的任务状态】
- 定义SQL获取数据方式
- 批量迭代执行构建任务
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?\',[check_point])','check_point',check_point) AS sqlData
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;
备注
在上文中主要展示了构建具有相同标签的节点之间的数据,可理解为同构图的构建。异构图的构建方式在任务状态的依赖上有一些区别,即任务状态不可被二次修改,请看下回分解。
边栏推荐
- 时钟轮在 RPC 中的应用
- Selenium browser (2)
- The new generation of domestic ORM framework sagacity sqltoy-5.1.25 release
- C language: implementation of daffodil number function
- 对人胜率84%,DeepMind AI首次在西洋陆军棋中达到人类专家水平
- Unity script API - component component
- Research Report on market supply and demand and strategy of tetramethylpyrazine industry in China
- Accounting regulations and professional ethics [7]
- Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
- Audio and video technology development weekly | 252
猜你喜欢
QT graphical view frame: element movement
Unity script lifecycle day02
Application of clock wheel in RPC
Ten clothing stores have nine losses. A little change will make you buy every day
时钟轮在 RPC 中的应用
Actual combat | use composite material 3 in application
Penetration test --- database security: detailed explanation of SQL injection into database principle
Anta is actually a technology company? These operations fool netizens
Communication mode based on stm32f1 single chip microcomputer
Vscode setting outline shortcut keys to improve efficiency
随机推荐
Common knowledge of unity Editor Extension
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
How to decrypt worksheet protection password in Excel file
AutoCAD - set color
. Net delay queue
Socks agent tools earthworm, ssoks
What encryption algorithm is used for the master password of odoo database?
Summary of database 2
实战:fabric 用户证书吊销操作流程
Accounting regulations and professional ethics [7]
Go deep into the details of deconstruction and assignment of several data types in JS
Using celery in projects
C language: implementation of daffodil number function
Preliminary practice of niuke.com (10)
Find numbers
Game theory
Web components series - detailed slides
Talking about Net core how to use efcore to inject multiple instances of a context annotation type for connecting to the master-slave database
多年锤炼,迈向Kata 3.0 !走进开箱即用的安全容器体验之旅| 龙蜥技术
Market trend report, technical innovation and market forecast of tetrabromophthalate (pht4 diol) in China