This article was first published in Nebula Graph Community official account
1. Figure introduction to calculation
1.1 Graph database vs Figure calculation
Graph database is oriented to OLTP scene , Emphasize adding, deleting, modifying and checking , And a query often involves only a small amount of data in the whole graph , Graph computing is oriented to OLAP scene , It is often used to analyze and calculate the whole map data .
1.2 Figure computing system distribution architecture
According to the distributed architecture , Figure the computing system is divided into single machine and distributed system .
The advantage of single machine graph computing system is that the model is simple , There is no need to consider distributed communication , There is no need for Graph Segmentation , But subject to stand-alone system resources , It is impossible to analyze graph data on a larger scale .
Distributed graph computing platform divides graph data into multiple machines , So as to process larger scale graph data , But it inevitably introduces the overhead of distributed communication .
1.3 Division of pictures
There are two main ways of graph Division: edge cutting (Edge- Cut) And point cutting (Vertex-Cut).

Edge segmentation : The data of each point will only be stored on one machine , But some edges will be interrupted and distributed to multiple machines .
Pictured (a) Shown , spot A The data is stored only on the machine 1 On , spot B The data is stored only on the machine 2 On . For edge AB for , Will be stored in the machine 1 And machines 2 On . Due to the point A Sum point B Distributed on different machines , In the process of iterative calculation , It will bring communication overhead .
Point segmentation : Each side will only be stored on one machine , But some points may be divided , Distributed on multiple machines .
Pictured (b) Shown , edge AB Stored in the machine 1 On , edge BC Stored in the machine 2 On , edge CD Stored in the machine 3 On , And point B Assigned to 1, 2 On two machines , spot C Assigned to 2,3 On two machines . Since points are stored on multiple machines , Maintaining the consistency of vertex data will also bring communication overhead .
1.4 Calculation model
The programming model is for graph computing application developers , It can be divided into node centered programming models 、 Edge or path centered programming model 、 Subgraph centered programming model .
Computing model is a problem faced by developers of graph computing system , There are mainly synchronous execution model and asynchronous execution model . The more common ones are BSP Model (Bulk Synchronous Parallel Computing Model) and GAS Model .
BSP Model :BSP The calculation process of the model consists of a series of iterative steps , Each iteration step is called a super step . use BSP The system of the model mainly includes Pregel、Hama、Giraph etc. .
BSP The model has horizontal and vertical structure . Looking vertically ,BSP The model consists of a series of serial oversteps . Horizontally ( As shown in the figure ), A super step is divided into three stages :
- Local computing phase , Each processor only calculates the data stored in local memory .
- Global communication phase , Exchange data between machine nodes .
- Fence synchronization phase , Wait for the end of all communication activities .

GAS Model :GAS The model is in PowerGraph System proposed , It is divided into information collection stage (Gather)、 Application stage (Apply) And distribution phase (Scatter).
- Gather Stage , Responsible for collecting information from neighbor vertices .
- Apply Stage , Be responsible for processing the collected information locally , Update to vertex .
- Scatter Stage , Responsible for sending new information to neighbors .
2. Gemini Figure introduction to computing system
Gemini More influential in industry , Its main technical points include :CSR/CSC、push/pull、master and mirror、 Sparse and dense graphs 、 Communication and computing work together 、chunk-based Partition by type 、NUMA Perceived sub partitions, etc .
Gemini The edge cutting method is used to cut the drawing data according to chunk-based How to partition , And support Numa structure . Partitioned data , use CSR Store edge information , use CSC Store edge information . In the process of iterative calculation , For sparse graphs push Update its outbound neighbors , For dense graphs pull Pull in the information of edge neighbors .
If an edge is cut , The vertex at one end of the edge is master, The vertex at the other end is mirror.mirror Called placeholders (placeholder) , stay pull During the calculation of , On each machine mirror The vertex pulls its neighbors into the edge master The information of the vertex is calculated once , stay BSP Under the calculation model, it is synchronized through the network master The vertices . stay push During the calculation of , Each machine's master The vertex will synchronize its information to its first mirror The vertices , Again by mirror Update its outbound neighbors .
stay BSP Communication phase of , Each machine Node_i To its next machine Node_i+1, The last machine will be sent to the first machine . When each machine sends, it will also receive Node_i-1 Information about , The local calculation will be performed immediately after receiving the message . The overlap of communication and computing can hide the communication time , Improve overall efficiency .
For more details, please refer to the paper 《Gemini: A Computation-Centric Distributed Graph Processing System》.
3. Plato Figure computing system and Nebula Graph Integration of
3.1 Plato Figure introduction to computing system
Plato Tencent is an open source based on Gemni The industrial level graph calculation system realized in this paper .Plato It can run in general x86 colony , Such as Kubernetes colony 、Yarn Cluster etc. . At the file system level ,Plato It provides a variety of interfaces to support mainstream file systems , Such as HDFS、Ceph wait .

