当前位置:网站首页>Practice of high performance graph computing system Plato in Nebula graph

Practice of high performance graph computing system Plato in Nebula graph

2022-06-10 08:44:00 NebulaGraph

This article was first published in Nebula Graph Community official account

1. Figure introduction to calculation

1.1 Graph database vs Figure calculation

Graph database is oriented to OLTP scene , Emphasize adding, deleting, modifying and checking , And a query often involves only a small amount of data in the whole graph , Graph computing is oriented to OLAP scene , It is often used to analyze and calculate the whole map data .

1.2 Figure computing system distribution architecture

According to the distributed architecture , Figure the computing system is divided into single machine and distributed system .

The advantage of single machine graph computing system is that the model is simple , There is no need to consider distributed communication , There is no need for Graph Segmentation , But subject to stand-alone system resources , It is impossible to analyze graph data on a larger scale .

Distributed graph computing platform divides graph data into multiple machines , So as to process larger scale graph data , But it inevitably introduces the overhead of distributed communication .

1.3 Division of pictures

There are two main ways of graph Division: edge cutting (Edge- Cut) And point cutting (Vertex-Cut).

Edge segmentation : The data of each point will only be stored on one machine , But some edges will be interrupted and distributed to multiple machines .
Pictured (a) Shown , spot A The data is stored only on the machine 1 On , spot B The data is stored only on the machine 2 On . For edge AB for , Will be stored in the machine 1 And machines 2 On . Due to the point A Sum point B Distributed on different machines , In the process of iterative calculation , It will bring communication overhead .

Point segmentation : Each side will only be stored on one machine , But some points may be divided , Distributed on multiple machines .
Pictured (b) Shown , edge AB Stored in the machine 1 On , edge BC Stored in the machine 2 On , edge CD Stored in the machine 3 On , And point B Assigned to 1, 2 On two machines , spot C Assigned to 2,3 On two machines . Since points are stored on multiple machines , Maintaining the consistency of vertex data will also bring communication overhead .

1.4 Calculation model

The programming model is for graph computing application developers , It can be divided into node centered programming models 、 Edge or path centered programming model 、 Subgraph centered programming model .

Computing model is a problem faced by developers of graph computing system , There are mainly synchronous execution model and asynchronous execution model . The more common ones are BSP Model (Bulk Synchronous Parallel Computing Model) and GAS Model .

BSP Model :BSP The calculation process of the model consists of a series of iterative steps , Each iteration step is called a super step . use BSP The system of the model mainly includes Pregel、Hama、Giraph etc. .
BSP The model has horizontal and vertical structure . Looking vertically ,BSP The model consists of a series of serial oversteps . Horizontally ( As shown in the figure ), A super step is divided into three stages :

  • Local computing phase , Each processor only calculates the data stored in local memory .
  • Global communication phase , Exchange data between machine nodes .
  • Fence synchronization phase , Wait for the end of all communication activities .

GAS Model :GAS The model is in PowerGraph System proposed , It is divided into information collection stage (Gather)、 Application stage (Apply) And distribution phase (Scatter).

  • Gather Stage , Responsible for collecting information from neighbor vertices .
  • Apply Stage , Be responsible for processing the collected information locally , Update to vertex .
  • Scatter Stage , Responsible for sending new information to neighbors .

2. Gemini Figure introduction to computing system

Gemini More influential in industry , Its main technical points include :CSR/CSC、push/pull、master and mirror、 Sparse and dense graphs 、 Communication and computing work together 、chunk-based Partition by type 、NUMA Perceived sub partitions, etc .

Gemini The edge cutting method is used to cut the drawing data according to chunk-based How to partition , And support Numa structure . Partitioned data , use CSR Store edge information , use CSC Store edge information . In the process of iterative calculation , For sparse graphs push Update its outbound neighbors , For dense graphs pull Pull in the information of edge neighbors .

If an edge is cut , The vertex at one end of the edge is master, The vertex at the other end is mirror.mirror Called placeholders (placeholder) , stay pull During the calculation of , On each machine mirror The vertex pulls its neighbors into the edge master The information of the vertex is calculated once , stay BSP Under the calculation model, it is synchronized through the network master The vertices . stay push During the calculation of , Each machine's master The vertex will synchronize its information to its first mirror The vertices , Again by mirror Update its outbound neighbors .

stay BSP Communication phase of , Each machine Node_i To its next machine Node_i+1, The last machine will be sent to the first machine . When each machine sends, it will also receive Node_i-1 Information about , The local calculation will be performed immediately after receiving the message . The overlap of communication and computing can hide the communication time , Improve overall efficiency .

