1. 程式人生 > >CS144學習(2)TCP協議實現

CS144學習(2)TCP協議實現

Lab1-4 分別是完成一個流重組器,TCP接收端,TCP傳送端,TCP連線四個部分,將四個部分組合在一起就是一個完整的TCP端了。之後經過包裝就可以進行TCP的接收和傳送了。 程式碼全部在[github](https://github.com/weijunji/TCP-CPP)上了。 ## Lab1 流重組器 這一個實驗是要實現一個流重組器,傳入資料的片段以及起始位置,之後對其進行重組,並儘快將以及重組完成的資料輸出。 這裡我使用的是紅黑樹來實現,也就是C++的`std::set`來實現。將未重組完成的碎片儲存在紅黑樹中,當新碎片到達時就儘可能地將該碎片與已有的碎片進行合併,保證紅黑樹中沒有重疊的碎片。 這一個實驗的問題就是要考慮的情況有很多,當用`lower_bound()`找到插入位置後,要對前面和後面的碎片判斷能否合併,合併的情況也有很多種,包括部分重疊、正好接上、完全覆蓋等情況;而且一個新的碎片可能會一次覆蓋掉很多碎片。這一部分程式碼我寫的比較混亂,因為寫完之後測試發現有情況沒考慮到,然後就只能打補丁,於是就越來越混亂了。 而儘快輸出這個條件還是很容易的,如果當前到達碎片能夠直接輸出的話,就再判斷一下樹中第一個碎片能否輸出,因為前面保證了不會有重疊碎片,所以可以只對第一個碎片進行判斷。 ## Lab2 TCP接收端 這個實驗是基於上一個的流重組器來實現一個TCP接收端,這一個還是比較簡單的。就是前面流重組器的一些BUG可能會在這個實驗裡面被檢測到,要回去改程式碼。 首先是實現一個`WrappingInt32`,因為TCP的序號是32位的,並且是可能發生溢位的,而在流重組器裡面使用的序列號是64位,因此需要實現函式來進行轉換,將64位的相對序列號根據ISN轉換成32位的絕對序列號。 ```cpp #include "wrapping_integers.hh" using namespace std; WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) { uint64_t res = isn.raw_value() + n; return WrappingInt32{static_cast(res)}; } uint64_t abs(uint64_t a, uint64_t b) { if (a > b) { return a - b; } else { return b - a; } } uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) { uint64_t pre = checkpoint & 0xffffffff00000000; uint64_t num; if (n.raw_value() >= isn.raw_value()) { num = n.raw_value() - isn.raw_value(); } else { num = 0x0000000100000000; num += n.raw_value(); num -= isn.raw_value(); } uint64_t a = pre + num; uint64_t b = a + 0x0000000100000000; uint64_t c = a - 0x0000000100000000; // b a c if (abs(a, checkpoint) < abs(b, checkpoint)) { if (abs(a, checkpoint) < abs(c, checkpoint)) { return a; } else { return c; } } else { return b; } } ``` 最後就是對流重組器進行一下包裝,計算出`ackno`和`window_size`提供給後面使用,處理一下SYN和FIN標記就行了。 ```cpp #include "tcp_receiver.hh" using namespace std; void TCPReceiver::segment_received(const TCPSegment &seg) { if (!_isn.has_value()) { if (seg.header().syn) { _isn = seg.header().seqno + 1; } else { std::cerr << "Error: connection not build" << std::endl; return; } } bool eof = seg.header().fin; std::string&& payload = std::string(seg.payload().str()); if (seg.header().seqno == _isn.value() - 1 && !seg.header().syn && _reassembler.expect() < 0x0000ffff) { // wrong packet seqno == isn return; } uint64_t index = unwrap(seg.header().seqno + (seg.header().syn ? 1 : 0), _isn.value(), _reassembler.expect()); _reassembler.push_substring(payload, index, eof); } optional TCPReceiver::ackno() const { if (_isn.has_value()) { return { wrap(_reassembler.expect(), _isn.value()) + (_reassembler.stream_out().input_ended() ? 1 : 0) }; } else { return std::nullopt; } } size_t TCPReceiver::window_size() const { return _capacity - _reassembler.stream_out().buffer_size(); } ``` ## Lab3 TCP傳送端 在這個實驗裡面就要考慮到TCP的一些細節了,包括`SYN`和`FIN`包的傳送,`ACK`的處理,超時重傳的實現了。 ### SYN包 之前我還在考慮客戶端和服務端的`SYN`包應該是不同的,應該如何處理;而實際上兩個包是相同的,都是攜帶`SYN`和初始序列號,不同的地方就是服務端的`SYN`要同時對客戶端的`SYN`的包進行ACK。但ACK的處理是在TCP連線部分進行處理的,也就是說在TCP傳送端裡,只要發出一個`SYN`包就行了,剩下的不需要考慮。 而應該在什麼時候發出`SYN`包呢,一開始我是在建構函式中就構造一個`SYN`包放到傳送佇列裡面。這個做法在這個實驗的測試裡面是沒有問題的,但是對於下一個實驗就有問題了。因為服務端一開始是處於`LISTEN`狀態,而這個狀態下不應該有包被髮出。因此,`SYN`包的傳送應該放在`fill_window()`函式中,如果沒有傳送過`SYN`包,就先將`SYN`包傳送出去。 對於一個最簡單的`SYN`包,就只需要將`SYN`位置1,設定初始序列號`seqno`就行了。注意`SYN`是要佔用一個序列號的。 ### FIN包 當傳送流被使用者程式結束後,就可以傳送`FIN`包來關閉一個方向的連線了。這一個包的傳送還是比較簡單的,只要在`fill_window`中判斷是否結束就行了。`FIN`包是可以和資料包一起傳送的,在傳送資料時發現流結束了,就將該資料包的`FIN`標誌置1就行了。`FIN`也是要佔用一個序列號的。當收到對方對`FIN`包的ACK後,就說明順利關閉了。 ### ACK的處理 ACK資料會通過`ack_received`函式來通知傳送端,當收到一個ACK後,就可以將傳送視窗向右滑動,注意判斷一下ACK是不是之前的ACK,避免視窗左移。最後將所有被ACK了的包從等待確認的佇列中移除就行了。 ### 重傳 在這個實驗裡面只要求實現的是超時重傳機制,但我也加入了快速重傳機制。理論上當一個包超時之後就要對其進行重傳,也就是每個包都要有定時器來負責重傳,而這樣的代價是很高的。因此,實現中是對每個TCP連線設定一個定時器,當時間超過RTO後進行重傳,定時器的規則如下: * 當傳送一個包並且定時器為關閉狀態:開啟定時器 * 當傳送一個包並且定時器為開啟狀態:不做任何修改 * 當收到一個ACK並且所有包都被ACK了:關閉定時器 * 當收到一個ACK並且仍有包未被ACK:重開定時器 超時重傳使用的是指數退避演算法,當進行了一次超時重傳後,下一次超時的時間就會翻倍,也就是1RTO,2RTO,4RTO,8RTO… 當定時器超時後,就需要對包進行重傳,在本實驗裡面只要重傳第一個包就行了。這種方法的缺點就是後面丟失的包也要等到第一個被ACK了才能重發,時間會比較長。另一種選擇是重傳所有包,而這種的缺點就是會加大網路負擔。因此,為了解決這種問題,就引入了其他的機制。 #### 快速重傳 快速重傳是指當收到三個重複ACK(不包括第一次ACK)的時候,就立即進行重傳,這種方法是資料驅動而不是時間驅動,可以避免超時重傳的速度較慢的問題。而這仍然存在是重傳一個還是所有的問題。 #### SACK SACK也就是選擇重傳機制,接收端通過SACK來確認已收到的片段,從而對重傳演算法進行優化,可以不用對所有包進行重傳。 而SACK存在接收方Reneging的問題,即接收方有權把已經SACK的資料給丟棄。這種丟棄是不被鼓勵但還是可能發生的。因此,傳送方不能完全依賴SACK,還是要依賴ACK,並維護定時器,如果後續的ACK沒有增長,那麼還是要對已經SACK的資料進行重傳。同時,接收端也永遠不能把SACK的包標記為ACK。 ```cpp #include "tcp_sender.hh" #include "tcp_config.hh" #include #include using namespace std; TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional fixed_isn) : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})) , _initial_retransmission_timeout{retx_timeout} , _stream(capacity) { } uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _expect_ack; } void TCPSender::fill_window() { if (!_syn_sent) { TCPSegment seg; seg.header().syn = true; seg.header().seqno = wrap(0, _isn); _segments_out.push(seg); _seg_not_ack.push(seg); _next_seqno = 1; _retrans_timer = _tick + _initial_retransmission_timeout; _syn_sent = true; } uint64_t remain = _window_size - bytes_in_flight(); bool send = false; if (_expect_ack != 0) { // SYN received while (remain > 0 && _stream.buffer_size() > 0) { // send segment uint64_t send_bytes = min(remain, TCPConfig::MAX_PAYLOAD_SIZE); string payload = _stream.read(send_bytes); TCPSegment seg; seg.header().seqno = wrap(_next_seqno, _isn); seg.payload() = move(payload); _next_seqno += seg.length_in_sequence_space(); remain = _window_size - bytes_in_flight(); if (_stream.eof() && remain > 0 && !_fin_sent) { seg.header().fin = true; _next_seqno += 1; _fin_sent = true; } _segments_out.push(seg); _seg_not_ack.push(seg); send = true; } } if (_stream.eof() && remain > 0 && !_fin_sent) { // send FIN TCPSegment seg; seg.header().fin = true; seg.header().seqno = wrap(_next_seqno, _isn); _segments_out.push(seg); _seg_not_ack.push(seg); _next_seqno += 1; _fin_sent = true; send = true; } if (send && _retrans_timer == 0) { // open timer _retrans_timer = _tick + _initial_retransmission_timeout; _consecutive_retransmissions = 0; _rto_back_off = 0; } } void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) { _window_size = window_size; _do_back_off = 1; if (_window_size == 0) { _window_size = 1; _do_back_off = 0; } uint64_t ack = unwrap(ackno, _isn, _expect_ack); if (ack <= _next_seqno && ack > _expect_ack) { if (ack == _expect_ack) { _same_ack++; } else { _same_ack = 0; } _expect_ack = ack; if (bytes_in_flight() == 0) { // close timer _retrans_timer = 0; _consecutive_retransmissions = 0; _rto_back_off = 0; } else { // reopen timer _retrans_timer = _tick + _initial_retransmission_timeout; _consecutive_retransmissions = 0; _rto_back_off = 0; } } // remove all acked packets while (!_seg_not_ack.empty()) { TCPSegment seg = _seg_not_ack.front(); if (seg.length_in_sequence_space() + unwrap(seg.header().seqno, _isn, _expect_ack) <= _expect_ack) { _seg_not_ack.pop(); } else { break; } } // faster retransmit if (_same_ack == 3 && !_seg_not_ack.empty()) { // cout << "!! FASTER RETRANSMIT" << endl; _same_ack = 0; TCPSegment seg = _seg_not_ack.front(); _segments_out.push(seg); _consecutive_retransmissions += 1; _rto_back_off += _do_back_off; _retrans_timer += _initial_retransmission_timeout << _rto_back_off; } } void TCPSender::tick(const size_t ms_since_last_tick) { _tick += ms_since_last_tick; if (!_seg_not_ack.empty() && _tick >= _retrans_timer) { // retransmit the first packet // cout << "retransmit" << endl; TCPSegment seg = _seg_not_ack.front(); _segments_out.push(seg); _consecutive_retransmissions += 1; _rto_back_off += _do_back_off; _retrans_timer = _tick + (_initial_retransmission_timeout << _rto_back_off); } } unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions; } void TCPSender::send_empty_segment() { TCPSegment seg; seg.header().seqno = wrap(_next_seqno, _isn); _segments_out.push(seg); } ``` ## Lab4 TCP連線 這個實驗就是將之前的傳送端和接收端組合起來,成為一個完整的TCP peer。 主要的工作就是將傳送的資料從傳送端的佇列中取出,再放到傳送佇列中去;傳送`ack`包進行確認;對`RST`進行處理;對連線的關閉和TIME_WAIT狀態進行處理。 當呼叫`connect`函式時,就可以呼叫`fill_window`生成`SYN`包,然後傳送出去。 當收到一個包之後,就將對於資訊交給傳送端和接收端進行處理,然後進行ACK,當傳送佇列有包時直接附帶ACK就行了,如果沒有就要生成一個空包進行ACK,注意當接收的包只是一個ACK包而沒有任何資料的話就不要進行ACK。當接收端收到所有資料以及`FIN`包之後就會關閉接收端的輸入流。 當連線的輸入流關閉,就可以呼叫傳送端的`end_input`和`fill_window`來生成`FIN`包併發送。 ### 連線的關閉 TCP連線的關閉分為兩種情況,主動關閉和被動關閉。 當傳送流先結束時,就要進行主動關閉,傳送`FIN`進入FIN_WAIT_1狀態,收到`ACK`後進入FIN_WAIT_2狀態,當收到對方的`FIN`包並進行ACK之後,就進入TIME_WAIT狀態,在TIME_WAIT狀態下要等待2MSL(Linux中一般為60s)才能釋放連線。 TIME_WAIT狀態的目的就是保證最後一個ACK包被對方接收到,因為不會對ACK進行ACK,就只能使用這種方式。如果ACK沒有被對方接收到,那麼對方就會重發`FIN`包,這時候就可以再次進行ACK。如果直接釋放而不進行TIME_WAIT的話,那麼下一個使用該埠的連線就可能會收到上一個連線重傳的`FIN`包,從而導致混亂。 當對方先關閉時就是被動關閉,當收到`FIN`並ACK後,就進入CLOSE_WAIT狀態。等到傳送流結束後,傳送`FIN`進入LAST-ACK狀態,收到對方ACK後就可以關閉連線了,當被動關閉時,就不需要TIME_WAIT了。 ```cpp #include "tcp_connection.hh" #include using namespace std; void TCPConnection::send_all_segments() { if (_closed) return; while (!_sender.segments_out().empty()) { TCPSegment& seg = _sender.segments_out().front(); if (_receiver.ackno().has_value()) { seg.header().ack = true; seg.header().ackno = _receiver.ackno().value(); } size_t max_win = numeric_limits().max(); seg.header().win = min(_receiver.window_size(), max_win); _segments_out.push(seg); _sender.segments_out().pop(); } if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) { if (_linger_after_streams_finish) { _time_wait = true; } } } size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); } size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); } size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); } size_t TCPConnection::time_since_last_segment_received() const { return _ticks - _last_ack_time; } void TCPConnection::segment_received(const TCPSegment &seg) { if (!_syn_sent && !seg.header().syn) return; if (seg.header().rst) { // reset connection _sender.stream_in().set_error(); _receiver.stream_out().set_error(); _linger_after_streams_finish = false; } _last_ack_time = _ticks; _receiver.segment_received(seg); _sender.ack_received(seg.header().ackno, seg.header().win); _sender.fill_window(); _syn_sent = true; if (_receiver.stream_out().input_ended() && !_sender.stream_in().eof()) { // passive close _linger_after_streams_finish = false; } if (!_receiver.ackno().has_value()) { return; // no need for ack } if (_sender.segments_out().empty()) { // generate an empty segment to ack if (_receiver.stream_out().input_ended() && !seg.header().fin) { // no need to ack, server closed and seg not fin } else if (seg.length_in_sequence_space() == 0) { // no need to ack the empty-ack } else { _sender.send_empty_segment(); } } // send with ack send_all_segments(); } bool TCPConnection::active() const { if (_sender.stream_in().error() && _receiver.stream_out().error()) return false; return !(_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) || _time_wait; } size_t TCPConnection::write(const string &data) { size_t wrote = _sender.stream_in().write(data); _sender.fill_window(); send_all_segments(); return wrote; } void TCPConnection::tick(const size_t ms_since_last_tick) { _ticks += ms_since_last_tick; _sender.tick(ms_since_last_tick); if (_time_wait && _ticks >= _last_ack_time + _cfg.rt_timeout * 10) { // closed _time_wait = false; _closed = true; } if (_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS) { // RST _sender.stream_in().set_error(); _receiver.stream_out().set_error(); _linger_after_streams_finish = false; while (!_sender.segments_out().empty()) { // pop all segments _sender.segments_out().pop(); } _sender.send_empty_segment(); TCPSegment& seg = _sender.segments_out().front(); seg.header().rst = true; } send_all_segments(); } void TCPConnection::end_input_stream() { _sender.stream_in().end_input(); _sender.fill_window(); send_all_segments(); } void TCPConnection::connect() { // send SYN if (!_syn_sent) { _sender.fill_window(); _syn_sent = true; TCPSegment& seg = _sender.segments_out().front(); size_t max_win = numeric_limits().max(); seg.header().win = min(_receiver.window_size(), max_win); _segments_out.push(seg); _sender.segments_out().pop(); } } TCPConnection::~TCPConnection() { try { if (active()) { cerr << "Warning: Unclean shutdown of TCPConnection\n"; _sender.stream_in().set_error(); _receiver.stream_out().set_error(); _linger_after_streams_finish = false; while (!_sender.segments_out().empty()) { // pop all segments _sender.segments_out().pop(); } _sender.send_empty_segment(); TCPSegment& seg = _sender.segments_out().front(); seg.header().rst = true; send_all_segments(); } } catch (const exception &e) { std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl; } } ``` ## 測試 至此整個簡單的TCP協議就實現完了,使用這個TCP協議修改Lab0中的webget,然後就可以對網站進行訪問了。使用抓包軟體就可以看到完整的連線建立、資料傳送、連線關閉的過程了。但這裡有一個問題不知道是為什麼,使用webget訪問`cs144.keithw.org`,`bilibili.com`都能正常訪問,但是訪問`www.baidu.com`的時候,就會丟失連線建立之後的一個包,抓包看根本沒收到那個包(tcp previous segment not captured),不知道是我的程式碼有問題還是百度的伺服器使用了什麼特殊的策略。