当前位置:网站首页>[Stanford Jiwang cs144 project] lab1: streamreassembler
[Stanford Jiwang cs144 project] lab1: streamreassembler
2022-06-13 01:12:00 【Altair_ alpha】
The classes implemented in this section are StreamReassembler, That is, the recombiner of byte stream fragments . We know that the network environment is unreliable , The transmitted packets are not guaranteed to be lost , There is no guarantee that they will arrive in the order they were sent , Therefore, it is necessary to have a tool responsible for reconstituting the streams arrived from such disordered fragmentation into data arranged continuously in the correct order . The basis for restructuring is TCP Serial number specified in the agreement , That is, each byte of data has a unique number , The data packet will carry the number of the first byte of the packet in the packet header ( And the length of the data ), To represent the position of the data of this package in the whole stream . In the next few sections, we will talk about this serial number in the implementation of the upper class seqnum, We are also responsible for maintaining it when sending data , But in this experiment , Just think of it as a index, Do a good job in receiving . The core functions to be implemented are as follows :
void push_substring(const std::string &data, const uint64_t index, const bool eof);
And ByteStream similar ,StreamReassembler There is also a capacity (capacity) limit , All the data it stores , Include writes that have been ordered ByteStream And the two parts temporarily stored because the previous data did not arrive The sum cannot exceed this value .
StreamReassembler The core logic is simple : If the data arrived index With the last byte of data assembled index Connected to a , Or the beginning of the stream (index = 0), Just write the data into a ByteStream in ; Otherwise, the data will be temporarily stored , Wait for the data of the previous byte to be written before writing itself . However, there are still many details to be considered in the implementation , Mainly focused on Data may overlap ( for example , Come first 1-3 The data is coming again 2-4 The data of ) and Capacity limit At two o'clock . The better thing is that the document tells us that the overlapping data is guaranteed to be consistent , So for the late arrival repetition index Data can be discarded directly . in addition , Also consider setting up EOF The logic of .
Let's explain my idea logically . First of all, it's not hard to think of :
- You should add a data structure for the staging area , I used it
map, The key values are index And data , Easy to find . - You should maintain a list of currently waiting data index, I called
_wait_index.
When calling push_substring when , Consider the new arrival data index:
- If it is less than
_wait_index, Then the_wait_indexThe previous data is lost , Because those have been written . If there is no data after the loss , This indicates that there is no new data in this package , No need to do anything ; Otherwise, the remaining data is just the new data waiting , Write at , Note that capacity constraints need to be considered , I usetruncate_dataThe function represents the data clipping process . - If it is equal to
_wait_index, Write at , Also usetruncate_data. - If it is greater than
_wait_index, It indicates that the following data is coming , Store them temporarily . Capacity constraints also need to be considered when staging , I usechecked_insertFunction representation .
If a write occurs , Then you should check whether there is any temporary data that can now be written ; If a new write is triggered, the loop check should continue . The check update to the staging area is written in update_waitmap Function . The implementation will guarantee the index Greater than or equal to at any time _wait_index, In other words, it will not waste space by storing the data that has been written , Therefore, when looking for whether new data can be written, just look for a value equal to _wait_index The can .EOF The logic of is relatively simple , Maintain a eof_index that will do . So you can write push_substring The code for is as follows :
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if (eof) {
_eof_index = index + data.size(); // one over last byte's index
}
// If the incoming data's index is smaller than waiting index, truncate it at front
// (we can't erase data that's already been written to the end of output stream,
// and as the document says both substring in this case should be the same)
size_t start_pos = index < _wait_index ? _wait_index - index : 0;
if (start_pos >= data.size()) {
if (empty() && _wait_index >= _eof_index) {
_output.end_input();
}
return;
}
string write_data = data.substr(start_pos);
size_t moved_index = index + start_pos;
if (moved_index > _wait_index) {
checked_insert(write_data, index);
} else if (moved_index == _wait_index) {
write_data = truncate_data(write_data, index);
size_t write_size = _output.write(write_data); // write_size should be equal to trucated data size
_wait_index += write_size;
// try to reassemble as much data as possible
while (true) {
update_waitmap();
auto it = _wait_map.find(_wait_index);
if (it == _wait_map.end()) {
break;
}
write_size = _output.write((*it).second);
_wait_index += write_size;
_wait_map.erase(it);
}
}
// if all data in wait buffer has been assembled (including eof byte)
// it's ok to close the output stream
if (empty() && _wait_index == _eof_index) {
_output.end_input();
}
}
Now we need to understand truncate_data,checked_insert and update_waitmap Three functions are broken one by one .
First consider truncate_data, That is, for the byte stream to be written / A function that inserts data in the staging area and cuts it according to its capacity . There are two conditions :
- Byte stream capacity + Staging capacity + data size <= Capacity . From the definition of capacity, we naturally get .
- index > Capacity + The number of bytes of data that have been read Should be discarded . This is for inserting temporary storage area , Let me give you an example : Capacity of 8, First insert index 0: ‘abc’, Then insert index 6: ‘ghX’, Then it should be cut into ‘gh’, Because if you insert ghX, The remaining capacity is 2, So if no read occurs , Waiting in the middle 3 Bytes ( Such as def ) Medium f Will never have a chance to be written , Storage ghX It doesn't make sense .
The implementation is as follows :
string StreamReassembler::truncate_data(const string &data, uint64_t index) {
size_t trunc_size = min(data.size(), _capacity + _output.bytes_read() - index);
trunc_size = min(trunc_size, _capacity - _output.buffer_size() - unassembled_bytes());
return data.substr(0, trunc_size);
Then consider checked_insert, That is, the process of putting data into the temporary storage area . The last step, of course, is to call the above truncate_data Cut to a length that meets the capacity requirements , But before that , Because items that overlap the current data may already exist in the staging area , Consider the following scenarios that can reduce duplicate storage :
- If the new data completely covers an item , You can remove the item , Insert new data .
- If an item completely overwrites the new data , There is no need to insert .
- If an item partially overlaps with the new data , Remove the item from the staging area and merge it with the new data ( The new data is in the first two cases and the data is in the first two cases ).
Scan all items in the staging area according to the above rules , Proceed again truncate_data After that, you can insert . The implementation is as follows :
void StreamReassembler::checked_insert(const string &data, uint64_t index) {
string ins_data = data;
// check and truncate data according to _wait_map's content
size_t ins_start = index, ins_end = index + ins_data.size() - 1;
for (auto it = _wait_map.begin(); it != _wait_map.end();) {
const string &elem_data = (*it).second;
size_t elem_start = (*it).first, elem_end = elem_start + elem_data.size() - 1;
// insert data overlaps with current element 'e'
if (ins_start <= elem_end && elem_start <= ins_end) {
// insert data completely covers 'e', erase 'e'
if (ins_start <= elem_start && ins_end >= elem_end) {
it = _wait_map.erase(it);
}
// insert data is completely covered by 'e', clear data (do not insert)
else if (elem_start <= ins_start && elem_end >= ins_end) {
ins_data.clear();
++it;
}
// insert data partially overlaps with 'e', merge 'e' into data
else {
if (ins_start <= elem_start) {
ins_data += elem_data.substr(ins_end + 1 - elem_start, elem_end - ins_end);
} else {
index = elem_start;
ins_data.insert(0, elem_data.substr(0, ins_start - elem_start));
}
it = _wait_map.erase(it);
}
} else {
++it;
}
}
// if data isn't empty after checking, perform the insert
if (!ins_data.empty()) {
_wait_map.insert(make_pair(index, truncate_data(ins_data, index)));
}
}
Finally, consider update_waitmap. Since the logic of looping to find whether there is data that can be written to the output staging area has been push_substring Finish in , Here, you only need to complete the new data insertion ( Lead to _wait_index Move right ) The staging area needs to be updated . say concretely , For example, the original _wait_index yes 2, In staging area 3-5,4-7 and 8-10 The data of , The insertion of new data makes _wait_index Turn into 6, So the first item 3-5 It should be deleted ,4-7 It should be revised to 6-7( Then it will be written in the next loop ),8-10 unchanged . The implementation is as follows :
void StreamReassembler::update_waitmap() {
for (auto it = _wait_map.begin(); it != _wait_map.end();) {
size_t index = (*it).first;
if (index < _wait_index) {
string data = (*it).second;
// erase anyway as we're either discarding it or modifying both key and value
it = _wait_map.erase(it);
if (index + data.size() > _wait_index) {
data = data.substr(_wait_index - index);
index = _wait_index;
checked_insert(data, index);
}
} else {
++it;
}
}
}
Full code link :
stream_reassembler.hh
stream_reassembler.cc
Screenshot of customs clearance

边栏推荐
- [JS component] floating text
- Unitywebrequest asynchronous Download
- 使用Pygame创建一个简单游戏界面
- Downloading wiki corpus and aligning with multilingual wikis
- Leetcode-16- sum of the nearest three numbers (medium)
- Status of the thread
- With a market value of more than trillion yuan and a sales volume of more than 100000 yuan for three consecutive months, will BYD become the strongest domestic brand?
- Calculate sentence confusion ppl using Bert and gpt-2
- Characteristics of transactions -- atomicity (implementation principle)
- Lecture on Compilation Principles
猜你喜欢

Binary tree - right view

Et5.0 value type generation

Five classic articles worth reading (2)

Leetcode find duplicates

Tangent and tangent plane

Argparse command line passes list type parameter

(01). Net Maui actual construction project

Memory learning book reference

Application advantages of 5g industrial gateway in coal industry

【斯坦福计网CS144项目】Lab1: StreamReassembler
随机推荐
408 true question - division sequence
How to solve the problem of database?
ArrayList underlying source code
Undirected graph -- computing the degree of a node in compressed storage
How to choose stocks? Which indicator strategy is reliable? Quantitative analysis and comparison of DBCD, ROC, vroc, Cr and psy index strategy income
Stmarl: a spatio temporal multi agentreinforcement learning approach for cooperative traffic
Introduction to convolutional neural network
Sequence table - find main element
Pipeline pipeline project construction
关于#数据库#的问题,如何解决?
Common skills of quantitative investment -- Drawing Part 1: Drawing stock closing price curve and ochl candle chart
pycharm add configutions
4K sea bottom and water surface fabrication method and ocean bump displacement texture Download
軟件測試的幾種分類,一看就明了
Plusieurs catégories de tests logiciels sont claires à première vue
Common skills for quantitative investment - indicators Chapter 3: detailed explanation of RSI indicators, their code implementation and drawing
FLIP动画实现思路
论文笔记:STMARL: A Spatio-Temporal Multi-AgentReinforcement Learning Approach for Cooperative Traffic
Kotlin coroutine suspend function suspend keyword
Quick power explanation