For more details, please refer to the paper 《Gemini: A Computation-Centric Distributed Graph Processing System》.

3. Plato Figure computing system and Nebula Graph Integration of

3.1 Plato Figure introduction to computing system

Plato Tencent is an open source based on Gemni The industrial level graph calculation system realized in this paper .Plato It can run in general x86 colony , Such as Kubernetes colony 、Yarn Cluster etc. . At the file system level ,Plato It provides a variety of interfaces to support mainstream file systems , Such as HDFS、Ceph wait .

3.2 And Nebula Graph Integration of

We are based on Plato Did a second development , To access Nebula Graph data source .

3.2.1 Nebula Graph As an input and output data source

increase Plato Data source , Support will Nebula Graph As an input and output data source , Directly from Nebula Graph Read the data in and calculate the graph , And write the calculation results directly back to Nebula Graph in .

Nebula Graph Our storage tier provides for partition Of scan Interface , It is easy to scan out vertex and edge data in batches through this interface :

ScanEdgeIter scanEdgeWithPart(std::string spaceName,
                                  int32_t partID,
                                  std::string edgeName,
                                  std::vector<std::string> propNames,
                                  int64_t limit = DEFAULT_LIMIT,
                                  int64_t startTime = DEFAULT_START_TIME,
                                  int64_t endTime = DEFAULT_END_TIME,
                                  std::string filter = "",
                                  bool onlyLatestVersion = false,
                                  bool enableReadFromFollower = true);

ScanVertexIter scanVertexWithPart(std::string spaceName,
                                      int32_t partId,
                                      std::string tagName,
                                      std::vector<std::string> propNames,
                                      int64_t limit = DEFAULT_LIMIT,
                                      int64_t startTime = DEFAULT_START_TIME,
                                      int64_t endTime = DEFAULT_END_TIME,
                                      std::string filter = "",
                                      bool onlyLatestVersion = false,
                                      bool enableReadFromFollower = true);

In practice , We first get the specified space Under the partition Distribution situation , And will each partition Of scan The tasks are assigned to Plato On each node of the cluster , Each node will further partition Of scan The task is assigned to each thread running on the node , To achieve parallel and fast data reading . After the figure calculation is completed , Pass the calculation result Nebula client Parallel writing Nebula Graph.

3.2.2 Distributed ID Encoder

Gemini and Plato The requirements of ID from 0 Start to increase continuously , But most real data vertices ID Does not meet this need , In especial Nebula Graph from 2.0 Version starting support string type ID.

therefore , Before calculating , We need to put the original ID from int or string Type is converted from 0 Begin to increase continuously int.Plato A stand-alone version of ID Encoder , namely Plato Each machine in the cluster stores all data redundantly ID The mapping relation of . When the number of points is large , Each machine only ID The storage of the mapping table needs hundreds of GB Of memory , Because we need to implement distributed ID mapper , take ID The mapping relationship is cut into multiple copies , Separate storage .

We hash the original ID Break up in different machines , Allocate the global from in parallel 0 Begin to increase continuously ID. Generate ID After mapping relations , Every machine will have ID Part of the mapping table . Then the edge data is hashed by the start point and the end point respectively , Send it to the corresponding machine for encoding , The final data is the data that can be used for calculation . When the calculation runs out , Data needs to be mapped back to the business ID, The process is similar to the above .

3.2.3 Complementary algorithm

We are Plato That's an increase from sssp、apsp、jaccard similarity、 Triangle counting and other algorithms , The input and output of each algorithm are added to Nebula Graph Data source support . Currently, the algorithms supported are :

file name The algorithm name classification
apsp.cc All pair shortest path route
sssp.cc Single source shortest path route
tree_stat.cc Tree depth / Width Graph features
nstepdegrees.ccn Order Graph features
hyperanf.cc Figure average distance estimation Graph features
triangle_count.cc Trigonometric counting Graph features
kcore.cc Node centrality
pagerank.ccPagerank Node centrality
bnc.ccBetweenness Node centrality
cnc.cc Close to centrality (Closeness Centrality) Node centrality
cgm.cc Connected component calculation Community discovery
lpa.cc Tag spread Community discovery
hanp.ccHANP Community discovery
metapath_randomwalk.cc Figure shows learning
node2vec_randomwalk.cc Figure shows learning
fast_unfolding.cclouvain clustering
infomap_simple.cc clustering
jaccard_similarity.cc Similarity degree
mutual.cc other
torch.cc other
bfs.cc Breadth first traversal other

