当前位置:网站首页>Nebula Importer 数据导入实践
Nebula Importer 数据导入实践
2022-07-04 17:29:00 【InfoQ】
前言
- 需要将来自 Kafka、Pulsar 平台的流式数据, 导入 Nebula Graph 数据库
- 需要从关系型数据库(如 MySQL)或者分布式文件系统(如 HDFS)中读取批式数据
- 需要将大批量数据生成 Nebula Graph 能识别的 SST 文件
- Importer 适用于将本地 CSV 文件的内容导入至 Nebula Graph 中
- 在不同的 Nebula Graph 集群之间迁移数据
- 在同一个 Nebula Graph 集群内不同图空间之间迁移数据
- Nebula Graph 与其他数据源之间迁移数据
- 结合 Nebula Algorithm 进行图计算
- 在不同的 Nebula Graph 集群之间迁移数据
- 在同一个 Nebula Graph 集群内不同图空间之间迁移数据
- Nebula Graph 与其他数据源之间迁移数据

Nebula Importer 的使用
[[email protected] importer]# lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping: 7
CPU MHz: 2499.998
BogoMIPS: 4999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 36608K
NUMA node0 CPU(s): 0-15
Disk:SSD
Memory: 128G
集群环境
- Nebula Version:v2.6.1
- 部署方式:RPM
- 集群规模:三副本,六节点
数据规模
---------+--------------------------+-----------+
| "Space" | "vertices" | 559191827 |
+---------+--------------------------+-----------+
| "Space" | "edges" | 722490436 |
+---------+--------------------------+-----------+
Importer 配置
# Graph版本,连接2.x时设置为v2。
version: v2
description: Relation Space import data
# 是否删除临时生成的日志和错误数据文件。
removeTempFiles: false
clientSettings:
# nGQL语句执行失败的重试次数。
retry: 3
# Nebula Graph客户端并发数。
concurrency: 5
# 每个Nebula Graph客户端的缓存队列大小。
channelBufferSize: 1024
# 指定数据要导入的Nebula Graph图空间。
space: Relation
# 连接信息。
connection:
user: root
password: ******
address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669
postStart:
# 配置连接Nebula Graph服务器之后,在插入数据之前执行的一些操作。
commands: |
# 执行上述命令后到执行插入数据命令之间的间隔。
afterPeriod: 1s
preStop:
# 配置断开Nebula Graph服务器连接之前执行的一些操作。
commands: |
# 错误等日志信息输出的文件路径。
logPath: /mnt/csv_file/prod_relation/err/test.log
....
50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log
2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)
实时性
一些注意点
- 关于并发度,问题中有提到,这个 concurrency 指定为你的 cpu cores 就可以, 表示起多少个 client 连接 Nebula Server。 实际操作中,要去 trade off 导入速度和服务器压力的影响。在我们这边测试,如果并发过高,会导致磁盘 IO 过高,引发设置的一些告警,不建议一下把并发拉太高,可以根据实际业务测试下做权衡。
- Importer 并不能断点续传,如果出现错误,需要手动处理。在实际操作中,我们会程序分析 Importer 的 log,根据情况处理,如果哪部分数据出现了非预期的错误,会告警通知,人工介入,防止出现意外。
- Hive 生成表之后传输到 Nebula Server, 这部分任务 实际耗时是和 Hadoop 资源情况密切相关的,有可能会出现资源不够导致 Hive 和 CSV 表生成时间滞缓,而 Importer 正常在跑的情况,这部分需要提前做好预判。我们这边是根据hive任务结束时间和 Importer 任务开始时间做对比,判断是否需要 Importer 的进程正常运行。
边栏推荐
- Caché WebSocket
- How is the entered query SQL statement executed?
- Scala基础教程--15--递归
- Mxnet implementation of googlenet (parallel connection network)
- Pb extended DLL development (super chapter) (VII)
- IBM WebSphere MQ retrieving messages
- 力扣刷题日记/day7/2022.6.29
- 线上MySQL的自增id用尽怎么办?
- 神经网络物联网是什么意思通俗的解释
- 工厂从自动化到数字孪生,图扑能干什么?
猜你喜欢
.NET ORM框架HiSql实战-第二章-使用Hisql实现菜单管理(增删改查)
Behind the ultra clear image quality of NBA Live Broadcast: an in-depth interpretation of Alibaba cloud video cloud "narrowband HD 2.0" technology
How is the entered query SQL statement executed?
MXNet对GoogLeNet的实现(并行连结网络)
Imitation of numpy 2
My colleagues quietly told me that flying Book notification can still play like this
力扣刷题日记/day7/6.30
Wireshark抓包TLS协议栏显示版本不一致问题
输入的查询SQL语句,是如何执行的?
Scala基础教程--18--集合(二)
随机推荐
物联网应用技术的就业前景和现状
Scala基础教程--19--Actor
Scala basic tutorial -- 13 -- advanced function
Li Kou brush question diary /day7/6.30
如何使用 wget 和 curl 下载文件
Scala basic tutorial -- 18 -- set (2)
.NET ORM框架HiSql实战-第二章-使用Hisql实现菜单管理(增删改查)
[mathematical basis of machine learning] (I) linear algebra (Part 1 +)
DeFi生态NFT流动性挖矿系统开发搭建
Digital "new" operation and maintenance of energy industry
Rookie post station management system based on C language
Imitation of numpy 2
使用FTP
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
Scala基础教程--14--隐式转换
【云端心声 建议征集】云商店焕新升级:提有效建议,赢海量码豆、华为AI音箱2!
力扣刷题日记/day5/2022.6.27
Wireshark抓包TLS协议栏显示版本不一致问题
力扣刷题日记/day8/7.1
Li Chi's work and life summary in June 2022