当前位置:网站首页>Jafka source analysis processor
Jafka source analysis processor
2022-07-06 22:33:00 【Full stack programmer webmaster】
Hello everyone , I meet you again , I'm the king of the whole stack .
Jafka Acceptor Accept client And the connection request after establishment ,Acceptor Will Socket Connect to Processor To deal with .Processor Proceed through the following processing steps client Processing of requests :
1. Read client request .
2. basis client Different request types , Call the corresponding processing function to process .
Processor Read client Request is an interesting thing , There are two things to consider : First of all , Request rules (Processor The request must be parsed according to certain rules ). second , How to make sure that the reading of a request has ended ( Because it is a non blocking connection , It is likely that the first read operation read part of the requested data , Second to second N Only after reading once can the whole client Request to read complete ). Let's analyze it in detail below client Format of request .
client The request first includes a int, The int Indicate this client Requested size (size). And then , The request includes one or two byte(short) Request type for ( Request types include :CreaterRequest、DeleterRequest、FetchRequest、MultiFetchRequest、MultiProducerRequest、OffsetRequest and ProducerRequest. Then each request type has a fixed format . The figure below illustrates ProducerRequest The format of :
After knowing the above format , Question two ( How to ensure that a request has been read ) It's very easy Got it .
First of all to “ Request length ” Allocate one 4byte Of ByteBuffer, Until it's time to Buffer Read full , Otherwise, the length has not been read .“ Request length ” After reading , Assign a to the request “ Request length ” The size of ByteBuffer, Until it's time to Buffer If it is full, it means that one request is read . After reading , basis “ Request type ” Call the corresponding processing function (Handler) To deal with . stay jafka in , Two of the above Buffer In the class BoundedByteBufferReceive Declaration and management .Processor Received Acceptor The distribution of socket After connection . Would be socke Connect to create a BoundedByteBufferReceive And with socket Connect to bind . Whenever it's time to socket Connect “ Can be read ” when . take BoundedByteBufferReceive Take it out and continue reading based on the last reading . Until a request is completely read , The detailed process is shown in the following code (Processor.read) Seen :
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}BoundedByteBufferReceive.readFrom For example, the following : Mainly apply for two Buffer And constantly read data .
public int readFrom(ReadableByteChannel channel) throws IOException {
expectIncomplete();
int read = 0;
if (sizeBuffer.remaining() > 0) {
read += Utils.read(channel, sizeBuffer);
}
if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
sizeBuffer.rewind();
int size = sizeBuffer.getInt();
if (size <= 0) {
throw new InvalidRequestException(...);
}
if (size > maxRequestSize) {
final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
throw new InvalidRequestException(format(msg, size, maxRequestSize));
}
contentBuffer = byteBufferAllocate(size);
}
//
if (contentBuffer != null) {
read = Utils.read(channel, contentBuffer);
//
if (!contentBuffer.hasRemaining()) {
contentBuffer.rewind();
setCompleted();
}
}
return read;
}After reading ,Processor Can parse “ Request type ”, Call different... According to different request types Handler Processing corresponds to the request .
Copyright notice : This article is the original article of the blogger , Blog , Do not reprint without permission .
Publisher : Full stack programmer stack length , Reprint please indicate the source :https://javaforall.cn/116982.html Link to the original text :https://javaforall.cn
边栏推荐
- POJ 1258 Agri-Net
- Void keyword
- Inno setup packaging and signing Guide
- 2022-07-05 use TPCC to conduct sub query test on stonedb
- 【雅思口语】安娜口语学习记录part1
- i. Mx6ull build boa server details and some of the problems encountered
- 剑指offer刷题记录1
- Chapter 3: detailed explanation of class loading process (class life cycle)
- General implementation and encapsulation of go diversified timing tasks
- Report on technological progress and development prospects of solid oxide fuel cells in China (2022 Edition)
猜你喜欢

基於 QEMUv8 搭建 OP-TEE 開發環境

Crawler obtains real estate data

Web APIs DOM 时间对象

Balanced Multimodal Learning via On-the-fly Gradient Modulation(CVPR2022 oral)

Self made j-flash burning tool -- QT calls jlinkarm DLL mode

将MySQL的表数据纯净方式导出

Aardio - 利用customPlus库+plus构造一个多按钮组件

网络基础入门理解

signed、unsigned关键字

3DMAX assign face map
随机推荐
OpenCV VideoCapture. Get() parameter details
UE4蓝图学习篇(四)--流程控制ForLoop和WhileLoop
Learn the principle of database kernel from Oracle log parsing
雅思口语的具体步骤和时间安排是什么样的?
BasicVSR_PlusPlus-master测试视频、图片
Leetcode exercise - Sword finger offer 26 Substructure of tree
Inno Setup 打包及签名指南
How to confirm the storage mode of the current system by program?
Pit encountered by handwritten ABA
重磅新闻 | Softing FG-200获得中国3C防爆认证 为客户现场测试提供安全保障
Aardio - integrate variable values into a string of text through variable names
Adavit -- dynamic network with adaptive selection of computing structure
Spatial domain and frequency domain image compression of images
case 关键字后面的值有什么要求吗?
将MySQL的表数据纯净方式导出
新手程序员该不该背代码?
npm无法安装sharp
Dealing with the crash of QT quick project in offscreen mode
Leetcode question brushing (XI) -- sequential questions brushing 51 to 55
Heavyweight news | softing fg-200 has obtained China 3C explosion-proof certification to provide safety assurance for customers' on-site testing