当前位置:网站首页>How can people not love the amazing design of XXL job
How can people not love the amazing design of XXL job
2022-07-04 09:38:00 【Second brother learns Java】
Catalog
- Introduction to communication bottom
- Overall communication process
- Stunning design
Introduction to communication bottom
xxl-job Use netty http To communicate , Although it also supports Mina,jetty,netty tcp Methods such as , But what is fixed in the code is netty http.
Overall communication process
I take the example that the scheduler notifies the actuator to execute tasks , Activity diagram drawn :

Activity diagrams
Stunning design
After reading the whole process code , The design can be said to have originality , take netty, The knowledge of multithreading is applied smoothly .
I will now summarize these outstanding points in design as follows :
| Use dynamic agent mode , Hide communication details
xxl-job Two interfaces are defined ExecutorBiz,AdminBiz,ExecutorBiz The interface encapsulates the heartbeat , Pause , Trigger execution and other operations ,AdminBiz Encapsulated callback , register , Cancel registration , Interface implementation class , There is no communication related processing .
XxlRpcReferenceBean Class getObject() Method will generate a proxy class , This proxy class will communicate remotely .
| Fully asynchronous processing
The actuator receives the message and deserializes , There is no synchronized execution of the task code , Instead, the task information is stored in LinkedBlockingQueue in , Asynchronous threads get task information from this queue , And then execute .
And the processing result of the task , It doesn't mean that after processing , Synchronize the returned , It is also placed in the blocking queue of the callback thread , Return the processing result asynchronously .
The advantage of this treatment is to reduce netty Processing time of the worker thread , Improved throughput .
| Packaging for asynchronous processing
Asynchronous processing is packaged , The code seems to be called synchronously .
Let's look at the regulator ,XxlJobTrigger Class triggers the code of task execution :
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz =
XxlJobScheduler.getExecutorBiz(address);
// There are a lot of asynchronous processing , Finally, the processing result of synchronization
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
ExecutorBiz.run The method we mentioned , Is the dynamic proxy of walking , Communicate with the actuator , The execution result of the actuator is also processed asynchronously , Just returned , And what we see here run The method is to synchronously wait for the processing result to return .
Let's take a look at xxl-job How to get processing results synchronously : After the scheduler sends a message to the actuator , The thread is blocked . Wait until the actuator is processed , Return the processing result to , Wake up blocked threads , Get the return value at the call .
The dynamic proxy code is as follows :
// Trigger call in proxy class
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (
xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(
xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse Class implements thread waiting , And thread wake-up processing :
// Return results , Wake up the thread
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
// Thread blocking
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?
timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
}
return response;
}
Some students may ask , The dispatcher received the return result , How to determine which thread to wake up ?
Every remote call , Will generate uuid Request id, This id Is passed throughout the call , It's like a key , When you go home , Take it and open the door .
Take the request here id This key , You can find the corresponding XxlRpcFutureResponse, And then call setResponse Method , Set return value , Wake up the thread .
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// adopt requestId find XxlRpcFutureResponse,
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (
futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (
xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(
xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// It calls lock Of notify Method
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
边栏推荐
- QTreeView+自定义Model实现示例
- 《网络是怎么样连接的》读书笔记 - Tcp/IP连接(二)
- Lauchpad X | 模式
- 智能网关助力提高工业数据采集和利用
- Report on the development trend and prospect trend of high purity zinc antimonide market in the world and China Ⓕ 2022 ~ 2027
- Ultimate bug finding method - two points
- 2022-2028 global visual quality analyzer industry research and trend analysis report
- 什么是权限?什么是角色?什么是用户?
- After unplugging the network cable, does the original TCP connection still exist?
- UML sequence diagram [easy to understand]
猜你喜欢

Daughter love in lunch box

Kubernetes CNI 插件之Fabric

Ultimate bug finding method - two points

C语言指针经典面试题——第一弹

回复评论的sql

智慧路灯杆水库区安全监测应用

2022-2028 global elastic strain sensor industry research and trend analysis report

2022-2028 global tensile strain sensor industry research and trend analysis report

Logstack configuration details -- elasticstack (elk) work notes 020

If you can quickly generate a dictionary from two lists
随机推荐
Ultimate bug finding method - two points
UML sequence diagram [easy to understand]
Investment analysis and future production and marketing demand forecast report of China's paper industry Ⓥ 2022 ~ 2028
回复评论的sql
Write a jison parser from scratch (5/10): a brief introduction to the working principle of jison parser syntax
Global and Chinese markets for laser assisted liposuction (LAL) devices 2022-2028: Research Report on technology, participants, trends, market size and share
Pueue data migration from '0.4.0' to '0.5.0' versions
xxl-job惊艳的设计,怎能叫人不爱
How do microservices aggregate API documents? This wave of show~
PMP registration process and precautions
Flutter tips: various fancy nesting of listview and pageview
2022-2028 global industrial gasket plate heat exchanger industry research and trend analysis report
Global and Chinese PCB function test scale analysis and development prospect planning report Ⓑ 2022 ~ 2027
Logstack configuration details -- elasticstack (elk) work notes 020
Trim leading or trailing characters from strings- Trim leading or trailing characters from a string?
《网络是怎么样连接的》读书笔记 - FTTH
Flutter 小技巧之 ListView 和 PageView 的各種花式嵌套
C # use gdi+ to add text to the picture and make the text adaptive to the rectangular area
IIS configure FTP website
Development trend and market demand analysis report of high purity tin chloride in the world and China Ⓔ 2022 ~ 2027