当前位置:网站首页>Spark RPC
Spark RPC
2022-07-27 15:36:00 【wankunde】
List of articles
RpcEndpoint & RpcEndpointRef & NettyRpcEndpointRef RPC Call interface
SparkEnv : Saved a Spark All environment information of the running instance
RpcEnv <-- new NettyRpcEnvFactory().create(RpcEnvConfig())
RpcEndpoint and RpcEndpointRef It should be the upper layer we use in programming RPC Programming interface .
- RpcEndpoint Representing one RPC Communication terminal , So we need to pass
rpcEnv.setupEndpoint()So that others can find us , Communicate with us - When we want to be with another RpcEndpoint When communicating , need
rpcEnv.setupEndpointRef()Incoming and remote RpcEndpoint Establishing a connection - adopt
RpcEndpointRefOf send, ask And remote communication
example 1:
test("send a message remotely") {
@volatile var message: String = null
// Set up a RpcEndpoint using env
env.setupEndpoint("send-remotely", new RpcEndpoint {
override val rpcEnv = env
override def receive: PartialFunction[Any, Unit] = {
case msg: String => message = msg
}
})
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely")
try {
rpcEndpointRef.send("hello")
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
}
}
Examples :
- RpcEndpointAddress : According to URI Create , Such as
spark://[email protected]:54075 - NettyRpcEndpointRef : According to the above address establish , Such as
NettyRpcEndpointRef(spark://[email protected]:54075)
example 2: RpcEndpointRef send out Rpc
// 1. example EndpointRef
val verifier = new NettyRpcEndpointRef(
conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
// 2. RpcEndpointVerifier.CheckExistence(endpointRef.name) It's a case class object , Theoretically, as long as it is serializable , We can send anything
// 3. ask() Function to send a message , Including the encapsulation of messages , Interruptible request , timeout handler ,
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap {
find =>
if (find) {
Future.successful(endpointRef)
} else {
Future.failed(new RpcEndpointNotFoundException(uri))
}
}(ThreadUtils.sameThread)
NettyRpcEnv Send a message
// NettyRpcEnv
private[netty] def askAbortable[T: ClassTag](
message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
e match {
case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}
def onSuccess(reply: Any): Unit = reply match {
case RpcFailure(e) => onFailure(e)
case rpcReply =>
if (!promise.trySuccess(rpcReply)) {
logWarning(s"Ignored message: $reply")
}
}
def onAbort(reason: String): Unit = {
onFailure(new RpcAbortException(reason))
}
try {
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
// 1. encapsulation RPC news , And define the success and failure of message processing callback function
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
// 2. Send a message to Outbox
postToOutbox(message.receiver, rpcMessage)
// 3. After the message is processed, it will enter onSuccess or onFailure, In these two methods, there will be promise.future return
// 4. timeout and abort Two functions are called locally
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _: RpcAbortException => rpcMessage.onAbort()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete {
v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
onFailure(e)
}
new AbortableRpcFuture[T](
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread),
onAbort)
}
RpcOutboxMessage
RpcOutboxMessage Contains the message body we want to send , Processing successful response function , Handle the failure effect function . RpcOutboxMessage It's still a RpcResponseCallback, adopt client call sendRpc When the method is used , Will call back its own internal methods .
private[netty] case class RpcOutboxMessage(
content: ByteBuffer,
_onFailure: (Throwable) => Unit,
_onSuccess: (TransportClient, ByteBuffer) => Unit)
extends OutboxMessage with RpcResponseCallback with Logging {
private var client: TransportClient = _
private var requestId: Long = _
override def sendWith(client: TransportClient): Unit = {
this.client = client
// Send the message itself , And register yourself as callback function
this.requestId = client.sendRpc(content, this)
}
private[netty] def removeRpcRequest(): Unit = {
if (client != null) {
client.removeRpcRequest(requestId)
} else {
logError("Ask terminated before connecting successfully")
}
}
def onTimeout(): Unit = {
removeRpcRequest()
}
def onAbort(): Unit = {
removeRpcRequest()
}
override def onFailure(e: Throwable): Unit = {
_onFailure(e)
}
override def onSuccess(response: ByteBuffer): Unit = {
_onSuccess(client, response)
}
}
//TransportClient
/** * Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked * with the server's response or upon any failure. * * @param message The message to send. * @param callback Callback to handle the RPC's reply. * @return The RPC's id. */
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
long requestId = requestId();
handler.addRpcRequest(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
.addListener(listener);
return requestId;
}
Outbox
Outbox The setting of is mainly used to solve the original Spark RPC Message communication , The problem of disordered messages . For example, send messages successively A , B, however remote endpoint The order in which messages are received is B , A. So we introduced Outbox, And started a thread to send messages in turn to ensure the sequence of messages .
Issue : [SPARK-11098][Core]Add Outbox to cache the sending messages to resolve the message disorder issue
Message receiving and sending services are two completely asynchronous actions .
Message sending time :
- establish rpc When connecting , Because the action of storing messages may be asynchronous , So when the connection is established , There are already messages , So you need to send .
- When the message arrives , If the connection has been established , Start the asynchronous sending thread ; If not ready , No sending is .
// Outbox receive messages , Wrap the message , Then cache to LinkedList in
NettyRpcEndpointRef::def send(message: Any)
NettyRpcEnv::def send(message: RequestMessage)
NettyRpcEnv::postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage)
Outbox::send(message)
messages.add(message)
// Outbox Send a message
Traverse message list, If there is , Send a message
Outbox.drainOutbox() :
launchConnectTask()
message = messages.poll()
RpcOutboxMessage.sendWith(client)
TransportClient.sendRpc(message) take message Encapsulated in the RpcRequest, adopt channel Send out
Inbox
There are two kinds of sources , One is to receive remote EndpointRef Messages sent , Second, at present Endpoint dispatch Forwarded message
NettyRpcEnv::def send(message: RequestMessage) // At present Endpoint Forward local messages
NettyRpcHandler::override def receive( client: TransportClient, message: ByteBuffer) // Receive messages from remote
Add a message to the mailbox
Dispatcher::def postRemoteMessage() def postLocalMessage() def postOneWayMessage()
Dispatcher::private def postMessage()
DedicatedMessageLoop::override def post(endpointName: String, message: InboxMessage)
Inbox::def post(message: InboxMessage) :
Traverse message list, Use endpoint Process the extracted message
def process(dispatcher: Dispatcher)
RpcMessage -> endpoint.receiveAndReply(context)
OneWayMessage -> endpoint.receive
Network communication module
adopt rpcEnv establish server Communication port
SparkEnv::private def create()
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
RpcEnv::new NettyRpcEnvFactory().create(config)
NettyRpcEnvFactory::nettyEnv.startServer(config.bindAddress, actualPort)
// Initialize a rpcHandler, You will see the place you only use
NettyRpcEnv::private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager))
NettyRpcEnv::def startServer(bindAddress: String, port: Int)
server = transportContext.createServer(bindAddress, port, bootstraps)
TransportContext::public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps)
TransportContext::new TransportServer(this, host, port, rpcHandler, bootstraps)
TransportServer::private void init(String hostToBind, int portToBind) // Start here Server service
// Above we see rpcHandler Initialization and application of
// TransportServer
private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
// Start here Server service
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
this.metrics = new NettyMemoryMetrics(
pooledAllocator, conf.getModuleName() + "-server", conf);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
if (conf.enableTcpKeepAlive()) {
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
}
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());
// Use here rpcHandler Do relevant initialization
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
Outbox::private def launchConnectTask()
val _client = nettyEnv.createClient(address)
TransportClientFactory::public TransportClient createClient(String remoteHost, int remotePort)
TransportContext::public TransportChannelHandler initializePipeline()
TransportChannelHandler::public void channelRead0(ChannelHandlerContext ctx, Message request)
TransportRequestHandler::public void handle(RequestMessage request)
TransportRequestHandler::private void processOneWayMessage(OneWayMessage req)
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
边栏推荐
- Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81
- IJCAI 2022 outstanding papers were published, and 298 Chinese mainland authors won the first place in two items
- How to edit a framework resource file separately
- 华为鸿蒙模拟器去除顶部导航栏方法
- Spark lazy list files 的实现
- js使用一元运算符简化字符串转数字
- Leetcode 240. search two-dimensional matrix II medium
- Reading notes of lifelong growth (I)
- 【剑指offer】面试题39:数组中出现次数超过一半的数字
- 3.3-5v conversion
猜你喜欢

