当前位置:网站首页>【斯坦福計網CS144項目】Lab1: StreamReassembler
【斯坦福計網CS144項目】Lab1: StreamReassembler
2022-06-13 01:10:00 【Altair_alpha】
本節實現的類是 StreamReassembler,即字節流碎片的重組器。我們知道網絡環境是不可靠的,其中傳輸的數據包既不保證不丟失,也不保證按發送順序到達,因此需要有一個工具負責將這樣亂序碎片化到達的流重組成按正確順序連續排列的數據。重組的依據就是 TCP 協議規定的序列號,即每個字節的數據有一個獨特的編號,數據包會在包頭中帶上本包第一個字節數據的編號(和數據的長度),以錶示本包的數據在整個流中的比特置。在後面幾節更上層的類的實現中會講到這個序列號叫做 seqnum,我們也要自己負責在發送數據時維護它,不過在本節實驗中,只需要認為它是一個 index,做好接收時的處理即可。需要實現的核心函數如下:
void push_substring(const std::string &data, const uint64_t index, const bool eof);
與 ByteStream 類似,StreamReassembler 也有一個容量(capacity)限額,其存儲的所有數據,包括已經按順序排好寫入 ByteStream 的和因為前面的數據沒到而暫存起來的兩部分加起來不能超過這個值。
StreamReassembler 的核心邏輯很簡單:如果到達的數據的 index 與已組裝好的最後一字節數據的 index 相連,或者是流的開頭(index = 0),就將數據寫入一個 ByteStream 中;否則將數據暫存起來,等待其前一字節的數據到達寫入後再將其自身寫入。但是在實現上還是有不少需要考慮的細節,主要集中在 數據可能重疊(例如,先到來1-3的數據又到來2-4的數據)和 容量限制 兩點上。比較好的是文檔告訴我們重疊的數據保證是一致的,所以對於後到達的重複 index 數據直接丟弃即可。另外,還要考慮設置 EOF 的邏輯。
下面從邏輯上講解我的思路。首先不難想到:
- 應該添加一個暫存區的數據結構,我用了
map,鍵值分別為 index 和數據,方便查找。 - 應該維護一個當前等待的數據的 index,我稱為
_wait_index。
當調用 push_substring 時,考慮新到達數據的 index:
- 如果小於
_wait_index,則應該將_wait_index之前的數據丟掉,因為那些已經被寫入過了。如果丟完後沒有數據了,說明這個包沒有新數據,無需做任何事;否則剩下的數據正好是等待的新數據,執行寫入,注意需要考慮容量限制,我用truncate_data函數錶示對數據的裁剪過程。 - 如果等於
_wait_index,執行寫入,同樣要使用truncate_data。 - 如果大於
_wait_index,說明到來的是後面的數據,先將它們暫存起來。暫存時也需要考慮容量限制,我用checked_insert函數錶示。
如果發生了寫入,則應該檢查是否有暫存的數據現在能够被寫入了;如果觸發了新的寫入則應該繼續循環檢查。對暫存區的檢查更新我寫在 update_waitmap 函數中。實現中會保證所有暫存項的 index 隨時都大於等於 _wait_index,也就是不會存儲已經寫入過的數據浪費空間,因此在查找是否有新數據能被寫入時只要尋找等於 _wait_index 的即可。EOF 的邏輯比較簡單,維護一個 eof_index 即可。於是可以寫出 push_substring 的代碼如下:
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if (eof) {
_eof_index = index + data.size(); // one over last byte's index
}
// If the incoming data's index is smaller than waiting index, truncate it at front
// (we can't erase data that's already been written to the end of output stream,
// and as the document says both substring in this case should be the same)
size_t start_pos = index < _wait_index ? _wait_index - index : 0;
if (start_pos >= data.size()) {
if (empty() && _wait_index >= _eof_index) {
_output.end_input();
}
return;
}
string write_data = data.substr(start_pos);
size_t moved_index = index + start_pos;
if (moved_index > _wait_index) {
checked_insert(write_data, index);
} else if (moved_index == _wait_index) {
write_data = truncate_data(write_data, index);
size_t write_size = _output.write(write_data); // write_size should be equal to trucated data size
_wait_index += write_size;
// try to reassemble as much data as possible
while (true) {
update_waitmap();
auto it = _wait_map.find(_wait_index);
if (it == _wait_map.end()) {
break;
}
write_size = _output.write((*it).second);
_wait_index += write_size;
_wait_map.erase(it);
}
}
// if all data in wait buffer has been assembled (including eof byte)
// it's ok to close the output stream
if (empty() && _wait_index == _eof_index) {
_output.end_input();
}
}
下面需要對 truncate_data,checked_insert 和 update_waitmap 三個函數逐個攻破。
首先考慮 truncate_data,也就是對即將寫入字節流 / 插入暫存區的數據根據容量進行裁剪的函數。有兩個條件:
- 字節流容量 + 暫存區容量 + 數據大小 <= 容量。由容量的定義自然得到。
- index > 容量 + 已經被讀取字節數的數據應該被丟弃。這條是針對插入暫存區的情况,舉個例子說明:容量為8,首先插入 index 0: ‘abc’,然後插入 index 6: ‘ghX’,那麼應該被裁剪成 ‘gh’,因為如果插入了 ghX,剩餘容量為2,那麼如果不發生讀取,中間等待的3個字節(如 def )中的 f 將永遠沒有機會被寫入,存儲 ghX 也就沒有意義了。
實現如下:
string StreamReassembler::truncate_data(const string &data, uint64_t index) {
size_t trunc_size = min(data.size(), _capacity + _output.bytes_read() - index);
trunc_size = min(trunc_size, _capacity - _output.buffer_size() - unassembled_bytes());
return data.substr(0, trunc_size);
接著考慮 checked_insert,也就是將數據置入暫存區的處理。最後一步當然是調用上面的 truncate_data 裁剪成符合容量要求的長度,不過在此之前,因為暫存區中可能已經存在和當前數據存在重疊的項,應該考慮以下可以縮减重複存儲的情况:
- 如果新數據完全覆蓋了某個項,則可以把該項移除,插入新數據。
- 如果某個項完全覆蓋了新數據,則無需插入。
- 如果某個項和新數據部分重疊,將該項從暫存區移除並和新數據合並(分新數據在前和該項數據在前兩種情况)。
對暫存區中所有項按以上規則掃描一遍,再進行 truncate_data 後就可以執行插入了。實現如下:
void StreamReassembler::checked_insert(const string &data, uint64_t index) {
string ins_data = data;
// check and truncate data according to _wait_map's content
size_t ins_start = index, ins_end = index + ins_data.size() - 1;
for (auto it = _wait_map.begin(); it != _wait_map.end();) {
const string &elem_data = (*it).second;
size_t elem_start = (*it).first, elem_end = elem_start + elem_data.size() - 1;
// insert data overlaps with current element 'e'
if (ins_start <= elem_end && elem_start <= ins_end) {
// insert data completely covers 'e', erase 'e'
if (ins_start <= elem_start && ins_end >= elem_end) {
it = _wait_map.erase(it);
}
// insert data is completely covered by 'e', clear data (do not insert)
else if (elem_start <= ins_start && elem_end >= ins_end) {
ins_data.clear();
++it;
}
// insert data partially overlaps with 'e', merge 'e' into data
else {
if (ins_start <= elem_start) {
ins_data += elem_data.substr(ins_end + 1 - elem_start, elem_end - ins_end);
} else {
index = elem_start;
ins_data.insert(0, elem_data.substr(0, ins_start - elem_start));
}
it = _wait_map.erase(it);
}
} else {
++it;
}
}
// if data isn't empty after checking, perform the insert
if (!ins_data.empty()) {
_wait_map.insert(make_pair(index, truncate_data(ins_data, index)));
}
}
最後考慮 update_waitmap。由於循環尋找是否有能寫入輸出的暫存區數據的邏輯已經在 push_substring 中完成,這裏只要負責完成因為新數據插入(導致 _wait_index 右移)而需要對暫存區進行的更新。具體來說,例如原來的 _wait_index 是 2,暫存區中有 3-5,4-7 和 8-10 的數據,新數據的插入使 _wait_index 變為 6,那麼第一項 3-5 應該被删除,4-7 應該被修改為 6-7(然後會在下一次循環被寫入),8-10 不變。實現如下:
void StreamReassembler::update_waitmap() {
for (auto it = _wait_map.begin(); it != _wait_map.end();) {
size_t index = (*it).first;
if (index < _wait_index) {
string data = (*it).second;
// erase anyway as we're either discarding it or modifying both key and value
it = _wait_map.erase(it);
if (index + data.size() > _wait_index) {
data = data.substr(_wait_index - index);
index = _wait_index;
checked_insert(data, index);
}
} else {
++it;
}
}
}
完整代碼鏈接:
stream_reassembler.hh
stream_reassembler.cc
通關截圖

边栏推荐
- Et5.0 value type generation
- MySQL exception: com mysql. jdbc. PacketTooBigException: Packet for query is too large(4223215 > 4194304)
- 在国企做软件测试工程师是一种什么样的体验:每天过的像打仗一样
- Detailed explanation of Joseph problem
- 软件测试的几种分类,一看就明了
- Tkinter library installation
- redis
- Study notes on the introduction paper of face recognition deep facial expression recognition: a survey
- Binary tree -- using hierarchical sequence and middle sequence to determine a tree
- Physical orbit simulation
猜你喜欢

Memory learning book reference

Binary tree -- using hierarchical sequence and middle sequence to determine a tree

redis

Leetcode-11- container with the most water (medium)
![[Latex] 插入圖片](/img/0b/3304aaa03d3fea3ebb93b0348c3131.png)
[Latex] 插入圖片

Several categories of software testing are clear at a glance

Jenkins持续集成操作

Jenkins continuous integration operation

使用Pygame创建一个简单游戏界面

sort
随机推荐
Jenkins continuous integration operation
Pytorch's leafnode understanding
五篇经典好文,值得一看
[JS component] custom paging
生态聚合NFT来袭,Metaverse Ape引领Web 3.0元宇宙新范式革命
Traditional machine learning classification model predicts the rise and fall of stock prices under more than 40 indicators
Leetcode-17- letter combination of phone number (medium)
Alexnet implements image classification of caltech101 dataset (pytorch Implementation)
Five classic articles worth reading (2)
Argparse command line passes list type parameter
[JS component] create a custom horizontal and vertical scroll bar following the steam style
[JS component] simulation framework
Et5.0 simply transform referencecollectorieditor
Most elements leetcode
Alexnet实现Caltech101数据集图像分类(pytorch实现)
Several categories of software testing are clear at a glance
ES6解构赋值
np. Understanding of axis in concatenate
[JS component] browse progress bar
What kind of experience is it to be a software test engineer in a state-owned enterprise: every day is like a war