当前位置:网站首页>Spark RPC
Spark RPC
2022-07-27 15:36:00 【wankunde】
List of articles
RpcEndpoint & RpcEndpointRef & NettyRpcEndpointRef RPC Call interface
SparkEnv : Saved a Spark All environment information of the running instance
RpcEnv <-- new NettyRpcEnvFactory().create(RpcEnvConfig())
RpcEndpoint and RpcEndpointRef It should be the upper layer we use in programming RPC Programming interface .
- RpcEndpoint Representing one RPC Communication terminal , So we need to pass
rpcEnv.setupEndpoint()So that others can find us , Communicate with us - When we want to be with another RpcEndpoint When communicating , need
rpcEnv.setupEndpointRef()Incoming and remote RpcEndpoint Establishing a connection - adopt
RpcEndpointRefOf send, ask And remote communication
example 1:
test("send a message remotely") {
@volatile var message: String = null
// Set up a RpcEndpoint using env
env.setupEndpoint("send-remotely", new RpcEndpoint {
override val rpcEnv = env
override def receive: PartialFunction[Any, Unit] = {
case msg: String => message = msg
}
})
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely")
try {
rpcEndpointRef.send("hello")
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
}
}
Examples :
- RpcEndpointAddress : According to URI Create , Such as
spark://[email protected]:54075 - NettyRpcEndpointRef : According to the above address establish , Such as
NettyRpcEndpointRef(spark://[email protected]:54075)
example 2: RpcEndpointRef send out Rpc
// 1. example EndpointRef
val verifier = new NettyRpcEndpointRef(
conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
// 2. RpcEndpointVerifier.CheckExistence(endpointRef.name) It's a case class object , Theoretically, as long as it is serializable , We can send anything
// 3. ask() Function to send a message , Including the encapsulation of messages , Interruptible request , timeout handler ,
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap {
find =>
if (find) {
Future.successful(endpointRef)
} else {
Future.failed(new RpcEndpointNotFoundException(uri))
}
}(ThreadUtils.sameThread)
NettyRpcEnv Send a message
// NettyRpcEnv
private[netty] def askAbortable[T: ClassTag](
message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
e match {
case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}
def onSuccess(reply: Any): Unit = reply match {
case RpcFailure(e) => onFailure(e)
case rpcReply =>
if (!promise.trySuccess(rpcReply)) {
logWarning(s"Ignored message: $reply")
}
}
def onAbort(reason: String): Unit = {
onFailure(new RpcAbortException(reason))
}
try {
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
// 1. encapsulation RPC news , And define the success and failure of message processing callback function
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
// 2. Send a message to Outbox
postToOutbox(message.receiver, rpcMessage)
// 3. After the message is processed, it will enter onSuccess or onFailure, In these two methods, there will be promise.future return
// 4. timeout and abort Two functions are called locally
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _: RpcAbortException => rpcMessage.onAbort()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete {
v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
onFailure(e)
}
new AbortableRpcFuture[T](
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread),
onAbort)
}
RpcOutboxMessage
RpcOutboxMessage Contains the message body we want to send , Processing successful response function , Handle the failure effect function . RpcOutboxMessage It's still a RpcResponseCallback, adopt client call sendRpc When the method is used , Will call back its own internal methods .
private[netty] case class RpcOutboxMessage(
content: ByteBuffer,
_onFailure: (Throwable) => Unit,
_onSuccess: (TransportClient, ByteBuffer) => Unit)
extends OutboxMessage with RpcResponseCallback with Logging {
private var client: TransportClient = _
private var requestId: Long = _
override def sendWith(client: TransportClient): Unit = {
this.client = client
// Send the message itself , And register yourself as callback function
this.requestId = client.sendRpc(content, this)
}
private[netty] def removeRpcRequest(): Unit = {
if (client != null) {
client.removeRpcRequest(requestId)
} else {
logError("Ask terminated before connecting successfully")
}
}
def onTimeout(): Unit = {
removeRpcRequest()
}
def onAbort(): Unit = {
removeRpcRequest()
}
override def onFailure(e: Throwable): Unit = {
_onFailure(e)
}
override def onSuccess(response: ByteBuffer): Unit = {
_onSuccess(client, response)
}
}
//TransportClient
/** * Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked * with the server's response or upon any failure. * * @param message The message to send. * @param callback Callback to handle the RPC's reply. * @return The RPC's id. */
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
long requestId = requestId();
handler.addRpcRequest(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
.addListener(listener);
return requestId;
}
Outbox
Outbox The setting of is mainly used to solve the original Spark RPC Message communication , The problem of disordered messages . For example, send messages successively A , B, however remote endpoint The order in which messages are received is B , A. So we introduced Outbox, And started a thread to send messages in turn to ensure the sequence of messages .
Issue : [SPARK-11098][Core]Add Outbox to cache the sending messages to resolve the message disorder issue
Message receiving and sending services are two completely asynchronous actions .
Message sending time :
- establish rpc When connecting , Because the action of storing messages may be asynchronous , So when the connection is established , There are already messages , So you need to send .
- When the message arrives , If the connection has been established , Start the asynchronous sending thread ; If not ready , No sending is .
// Outbox receive messages , Wrap the message , Then cache to LinkedList in
NettyRpcEndpointRef::def send(message: Any)
NettyRpcEnv::def send(message: RequestMessage)
NettyRpcEnv::postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage)
Outbox::send(message)
messages.add(message)
// Outbox Send a message
Traverse message list, If there is , Send a message
Outbox.drainOutbox() :
launchConnectTask()
message = messages.poll()
RpcOutboxMessage.sendWith(client)
TransportClient.sendRpc(message) take message Encapsulated in the RpcRequest, adopt channel Send out
Inbox
There are two kinds of sources , One is to receive remote EndpointRef Messages sent , Second, at present Endpoint dispatch Forwarded message
NettyRpcEnv::def send(message: RequestMessage) // At present Endpoint Forward local messages
NettyRpcHandler::override def receive( client: TransportClient, message: ByteBuffer) // Receive messages from remote
Add a message to the mailbox
Dispatcher::def postRemoteMessage() def postLocalMessage() def postOneWayMessage()
Dispatcher::private def postMessage()
DedicatedMessageLoop::override def post(endpointName: String, message: InboxMessage)
Inbox::def post(message: InboxMessage) :
Traverse message list, Use endpoint Process the extracted message
def process(dispatcher: Dispatcher)
RpcMessage -> endpoint.receiveAndReply(context)
OneWayMessage -> endpoint.receive
Network communication module
adopt rpcEnv establish server Communication port
SparkEnv::private def create()
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
RpcEnv::new NettyRpcEnvFactory().create(config)
NettyRpcEnvFactory::nettyEnv.startServer(config.bindAddress, actualPort)
// Initialize a rpcHandler, You will see the place you only use
NettyRpcEnv::private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager))
NettyRpcEnv::def startServer(bindAddress: String, port: Int)
server = transportContext.createServer(bindAddress, port, bootstraps)
TransportContext::public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps)
TransportContext::new TransportServer(this, host, port, rpcHandler, bootstraps)
TransportServer::private void init(String hostToBind, int portToBind) // Start here Server service
// Above we see rpcHandler Initialization and application of
// TransportServer
private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
// Start here Server service
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
this.metrics = new NettyMemoryMetrics(
pooledAllocator, conf.getModuleName() + "-server", conf);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
if (conf.enableTcpKeepAlive()) {
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
}
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());
// Use here rpcHandler Do relevant initialization
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
Outbox::private def launchConnectTask()
val _client = nettyEnv.createClient(address)
TransportClientFactory::public TransportClient createClient(String remoteHost, int remotePort)
TransportContext::public TransportChannelHandler initializePipeline()
TransportChannelHandler::public void channelRead0(ChannelHandlerContext ctx, Message request)
TransportRequestHandler::public void handle(RequestMessage request)
TransportRequestHandler::private void processOneWayMessage(OneWayMessage req)
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
边栏推荐
- Leetcode 74. search two-dimensional matrix bisection /medium
- 【剑指offer】面试题51:数组中的逆序对——归并排序
- STM32F10x_ Hardware I2C read / write EEPROM (standard peripheral library version)
- USB interface electromagnetic compatibility (EMC) solution
- 设置提示框位置随鼠标移动,并解决提示框显示不全的问题
- Spark 3.0 测试与使用
- Leetcode 341. flattened nested list iterator DFS, stack / medium
- npm install错误 unable to access
- Network equipment hard core technology insider router 19 dpdk (IV)
- 《吐血整理》C#一些常用的帮助类
猜你喜欢
![[daily question 1] 558. Intersection of quadtrees](/img/96/16ec3031161a2efdb4ac69b882a681.png)
[daily question 1] 558. Intersection of quadtrees

