当前位置:网站首页>How to import a billion level offline CSV into Nepal graph
How to import a billion level offline CSV into Nepal graph
2022-07-02 15:48:00 【Figure database nebulagraph】
This article was first published in Nebula Graph Community official account
This practice is based on business requirements and subsequent expansion , Through the technical selection, the Nebula Graph Graph database , First, we need to verify Nebula Graph The performance of the database is imported in batches under the actual business scenario and verified . adopt Spark On Yarn Distributed tasks perform import work ,CSV Files in HDFS On , Share personal Nebula Spark Connector Best practices ..
One 、Nebula Spark Connector Concept 、 Applicable scenario 、 advantage
I won't elaborate here , Only screenshots show , Refer to the documentation for more details :https://docs.nebula-graph.com.cn/nebula-spark-connector/.
Two 、 environmental information
- Hardware environment
name | value | recommend |
Local disk SSD | 2 T | At least 2 T |
CPU | 16 C * 4 | 128 C |
Memory | 128 GB | 128 G |
- Software environment
name | Version number |
Nebula Graph | 3.0.0 |
Nebula Spark Connector | 3.0.0 |
Hadoop | 2.7.2U17-10 |
Spark | 2.4.5U5 |
- Data magnitude
name | value |
Data volume | 200 G |
Entity Vertext | 9.3 Billion |
Relationship Edge | 9.7 Billion |
3、 ... and 、 Deployment plan
- Deployment way : Distributed ,3 Nodes
- Refer to the official website :https://docs.nebula-graph.com.cn/3.0.1/4.deployment-and-installation/2.compile-and-install-nebula-graph/deploy-nebula-graph-cluster/
In general, it is a trilogy :
- Download the kernel RPM Pack and install ;
- Batch modify profile ;
- Start cluster service .
The following operations use root, Not root Just add sudo Execution can be .
download Nebula Graph RPM Pack and install
Execute the following command :
wget https://os-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm
wget https://oss-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm.sha256sum.txt
rpm -ivh nebula-graph-3.0.0.el7.x86_64.rpm
notes : Default installation path :/usr/local/nebula/
, Make sure you have enough disk space .
Batch modify profile
sed -i 's?--meta_server_addrs=,,' *.conf
sed -i 's?--local_ip=' *.conf
sed -i 's?--meta_server_addrs=,,' *.conf
sed -i 's?--local_ip=' *.conf
sed -i 's?--meta_server_addrs=,,' *.conf
sed -i 's?--local_ip=' *.conf
notes :ip The address is the intranet address , Used for inter cluster communication .
After starting , increase Storage service :
ADD HOSTS 172.x.x.15:9779,172.1x.x.176:9779,172.x.1x.149:9779;
notes : increase Storage The service is v3.x Operations required above version , If you're using v2.x This step can be ignored .
Start cluster service
/usr/local/nebula/scripts/nebula.service start all
The above command starts the service , Execute the following command to check whether the service is started successfully :
ps aux|grep nebula
give the result as follows 3 Service process :
/usr/local/nebula/bin/nebula-metad --flagfile /usr/local/nebula/etc/nebula-metad.conf
/usr/local/nebula/bin/nebula-graphd --flagfile /usr/local/nebula/etc/nebula-graphd.conf
/usr/local/nebula/bin/nebula-storaged --flagfile /usr/local/nebula/etc/nebula-storaged.conf
notes : If less than 3 individual , Just do it a few more times /usr/local/nebula/scripts/nebula.service start all
, No more restart
3、 ... and 、 Visualization services
I chose Nebula Graph Studio, visit :http://n01v:7001 You can use Studio( notes : Here is my own network environment , Readers are not allowed to access )
- Sign in :
10.x.x.1( Any node ):9669
- user name / password :root/nebula
Here you can read the official documents nGQL command :https://docs.nebula-graph.com.cn/3.0.1/2.quick-start/4.nebula-graph-crud
Start using Nebula Graph
register Nebula colony :
ADD HOSTS 172.x.x.121:9779,,;
List all nodes , see STATUS Is the column ONLINE, It can be done by SHOW HOSTS;
establish Space, Equivalent to traditional database database:
CREATE SPACE mylove (partition_num = 15, replica_factor = 3, vid_type = FIXED_STRING(256));// The number of partitions is recommended to be the number of nodes 5 Times relationship , The number of copies is the base , Generally set as 3,vid If string type , As long as possible , Otherwise, it takes up too much disk space .
establish Tag, Equivalent to an entity Vertex:
CREATE TAG entity (name string NULL, version string NULL);
establish Edge, Equivalent to relation Edge:
CREATE EDGE relation (name string NULL);
When inquiring , Be sure to add LIMIT
, Otherwise, it is easy to check the dead database :
match (v) return v limit 100;
Four 、( This paper mainly ) Use Spark Connector Read CSV And warehousing
Here you can refer to 2 Data :
Official NebulaSparkWriterExample(scala-json Format ):https://github.com/vesoft-inc/nebula-spark-utils/blob/master/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkWriterExample.scala
Provided by the great God NebulaSparkWriterExample(java-json Format ):https://www.jianshu.com/p/930e0343a28c
Enclosed NebulaSparkWriterExample Example code for :
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.{
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkWriter {
private val LOG = LoggerFactory.getLogger(this.getClass)
var ip = ""
def main(args: Array[String]): Unit = {
val part = args(0)
ip = args(1)
val sparkConf = new SparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark = SparkSession
if("1".equalsIgnoreCase(part)) writeVertex(spark)
if("2".equalsIgnoreCase(part)) writeEdge(spark)
def getNebulaConnectionConfig(): NebulaConnectionConfig = {
val config =
.withMetaAddress(ip + ":9559")
.withGraphAddress(ip + ":9669")
def writeVertex(spark: SparkSession): Unit = {
LOG.info("start to write nebula vertices: 1 entity")
val df = spark.read.option("sep", "\t").csv("/home/2022/project/origin_file/csv/tag/entity/").toDF("id", "name", "version")
val config = getNebulaConnectionConfig()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
df.coalesce(1400).write.nebula(config, nebulaWriteVertexConfig).writeVertices()
def writeEdge(spark: SparkSession): Unit = {
LOG.info("start to write nebula edges: 2 entityRel")
val df = spark.read.option("sep", "\t").csv("/home/2022/project/origin_file/csv/out/rel/relation/").toDF("src", "dst", "name")
val config = getNebulaConnectionConfig()
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
df.coalesce(1400).write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
Key details NebulaSparkWriterExample Sample code
Here are some function terms :
: Set log print level , prevent INFO interfere ;withTimeout(Integer.MAX_VALUE)
: The connection timeout should be as long as possible , The default is 1 minute , After the timeout times are greater than the retry times ,Spark The mission failed ;option("sep", "\t")
: Appoint CSV File separator , Otherwise, it defaults to 1 Listed ;toDF("src", "dst", "name")
: The dataset specifies Schema, namelyDataset<Row>
, Otherwise, you cannot specifyVidField
了 ;withVidField("id")
: Because this function only supports setting column names , So we must define Schema;withVidAsProp(false)
: Default ID by VID Field , Data does not need to be repeatedly stored as attributes , Take up disk space ;withSrcIdField("src")
: Set the starting nodeIdField
: Set the... Of the termination nodeIdField
: Save a spacewithDstAsProperty(false)
: Save a spacewithBatch(1000)
: Batch size ,WriteMode.UPDATE
Default <=512,WriteMode.INSERT
Can be set larger ( Gigabit nics / bandwidth 5Gbps / Local SSD = 1500)coalesce(1500)
: It can be adjusted according to the number of concurrent tasks . Single partition Too much data , Easily lead to executor OOM;
5、 ... and 、 Submit task to Spark colony
nohup spark-submit --master yarn --deploy-mode client --class com.xxx.nebula.connector.NebulaSparkWriter --conf spark.dynamicAllocation.enabled=false --conf spark.executor.memoryOverhead=10g --conf spark.blacklist.enabled=false --conf spark.default.parallelism=1000 --driver-memory 10G --executor-memory 12G --executor-cores 4 --num-executors 180 ./example-3.0-SNAPSHOT.jar > run-csv-nebula.log 2>&1 &
Auxiliary monitoring iotop command
Total DISK READ : 26.61 K/s | Total DISK WRITE : 383.77 M/s
Actual DISK READ: 26.61 K/s | Actual DISK WRITE: 431.75 M/s
Auxiliary monitoring top command
top - 16:03:01 up 8 days, 28 min, 1 user, load average: 6.16, 6.53, 4.58
Tasks: 205 total, 1 running, 204 sleeping, 0 stopped, 0 zombie
%Cpu(s): 28.3 us, 14.2 sy, 0.0 ni, 56.0 id, 0.6 wa, 0.0 hi, 0.4 si, 0.5 st
KiB Mem : 13186284+total, 1135004 free, 31321240 used, 99406592 buff/cache
KiB Swap: 0 total, 0 free, 0 used. 99641296 avail Mem
27979 root 20 0 39.071g 0.026t 9936 S 564.6 20.8 83:22.03 nebula-storaged
27920 root 20 0 2187476 804036 7672 S 128.2 0.6 17:13.75 nebula-graphd
27875 root 20 0 6484644 1.990g 8588 S 58.5 1.6 14:14.22 nebula-metad
Other resource monitoring
Service optimization
nebula-storaged.conf Configuration optimization
Here I modified nebula-storaged.conf
Configuration item :
# The default reserved bytes for a batch operation
# BlockBasedTable The default block cache size used in
# Unit is MB. Server memory 128G, Normally set to 1/3
############## rocksdb Options ##############
# rocksdb DBOptions stay json in , Every option The name and value of are a string , Such as :“option_name”:“option_value”, Comma separated
# rocksdb ColumnFamilyOptions stay json in , Every option The name and value of are strings , Such as :“option_name”:“option_value”, Comma separated
# rocksdb BlockBasedTableOptions stay json in , The name and value of each option are strings , Such as :“option_name”:“option_value”, Comma separated
# Maximum number of processors per request
# Inter cluster heartbeat interval
# Maximum batch size
# Parameter configuration reduce memory application
# The data is indirectly filtered in the lowest storage layer , The production environment prevents the trouble of finding super nodes
Linux system optimization
ulimit -c unlimited
ulimit -n 130000
sysctl -w net.ipv4.tcp_slow_start_after_idle=0
sysctl -w net.core.somaxconn=2048
sysctl -w net.ipv4.tcp_max_syn_backlog=2048
sysctl -w net.core.netdev_max_backlog=3000
sysctl -w kernel.core_uses_pid=1
6、 ... and 、 Verify the import results
- The entity insertion rate is about
27,837 strip /s
( Only applicable to this import performance calculation ) - The relationship insertion rate is about
26,276 strip /s
( Only applicable to this import performance calculation ) - If the server configuration is better , Performance will be better ; In addition, bandwidth 、 Cross data center 、 disk IO It is also a performance factor , Even network fluctuations .
[ro[email protected] nebula]# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 2.2G 48G 5% /
/dev/sdb1 2.0T 283G 1.6T 16% /usr/local/nebula
tmpfs 13G 0 13G 0% /run/user/62056
7、 ... and 、 Performance testing
- Query the specified node according to the attribute :
MATCH (v:entity) WHERE v.entity.name == 'Lifespan' RETURN v;
Execution time consumption 0.002558 (s)
- a jump
MATCH (v1:entity)-[e:propertiesRel]->(v2:attribute) WHERE id(v1) == '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' RETURN v2 limit 100;
Execution time consumption 0.003571 (s)
- Two hop
MATCH p=(v1:entity)-[e:propertiesRel*1..2]->(v2) WHERE id(v1) == '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' RETURN p;
Execution time consumption 0.005143 (s)
- Get all attribute values of the edge
FETCH PROP ON propertiesRel '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' -> '0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256' YIELD properties(edge);
Execution time consumption 0.001304 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*1]->(v2) return p;
Execution time consumption 0.02986 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*2]->(v2) return p;
Execution time consumption Execution time consumption 0.07937 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*3]->(v2) return p;
Execution time consumption 0.269 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*4]->(v2) return p;
Execution time consumption 3.524859 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*1..2]->(v2) return p;
Execution time consumption 0.072367 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*1..3]->(v2) return p;
Execution time consumption 0.279011 (s)
match p=(v:entity{name:" Zhang San "})-[e:entityRel|propertiesRel*1..4]->(v2) return p;
Execution time consumption 3.728018 (s)
- Query point A_vid point-to-point B_vid Shortest path ( two-way ), Carry attributes of points and edges :
FIND SHORTEST PATH WITH PROP FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a" TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * BIDIRECT YIELD path AS p;
Execution time consumption 0.003096 (s)
FIND ALL PATH FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a" TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * WHERE propertiesRel.name is not EMPTY or propertiesRel.name >=0 YIELD path AS p;
Execution time consumption 0.003656 (s)
8、 ... and 、 Problems encountered :
1.guava Dependency package version conflict
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;
After investigation, it is found that the dependent module uses guava edition 22.0, and Spark The cluster comes with 14.0, Lead to conflict , And it doesn't work . Running on the Spark Tasks on the cluster ,Spark load guava Package priority is higher than your own package .
The package we rely on uses guava edition 22.0 A relatively new method in , And in the 14.0 There is no such method in version . On the premise that the code of the other party cannot be modified , There is a plan as follows :
- spark Upgrade the cluster package , High risk , Easy to cause unknown problems .
- Another way is to use Maven The plug-in renames its own guava package .
Here's the second way , utilize Maven plug-in unit shade( link :https://maven.apache.org/plugins/maven-shade-plugin/) Rename the package to solve the problem .
2.Spark Blacklist mechanism
Blacklisting behavior can be configured via spark.blacklist.*.
spark.blacklist.enabled, The default value is false. If this parameter is true, that Spark You will no longer schedule tasks to the actuators in the blacklist . The blacklist algorithm can be used by other spark.blacklist
Configuration options further control , See the introduction below for details .
Exchange feedback
* Welcome to the forum to discuss with the author :https://discuss.nebula-graph.com.cn
- Thoroughly understand browser strong cache and negotiation cache
- 【LeetCode】486-预测赢家
- PyObject 转 char* (string)
- 已知两种遍历序列构造二叉树
- Pyinstaller's method of packaging pictures attached to exe
- lseek 出错
- 奥比中光 astra: Could not open “2bc5/[email protected]/6“: Failed to set USB interface
- (4) Flink's table API and SQL table schema
- [leetcode] 1162 map analysis
- [leetcode] 283 move zero
Aiko ai Frontier promotion (7.2)
Experiment collection of University "Fundamentals of circuit analysis". Experiment 7 - Research on sinusoidal steady-state circuit
The outline dimension function application of small motherboard
Finally, I understand the event loop, synchronous / asynchronous, micro task / macro task, and operation mechanism in JS (with test questions attached)
Loss function and positive and negative sample allocation: Yolo series
[experience cloud] how to get the metadata of experience cloud in vscode
Experiment collection of University "Fundamentals of circuit analysis". Experiment 4 - Research on linear circuit characteristics
[salesforce] how to confirm your salesforce version?
Two traversal sequences are known to construct binary trees
2022 college students in Liaoning Province mathematical modeling a, B, C questions (related papers and model program code online disk download)
[leetcode] 19 delete the penultimate node of the linked list
[leetcode] 200 number of islands
[leetcode] 1254 - count the number of closed Islands
College entrance examination admission score line crawler
Ant group's large-scale map computing system tugraph passed the national evaluation
Review materials for the special topic of analog electronics with all essence: basic amplification circuit knowledge points
6091. 划分数组使最大差为 K
Why does the system convert the temp environment variable to a short file name?
[development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)
6096. 咒语和药水的成功对数
(4) Flink's table API and SQL table schema
List of sergeant schools
XPT2046 四线电阻式触摸屏
Xpt2046 four wire resistive touch screen