当前位置:网站首页>How can people not love the amazing design of XXL job
How can people not love the amazing design of XXL job
2022-07-04 09:38:00 【Second brother learns Java】
Catalog
- Introduction to communication bottom
- Overall communication process
- Stunning design
Introduction to communication bottom
xxl-job Use netty http To communicate , Although it also supports Mina,jetty,netty tcp Methods such as , But what is fixed in the code is netty http.
Overall communication process
I take the example that the scheduler notifies the actuator to execute tasks , Activity diagram drawn :
Activity diagrams
Stunning design
After reading the whole process code , The design can be said to have originality , take netty, The knowledge of multithreading is applied smoothly .
I will now summarize these outstanding points in design as follows :
| Use dynamic agent mode , Hide communication details
xxl-job Two interfaces are defined ExecutorBiz,AdminBiz,ExecutorBiz The interface encapsulates the heartbeat , Pause , Trigger execution and other operations ,AdminBiz Encapsulated callback , register , Cancel registration , Interface implementation class , There is no communication related processing .
XxlRpcReferenceBean Class getObject() Method will generate a proxy class , This proxy class will communicate remotely .
| Fully asynchronous processing
The actuator receives the message and deserializes , There is no synchronized execution of the task code , Instead, the task information is stored in LinkedBlockingQueue in , Asynchronous threads get task information from this queue , And then execute .
And the processing result of the task , It doesn't mean that after processing , Synchronize the returned , It is also placed in the blocking queue of the callback thread , Return the processing result asynchronously .
The advantage of this treatment is to reduce netty Processing time of the worker thread , Improved throughput .
| Packaging for asynchronous processing
Asynchronous processing is packaged , The code seems to be called synchronously .
Let's look at the regulator ,XxlJobTrigger Class triggers the code of task execution :
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz =
XxlJobScheduler.getExecutorBiz(address);
// There are a lot of asynchronous processing , Finally, the processing result of synchronization
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
ExecutorBiz.run The method we mentioned , Is the dynamic proxy of walking , Communicate with the actuator , The execution result of the actuator is also processed asynchronously , Just returned , And what we see here run The method is to synchronously wait for the processing result to return .
Let's take a look at xxl-job How to get processing results synchronously : After the scheduler sends a message to the actuator , The thread is blocked . Wait until the actuator is processed , Return the processing result to , Wake up blocked threads , Get the return value at the call .
The dynamic proxy code is as follows :
// Trigger call in proxy class
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (
xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(
xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse Class implements thread waiting , And thread wake-up processing :
// Return results , Wake up the thread
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
// Thread blocking
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?
timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
}
return response;
}
Some students may ask , The dispatcher received the return result , How to determine which thread to wake up ?
Every remote call , Will generate uuid Request id, This id Is passed throughout the call , It's like a key , When you go home , Take it and open the door .
Take the request here id This key , You can find the corresponding XxlRpcFutureResponse, And then call setResponse Method , Set return value , Wake up the thread .
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// adopt requestId find XxlRpcFutureResponse,
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (
futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (
xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(
xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// It calls lock Of notify Method
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
边栏推荐
- IIS configure FTP website
- "How to connect the Internet" reading notes - FTTH
- Write a jison parser (7/10) from scratch: the iterative development process of the parser generator 'parser generator'
- Are there any principal guaranteed financial products in 2022?
- MySQL transaction mvcc principle
- PHP student achievement management system, the database uses mysql, including source code and database SQL files, with the login management function of students and teachers
- Deadlock in channel
- 浅谈Multus CNI
- GoLand environment variable configuration
- mmclassification 标注文件生成
猜你喜欢
Latex download installation record
Hands on deep learning (33) -- style transfer
AMLOGIC gsensor debugging
mmclassification 标注文件生成
2022-2028 global intelligent interactive tablet industry research and trend analysis report
GoLand environment variable configuration
2022-2028 global strain gauge pressure sensor industry research and trend analysis report
Daughter love in lunch box
JDBC and MySQL database
H5 audio tag custom style modification and adding playback control events
随机推荐
HMS core helps baby bus show high-quality children's digital content to global developers
Reading notes of how the network is connected - understanding the basic concepts of the network (I)
C # use gdi+ to add text to the picture and make the text adaptive to the rectangular area
回复评论的sql
[on February 11, 2022, the latest and most fully available script library collection of the whole network, a total of 23]
Trim leading or trailing characters from strings- Trim leading or trailing characters from a string?
Kubernetes CNI 插件之Fabric
el-table单选并隐藏全选框
Launpad | 基礎知識
2022-2028 global special starch industry research and trend analysis report
Web端自动化测试失败原因汇总
Write a jison parser (7/10) from scratch: the iterative development process of the parser generator 'parser generator'
《网络是怎么样连接的》读书笔记 - 认识网络基础概念(一)
Golang 类型比较
Write a jison parser from scratch (3/10): a good beginning is half the success -- "politics" (Aristotle)
2022-2028 global small batch batch batch furnace industry research and trend analysis report
PHP student achievement management system, the database uses mysql, including source code and database SQL files, with the login management function of students and teachers
At the age of 30, I changed to Hongmeng with a high salary because I did these three things
如何编写单元测试用例
什么是权限?什么是角色?什么是用户?