当前位置:网站首页>Jafka source analysis processor
Jafka source analysis processor
2022-07-06 22:33:00 【Full stack programmer webmaster】
Hello everyone , I meet you again , I'm the king of the whole stack .
Jafka Acceptor Accept client And the connection request after establishment ,Acceptor Will Socket Connect to Processor To deal with .Processor Proceed through the following processing steps client Processing of requests :
1. Read client request .
2. basis client Different request types , Call the corresponding processing function to process .
Processor Read client Request is an interesting thing , There are two things to consider : First of all , Request rules (Processor The request must be parsed according to certain rules ). second , How to make sure that the reading of a request has ended ( Because it is a non blocking connection , It is likely that the first read operation read part of the requested data , Second to second N Only after reading once can the whole client Request to read complete ). Let's analyze it in detail below client Format of request .
client The request first includes a int, The int Indicate this client Requested size (size). And then , The request includes one or two byte(short) Request type for ( Request types include :CreaterRequest、DeleterRequest、FetchRequest、MultiFetchRequest、MultiProducerRequest、OffsetRequest and ProducerRequest. Then each request type has a fixed format . The figure below illustrates ProducerRequest The format of :
After knowing the above format , Question two ( How to ensure that a request has been read ) It's very easy Got it .
First of all to “ Request length ” Allocate one 4byte Of ByteBuffer, Until it's time to Buffer Read full , Otherwise, the length has not been read .“ Request length ” After reading , Assign a to the request “ Request length ” The size of ByteBuffer, Until it's time to Buffer If it is full, it means that one request is read . After reading , basis “ Request type ” Call the corresponding processing function (Handler) To deal with . stay jafka in , Two of the above Buffer In the class BoundedByteBufferReceive Declaration and management .Processor Received Acceptor The distribution of socket After connection . Would be socke Connect to create a BoundedByteBufferReceive And with socket Connect to bind . Whenever it's time to socket Connect “ Can be read ” when . take BoundedByteBufferReceive Take it out and continue reading based on the last reading . Until a request is completely read , The detailed process is shown in the following code (Processor.read) Seen :
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}BoundedByteBufferReceive.readFrom For example, the following : Mainly apply for two Buffer And constantly read data .
public int readFrom(ReadableByteChannel channel) throws IOException {
expectIncomplete();
int read = 0;
if (sizeBuffer.remaining() > 0) {
read += Utils.read(channel, sizeBuffer);
}
if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
sizeBuffer.rewind();
int size = sizeBuffer.getInt();
if (size <= 0) {
throw new InvalidRequestException(...);
}
if (size > maxRequestSize) {
final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
throw new InvalidRequestException(format(msg, size, maxRequestSize));
}
contentBuffer = byteBufferAllocate(size);
}
//
if (contentBuffer != null) {
read = Utils.read(channel, contentBuffer);
//
if (!contentBuffer.hasRemaining()) {
contentBuffer.rewind();
setCompleted();
}
}
return read;
}After reading ,Processor Can parse “ Request type ”, Call different... According to different request types Handler Processing corresponds to the request .
Copyright notice : This article is the original article of the blogger , Blog , Do not reprint without permission .
Publisher : Full stack programmer stack length , Reprint please indicate the source :https://javaforall.cn/116982.html Link to the original text :https://javaforall.cn
边栏推荐
- OpenNMS separation database
- memcached
- Leetcode exercise - Sword finger offer 26 Substructure of tree
- Aardio - 不声明直接传float数值的方法
- 柔性数组到底如何使用呢?
- Applet system update prompt, and force the applet to restart and use the new version
- extern关键字
- Daily question 1: force deduction: 225: realize stack with queue
- The nearest common ancestor of binary (search) tree ●●
- Extern keyword
猜你喜欢

手写ABA遇到的坑

uniapp滑动到一定的高度后固定某个元素到顶部效果demo(整理)

Improving Multimodal Accuracy Through Modality Pre-training and Attention

二分图判定

剑指offer刷题记录1

关于声子和热输运计算中BORN电荷和non-analytic修正的问题

Installation and use of labelimg

Sword finger offer question brushing record 1

LeetCode 练习——剑指 Offer 26. 树的子结构

Attack and defense world miscall
随机推荐
Windows Auzre 微软的云计算产品的后台操作界面
HDR image reconstruction from a single exposure using deep CNN reading notes
Aardio - construct a multi button component with customplus library +plus
The difference between enumeration and define macro
3DMAX assign face map
使用云服务器搭建代理
Heavyweight news | softing fg-200 has obtained China 3C explosion-proof certification to provide safety assurance for customers' on-site testing
NPDP认证|产品经理如何跨职能/跨团队沟通?
uniapp设置背景图效果demo(整理)
Pit encountered by handwritten ABA
2022-07-04 the high-performance database engine stonedb of MySQL is compiled and run in centos7.9
SQL server generates auto increment sequence number
HDU 5077 NAND (violent tabulation)
case 关键字后面的值有什么要求吗?
Netxpert xg2 helps you solve the problem of "Cabling installation and maintenance"
Chapter 4: talk about class loader again
What are the specific steps and schedule of IELTS speaking?
Improving Multimodal Accuracy Through Modality Pre-training and Attention
Signed and unsigned keywords
2022-07-05 stonedb的子查询处理解析耗时分析