4. Plato Deployment, installation and operation

4.1 Cluster deployment

Plato use MPI Communication between processes , Deploy... On a cluster Plato when , Need to put Plato Install in the same directory , Or use NFS. The operation method is shown in :https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/

4.2 Script and configuration file for running the algorithm

scripts/run_pagerank_local.sh

#!/bin/bash

PROJECT="$(cd "$(dirname "$0")" && pwd)/.."

MAIN="./bazel-bin/example/pagerank" # process name

WNUM=3
WCORES=8

#INPUT=${INPUT:="$PROJECT/data/graph/v100_e2150_ua_c3.csv"}
INPUT=${INPUT:="nebula:${PROJECT}/scripts/nebula.conf"}
#OUTPUT=${OUTPUT:='hdfs://192.168.8.149:9000/_test/output'}
OUTPUT=${OUTPUT:="nebula:$PROJECT/scripts/nebula.conf"}
IS_DIRECTED=${IS_DIRECTED:=true}  # let plato auto add reversed edge or not
NEED_ENCODE=${NEED_ENCODE:=true}
VTYPE=${VTYPE:=uint32}

ALPHA=-1
PART_BY_IN=false

EPS=${EPS:=0.0001}
DAMPING=${DAMPING:=0.8}
ITERATIONS=${ITERATIONS:=5}

export MPIRUN_CMD=${MPIRUN_CMD:="${PROJECT}/3rd/mpich-3.2.1/bin/mpiexec.hydra"}

PARAMS+=" --threads ${WCORES}"
PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED} --need_encode=${NEED_ENCODE} --vtype=${VTYPE}"
PARAMS+=" --iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"

# env for JAVA && HADOOP
export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}

# env for hadoop
export CLASSPATH=${HADOOP_HOME}/etc/hadoop:`find ${HADOOP_HOME}/share/hadoop/ | awk '{path=path":"$0}END{print path}'`
export LD_LIBRARY_PATH="${HADOOP_HOME}/lib/native":${LD_LIBRARY_PATH}

chmod 777 ./${MAIN}
${MPIRUN_CMD} -n ${WNUM} -f ${PROJECT}/scripts/cluster ./${MAIN} ${PARAMS}
exit $?

Parameter description

  • INPUT Parameters and OUPUT Parameters specify the input data source and output data source of the algorithm respectively , At present, it supports local csv file 、HDFS file 、 Nebula Graph. When the input / output data source is Nebula Graph when ,INPUT and OUPUT In the form of nebula:/path/to/nebula.conf
  • WNUM The sum of the number of processes running for all machines in the cluster , It is recommended that each machine run as 1 perhaps NUMA node Several processes ,WCORE Number of threads per process , The recommended maximum setting is the number of hardware threads of the machine .

scripts/nebula.conf

## read/write
--retry=3 #  Connect  Nebula Graph  The number of retries 
--space=sf30 #  To read or write  space  name 

## read from nebula
--meta_server_addrs=192.168.8.94:9559 # Nebula Graph  Of  metad  Service address 
--edge=LIKES #  The name of the edge to be read 
#--edge_data_field #  The name of the weight attribute to be read as an edge 
--read_batch_size=10000 #  Every time  scan  At the time of the  batch  Size 

## write to nebula
--graph_server_addrs=192.168.8.94:9669 # Nebula Graph  Of  graphd  Service address 
--user=root # graphd  Login user name of the service 
--password=nebula # graphd  Login password of the service 
# insert or update
--mode=insert #  Write back to  Nebula Graph  The mode used in : insert/update
--tag=pagerank #  Write back to  Nebula Graph  Of  tag  name 
--prop=pr #  Write back to  Nebula Graph  Of  tag  Corresponding property name 
--type=double #  Write back to  Nebula Graph  Of  tag  The type of the corresponding attribute 
--write_batch_size=1000 #  When writing back  batch  size 
--err_file=/home/plato/err.txt #  Write back the file stored by the failed data 

scripts/cluster

cluster The file specifies the name of the cluster machine on which the algorithm is to run IP

192.168.15.3
192.168.15.5
192.168.15.6

The above is Plato stay Nebula Graph Application in , At present, this function is integrated in Nebula Graph In the enterprise edition , If you're using an open source version Nebula Graph, Meet your own needs Plato.


Communication graph database technology ? Join in Nebula Communication group please first Fill in your Nebula Business card ,Nebula The little assistant will pull you into the group ~~

Official account

原网站

版权声明
本文为[NebulaGraph]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203021041100778.html