当前位置:网站首页>How can people not love the amazing design of XXL job
How can people not love the amazing design of XXL job
2022-07-04 09:38:00 【Second brother learns Java】
Catalog
- Introduction to communication bottom
- Overall communication process
- Stunning design
Introduction to communication bottom
xxl-job Use netty http To communicate , Although it also supports Mina,jetty,netty tcp Methods such as , But what is fixed in the code is netty http.
Overall communication process
I take the example that the scheduler notifies the actuator to execute tasks , Activity diagram drawn :

Activity diagrams
Stunning design
After reading the whole process code , The design can be said to have originality , take netty, The knowledge of multithreading is applied smoothly .
I will now summarize these outstanding points in design as follows :
| Use dynamic agent mode , Hide communication details
xxl-job Two interfaces are defined ExecutorBiz,AdminBiz,ExecutorBiz The interface encapsulates the heartbeat , Pause , Trigger execution and other operations ,AdminBiz Encapsulated callback , register , Cancel registration , Interface implementation class , There is no communication related processing .
XxlRpcReferenceBean Class getObject() Method will generate a proxy class , This proxy class will communicate remotely .
| Fully asynchronous processing
The actuator receives the message and deserializes , There is no synchronized execution of the task code , Instead, the task information is stored in LinkedBlockingQueue in , Asynchronous threads get task information from this queue , And then execute .
And the processing result of the task , It doesn't mean that after processing , Synchronize the returned , It is also placed in the blocking queue of the callback thread , Return the processing result asynchronously .
The advantage of this treatment is to reduce netty Processing time of the worker thread , Improved throughput .
| Packaging for asynchronous processing
Asynchronous processing is packaged , The code seems to be called synchronously .
Let's look at the regulator ,XxlJobTrigger Class triggers the code of task execution :
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz =
XxlJobScheduler.getExecutorBiz(address);
// There are a lot of asynchronous processing , Finally, the processing result of synchronization
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
ExecutorBiz.run The method we mentioned , Is the dynamic proxy of walking , Communicate with the actuator , The execution result of the actuator is also processed asynchronously , Just returned , And what we see here run The method is to synchronously wait for the processing result to return .
Let's take a look at xxl-job How to get processing results synchronously : After the scheduler sends a message to the actuator , The thread is blocked . Wait until the actuator is processed , Return the processing result to , Wake up blocked threads , Get the return value at the call .
The dynamic proxy code is as follows :
// Trigger call in proxy class
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (
xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(
xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse Class implements thread waiting , And thread wake-up processing :
// Return results , Wake up the thread
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
// Thread blocking
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?
timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
}
return response;
}
Some students may ask , The dispatcher received the return result , How to determine which thread to wake up ?
Every remote call , Will generate uuid Request id, This id Is passed throughout the call , It's like a key , When you go home , Take it and open the door .
Take the request here id This key , You can find the corresponding XxlRpcFutureResponse, And then call setResponse Method , Set return value , Wake up the thread .
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// adopt requestId find XxlRpcFutureResponse,
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (
futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (
xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(
xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// It calls lock Of notify Method
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
边栏推荐
- 2022-2028 global gasket plate heat exchanger industry research and trend analysis report
- What is uid? What is auth? What is a verifier?
- Kubernetes CNI 插件之Fabric
- IIS configure FTP website
- ArrayBuffer
- Leetcode (Sword finger offer) - 35 Replication of complex linked list
- Reading notes on how to connect the network - hubs, routers and routers (III)
- PHP student achievement management system, the database uses mysql, including source code and database SQL files, with the login management function of students and teachers
- C # use gdi+ to add text to the picture and make the text adaptive to the rectangular area
- Global and Chinese market of wheel hubs 2022-2028: Research Report on technology, participants, trends, market size and share
猜你喜欢

C # use ffmpeg for audio transcoding

el-table单选并隐藏全选框

2022-2028 global intelligent interactive tablet industry research and trend analysis report

Leetcode (Sword finger offer) - 35 Replication of complex linked list

IIS configure FTP website

MySQL foundation 02 - installing MySQL in non docker version

pcl::fromROSMsg报警告Failed to find match for field ‘intensity‘.

Markdown syntax

Hands on deep learning (34) -- sequence model

Daughter love: frequency spectrum analysis of a piece of music
随机推荐
Regular expression (I)
Daughter love: frequency spectrum analysis of a piece of music
MySQL transaction mvcc principle
Trim leading or trailing characters from strings- Trim leading or trailing characters from a string?
Solution to null JSON after serialization in golang
什么是权限?什么是角色?什么是用户?
Write a jison parser from scratch (6/10): parse, not define syntax
[on February 11, 2022, the latest and most fully available script library collection of the whole network, a total of 23]
Global and Chinese trisodium bicarbonate operation mode and future development forecast report Ⓢ 2022 ~ 2027
PHP book borrowing management system, with complete functions, supports user foreground management and background management, and supports the latest version of PHP 7 x. Database mysql
Go context 基本介绍
If you can quickly generate a dictionary from two lists
2022-2028 global probiotics industry research and trend analysis report
What is uid? What is auth? What is a verifier?
Golang Modules
Simulate EF dbcontext with MOQ - mocking EF dbcontext with MOQ
Latex download installation record
C # use gdi+ to add text with center rotation (arbitrary angle)
Multilingual Wikipedia website source code development part II
Development trend and market demand analysis report of high purity tin chloride in the world and China Ⓔ 2022 ~ 2027