当前位置:网站首页>Nebula Importer 数据导入实践
Nebula Importer 数据导入实践
2022-07-05 01:09:00 【NebulaGraph】
前言
Nebula 目前作为较为成熟的产品,已经有着很丰富的生态。数据导入的维度而言就已经提供了多种选择。有大而全的Nebula Exchange,小而精简的Nebula Importer, 还有为 Spark / Flink 引擎提供的Nebula Spark Connector 和 Nebula Flink Connector。
在众多的导入方式中,究竟哪种较为方便呢?
使用场景介绍:
- Nebula Exchange
- 需要将来自 Kafka、Pulsar 平台的流式数据, 导入 Nebula Graph 数据库
- 需要从关系型数据库(如 MySQL)或者分布式文件系统(如 HDFS)中读取批式数据
- 需要将大批量数据生成 Nebula Graph 能识别的 SST 文件
- Nebula Importer
- Importer 适用于将本地 CSV 文件的内容导入至 Nebula Graph 中
- Nebula Spark Connector:
- 在不同的 Nebula Graph 集群之间迁移数据
- 在同一个 Nebula Graph 集群内不同图空间之间迁移数据
- Nebula Graph 与其他数据源之间迁移数据
- 结合 Nebula Algorithm 进行图计算
- Nebula Flink Connector
- 在不同的 Nebula Graph 集群之间迁移数据
- 在同一个 Nebula Graph 集群内不同图空间之间迁移数据
- Nebula Graph 与其他数据源之间迁移数据
以上摘自 Nebula 官方文档:https://docs.nebula-graph.com.cn/2.6.2/1.introduction/1.what-is-nebula-graph/
总体来说,Exchange 大而全,可以和大部分的存储引擎结合,导入到 Nebula 中,但是需要部署Spark 环境。
Importer 使用简单,所需依赖较少,但需要自己提前生成数据文件,配置好 schema 一劳永逸,但是不支持断点续传,适合数据量中等。
Spark / Flink Connector 需要和流数据结合。
不同的场景选择不同的工具,如果作为新人使用 Nebula 在导入数据时,建议使用 Nebula Importer 工具,简单快速上手。
Nebula Importer 的使用
在我们接触 Nebula Graph 初期,当时生态不够完善, 加上只有部分业务迁移到 Nebula Graph 上,我们对 Nebula Graph 数据的导入不管全量还是增量都是采用 Hive 表推到 Kafka,消费 Kafka 批量写入 Nebula Graph 的方式。后来随着越来越多的数据和业务切换到 Nebula Graph,导入的数据效率问题愈发严峻,导入时长的增加,使得业务高峰期时仍然在全量的数据导入,这是不可接受的。
针对以上问题,在尝试 Nebula Spark Connector 和 Nebula Importer 之后,由便于维护和迁移多方面考虑,采用 Hive table -> CSV -> Nebula Server -> Nebula Importer
的方式进行全量的导入,整体耗时时长也有较大的提升。
Nebula Importor 的相关配置
系统环境
[[email protected] importer]# lscpuArchitecture: x86_64CPU op-mode(s): 32-bit, 64-bitByte Order: Little EndianCPU(s): 16On-line CPU(s) list: 0-15Thread(s) per core: 2Core(s) per socket: 8Socket(s): 1NUMA node(s): 1Vendor ID: GenuineIntelCPU family: 6Model: 85Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHzStepping: 7CPU MHz: 2499.998BogoMIPS: 4999.99Hypervisor vendor: KVMVirtualization type: fullL1d cache: 32KL1i cache: 32KL2 cache: 1024KL3 cache: 36608KNUMA node0 CPU(s): 0-15Disk:SSDMemory: 128G
集群环境
- Nebula Version:v2.6.1
- 部署方式:RPM
- 集群规模:三副本,六节点
数据规模
+---------+--------------------------+-----------+| "Space" | "vertices" | 559191827 |+---------+--------------------------+-----------+| "Space" | "edges" | 722490436 |+---------+--------------------------+-----------+
Importer 配置
# Graph版本,连接2.x时设置为v2。version: v2description: Relation Space import data# 是否删除临时生成的日志和错误数据文件。removeTempFiles: falseclientSettings: # nGQL语句执行失败的重试次数。 retry: 3 # Nebula Graph客户端并发数。 concurrency: 5 # 每个Nebula Graph客户端的缓存队列大小。 channelBufferSize: 1024 # 指定数据要导入的Nebula Graph图空间。 space: Relation # 连接信息。 connection: user: root password: ****** address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669 postStart: # 配置连接Nebula Graph服务器之后,在插入数据之前执行的一些操作。 commands: | # 执行上述命令后到执行插入数据命令之间的间隔。 afterPeriod: 1s preStop: # 配置断开Nebula Graph服务器连接之前执行的一些操作。 commands: |# 错误等日志信息输出的文件路径。 logPath: /mnt/csv_file/prod_relation/err/test.log....
由于篇幅 只展示些全局相关的配置,点边相关的配置较多,不再展开,详情可以参考GitHub。
设置 Crontab,Hive 生成表之后传输到 Nebula Server,在夜间流量较低的时候跑起 Nebula Importer 任务:
50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log
总共耗时 2h,在六点左右完成全量数据的导入。
部分 log 如下,导入速度最高维持在 200,000/s 左右:
2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)
然后在七点,根据时间戳,重新消费 Kafka 导入当天凌晨到七点的增量数据, 防止 T+1 的全量数据覆盖当天的增量数据。
50 07 15 * * python3 /mnt/code/consumer_by_time/relation_consumer_by_timestamp.py
增量的消费大概耗时 10-15min。
实时性
根据 MD5 对比之后得到的增量数据,导入Kafka中,实时消费 Kafka 的数据,确保数据的延迟不超过 1 分钟。
另外长时间的实时可能会有非预期的数据问题出现而未发现,所以每 30 天会导入一次全量数据,上面介绍的 Importer 导入。然后给 Space 的点边添加 TTL=35 天确保未及时更新的数据会被 Filter 和后续回收。
一些注意点
论坛帖子 https://discuss.nebula-graph.com.cn/t/topic/361 这里提到了关于 CSV 导入常遇到的问题,大家可以参考下。另外根据经验这边有几点建议:
- 关于并发度,问题中有提到,这个 concurrency 指定为你的 cpu cores 就可以, 表示起多少个 client 连接 Nebula Server。 实际操作中,要去 trade off 导入速度和服务器压力的影响。在我们这边测试,如果并发过高,会导致磁盘 IO 过高,引发设置的一些告警,不建议一下把并发拉太高,可以根据实际业务测试下做权衡。
- Importer 并不能断点续传,如果出现错误,需要手动处理。在实际操作中,我们会程序分析 Importer 的 log,根据情况处理,如果哪部分数据出现了非预期的错误,会告警通知,人工介入,防止出现意外。
- Hive 生成表之后传输到 Nebula Server, 这部分任务 实际耗时是和 Hadoop 资源情况密切相关的,有可能会出现资源不够导致 Hive 和 CSV 表生成时间滞缓,而 Importer 正常在跑的情况,这部分需要提前做好预判。我们这边是根据hive任务结束时间和 Importer 任务开始时间做对比,判断是否需要 Importer 的进程正常运行。
交流图数据库技术?加入 Nebula 交流群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~
边栏推荐
- Liangzai's first program life and annual summary in 2022
- Basic operations of database and table ----- create index
- [Yocto RM]10 - Images
- What if the programmer's SQL data script coding ability is weak and Bi can't do it?
- Safety learning week4
- 【大型电商项目开发】性能压测-性能监控-堆内存与垃圾回收-39
- 【FPGA教程案例9】基于vivado核的时钟管理器设计与实现
- 程序员SQL数据脚本编码能力弱,BI做不出来怎么办?
- 【大型电商项目开发】性能压测-优化-中间件对性能的影响-40
- [development of large e-commerce projects] performance pressure test - Performance Monitoring - heap memory and garbage collection -39
猜你喜欢
【Unity】InputSystem
[wave modeling 2] three dimensional wave modeling and wave generator modeling matlab simulation
各大主流编程语言性能PK,结果出乎意料
抓包整理外篇——————状态栏[ 四]
Basic operation of database and table ----- phased test II
Database performance optimization tool
SAP UI5 应用的主-从-从(Master-Detail-Detail)布局模式的实现步骤
Expose testing outsourcing companies. You may have heard such a voice about outsourcing
多模输入事件分发机制详解
“薪资倒挂”、“毕业生平替” 这些现象说明测试行业已经...
随机推荐
NPM install error forced installation
整理混乱的头文件,我用include what you use
Pandora IOT development board learning (RT thread) - Experiment 4 buzzer + motor experiment [key external interrupt] (learning notes)
Playwright recording
Getting started with Paxos
Grabbing and sorting out external articles -- status bar [4]
Check if this is null - checking if this is null
Open3d uses GICP to register point clouds
Global and Chinese markets of emergency rescue vessels (errv) 2022-2028: Research Report on technology, participants, trends, market size and share
Redis(1)之Redis简介
SAP UI5 应用开发教程之一百零七 - SAP UI5 OverflowToolbar 容器控件介绍的试读版
College degree, what about 33 year old Baoma? I still sell and test, and my monthly income is 13K+
Behind the cluster listing, to what extent is the Chinese restaurant chain "rolled"?
Identifiers and keywords
How to use words to describe breaking change in Spartacus UI of SAP e-commerce cloud
Basic operations of database and table ----- create index
Global and Chinese market of portable CNC cutting machines 2022-2028: Research Report on technology, participants, trends, market size and share
Reasons and solutions of redis cache penetration and avalanche
当产业互联网时代真正发展完善之后,将会在每一个场景见证巨头的诞生
Postman automatically fills headers