Hyperlink parsing in MD: parsing `this$ Set() `, ` $` should be preceded by a space or escape character`\`

Alibaba's latest summary 2022 big factory interview real questions + comprehensive coverage of core knowledge points + detailed answers

Summer Challenge harmonyos realizes a hand-painted board

How to edit a framework resource file separately

Dan bin Investment Summit: on the importance of asset management!

How to package AssetBundle

QT (IV) mixed development using code and UI files

EMC design scheme of CAN bus

【剑指offer】面试题41:数据流中的中位数——大、小堆实现
随机推荐
Leetcode 81. search rotation sort array II binary /medium
$router.back(-1)
Go language learning notes (1)
Multi table query_ Exercise 1 & Exercise 2 & Exercise 3
MLX90640 红外热成像仪测温传感器模块开发笔记(七)
js使用一元运算符简化字符串转数字
Overview of wechat public platform development
npm install错误 unable to access
JS find the maximum and minimum values in the array (math.max() method)
Four kinds of relay schemes driven by single chip microcomputer
Distributed lock
扩展Log4j支持日志文件根据时间分割文件和过期文件自动删除功能
Lua study notes
Network equipment hard core technology insider router Chapter 14 from deer by device to router (middle)
Is it safe to open an account on a mobile phone?
Unity's simplest object pool implementation
【剑指offer】面试题41:数据流中的中位数——大、小堆实现
【剑指offer】面试题50:第一个只出现一次的字符——哈希表查找
Spark 3.0 DPP实现逻辑
“router-link”各种属性解释