当前位置:网站首页>[Stanford Jiwang cs144 project] lab1: streamreassembler

[Stanford Jiwang cs144 project] lab1: streamreassembler

2022-06-13 01:12:00 Altair_ alpha

The classes implemented in this section are StreamReassembler, That is, the recombiner of byte stream fragments . We know that the network environment is unreliable , The transmitted packets are not guaranteed to be lost , There is no guarantee that they will arrive in the order they were sent , Therefore, it is necessary to have a tool responsible for reconstituting the streams arrived from such disordered fragmentation into data arranged continuously in the correct order . The basis for restructuring is TCP Serial number specified in the agreement , That is, each byte of data has a unique number , The data packet will carry the number of the first byte of the packet in the packet header ( And the length of the data ), To represent the position of the data of this package in the whole stream . In the next few sections, we will talk about this serial number in the implementation of the upper class seqnum, We are also responsible for maintaining it when sending data , But in this experiment , Just think of it as a index, Do a good job in receiving . The core functions to be implemented are as follows :

void push_substring(const std::string &data, const uint64_t index, const bool eof);

And ByteStream similar ,StreamReassembler There is also a capacity (capacity) limit , All the data it stores , Include writes that have been ordered ByteStream And the two parts temporarily stored because the previous data did not arrive The sum cannot exceed this value .

StreamReassembler The core logic is simple : If the data arrived index With the last byte of data assembled index Connected to a , Or the beginning of the stream (index = 0), Just write the data into a ByteStream in ; Otherwise, the data will be temporarily stored , Wait for the data of the previous byte to be written before writing itself . However, there are still many details to be considered in the implementation , Mainly focused on Data may overlap ( for example , Come first 1-3 The data is coming again 2-4 The data of ) and Capacity limit At two o'clock . The better thing is that the document tells us that the overlapping data is guaranteed to be consistent , So for the late arrival repetition index Data can be discarded directly . in addition , Also consider setting up EOF The logic of .


Let's explain my idea logically . First of all, it's not hard to think of :

  1. You should add a data structure for the staging area , I used it map, The key values are index And data , Easy to find .
  2. You should maintain a list of currently waiting data index, I called _wait_index.

When calling push_substring when , Consider the new arrival data index:

  1. If it is less than _wait_index, Then the _wait_index The previous data is lost , Because those have been written . If there is no data after the loss , This indicates that there is no new data in this package , No need to do anything ; Otherwise, the remaining data is just the new data waiting , Write at , Note that capacity constraints need to be considered , I use truncate_data The function represents the data clipping process .
  2. If it is equal to _wait_index, Write at , Also use truncate_data.
  3. If it is greater than _wait_index, It indicates that the following data is coming , Store them temporarily . Capacity constraints also need to be considered when staging , I use checked_insert Function representation .

If a write occurs , Then you should check whether there is any temporary data that can now be written ; If a new write is triggered, the loop check should continue . The check update to the staging area is written in update_waitmap Function . The implementation will guarantee the index Greater than or equal to at any time _wait_index, In other words, it will not waste space by storing the data that has been written , Therefore, when looking for whether new data can be written, just look for a value equal to _wait_index The can .EOF The logic of is relatively simple , Maintain a eof_index that will do . So you can write push_substring The code for is as follows :

void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    
    if (eof) {
    
        _eof_index = index + data.size();  // one over last byte's index
    }
    // If the incoming data's index is smaller than waiting index, truncate it at front
    // (we can't erase data that's already been written to the end of output stream,
    // and as the document says both substring in this case should be the same)
    size_t start_pos = index < _wait_index ? _wait_index - index : 0;
    if (start_pos >= data.size()) {
    
        if (empty() && _wait_index >= _eof_index) {
    
            _output.end_input();
        }
        return;
    }
    string write_data = data.substr(start_pos);
    size_t moved_index = index + start_pos;

    if (moved_index > _wait_index) {
    
        checked_insert(write_data, index);
    } else if (moved_index == _wait_index) {
    
        write_data = truncate_data(write_data, index);
        size_t write_size = _output.write(write_data);  // write_size should be equal to trucated data size
        _wait_index += write_size;

        // try to reassemble as much data as possible
        while (true) {
    
            update_waitmap();
            auto it = _wait_map.find(_wait_index);
            if (it == _wait_map.end()) {
    
                break;
            }
            write_size = _output.write((*it).second);
            _wait_index += write_size;
            _wait_map.erase(it);
        }
    }
    // if all data in wait buffer has been assembled (including eof byte)
    // it's ok to close the output stream
    if (empty() && _wait_index == _eof_index) {
    
        _output.end_input();
    }
}

