- 1.概述
- 2.RPC通信原理
- 2.1 Actor体系
- 2.2 Transport体系
- 3.总结
- 4.参考资料
根据百度百科定义,RPC一般指远程过程调用,Is the abbreviation of remote procedure call (RPC);
spark是一个分布式框架,The distributed framework inevitably involves communication between nodes and node;sparkDifferent components between nodes is throughRPC进行点对点通信;
According to the Internet to learn,spark参照akka在2.0After their implements a set ofrpc通信逻辑,接下来就针对spark2.0后的rpcCommunication mechanism is analyzed;
2.1 Actor体系
2.1.1.Class design schema diagram
在spark RPC通信机制中,RpcEnv是定义RPCEnvironment of top abstract class,其唯一实现为NettyRpcEnv类;
通过实例化NettyRpcEnv类完成对RpcEnv类的实例化;在实例化过程中,对NettyRpcEnv类中的属性进行初始化,完成RPCPreparation of the communication environment;属性初始化
The instance assigned toNettyRpcEnv的transportConf属性,To complete the initialization properties;
每对RPCEntity established between the number of connections for1;
RPCThe server and client threads:numUsableCores参数 > 0时,用参数,否则在CPUAvailable numbers and8Take small between a;
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
//RPC通信配置:存储sparkConf中spark.rpcAt the beginning of networkio相关配置
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
//在confIn the object set in each pair ofRPCEstablish the number of connections between entities,默认是1
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
//RPCThe server and client threads
conf.getInt("spark.rpc.io.threads", numUsableCores))
//直接实例化一个Dispatcher,根据numUsableCoresSet the message dispatcher thread pool threads;
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
//是一个创建TransportServer, TransportClientFactory,使用TransportChannelHandler建立netty channel pipeline的上下文
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
@volatile private var server: TransportServer = _
//[[RpcAddress]]和[[发件箱]]的映射:一个RpcEndpointRef对应一个发件箱,一个RpcEndpoint 可以有多个RpcEndpointRef
//When we connect to the remote[[RpcAddress]]时,We just put a message in it[[Outbox]]To implement a non-blocking“发送”方法.
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
object SparkTransportConf {
//根据sparkConfThe parameters in the information buildingTransportConf;module = "rpc";
def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
// 确定RPCThe server and client threads,And add to the configuration as the client and the server communication threads
val numThreads = defaultNumThreads(numUsableCores)
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
//根据sparkConf实例化TransportConf,Reconfigure the access method;
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
// 确定RPCThe server and client threads
private def defaultNumThreads(numUsableCores: Int): Int = {
//参数 > 0时,用参数,否则在CPUAvailable numbers and8Take small between a
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
//Communication configuration class:存储sparkConf中spark.rpcAt the beginning of networkio相关配置
public class TransportConf {
private final String SPARK_NETWORK_IO_MODE_KEY;
private final String SPARK_NETWORK_IO_BACKLOG_KEY;
private final String SPARK_NETWORK_IO_LAZYFD_KEY;
private final ConfigProvider conf;
private final String module;
public TransportConf(String module, ConfigProvider conf) {
this.module = module;
this.conf = conf;
this.SPARK_NETWORK_IO_MODE_KEY = this.getConfKey("io.mode");
this.SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = this.getConfKey("io.preferDirectBufs");
this.SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = this.getConfKey("io.connectionTimeout");
this.SPARK_NETWORK_IO_BACKLOG_KEY = this.getConfKey("io.backLog");
this.SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = this.getConfKey("io.numConnectionsPerPeer");
this.SPARK_NETWORK_IO_SERVERTHREADS_KEY = this.getConfKey("io.serverThreads");
this.SPARK_NETWORK_IO_CLIENTTHREADS_KEY = this.getConfKey("io.clientThreads");
this.SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = this.getConfKey("io.receiveBuffer");
this.SPARK_NETWORK_IO_SENDBUFFER_KEY = this.getConfKey("io.sendBuffer");
this.SPARK_NETWORK_SASL_TIMEOUT_KEY = this.getConfKey("sasl.timeout");
this.SPARK_NETWORK_IO_MAXRETRIES_KEY = this.getConfKey("io.maxRetries");
this.SPARK_NETWORK_IO_RETRYWAIT_KEY = this.getConfKey("io.retryWait");
this.SPARK_NETWORK_IO_LAZYFD_KEY = this.getConfKey("io.lazyFD");
this.SPARK_NETWORK_VERBOSE_METRICS = this.getConfKey("io.enableVerboseMetrics");
private String getConfKey(String suffix) {
return "spark." + this.module + "." + suffix;
RpcEndpoint类是RPCCommunication mechanism in all top parent node class;
RpcEndpointRef是RpcEndpoint的引用,相当于RpcEndpoint的客户端;Communication between nodes are by invoking the corresponding nodeRpcEndpointRef对象实现;由RpcEndpointRef找到自己的RpcEndpointTo transmit a message in the past;
NettyRpcEndpointRef是RpcEndpointRef的子类,重写了RpcEndpointRef中很多方法,在实例化RpcEndpointRefWhen are instantiatedNettyRpcEndpointRef对象; a node for receipt of the clientRpcEndpointRef对象
in the sending node,According to the recipient nodeURI调用NettyRpcEnv#asyncSetupEndpointRefByURI(uri: String),Asynchronous access to the receiver node reference object;
private[spark] abstract class RpcEnv(conf: SparkConf) {
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
//根据节点URIAsynchronous node reference object
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
val addr = RpcEndpointAddress(uri)
val endpointRef = new NettyRpcEndpointRef(conf, addr, this)
val verifier = new NettyRpcEndpointRef(
conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap {
find =>
if (find) {
} else {
Future.failed(new RpcEndpointNotFoundException(uri))
} sender node passesRpcEndpointRefTo the receiver node sends the message
The sending node by calling the receiver node reference objectRpcEndpointRef的send()、ask()方法进行消息发送;
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
//Send a one-way asynchronous message;Range after sending the letter;
//by the accepting nodeRpcEndpoint.receive()方法处理
def send(message: Any): Unit
//After sending a message in a timeout time waiting for reply;
//by the accepting nodeRpcEndpoint.receiveAndReply()方法处理
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
private[netty] class NettyRpcEndpointRef(
@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,
@transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
} node to accept the message
private[spark] trait RpcEndpoint {
//A node by processingsendThe news sent here,No reply is required
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
//A node by processingaskThe news sent here,需要回信
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
//具体处理逻辑在RpcEndpoint的子类中实现,Different USES of the subclass implementation logic does not necessarily agree;消息从RpcEndpointRef到RpcEndpoint
如下图所示,message sending node(sender node)Send a message to the message receiving node(recipient node)流程:
1、The sender node through the receiver node referenceNettyRpcEndpointRef对象(rpcEndRef)调用ask()方法进行消息发送;
4、For sending a message to the local node,Since each node at the time of instantiation,The dispenser to newsDispatcher进行注册,所以DispatcherThe cache can accept message node information–List of recipient nodes;在dispatcher.postLocalMessage()中,First of all, according to the receipt of the message node name list find a receipt from the receiver node,Then send a message to the receiver node inbox(调用Inbox.post()方法);
5、For sending a message to the remote node,If the node reference objects have maintained communication client,Through the communication the client sends a message,否则,Building a node reference object outbox,Send messages outbox;
至此,Completion message nodes from the sender to the receiver of the transfer;
private[netty] class NettyRpcEndpointRef(
@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,
@transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private[netty] def send(message: RequestMessage): Unit = {
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
//Send a message to the local node
// Message to a local RPC endpoint.
try {
//Call the message dispatcherpostLocalMessage方法
} catch {
case e: RpcEnvStoppedException => logDebug(e.getMessage)
} else {
//通过RPCWay to send a message to the remote node
// Message to a remote RPC endpoint.
postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
try {
if (remoteAddr == address) {
//Send a message to the local node
//Call the message dispatcherpostLocalMessage方法
dispatcher.postLocalMessage(message, p)
} else {
//通过RPCWay to send a message to the remote node
val rpcMessage = RpcOutboxMessage(message.serialize(this),
(client, response) => onSuccess(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
} catch {
case NonFatal(e) =>
//Send a message to the remote node
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
//There are communication client,Direct communication through the client sends the message
if (receiver.client != null) {
} else {
//There is no communication client
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
//Building a node reference object outbox
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
} else {
} else {
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
} else {
//Through the outbox sending messages
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//The dispenser in the node information encapsulation class,Will the same node name、节点、节点引用、Inbox together
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
//Node name and node information encapsulation type mapping relationship:Information can be found according to the name of the node node wrapper class
private val endpoints: ConcurrentMap[String, EndpointData] =
new ConcurrentHashMap[String, EndpointData]
//The dispenser in the registration receipt node(收件人)
private val receivers = new LinkedBlockingQueue[EndpointData]
//Send a message to the local node
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
val rpcCallContext =
new LocalNettyRpcCallContext(message.senderAddress, p)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
//From the message for the receiver node name,发送消息
postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
def postOneWayMessage(message: RequestMessage): Unit = {
//From the message for the receiver node name,封装消息,发送消息
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
(e) => throw e)
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
//According to the receiver node name from the dispenser object cache to find the receiver node
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
//Send a message to the receiver node inbox
// We don't need to call `onStop` in the `synchronized` block
在NettyRpcEnv中有属性dispatcher,该属性通过new Dispatcher实现初始化;即在RpcEnv实例化过程中,完成了Dispatcher的实例化过程;
Dispatcher类中存在2个内部类:EndpointData类和MessageLoop类;EndpointDataClass encapsulates information communication node,包括节点、节点引用、节点名称、Node Inbox;MessageLoopClass describes a thread in the distribution of the message to the node for message processing logic;内部类
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//Communication node information class:The name of the same node、节点对象、Node reference object、Node inbox together
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
//Message loop class thread:Through the way of first in first out from the receiver node blocking queue(Node when registering the cache node data)Access to the receiver node,Call the receiver node's ability to process the message to complete message processing
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
//Through the way of first in first out from the receiver node blocking queue(Node when registering the cache node data)Access to the receiver node
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
//Call the receiver node's ability to process the message to complete message processing
} catch {
case NonFatal(e) => logError(e.getMessage, e)
} catch {
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a MessageLoop so that Dispatcher will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(new MessageLoop)
} finally {
throw t
The message dispatcher passesConcurrentHashMap、LinkedBlockingQueueCache communications node registration information;其中LinkedBlockingQueueNode cache all available nodes communication information encapsulation object;The registration information in communication nodes todispatcherCompleted registration information fill;
Messages distributed processing thread pool inDispatcherInstantiation to complete initialization,Thread pool threads by performing a message loop distribute message processing;The number of threads in thread pool to determine the way:参数配置中spark.rpc.netty.dispatcher.numThreadsThe value of the parameter Settings,With the set values of the parameters,否则在2和可用cpuTake the larger of the number of cores;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//节点名称-The node information encapsulation object mapping,communication node todispatcherRegistration information fill
private val endpoints: ConcurrentMap[String, EndpointData] =
new ConcurrentHashMap[String, EndpointData]
//节点对象-The node reference object mapping,communication node todispatcherRegistration information fill
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
//Recipient node queue,一个阻塞队列:communication node todispatcherRegistration information fill
private val receivers = new LinkedBlockingQueue[EndpointData]
//If the scheduler has stopped,则为true.一旦停止,All message will be returned immediately release.
private var stopped = false
//Messages distributed processing thread pool
private val threadpool: ThreadPoolExecutor = {
//可以cpu核数:参数>0用参数值,否则使用jvmnumber of processes
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
//Determine the number of threads in thread pool:参数配置中spark.rpc.netty.dispatcher.numThreadsThe value of the parameter Settings,With the set values of the parameters,否则在2和可用cpuTake the larger of the number of cores
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, availableCores))
//According to the number of threads to initialize the thread pool
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
//Thread message loop execution,Distribute the message processing
pool.execute(new MessageLoop)
//指示MessageLoopShould quit the harmful endpoint of the message loop.
private val PoisonPill = new EndpointData(null, null, null)
}向dispatcherRegistered in communications node
Node registered mainly completed the following events:
在disaptcherCache node name-The node information encapsulation object mapping,ConcurrentHashMap缓存;
在disaptcherCache node object-The node reference object mapping,ConcurrentHashMap缓存;
在disaptcherCache the receiver node,LinkedBlockingQueue缓存;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
private val endpoints: ConcurrentMap[String, EndpointData] =
new ConcurrentHashMap[String, EndpointData]
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
private val receivers = new LinkedBlockingQueue[EndpointData]
private var stopped = false
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
//Encapsulates node information,Complete the node name-Node information encapsulation object map
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
//Complete the node object-The mapping cache node reference object
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
//Node blocking queue cache
receivers.offer(data) // for the OnStart message
//Returns the node reference
By putting the message is sent to the receiver node inbox completion message sent;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//To all the nodes sending messages
def postToAll(message: InboxMessage): Unit = {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
postMessage(name, message, (e) => {
e match {
case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${
case e: Throwable => logWarning(s"Message $message dropped. ${
//Release of the remote node forwards the message
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
//Encapsulate the sent content
val rpcCallContext =
new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
//Release the local node forwards the message
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
//Encapsulate the sent content
val rpcCallContext =
new LocalNettyRpcCallContext(message.senderAddress, p)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
//Send a one-way no return messages
def postOneWayMessage(message: RequestMessage): Unit = {
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
(e) => throw e)
/** * Send a message to the corresponding node * * @param endpointName 节点名称 * @param message 需要发送的消息 * @param callbackIfStopped Message is sent after the execution of the callback function */
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
//According to the node name for the node information was obtained from the cache object
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
//To the node's inbox sending messages
//Insert the node information object queue tail
// 消息发送完成,执行回调函数
Through the thread pool thread message loop execution,In the message loop in the thread,From blocking the queue for the receiver node information,And then to the receiver node inbox sending messages;When the receiver node of the blocking queue when no data,线程阻塞,Know that by the new node into the queue,The thread is not blocking,Continue to get the node has to perform;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
def verify(name: String): Boolean = {
//dispatcherInstantiation complete thread pool initialization;The thread pool thread message loop execution
private val threadpool: ThreadPoolExecutor = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, availableCores))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
//Message loop class thread:Through the way of first in first out from the receiver node blocking queue(Node when registering the cache node data)Access to the receiver node,Call the receiver node's ability to process the message to complete message processing
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
} catch {
case NonFatal(e) => logError(e.getMessage, e)
} catch {
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a MessageLoop so that Dispatcher will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(new MessageLoop)
} finally {
throw t
/** A poison endpoint that indicates MessageLoop should exit its message loop. */
private val PoisonPill = new EndpointData(null, null, null)
1、一个RpcEnv(RPC通信环境)Binding a message dispenser;
2、Communication nodes instantiation,Need to message dispatcher registered,Message dispatcher cache all available communications node information;
3、Message dispatcher provide node registered、反注册、Send a message to the receiver node inbox、Distribution of messages to the receiver node for processing ability;
4、Message dispatcher is a communication intermediary,The sender node sends a message to the message the dispenser,Message dispatcher to send messages to the receiver node inbox;
5、Message dispatcher maintains a thread pool,Many a message loop thread pool just,Through the way of first in first out from the receiver node blocking queue(Node when registering the cache node data)Access to the receiver node,Call the receiver node's ability to process the message to complete message processing;收件箱
如下图所示:收件箱InboxHolds the node object、Node reference object properties,Can be identified by the inbox belonging node;节点通过messagesAttribute to the inbox messages cache,messages时一个InboxMessage消息的列表;
在Inbox中,Providing for message cache、The ability to call nodes capable of handling the message;这2Ability will need throughdispatcher调用才能生效;实例化Inbox
在实例化Inbox过程中,Finish initializing message buffer queue,Has been added to the message buffer queueOnstart消息;
Each node to the message dispatcher registered,Can bind an inbox and add the first message to the inbox-Onstart消息;The message in the node registered after the completion of the,By the message dispatcher thread pool up messages circulating thread execution of registered node distribution,Finally by the registered nodesonstart方法处理;
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
//Initialize the message buffer queue
protected val messages = new java.util.LinkedList[InboxMessage]()
/** True if the inbox (and its associated endpoint) is stopped. */
private var stopped = false
/** Allow multiple threads to process messages at the same time. */
private var enableConcurrent = false
/** The number of threads processing messages for this inbox. */
private var numActiveThreads = 0
//将OnstartThe message the first message buffer queue to be added to the inbox
//OnstartThe message in the node instantiated,to the message dispatcherdispatcherExecute after registration(To complete the registration process,Have to bind the inbox to the node and addOnstart消息),By the message dispatcher thread pool automatically pull up registered nodes message loop thread executionOnstart消息处理;
inbox.synchronized {
通过LinkedListProvide message buffer capacity;
When the inbox received throughdispatcher发送过来的消息后,Cache to the message list;
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
//Provides caching information ability
protected val messages = new java.util.LinkedList[InboxMessage]()
def post(message: InboxMessage): Unit = inbox.synchronized {
if (stopped) {
// We already put "OnStop" into "messages", so we should drop further messages
} else {
//Add a new message to the message buffer in the list
protected def onDrop(message: InboxMessage): Unit = {
logWarning(s"Drop $message because $endpointRef is stopped")
The processing of inbox messages,According to the rules of fifo,从前向后,一条一条处理;
Each message has a single thread processing;
消息的处理,Nodes have inbox belong to the corresponding method of treatment;
Message type and the methods of processing messages in corresponding relation:
处理OnStart消息后,The switch of open multiple threads processing messages,Allows multiple threads to handle inbox messages at the same time;
处理OnStop消息后,From the message of the dispenser in a node list removed to performOnStop消息的节点;
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
//Provides caching information ability
protected val messages = new java.util.LinkedList[InboxMessage]()
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
//Each process messages in the cache head message,The first message in the message buffer
message = messages.poll()
if (message != null) {
//Each message by a single thread processing,No processing messages a day,Inbox process messages on the number of threads+1
numActiveThreads += 1
} else {
//所有的消息,All nodes corresponding method of treatment by the inbox belong to
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, {
msg =>
throw new SparkException(s"Unsupported message $message from ${
} catch {
case e: Throwable =>
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, {
msg =>
throw new SparkException(s"Unsupported message $message from ${
case OnStart =>
//Allows multiple threads to handle inbox messages at the same time
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
case OnStop =>
val activeThreads = inbox.synchronized {
inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
//News from the dispenser of removing the current node in a node list
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
case RemoteProcessDisconnected(remoteAddress) =>
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
in the outbox,通过LinkedList列表缓存OutboxMessage消息;And maintain communication in outbox clientTransportClient对象;Provides messagingsend能力;
Only handle is sent to the remote node other(非本节点)的消息,Will use box arose, the stream;Through outbox communications client maintain the connection with remote address,最后调用OutboxMessage.sendWith()方法处理消息;实例化Outbox
In the node reference objects not maintain communications client properties,Instantiation outbox and bind to the node in the reference;
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
if (receiver.client != null) {
} else {
//When there is no maintain communications client node reference object,Need to instantiate the outbox,And to outbox information is cachedrpcEnv中
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
} else {
} else {
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
} else {
首先,Add message box arose, the stream message buffer queue;
其次,Ensure outbox communications client isn't empty,Otherwise, according to the remote address to create a communications client;
然后,According to the rules of fifo,Circulation processing message queue all messages(OutboxMessage.sendWith()方法处理消息),until the message clears,End message processing;
private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
outbox => // Give this an alias so we can use it more clearly in closures.
private val messages = new java.util.LinkedList[OutboxMessage]
private var client: TransportClient = null
//Ability to send message outbox
def send(message: OutboxMessage): Unit = {
val dropped = synchronized {
if (stopped) {
} else {
//The message buffer departure message cache list box
if (dropped) {
message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
} else {
//清空消息队列:The message buffer queue all messages sent out
private def drainOutbox(): Unit = {
var message: OutboxMessage = null
synchronized {
if (stopped) {
if (connectFuture != null) {
// Is creating a connection communication on the far side of the client
if (client == null) {
// Missing communication client,Start the connection task
if (draining) {
// There have been a thread is empty message queue tasks,Is not the clear mission
//The treatment was obtained from the adversary from news
message = messages.poll()
if (message == null) {
//Identify existing task is clear mission
draining = true
while (true) {
//Circulation processing all the messages in the message queue,Until the empty message queue
try {
val _client = synchronized {
client }
if (_client != null) {
} else {
assert(stopped == true)
} catch {
case NonFatal(e) =>
synchronized {
if (stopped) {
message = messages.poll()
if (message == null) {
//The message queue is emptied,不在循环
draining = false
private def launchConnectTask(): Unit = {
//由rpcEnvThe liability of the client connection pool execution create communications client thread
connectFuture = nettyEnv.clientConnectionExecutor.submit(new Callable[Unit] {
override def call(): Unit = {
try {
//A connection is established based on the remote address communication on the far side of the client
val _client = nettyEnv.createClient(address)
outbox.synchronized {
//Communications client binding box arose, the stream
client = _client
if (stopped) {
} catch {
case ie: InterruptedException =>
// exit
case NonFatal(e) =>
outbox.synchronized {
connectFuture = null }
//Identification with the remote connection is established successfully
outbox.synchronized {
connectFuture = null }
// Communications client build successful,Began to empty the message list
OutboxMessageTop is outbox messagestrait,Defines the messages and failure handling behavior;有2个子类,OneWayOutboxMessagThe message does not need to return information,Client calls communicationsend方法完成消息发送;RpcOutboxMessage消息有返回值,Invoke the communication clientsendRpc方法完成消息发送;
//Outbox messages toptrait:Define the message sending and failure handling behavior
private[netty] sealed trait OutboxMessage {
def sendWith(client: TransportClient): Unit
def onFailure(e: Throwable): Unit
//One-way outbox messages without return wrapper class
private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage
with Logging {
override def sendWith(client: TransportClient): Unit = {
override def onFailure(e: Throwable): Unit = {
e match {
case e1: RpcEnvStoppedException => logDebug(e1.getMessage)
case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
//需要RPCSending outbox messages wrapper class
private[netty] case class RpcOutboxMessage(
content: ByteBuffer,
_onFailure: (Throwable) => Unit,
_onSuccess: (TransportClient, ByteBuffer) => Unit)
extends OutboxMessage with RpcResponseCallback with Logging {
private var client: TransportClient = _
private var requestId: Long = _
override def sendWith(client: TransportClient): Unit = {
this.client = client
this.requestId = client.sendRpc(content, this)
def onTimeout(): Unit = {
if (client != null) {
} else {
logError("Ask timeout before connecting successfully")
override def onFailure(e: Throwable): Unit = {
override def onSuccess(response: ByteBuffer): Unit = {
_onSuccess(client, response)
2.2 Transport体系
client、server通信模型:When the initialization line definition;
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
public class TransportContext {
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
this(conf, rpcHandler, false);
//构造函数,Complete property binding
public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
TransportServer的实例化,在创建driver端执行环境,初始化sparkEnvprocess is completed;通过TransportContext#createServer()完成server的创建;
SparkEnv#createDriverEnv() —> SparkEnv#create() —> RpcEnv#create() —> NettyRpcEnvFactory#create()
—> NettyRpcEnv#startServer() —> TransportContext#createServer()
object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
private[spark] object RpcEnv {
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = {
actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
throw e
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
server = transportContext.createServer(bindAddress, port, bootstraps)
//The nodes registered with the news of the dispenser
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
在创建TransportServernode process,创建NettyThe server root boot,On rear pipe with bootstrap initialization callback function、绑定Socket的监听端口;
When initializing the pipeline,创建TransportChannelHandler,将将TransportChannelHandler添加到socketChannel中;
public class TransportContext {
private final RpcHandler rpcHandler;
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
//Define pipeline news/数据处理流程
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
public class TransportServer implements Closeable {
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
this.context = context;
this.conf = context.getConf();
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
boolean shouldClose = true;
try {
init(hostToBind, portToBind);
shouldClose = false;
} finally {
if (shouldClose) {
private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
conf.getModuleName() + "-boss");
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
// 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
// 创建NettyThe server root boot并对其进行配置
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, allocator);
this.metrics = new NettyMemoryMetrics(
allocator, conf.getModuleName() + "-server", conf);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
// 为根引导程序设置管道初始化回调函数
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
context.initializePipeline(ch, rpcHandler);
// 给根引导程序绑定Socket的监听端口
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
Outbox#send() —> Outbox#drainOutbox() —> Outbox#launchConnectTask() —> NettyRpcEnv#createClient() —> TransportClientFactory#createClient();
When outbox empty message queue,If there is no binding outbox communications clientclient,调用NettyRpcEnv#createClient()方法,最终通过TransportClientA factory object to instantiate aTransportClient对象;
private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
outbox => // Give this an alias so we can use it more clearly in closures.
private val messages = new java.util.LinkedList[OutboxMessage]
private var client: TransportClient = null
//Ability to send message outbox
def send(message: OutboxMessage): Unit = {
if (dropped) {
message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
} else {
//清空消息队列:The message buffer queue all messages sent out
private def drainOutbox(): Unit = {
var message: OutboxMessage = null
synchronized {
if (client == null) {
// Missing communication client,Start the connection task
private def launchConnectTask(): Unit = {
//由rpcEnvThe liability of the client connection pool execution create communications client thread
connectFuture = nettyEnv.clientConnectionExecutor.submit(new Callable[Unit] {
override def call(): Unit = {
try {
//A connection is established based on the remote address communication on the far side of the client
val _client = nettyEnv.createClient(address)
} catch {
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
private[netty] def createClient(address: RpcAddress): TransportClient = {
//Through the factory to create the client
clientFactory.createClient(address.host, address.port)
TransportContext#initializePipeline() —> TransportContext#createChannelHandler() —> new TransportClient(channel, responseHandler);
在通过TransportContextInitializes the communication line,需要构建TransportChannelHandler对象添加到socketChannel中,在构建TransportChannelHandler对象时,通过TransportClientA constructor to build firstTransportClient对象;
public class TransportContext {
private final RpcHandler rpcHandler;
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
Access to the client object steps:
1、利用ConcurrentHashMap<SocketAddress, TransportClientFactory.ClientPool>The client and caching address pool map,The client in the pool to store multiple clients,Each time the client from the pool of random took out a client object to use;
2、Create a communications client object,先从ConcurrentHashMapThe cache to address access client poolClientPool,Then from the client poolClientPoolRandom access to a client object;
3、If the client object is not null cutting is active,Update the client bindingTransportChannelHandler对象中TransportResponseHandlerThe end of the object request time;
4、If the client object is empty or is in the inactive,创建一个客户端,Put the new client in the client in the pool,并返回;
A new client object steps:
2、为根引导程序设置管道初始化回调函数,此回调函数将调用TransportContext的initializePipeline方法初始化Channel的pipeline(构建TransportChannelHandler对象添加到socketChannel中,在构建TransportChannelHandler对象时,通过TransportClientA constructor to build firstTransportClient对象);
创建TransportClient对象client时,将SocketChannel对象channel绑定到client的属性中;对channel的初始化中,Define the message in the pipe/数据的处理流程,包括编码、解码等,At the end of the pipe message processing step isTransportChannelHandler对象;
即通过socketChannel进行消息/数据传递时,最后都会有TransportChannelHandlerclass for messages/数据处理;
public class TransportClientFactory implements Closeable {
public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException {
//Encapsulates the communication address
InetSocketAddress unresolvedAddress = InetSocketAddress.createUnresolved(remoteHost, remotePort);
//Through the connection pool to obtain the client pool
TransportClientFactory.ClientPool clientPool = (TransportClientFactory.ClientPool)this.connectionPool.get(unresolvedAddress);
//The client pool does not exist,Build the client pool in the connection pool
if (clientPool == null) {
this.connectionPool.putIfAbsent(unresolvedAddress, new TransportClientFactory.ClientPool(this.numConnectionsPerPeer));
clientPool = (TransportClientFactory.ClientPool)this.connectionPool.get(unresolvedAddress);
//Client from the pool of random access communications client objectclient
int clientIndex = this.rand.nextInt(this.numConnectionsPerPeer);
TransportClient cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null && cachedClient.isActive()) {
//Update the client bindingTransportChannelHandler中TransportResponseHandlerThe end of the object request time
TransportChannelHandler handler = (TransportChannelHandler)cachedClient.getChannel().pipeline().get(TransportChannelHandler.class);
synchronized(handler) {
//Return communications client object
if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", cachedClient.getSocketAddress(), cachedClient);
return cachedClient;
long preResolveHost = System.nanoTime();
InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000L;
if (hostResolveTimeMs > 2000L) {
logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
} else {
logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
synchronized(clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null) {
if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
return cachedClient;
logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
//The client object that was obtained from the client in the pool is empty or is not active,To create a client object,And in the client pool
clientPool.clients[clientIndex] = this.createClient(resolvedAddress);
return clientPool.clients[clientIndex];
//Create a communications client object
private TransportClient createClient(InetSocketAddress address) throws IOException, InterruptedException {
logger.debug("Creating new connection to {}", address);
// 构建根引导器Bootstrap并对其进行配置
Bootstrap bootstrap = new Bootstrap();
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
if (this.conf.receiveBuf() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, this.conf.receiveBuf());
if (this.conf.sendBuf() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, this.conf.sendBuf());
final AtomicReference<TransportClient> clientRef = new AtomicReference();
final AtomicReference<Channel> channelRef = new AtomicReference();
// 为根引导程序设置管道初始化回调函数
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
//初始化管道:构建TransportChannelHandler对象添加到socketChannel中,在构建TransportChannelHandler对象时,通过TransportClientA constructor to build firstTransportClient对象
TransportChannelHandler clientHandler = TransportClientFactory.this.context.initializePipeline(ch);
long preConnect = System.nanoTime();
// 使用根引导程序连接远程服务器
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await((long)this.conf.connectionTimeoutMs())) {
throw new IOException(String.format("Connecting to %s timed out (%s ms)", address, this.conf.connectionTimeoutMs()));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
} else {
TransportClient client = (TransportClient)clientRef.get();
Channel channel = (Channel)channelRef.get();
assert client != null : "Channel future completed successfully with null client";
long preBootstrap = System.nanoTime();
logger.debug("Connection to {} successful, running bootstraps...", address);
try {
Iterator var12 = this.clientBootstraps.iterator();
while(var12.hasNext()) {
// 给TransportClient设置客户端引导程序
TransportClientBootstrap clientBootstrap = (TransportClientBootstrap)var12.next();
clientBootstrap.doBootstrap(client, channel);
} catch (Exception var15) {
long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000L;
logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", var15);
throw Throwables.propagate(var15);
long postBootstrap = System.nanoTime();
logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", new Object[]{
address, (postBootstrap - preConnect) / 1000000L, (postBootstrap - preBootstrap) / 1000000L});
return client;
2.2.3.TransportClient通过TransportServerSend a message to the corresponding node对象发送消息
通过TransportClient对象绑定的Channel对象,After the encapsulation of news writing communication pipeline;
public class TransportClient implements Closeable {
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", NettyUtils.getRemoteAddress(this.channel));
long requestId = requestId();
this.handler.addRpcRequest(requestId, callback);
//Listen to the communication channel
TransportClient.RpcChannelListener listener = new TransportClient.RpcChannelListener(requestId, callback);
//封装消息,The message is written communication pipeline,并添加监听
this.channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(listener);
return requestId;
public void send(ByteBuffer message) {
//封装消息,The message is written communication pipeline
this.channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
在启动serverWhen setting up the bootloader,Pipeline initialization work,其中TransportChannelHandlerObjects defined when pipe processing data flow of the last ring;Through pipelines for communication,最后都由TransportChannelHandler对象进行数据处理;
针对rpcrequest message,由NettyRpcHandler#receive()进行消息处理,Finally to the message the dispenser for subsequent processing
public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
private final TransportClient client;
private final TransportResponseHandler responseHandler;
private final TransportRequestHandler requestHandler;
//消息分类,Respectively by the corresponding message processing
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
//请求消息,By the request processor processing
} else if (request instanceof ResponseMessage) {
//响应消息,By the response processing
} else {
public class TransportRequestHandler extends MessageHandler<RequestMessage> {
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
} else if (request instanceof RpcRequest) {
} else if (request instanceof OneWayMessage) {
} else if (request instanceof StreamRequest) {
} else {
if (!(request instanceof UploadStream)) {
throw new IllegalArgumentException("Unknown request type: " + request);
private void processRpcRequest(final RpcRequest req) {
try {
this.rpcHandler.receive(this.reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
public void onSuccess(ByteBuffer response) {
TransportRequestHandler.this.respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
public void onFailure(Throwable e) {
TransportRequestHandler.this.respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} catch (Exception var6) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, var6);
this.respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(var6)));
} finally {
private[netty] class NettyRpcHandler(
dispatcher: Dispatcher,
nettyEnv: NettyRpcEnv,
streamManager: StreamManager) extends RpcHandler with Logging {
// A variable to track the remote RpcEnv addresses of all clients
private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
//Will the news to the dispenserpostRemoteMessage()方法
dispatcher.postRemoteMessage(messageToDispatch, callback)
//Message dispatcher processing the remoterpc消息
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
val rpcCallContext =
new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
1、clientSend a message to the message pipeline;
2、Messages pipeline according to the definition of treatment process for message,In the last step byTransportChannelHandlerTo classify the message;
3、rpcAfter classifying the request message in,由TransportRequestHandler进行处理:调用NettyRpcHandler#receive()进行消息处理;
4、在NettyRpcHandler#receive()中,The message to the message the dispenser;By the distributor for subsequent message processing;
To send to the news of the local node,Directly to the message for message distributed to the corresponding node inbox dispenser;
To send to the remote node using a message,A node is sent into the first outbox,Then by communicationclientPushed to the communication pipelinechannel;Pipeline decoding the message into a coding and a series of process,最后由TransportChannelHandlerObject classification processing:针对rpc请求消息,由TransportRequestHandler对象交给NettyRpcHandler处理,NettyRpcHandlerThe message to the message for message distributed to the corresponding node inbox dispenser;
For each node in the inbox message queue processing,in the message dispatcherdispatcher中维护了一个线程池,The pool multiple threads polling the receiver node,Will receive a node inbox messages the head of the queue for distribution,Call the corresponding node for subsequent message processing;