Spark 3.0 Adaptive Execution 代码实现及数据倾斜优化

实现自定义Spark优化规则

【剑指offer】面试题42:连续子数组的最大和——附0x80000000与INT_MIN

Jump to the specified position when video continues playing

Spark 本地程序启动缓慢问题排查

【剑指offer】面试题53-Ⅰ:在排序数组中查找数字1 —— 二分查找的三个模版

flutter —— 布局原理与约束

Adaptation verification new occupation is coming! Huayun data participated in the preparation of the national vocational skill standard for information system adaptation verifiers
![[daily question 1] 558. Intersection of quadtrees](/img/96/16ec3031161a2efdb4ac69b882a681.png)
[daily question 1] 558. Intersection of quadtrees

Leetcode 190. reverse binary bit operation /easy
随机推荐
使用Lombok导致打印的tostring中缺少父类的属性
《吐血整理》C#一些常用的帮助类
Leetcode 81. search rotation sort array II binary /medium
C:什么是函数中的返回值(转)
Network equipment hard core technology insider router Chapter 10 Cisco asr9900 disassembly (III)
With just two modifications, apple gave styleganv2 3D generation capabilities
Network equipment hard core technology insider router Chapter 7 tompkinson roaming the network world (Part 2)
Deveco studio2.1 operation item error
QT (five) meta object properties
使用Prometheus监控Spark任务
js寻找数组中的最大和最小值(Math.max()方法)
Several basic uses of tl431-2.5v voltage reference chip
3.3-5v conversion
Google team launches new transformer to optimize panoramic segmentation scheme CVPR 2022
MySQL interview 40 consecutive questions, interviewer, if you continue to ask, I will turn my face
Huayun data creates a perfect information technology and innovation talent training system to help the high-quality development of information technology and innovation industry
Unity mouse controls the first person camera perspective
js使用for in和for of来简化普通for循环
EMC design scheme of RS485 interface
The design method of integral operation circuit is introduced in detail