Now we need to understand truncate_data,checked_insert and update_waitmap Three functions are broken one by one .

First consider truncate_data, That is, for the byte stream to be written / A function that inserts data in the staging area and cuts it according to its capacity . There are two conditions :

  1. Byte stream capacity + Staging capacity + data size <= Capacity . From the definition of capacity, we naturally get .
  2. index > Capacity + The number of bytes of data that have been read Should be discarded . This is for inserting temporary storage area , Let me give you an example : Capacity of 8, First insert index 0: ‘abc’, Then insert index 6: ‘ghX’, Then it should be cut into ‘gh’, Because if you insert ghX, The remaining capacity is 2, So if no read occurs , Waiting in the middle 3 Bytes ( Such as def ) Medium f Will never have a chance to be written , Storage ghX It doesn't make sense .

The implementation is as follows :

string StreamReassembler::truncate_data(const string &data, uint64_t index) {
    
    size_t trunc_size = min(data.size(), _capacity + _output.bytes_read() - index);
    trunc_size = min(trunc_size, _capacity - _output.buffer_size() - unassembled_bytes());
    return data.substr(0, trunc_size);

Then consider checked_insert, That is, the process of putting data into the temporary storage area . The last step, of course, is to call the above truncate_data Cut to a length that meets the capacity requirements , But before that , Because items that overlap the current data may already exist in the staging area , Consider the following scenarios that can reduce duplicate storage :

  1. If the new data completely covers an item , You can remove the item , Insert new data .
  2. If an item completely overwrites the new data , There is no need to insert .
  3. If an item partially overlaps with the new data , Remove the item from the staging area and merge it with the new data ( The new data is in the first two cases and the data is in the first two cases ).

Scan all items in the staging area according to the above rules , Proceed again truncate_data After that, you can insert . The implementation is as follows :

void StreamReassembler::checked_insert(const string &data, uint64_t index) {
    
    string ins_data = data;
    // check and truncate data according to _wait_map's content
    size_t ins_start = index, ins_end = index + ins_data.size() - 1;
    for (auto it = _wait_map.begin(); it != _wait_map.end();) {
    
        const string &elem_data = (*it).second;
        size_t elem_start = (*it).first, elem_end = elem_start + elem_data.size() - 1;
        // insert data overlaps with current element 'e'
        if (ins_start <= elem_end && elem_start <= ins_end) {
    
            // insert data completely covers 'e', erase 'e'
            if (ins_start <= elem_start && ins_end >= elem_end) {
    
                it = _wait_map.erase(it);
            }
            // insert data is completely covered by 'e', clear data (do not insert)
            else if (elem_start <= ins_start && elem_end >= ins_end) {
    
                ins_data.clear();
                ++it;
            }
            // insert data partially overlaps with 'e', merge 'e' into data
            else {
    
                if (ins_start <= elem_start) {
    
                    ins_data += elem_data.substr(ins_end + 1 - elem_start, elem_end - ins_end);
                } else {
    
                    index = elem_start;
                    ins_data.insert(0, elem_data.substr(0, ins_start - elem_start));
                }
                it = _wait_map.erase(it);
            }
        } else {
    
            ++it;
        }
    }
    // if data isn't empty after checking, perform the insert
    if (!ins_data.empty()) {
    
        _wait_map.insert(make_pair(index, truncate_data(ins_data, index)));
    }
}

Finally, consider update_waitmap. Since the logic of looping to find whether there is data that can be written to the output staging area has been push_substring Finish in , Here, you only need to complete the new data insertion ( Lead to _wait_index Move right ) The staging area needs to be updated . say concretely , For example, the original _wait_index yes 2, In staging area 3-5,4-7 and 8-10 The data of , The insertion of new data makes _wait_index Turn into 6, So the first item 3-5 It should be deleted ,4-7 It should be revised to 6-7( Then it will be written in the next loop ),8-10 unchanged . The implementation is as follows :

void StreamReassembler::update_waitmap() {
    
    for (auto it = _wait_map.begin(); it != _wait_map.end();) {
    
        size_t index = (*it).first;
        if (index < _wait_index) {
    
            string data = (*it).second;
            // erase anyway as we're either discarding it or modifying both key and value
            it = _wait_map.erase(it);
            if (index + data.size() > _wait_index) {
    
                data = data.substr(_wait_index - index);
                index = _wait_index;
                checked_insert(data, index);
            }
        } else {
    
            ++it;
        }   
    }
}

Full code link :
stream_reassembler.hh
stream_reassembler.cc

Screenshot of customs clearance

 Please add a picture description

原网站

版权声明
本文为[Altair_ alpha]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/164/202206130109083500.html