当前位置:网站首页>Jafka source analysis processor
Jafka source analysis processor
2022-07-06 22:33:00 【Full stack programmer webmaster】
Hello everyone , I meet you again , I'm the king of the whole stack .
Jafka Acceptor Accept client And the connection request after establishment ,Acceptor Will Socket Connect to Processor To deal with .Processor Proceed through the following processing steps client Processing of requests :
1. Read client request .
2. basis client Different request types , Call the corresponding processing function to process .
Processor Read client Request is an interesting thing , There are two things to consider : First of all , Request rules (Processor The request must be parsed according to certain rules ). second , How to make sure that the reading of a request has ended ( Because it is a non blocking connection , It is likely that the first read operation read part of the requested data , Second to second N Only after reading once can the whole client Request to read complete ). Let's analyze it in detail below client Format of request .
client The request first includes a int, The int Indicate this client Requested size (size). And then , The request includes one or two byte(short) Request type for ( Request types include :CreaterRequest、DeleterRequest、FetchRequest、MultiFetchRequest、MultiProducerRequest、OffsetRequest and ProducerRequest. Then each request type has a fixed format . The figure below illustrates ProducerRequest The format of :
After knowing the above format , Question two ( How to ensure that a request has been read ) It's very easy Got it .
First of all to “ Request length ” Allocate one 4byte Of ByteBuffer, Until it's time to Buffer Read full , Otherwise, the length has not been read .“ Request length ” After reading , Assign a to the request “ Request length ” The size of ByteBuffer, Until it's time to Buffer If it is full, it means that one request is read . After reading , basis “ Request type ” Call the corresponding processing function (Handler) To deal with . stay jafka in , Two of the above Buffer In the class BoundedByteBufferReceive Declaration and management .Processor Received Acceptor The distribution of socket After connection . Would be socke Connect to create a BoundedByteBufferReceive And with socket Connect to bind . Whenever it's time to socket Connect “ Can be read ” when . take BoundedByteBufferReceive Take it out and continue reading based on the last reading . Until a request is completely read , The detailed process is shown in the following code (Processor.read) Seen :
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}
BoundedByteBufferReceive.readFrom For example, the following : Mainly apply for two Buffer And constantly read data .
public int readFrom(ReadableByteChannel channel) throws IOException {
expectIncomplete();
int read = 0;
if (sizeBuffer.remaining() > 0) {
read += Utils.read(channel, sizeBuffer);
}
if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
sizeBuffer.rewind();
int size = sizeBuffer.getInt();
if (size <= 0) {
throw new InvalidRequestException(...);
}
if (size > maxRequestSize) {
final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
throw new InvalidRequestException(format(msg, size, maxRequestSize));
}
contentBuffer = byteBufferAllocate(size);
}
//
if (contentBuffer != null) {
read = Utils.read(channel, contentBuffer);
//
if (!contentBuffer.hasRemaining()) {
contentBuffer.rewind();
setCompleted();
}
}
return read;
}
After reading ,Processor Can parse “ Request type ”, Call different... According to different request types Handler Processing corresponds to the request .
Copyright notice : This article is the original article of the blogger , Blog , Do not reprint without permission .
Publisher : Full stack programmer stack length , Reprint please indicate the source :https://javaforall.cn/116982.html Link to the original text :https://javaforall.cn
边栏推荐
- [leetcode] 19. Delete the penultimate node of the linked list
- The SQL response is slow. What are your troubleshooting ideas?
- 每日一题:力扣:225:用队列实现栈
- Clip +json parsing converts the sound in the video into text
- leetcode:面试题 17.24. 子矩阵最大累加和(待研究)
- Balanced Multimodal Learning via On-the-fly Gradient Modulation(CVPR2022 oral)
- 使用云服务器搭建代理
- Advantages of link local address in IPv6
- Anaconda installs third-party packages
- 云原生技术--- 容器知识点
猜你喜欢
Unity3d minigame unity webgl transform plug-in converts wechat games to use dlopen, you need to use embedded 's problem
The SQL response is slow. What are your troubleshooting ideas?
Balanced Multimodal Learning via On-the-fly Gradient Modulation(CVPR2022 oral)
网络基础入门理解
Aardio - 封装库时批量处理属性与回调函数的方法
在IPv6中 链路本地地址的优势
2022-07-04 mysql的高性能数据库引擎stonedb在centos7.9编译及运行
Signed and unsigned keywords
视图(view)
Senior soft test (Information System Project Manager) high frequency test site: project quality management
随机推荐
Inno setup packaging and signing Guide
Aardio - 通过变量名将变量值整合到一串文本中
Volatile keyword
Unity3d minigame-unity-webgl-transform插件转换微信小游戏报错To use dlopen, you need to use Emscripten‘s...问题
Sizeof keyword
Void keyword
UE4蓝图学习篇(四)--流程控制ForLoop和WhileLoop
Attack and defense world miscall
[leetcode] 19. Delete the penultimate node of the linked list
每日一题:力扣:225:用队列实现栈
[Digital IC hand tearing code] Verilog burr free clock switching circuit | topic | principle | design | simulation
The difference between enumeration and define macro
Jafka来源分析——Processor
2014 Alibaba web pre intern project analysis (1)
The SQL response is slow. What are your troubleshooting ideas?
【雅思口语】安娜口语学习记录part1
pytorch_ Yolox pruning [with code]
OpenNMS分离数据库
Puppeteer连接已有Chrome浏览器
uniapp滑动到一定的高度后固定某个元素到顶部效果demo(整理)