当前位置:网站首页>【斯坦福计网CS144项目】Lab1: StreamReassembler
【斯坦福计网CS144项目】Lab1: StreamReassembler
2022-06-13 01:09:00 【Altair_alpha】
本节实现的类是 StreamReassembler,即字节流碎片的重组器。我们知道网络环境是不可靠的,其中传输的数据包既不保证不丢失,也不保证按发送顺序到达,因此需要有一个工具负责将这样乱序碎片化到达的流重组成按正确顺序连续排列的数据。重组的依据就是 TCP 协议规定的序列号,即每个字节的数据有一个独特的编号,数据包会在包头中带上本包第一个字节数据的编号(和数据的长度),以表示本包的数据在整个流中的位置。在后面几节更上层的类的实现中会讲到这个序列号叫做 seqnum,我们也要自己负责在发送数据时维护它,不过在本节实验中,只需要认为它是一个 index,做好接收时的处理即可。需要实现的核心函数如下:
void push_substring(const std::string &data, const uint64_t index, const bool eof);
与 ByteStream 类似,StreamReassembler 也有一个容量(capacity)限额,其存储的所有数据,包括已经按顺序排好写入 ByteStream 的和因为前面的数据没到而暂存起来的两部分加起来不能超过这个值。
StreamReassembler 的核心逻辑很简单:如果到达的数据的 index 与已组装好的最后一字节数据的 index 相连,或者是流的开头(index = 0),就将数据写入一个 ByteStream 中;否则将数据暂存起来,等待其前一字节的数据到达写入后再将其自身写入。但是在实现上还是有不少需要考虑的细节,主要集中在 数据可能重叠(例如,先到来1-3的数据又到来2-4的数据)和 容量限制 两点上。比较好的是文档告诉我们重叠的数据保证是一致的,所以对于后到达的重复 index 数据直接丢弃即可。另外,还要考虑设置 EOF 的逻辑。
下面从逻辑上讲解我的思路。首先不难想到:
- 应该添加一个暂存区的数据结构,我用了
map
,键值分别为 index 和数据,方便查找。 - 应该维护一个当前等待的数据的 index,我称为
_wait_index
。
当调用 push_substring
时,考虑新到达数据的 index:
- 如果小于
_wait_index
,则应该将_wait_index
之前的数据丢掉,因为那些已经被写入过了。如果丢完后没有数据了,说明这个包没有新数据,无需做任何事;否则剩下的数据正好是等待的新数据,执行写入,注意需要考虑容量限制,我用truncate_data
函数表示对数据的裁剪过程。 - 如果等于
_wait_index
,执行写入,同样要使用truncate_data
。 - 如果大于
_wait_index
,说明到来的是后面的数据,先将它们暂存起来。暂存时也需要考虑容量限制,我用checked_insert
函数表示。
如果发生了写入,则应该检查是否有暂存的数据现在能够被写入了;如果触发了新的写入则应该继续循环检查。对暂存区的检查更新我写在 update_waitmap
函数中。实现中会保证所有暂存项的 index 随时都大于等于 _wait_index
,也就是不会存储已经写入过的数据浪费空间,因此在查找是否有新数据能被写入时只要寻找等于 _wait_index
的即可。EOF 的逻辑比较简单,维护一个 eof_index
即可。于是可以写出 push_substring
的代码如下:
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if (eof) {
_eof_index = index + data.size(); // one over last byte's index
}
// If the incoming data's index is smaller than waiting index, truncate it at front
// (we can't erase data that's already been written to the end of output stream,
// and as the document says both substring in this case should be the same)
size_t start_pos = index < _wait_index ? _wait_index - index : 0;
if (start_pos >= data.size()) {
if (empty() && _wait_index >= _eof_index) {
_output.end_input();
}
return;
}
string write_data = data.substr(start_pos);
size_t moved_index = index + start_pos;
if (moved_index > _wait_index) {
checked_insert(write_data, index);
} else if (moved_index == _wait_index) {
write_data = truncate_data(write_data, index);
size_t write_size = _output.write(write_data); // write_size should be equal to trucated data size
_wait_index += write_size;
// try to reassemble as much data as possible
while (true) {
update_waitmap();
auto it = _wait_map.find(_wait_index);
if (it == _wait_map.end()) {
break;
}
write_size = _output.write((*it).second);
_wait_index += write_size;
_wait_map.erase(it);
}
}
// if all data in wait buffer has been assembled (including eof byte)
// it's ok to close the output stream
if (empty() && _wait_index == _eof_index) {
_output.end_input();
}
}
下面需要对 truncate_data
,checked_insert
和 update_waitmap
三个函数逐个攻破。
首先考虑 truncate_data
,也就是对即将写入字节流 / 插入暂存区的数据根据容量进行裁剪的函数。有两个条件:
- 字节流容量 + 暂存区容量 + 数据大小 <= 容量。由容量的定义自然得到。
- index > 容量 + 已经被读取字节数的数据应该被丢弃。这条是针对插入暂存区的情况,举个例子说明:容量为8,首先插入 index 0: ‘abc’,然后插入 index 6: ‘ghX’,那么应该被裁剪成 ‘gh’,因为如果插入了 ghX,剩余容量为2,那么如果不发生读取,中间等待的3个字节(如 def )中的 f 将永远没有机会被写入,存储 ghX 也就没有意义了。
实现如下:
string StreamReassembler::truncate_data(const string &data, uint64_t index) {
size_t trunc_size = min(data.size(), _capacity + _output.bytes_read() - index);
trunc_size = min(trunc_size, _capacity - _output.buffer_size() - unassembled_bytes());
return data.substr(0, trunc_size);
接着考虑 checked_insert
,也就是将数据置入暂存区的处理。最后一步当然是调用上面的 truncate_data
裁剪成符合容量要求的长度,不过在此之前,因为暂存区中可能已经存在和当前数据存在重叠的项,应该考虑以下可以缩减重复存储的情况:
- 如果新数据完全覆盖了某个项,则可以把该项移除,插入新数据。
- 如果某个项完全覆盖了新数据,则无需插入。
- 如果某个项和新数据部分重叠,将该项从暂存区移除并和新数据合并(分新数据在前和该项数据在前两种情况)。
对暂存区中所有项按以上规则扫描一遍,再进行 truncate_data
后就可以执行插入了。实现如下:
void StreamReassembler::checked_insert(const string &data, uint64_t index) {
string ins_data = data;
// check and truncate data according to _wait_map's content
size_t ins_start = index, ins_end = index + ins_data.size() - 1;
for (auto it = _wait_map.begin(); it != _wait_map.end();) {
const string &elem_data = (*it).second;
size_t elem_start = (*it).first, elem_end = elem_start + elem_data.size() - 1;
// insert data overlaps with current element 'e'
if (ins_start <= elem_end && elem_start <= ins_end) {
// insert data completely covers 'e', erase 'e'
if (ins_start <= elem_start && ins_end >= elem_end) {
it = _wait_map.erase(it);
}
// insert data is completely covered by 'e', clear data (do not insert)
else if (elem_start <= ins_start && elem_end >= ins_end) {
ins_data.clear();
++it;
}
// insert data partially overlaps with 'e', merge 'e' into data
else {
if (ins_start <= elem_start) {
ins_data += elem_data.substr(ins_end + 1 - elem_start, elem_end - ins_end);
} else {
index = elem_start;
ins_data.insert(0, elem_data.substr(0, ins_start - elem_start));
}
it = _wait_map.erase(it);
}
} else {
++it;
}
}
// if data isn't empty after checking, perform the insert
if (!ins_data.empty()) {
_wait_map.insert(make_pair(index, truncate_data(ins_data, index)));
}
}
最后考虑 update_waitmap
。由于循环寻找是否有能写入输出的暂存区数据的逻辑已经在 push_substring
中完成,这里只要负责完成因为新数据插入(导致 _wait_index
右移)而需要对暂存区进行的更新。具体来说,例如原来的 _wait_index
是 2,暂存区中有 3-5,4-7 和 8-10 的数据,新数据的插入使 _wait_index
变为 6,那么第一项 3-5 应该被删除,4-7 应该被修改为 6-7(然后会在下一次循环被写入),8-10 不变。实现如下:
void StreamReassembler::update_waitmap() {
for (auto it = _wait_map.begin(); it != _wait_map.end();) {
size_t index = (*it).first;
if (index < _wait_index) {
string data = (*it).second;
// erase anyway as we're either discarding it or modifying both key and value
it = _wait_map.erase(it);
if (index + data.size() > _wait_index) {
data = data.substr(_wait_index - index);
index = _wait_index;
checked_insert(data, index);
}
} else {
++it;
}
}
}
完整代码链接:
stream_reassembler.hh
stream_reassembler.cc
通关截图
边栏推荐
- Mathematical knowledge arrangement: extremum & maximum, stagnation point, Lagrange multiplier
- Common skills for quantitative investment - indicators Chapter 3: detailed explanation of RSI indicators, their code implementation and drawing
- Characteristics of transactions - persistence (implementation principle)
- What kind of experience is it to be a software test engineer in a state-owned enterprise: every day is like a war
- 304. Merge two ordered arrays
- How many steps are appropriate for each cycle of deep learning?
- Most elements leetcode
- [backtrader source code analysis 7] analysis of the functions for calculating mean value, variance and standard deviation in mathsupport in backtrader (with low gold content)
- Leetcode-13- Roman numeral to integer (simple)
- Common skills of quantitative investment -- Drawing Part 1: Drawing stock closing price curve and ochl candle chart
猜你喜欢
Rasa对话机器人之HelpDesk (三)
What kind of experience is it to be a software test engineer in a state-owned enterprise: every day is like a war
工作与生活
How to choose stocks? Which indicator strategy is reliable? Quantitative analysis and comparison of strategic returns of BBI, MTM, obv, CCI and priceosc indicators
[JS component library] drag sorting component
ArrayList underlying source code
MySQL index
leetcode 142. Circular linked list II
Undirected graph -- computing the degree of a node in compressed storage
Illustrator tutorial, how to add dashes and arrows in illustrator?
随机推荐
Et5.0 value type generation
Minimum spanning tree problem
spiral matrix visit Search a 2D Matrix
Go simple read database
Matrix fast power
. The way to prove the effect of throwing exceptions on performance in. Net core
How many steps are appropriate for each cycle of deep learning?
5G工业网关在煤矿行业的应用优势
Traditional machine learning classification model predicts the rise and fall of stock prices under more than 40 indicators
Tangent and tangent plane
Liu Hui and introduction to nine chapter arithmetic and island arithmetic
What kind of experience is it to be a software test engineer in a state-owned enterprise: every day is like a war
leetcode. 349. intersection of two arrays
FLIP动画实现思路
[Latex] 插入图片
Tree - delete all leaf nodes
Wal mechanism of MySQL
Three column simple Typecho theme lanstar/ Blue Star Typecho theme
Understanding of the detach() function of pytorch
MySQL performance analysis - explain