当前位置:网站首页>Jafka来源分析——Processor
Jafka来源分析——Processor
2022-07-06 15:10:00 【全栈程序员站长】
大家好,又见面了,我是全栈君。
Jafka Acceptor接受client而建立后的连接请求,Acceptor会将Socket连接交给Processor进行处理。Processor通过下面的处理步骤进行client请求的处理:
1. 读取client请求。
2. 依据client请求类型的不同,调用对应的处理函数进行处理。
Processor读取client请求是一个比較有意思的事情,须要考虑两个方面的事情:第一,请求规则(Processor须要依照一定的规则进行请求的解析)。第二,怎样确定一次请求的读取已经结束(由于是非堵塞连接,很有可能第一次读操作读取了请求的一部分数据,第二次到第N次读取才干把整个client请求读取完整)。以下我们具体解析一下client请求的格式。
client请求首先包括一个int,该int指明本次client请求的大小(size)。随后,请求包括一个两个byte(short)的请求类型(请求类型包括:CreaterRequest、DeleterRequest、FetchRequest、MultiFetchRequest、MultiProducerRequest、OffsetRequest和ProducerRequest。然后每种请求类型有固定的格式。下图具体说明了ProducerRequest的格式:
知道了上面的格式之后,问题二(怎样确定一次请求已经读取完毕)就非常easy攻克了。
首先为“请求长度”分配一个4byte的ByteBuffer,直到该Buffer读满,否则说明长度一直没有读取完毕。“请求长度”读取完毕后,为请求分配一个“请求长度”大小的ByteBuffer,直到该Buffer读满则说明一次请求读取完毕。读取完毕后,依据“请求类型”调用对应的处理函数(Handler)进行处理。在jafka中,上述的两个Buffer在类BoundedByteBufferReceive中进行声明和管理。Processor接收到Acceptor分配的socket连接后。会为socke连接建立一个BoundedByteBufferReceive并将其与socket连接进行绑定。每当该socket连接“可读”时。将BoundedByteBufferReceive拿出来从上次读取的基础上继续读取。直到一次请求彻底读取完毕,详细过程如以下代码(Processor.read)所看到的:
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}
BoundedByteBufferReceive.readFrom的实现详细例如以下:主要是申请两个Buffer并不断的读取数据。
public int readFrom(ReadableByteChannel channel) throws IOException {
expectIncomplete();
int read = 0;
if (sizeBuffer.remaining() > 0) {
read += Utils.read(channel, sizeBuffer);
}
if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
sizeBuffer.rewind();
int size = sizeBuffer.getInt();
if (size <= 0) {
throw new InvalidRequestException(...);
}
if (size > maxRequestSize) {
final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
throw new InvalidRequestException(format(msg, size, maxRequestSize));
}
contentBuffer = byteBufferAllocate(size);
}
//
if (contentBuffer != null) {
read = Utils.read(channel, contentBuffer);
//
if (!contentBuffer.hasRemaining()) {
contentBuffer.rewind();
setCompleted();
}
}
return read;
}
读取完毕后,Processor会解析“请求类型”,依据请求类型的不同调用不同的Handler处理对应于该请求。
版权声明:本文博主原创文章,博客,未经同意不得转载。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/116982.html原文链接:https://javaforall.cn
边栏推荐
- 【编译原理】做了一半的LR(0)分析器
- Crawler obtains real estate data
- 2022-07-05 use TPCC to conduct sub query test on stonedb
- Seata aggregates at, TCC, Saga and XA transaction modes to create a one-stop distributed transaction solution
- signed、unsigned关键字
- pytorch_YOLOX剪枝【附代码】
- 2022年6月国产数据库大事记-墨天轮
- Unity3d minigame-unity-webgl-transform插件转换微信小游戏报错To use dlopen, you need to use Emscripten‘s...问题
- go多样化定时任务通用实现与封装
- 新手程序员该不该背代码?
猜你喜欢
剪映+json解析将视频中的声音转换成文本
Chapter 4: talk about class loader again
Build op-tee development environment based on qemuv8
机试刷题1
Management background --5, sub classification
Attack and defense world miscall
Aardio - 通过变量名将变量值整合到一串文本中
Installation and use of labelimg
硬件开发笔记(十): 硬件开发基本流程,制作一个USB转RS232的模块(九):创建CH340G/MAX232封装库sop-16并关联原理图元器件
Heavyweight news | softing fg-200 has obtained China 3C explosion-proof certification to provide safety assurance for customers' on-site testing
随机推荐
手写ABA遇到的坑
Hardware development notes (10): basic process of hardware development, making a USB to RS232 module (9): create ch340g/max232 package library sop-16 and associate principle primitive devices
(十八)LCD1602实验
Report on technological progress and development prospects of solid oxide fuel cells in China (2022 Edition)
Solve project cross domain problems
剑指offer刷题记录1
Seata聚合 AT、TCC、SAGA 、 XA事务模式打造一站式的分布式事务解决方案
Oracle control file and log file management
The nearest common ancestor of binary (search) tree ●●
将MySQL的表数据纯净方式导出
That's why you can't understand recursion
中国1,4-环己烷二甲醇(CHDM)行业调研与投资决策报告(2022版)
GD32F4XX串口接收中断和闲时中断配置
Seata aggregates at, TCC, Saga and XA transaction modes to create a one-stop distributed transaction solution
12、 Start process
2022-07-05 use TPCC to conduct sub query test on stonedb
pytorch_YOLOX剪枝【附代码】
做国外LEAD2022年下半年几点建议
中国固态氧化物燃料电池技术进展与发展前景报告(2022版)
中国VOCs催化剂行业研究与投资战略报告(2022版)