当前位置:网站首页>How can people not love the amazing design of XXL job
How can people not love the amazing design of XXL job
2022-07-04 09:38:00 【Second brother learns Java】
Catalog
- Introduction to communication bottom
- Overall communication process
- Stunning design
Introduction to communication bottom
xxl-job Use netty http To communicate , Although it also supports Mina,jetty,netty tcp Methods such as , But what is fixed in the code is netty http.
Overall communication process
I take the example that the scheduler notifies the actuator to execute tasks , Activity diagram drawn :
Activity diagrams
Stunning design
After reading the whole process code , The design can be said to have originality , take netty, The knowledge of multithreading is applied smoothly .
I will now summarize these outstanding points in design as follows :
| Use dynamic agent mode , Hide communication details
xxl-job Two interfaces are defined ExecutorBiz,AdminBiz,ExecutorBiz The interface encapsulates the heartbeat , Pause , Trigger execution and other operations ,AdminBiz Encapsulated callback , register , Cancel registration , Interface implementation class , There is no communication related processing .
XxlRpcReferenceBean Class getObject() Method will generate a proxy class , This proxy class will communicate remotely .
| Fully asynchronous processing
The actuator receives the message and deserializes , There is no synchronized execution of the task code , Instead, the task information is stored in LinkedBlockingQueue in , Asynchronous threads get task information from this queue , And then execute .
And the processing result of the task , It doesn't mean that after processing , Synchronize the returned , It is also placed in the blocking queue of the callback thread , Return the processing result asynchronously .
The advantage of this treatment is to reduce netty Processing time of the worker thread , Improved throughput .
| Packaging for asynchronous processing
Asynchronous processing is packaged , The code seems to be called synchronously .
Let's look at the regulator ,XxlJobTrigger Class triggers the code of task execution :
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz =
XxlJobScheduler.getExecutorBiz(address);
// There are a lot of asynchronous processing , Finally, the processing result of synchronization
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
ExecutorBiz.run The method we mentioned , Is the dynamic proxy of walking , Communicate with the actuator , The execution result of the actuator is also processed asynchronously , Just returned , And what we see here run The method is to synchronously wait for the processing result to return .
Let's take a look at xxl-job How to get processing results synchronously : After the scheduler sends a message to the actuator , The thread is blocked . Wait until the actuator is processed , Return the processing result to , Wake up blocked threads , Get the return value at the call .
The dynamic proxy code is as follows :
// Trigger call in proxy class
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (
xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(
xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse Class implements thread waiting , And thread wake-up processing :
// Return results , Wake up the thread
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
// Thread blocking
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?
timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
}
return response;
}
Some students may ask , The dispatcher received the return result , How to determine which thread to wake up ?
Every remote call , Will generate uuid Request id, This id Is passed throughout the call , It's like a key , When you go home , Take it and open the door .
Take the request here id This key , You can find the corresponding XxlRpcFutureResponse, And then call setResponse Method , Set return value , Wake up the thread .
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// adopt requestId find XxlRpcFutureResponse,
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (
futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (
xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(
xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// It calls lock Of notify Method
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
边栏推荐
- C # use gdi+ to add text with center rotation (arbitrary angle)
- Trees and graphs (traversal)
- 品牌连锁店5G/4G无线组网方案
- At the age of 30, I changed to Hongmeng with a high salary because I did these three things
- pcl::fromROSMsg报警告Failed to find match for field ‘intensity‘.
- 2022-2028 global probiotics industry research and trend analysis report
- PHP book borrowing management system, with complete functions, supports user foreground management and background management, and supports the latest version of PHP 7 x. Database mysql
- Get the source code in the mask with the help of shims
- 2022-2028 global edible probiotic raw material industry research and trend analysis report
- Latex download installation record
猜你喜欢
2022-2028 global probiotics industry research and trend analysis report
After unplugging the network cable, does the original TCP connection still exist?
Nuxt reports an error: render function or template not defined in component: anonymous
How does idea withdraw code from remote push
pcl::fromROSMsg报警告Failed to find match for field ‘intensity‘.
2022-2028 research and trend analysis report on the global edible essence industry
PHP personal album management system source code, realizes album classification and album grouping, as well as album image management. The database adopts Mysql to realize the login and registration f
2022-2028 global optical transparency industry research and trend analysis report
2022-2028 global edible probiotic raw material industry research and trend analysis report
Hands on deep learning (32) -- fully connected convolutional neural network FCN
随机推荐
Golang 类型比较
Flutter tips: various fancy nesting of listview and pageview
UML sequence diagram [easy to understand]
Write a mobile date selector component by yourself
UML 时序图[通俗易懂]
What is uid? What is auth? What is a verifier?
Luogu deep foundation part 1 Introduction to language Chapter 4 loop structure programming (2022.02.14)
Trim leading or trailing characters from strings- Trim leading or trailing characters from a string?
Global and Chinese markets of thrombography hemostasis analyzer (TEG) 2022-2028: Research Report on technology, participants, trends, market size and share
Launpad | basic knowledge
Fatal error in golang: concurrent map writes
At the age of 30, I changed to Hongmeng with a high salary because I did these three things
Problems encountered by scan, scanf and scanln in golang
Hands on deep learning (35) -- text preprocessing (NLP)
2022-2028 global protein confectionery industry research and trend analysis report
The 14th five year plan and investment risk analysis report of China's hydrogen fluoride industry 2022 ~ 2028
Lauchpad X | 模式
Global and Chinese markets of water heaters in Saudi Arabia 2022-2028: Research Report on technology, participants, trends, market size and share
2022-2028 global strain gauge pressure sensor industry research and trend analysis report
Write a jison parser from scratch (4/10): detailed explanation of the syntax format of the jison parser generator