当前位置:网站首页>xxl-job惊艳的设计,怎能叫人不爱
xxl-job惊艳的设计,怎能叫人不爱
2022-07-04 09:32:00 【二哥学Java】
目录
- 通信底层介绍
- 通信整体流程
- 惊艳的设计
通信底层介绍
xxl-job 使用 netty http 的方式进行通信,虽然也支持 Mina,jetty,netty tcp 等方式,但是代码里面固定写死的是 netty http。
通信整体流程
我以调度器通知执行器执行任务为例,绘制的活动图:
活动图
惊艳的设计
看完了整个处理流程代码,设计上可以说独具匠心,将 netty,多线程的知识运用得行云流水。
我现在就将这些设计上出彩的点总结如下:
| 使用动态代理模式,隐藏通信细节
xxl-job 定义了两个接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封装了向心跳,暂停,触发执行等操作,AdminBiz 封装了回调,注册,取消注册操作,接口的实现类中,并没有通信相关的处理。
XxlRpcReferenceBean 类的 getObject() 方法会生成一个代理类,这个代理类会进行远程通信。
| 全异步处理
执行器收到消息进行反序列化,并没有同步执行任务代码,而是将任务信息存储在 LinkedBlockingQueue 中,异步线程从这个队列中获取任务信息,然后执行。
而任务的处理结果,也不是说处理完之后,同步返回的,也是放到回调线程的阻塞队列中,异步的将处理结果返回回去。
这样处理的好处就是减少了 netty 工作线程的处理时间,提升了吞吐量。
| 对异步处理的包装
对异步处理进行了包装,代码看起来是同步调用的。
我们看下调度器,XxlJobTrigger 类触发任务执行的代码:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz =
XxlJobScheduler.getExecutorBiz(address);
//这里面做了很多异步处理,最终同步得到处理结果
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
ExecutorBiz.run 方法我们说过了,是走的动态代理,和执行器进行通信,执行器执行结果也是异步处理完,才返回的,而这里看到的 run 方法是同步等待处理结果返回。
我们看下xxl-job是如何同步获取处理结果的:调度器向执行器发出消息后,该线程阻塞。等到执行器处理完毕后,将处理结果返回,唤醒被阻塞的线程,调用处拿到返回值。
动态代理代码如下:
//代理类中的触发调用
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (
xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(
xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse 类中实现了线程的等待,和线程唤醒的处理:
//返回结果,唤醒线程
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
//线程阻塞
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?
timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
}
return response;
}
有的同学可能会问了,调度器接收到返回结果,怎么确定唤醒哪个线程呢?
每一次远程调用,都会生成 uuid 的请求 id,这个 id 是在整个调用过程中一直传递的,就像一把钥匙,在你回家的的时候,拿着它就带开门。
这里拿着请求 id 这把钥匙,就能找到对应的 XxlRpcFutureResponse,然后调用 setResponse 方法,设置返回值,唤醒线程。
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
// 通过requestId找到XxlRpcFutureResponse,
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (
futureResponse.getInvokeCallback()!=null) {
// callback type
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (
xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(
xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// 里面调用lock的notify方法
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
边栏推荐
- Explanation of closures in golang
- Four common methods of copying object attributes (summarize the highest efficiency)
- 【leetcode】29. Divide two numbers
- Global and Chinese market of wheel hubs 2022-2028: Research Report on technology, participants, trends, market size and share
- 2022-2028 research and trend analysis report on the global edible essence industry
- Simulate EF dbcontext with MOQ - mocking EF dbcontext with MOQ
- HMS core helps baby bus show high-quality children's digital content to global developers
- Report on investment analysis and prospect trend prediction of China's MOCVD industry Ⓤ 2022 ~ 2028
- [C Advanced] file operation (2)
- Global and Chinese market of planar waveguide optical splitter 2022-2028: Research Report on technology, participants, trends, market size and share
猜你喜欢
MySQL foundation 02 - installing MySQL in non docker version
How should PMP learning ideas be realized?
Sort out the power node, Mr. Wang he's SSM integration steps
SSM online examination system source code, database using mysql, online examination system, fully functional, randomly generated question bank, supporting a variety of question types, students, teache
At the age of 30, I changed to Hongmeng with a high salary because I did these three things
[C Advanced] file operation (2)
2022-2028 global intelligent interactive tablet industry research and trend analysis report
Opencv environment construction (I)
PHP book borrowing management system, with complete functions, supports user foreground management and background management, and supports the latest version of PHP 7 x. Database mysql
2022-2028 research and trend analysis report on the global edible essence industry
随机推荐
Write a jison parser from scratch (5/10): a brief introduction to the working principle of jison parser syntax
Four common methods of copying object attributes (summarize the highest efficiency)
Get the source code in the mask with the help of shims
什么是权限?什么是角色?什么是用户?
The 14th five year plan and investment risk analysis report of China's hydrogen fluoride industry 2022 ~ 2028
Global and Chinese market of bipolar generators 2022-2028: Research Report on technology, participants, trends, market size and share
Problems encountered by scan, scanf and scanln in golang
The child container margin top acts on the parent container
Research and investment strategy report of China's electronic hydrogen peroxide industry (2022 Edition)
Lauchpad X | 模式
Report on the development trend and prospect trend of high purity zinc antimonide market in the world and China Ⓕ 2022 ~ 2027
Luogu deep foundation part 1 Introduction to language Chapter 4 loop structure programming (2022.02.14)
法向量点云旋转
Multilingual Wikipedia website source code development part II
Reading notes on how to connect the network - tcp/ip connection (II)
Svg image quoted from CodeChina
Launpad | basic knowledge
Solution to null JSON after serialization in golang
Global and Chinese markets of water heaters in Saudi Arabia 2022-2028: Research Report on technology, participants, trends, market size and share
2022-2028 global gasket plate heat exchanger industry research and trend analysis report