3.2 And Nebula Graph Integration of
We are based on Plato Did a second development , To access Nebula Graph data source .
3.2.1 Nebula Graph As an input and output data source
increase Plato Data source , Support will Nebula Graph As an input and output data source , Directly from Nebula Graph Read the data in and calculate the graph , And write the calculation results directly back to Nebula Graph in .
Nebula Graph Our storage tier provides for partition Of scan Interface , It is easy to scan out vertex and edge data in batches through this interface :
ScanEdgeIter scanEdgeWithPart(std::string spaceName,
int32_t partID,
std::string edgeName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);
ScanVertexIter scanVertexWithPart(std::string spaceName,
int32_t partId,
std::string tagName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);In practice , We first get the specified space Under the partition Distribution situation , And will each partition Of scan The tasks are assigned to Plato On each node of the cluster , Each node will further partition Of scan The task is assigned to each thread running on the node , To achieve parallel and fast data reading . After the figure calculation is completed , Pass the calculation result Nebula client Parallel writing Nebula Graph.
3.2.2 Distributed ID Encoder
Gemini and Plato The requirements of ID from 0 Start to increase continuously , But most real data vertices ID Does not meet this need , In especial Nebula Graph from 2.0 Version starting support string type ID.
therefore , Before calculating , We need to put the original ID from int or string Type is converted from 0 Begin to increase continuously int.Plato A stand-alone version of ID Encoder , namely Plato Each machine in the cluster stores all data redundantly ID The mapping relation of . When the number of points is large , Each machine only ID The storage of the mapping table needs hundreds of GB Of memory , Because we need to implement distributed ID mapper , take ID The mapping relationship is cut into multiple copies , Separate storage .
We hash the original ID Break up in different machines , Allocate the global from in parallel 0 Begin to increase continuously ID. Generate ID After mapping relations , Every machine will have ID Part of the mapping table . Then the edge data is hashed by the start point and the end point respectively , Send it to the corresponding machine for encoding , The final data is the data that can be used for calculation . When the calculation runs out , Data needs to be mapped back to the business ID, The process is similar to the above .
3.2.3 Complementary algorithm
We are Plato That's an increase from sssp、apsp、jaccard similarity、 Triangle counting and other algorithms , The input and output of each algorithm are added to Nebula Graph Data source support . Currently, the algorithms supported are :
| file name | The algorithm name | classification |
|---|---|---|
| apsp.cc | All pair shortest path | route |
| sssp.cc | Single source shortest path | route |
| tree_stat.cc | Tree depth / Width | Graph features |
| nstepdegrees.cc | n Order | Graph features |
| hyperanf.cc | Figure average distance estimation | Graph features |
| triangle_count.cc | Trigonometric counting | Graph features |
| kcore.cc | Node centrality | |
| pagerank.cc | Pagerank | Node centrality |
| bnc.cc | Betweenness | Node centrality |
| cnc.cc | Close to centrality (Closeness Centrality) | Node centrality |
| cgm.cc | Connected component calculation | Community discovery |
| lpa.cc | Tag spread | Community discovery |
| hanp.cc | HANP | Community discovery |
| metapath_randomwalk.cc | Figure shows learning | |
| node2vec_randomwalk.cc | Figure shows learning | |
| fast_unfolding.cc | louvain | clustering |
| infomap_simple.cc | clustering | |
| jaccard_similarity.cc | Similarity degree | |
| mutual.cc | other | |
| torch.cc | other | |
| bfs.cc | Breadth first traversal | other |
4. Plato Deployment, installation and operation
4.1 Cluster deployment
Plato use MPI Communication between processes , Deploy... On a cluster Plato when , Need to put Plato Install in the same directory , Or use NFS. The operation method is shown in :https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/
4.2 Script and configuration file for running the algorithm
scripts/run_pagerank_local.sh
#!/bin/bash
PROJECT="$(cd "$(dirname "$0")" && pwd)/.."
MAIN="./bazel-bin/example/pagerank" # process name
WNUM=3
WCORES=8
#INPUT=${INPUT:="$PROJECT/data/graph/v100_e2150_ua_c3.csv"}
INPUT=${INPUT:="nebula:${PROJECT}/scripts/nebula.conf"}
#OUTPUT=${OUTPUT:='hdfs://192.168.8.149:9000/_test/output'}
OUTPUT=${OUTPUT:="nebula:$PROJECT/scripts/nebula.conf"}
IS_DIRECTED=${IS_DIRECTED:=true} # let plato auto add reversed edge or not
NEED_ENCODE=${NEED_ENCODE:=true}
VTYPE=${VTYPE:=uint32}
ALPHA=-1
PART_BY_IN=false
EPS=${EPS:=0.0001}
DAMPING=${DAMPING:=0.8}
ITERATIONS=${ITERATIONS:=5}
export MPIRUN_CMD=${MPIRUN_CMD:="${PROJECT}/3rd/mpich-3.2.1/bin/mpiexec.hydra"}
PARAMS+=" --threads ${WCORES}"
PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED} --need_encode=${NEED_ENCODE} --vtype=${VTYPE}"
PARAMS+=" --iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"
# env for JAVA && HADOOP
export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
# env for hadoop
export CLASSPATH=${HADOOP_HOME}/etc/hadoop:`find ${HADOOP_HOME}/share/hadoop/ | awk '{path=path":"$0}END{print path}'`
export LD_LIBRARY_PATH="${HADOOP_HOME}/lib/native":${LD_LIBRARY_PATH}
chmod 777 ./${MAIN}
${MPIRUN_CMD} -n ${WNUM} -f ${PROJECT}/scripts/cluster ./${MAIN} ${PARAMS}
exit $?Parameter description
INPUTParameters andOUPUTParameters specify the input data source and output data source of the algorithm respectively , At present, it supports local csv file 、HDFS file 、 Nebula Graph. When the input / output data source is Nebula Graph when ,INPUTandOUPUTIn the form ofnebula:/path/to/nebula.conf- WNUM The sum of the number of processes running for all machines in the cluster , It is recommended that each machine run as 1 perhaps NUMA node Several processes ,WCORE Number of threads per process , The recommended maximum setting is the number of hardware threads of the machine .
scripts/nebula.conf
## read/write
--retry=3 # Connect Nebula Graph The number of retries
--space=sf30 # To read or write space name
## read from nebula
--meta_server_addrs=192.168.8.94:9559 # Nebula Graph Of metad Service address
--edge=LIKES # The name of the edge to be read
#--edge_data_field # The name of the weight attribute to be read as an edge
--read_batch_size=10000 # Every time scan At the time of the batch Size
## write to nebula
--graph_server_addrs=192.168.8.94:9669 # Nebula Graph Of graphd Service address
--user=root # graphd Login user name of the service
--password=nebula # graphd Login password of the service
# insert or update
--mode=insert # Write back to Nebula Graph The mode used in : insert/update
--tag=pagerank # Write back to Nebula Graph Of tag name
--prop=pr # Write back to Nebula Graph Of tag Corresponding property name
--type=double # Write back to Nebula Graph Of tag The type of the corresponding attribute
--write_batch_size=1000 # When writing back batch size
--err_file=/home/plato/err.txt # Write back the file stored by the failed data scripts/cluster
cluster The file specifies the name of the cluster machine on which the algorithm is to run IP
192.168.15.3
192.168.15.5
192.168.15.6The above is Plato stay Nebula Graph Application in , At present, this function is integrated in Nebula Graph In the enterprise edition , If you're using an open source version Nebula Graph, Meet your own needs Plato.
Communication graph database technology ? Join in Nebula Communication group please first Fill in your Nebula Business card ,Nebula The little assistant will pull you into the group ~~









