1. 程式人生 > >期中專案設計文件

期中專案設計文件

計網期中專案設計文件

寫一個網路應用LFTP,該應用支援網際網路中的兩臺計算機進行大檔案傳輸

需求點實現

1. 使用UDP協議傳輸,但要求像TCP一樣完全可靠

使用rdt3.0停等協議效率太慢了,因此我們使用回退N步(GBN)的流水線協議,允許傳送方傳送多個分組,而不需要等待確認。
UDP是不可靠傳輸的,它所收到的資料是無序的,也有可能在中途丟包。而在GBN協議中,接收方丟棄所有失序分組,確保其像TCP一樣可靠。

GBN

執行時如上述所示,傳送方一直在傳送資料包,同時啟動一個定時器,只有當接收到回覆的ACK包時,才取消定時器。接收方如果收到是所期待的包,給傳送方回一個ACK包,ACK的值為當前所收到的包的序列號。如果接收的包不是所期待的包,則說明發生了丟包,則給傳送方回一個ACK包,ACK的值為期待的資料包的序列號減1。由於傳送方沒有收到ACK包,則觸發超時事件,導致重發。

也有可能是ACK包丟了,但這沒有影響,因為傳送方會根據下一個收到的ACK包來調整。比如ACK2丟失了,但收到了ACK3,說明資料包2沒有丟。

實現方法:

傳送方的實現方法參考以下的FSM圖。

FSM

傳送方通過維護一個視窗來控制傳送。基序號(base)是最早未確認分組的序號,下一個序號(nextseqnum)是最小的未使用序號。N是流水線是最大能允許未確認分組的數量。通過維護這個視窗,傳送端可以控制傳送速率,也可以方便地實現重發分組的功能。

base

程式進入一個迴圈, 傳送端在兩個狀態下進行轉換, 第一個狀態是接受收到接收方傳送的ACK包, 當此時沒有收到 ACK包則進入狀態2, 狀態2是向接收方傳送資料。這樣就可以保證不用停等協議, 我能夠一直在傳送狀態, 即可以傳送多個包。

對於狀態1(接收ACK包):

當接收到ACK包, 首先把收到的資料包解包, 得到確認的包號值 ,判斷如果確認了最後一個包,則說明已經傳送完了, 跳出迴圈, 結束傳輸,

message, client_address = server_socket.recvfrom(BUF_SIZE)

unpacked_message = pkt_struct.unpack(message)

if (newBase == lastSendPacketNum + 1):
    mytimer.cancel()
    break

得到最後一個包時通過傳送時,用一個變數lastSendPacketNum存檔案傳送的最後一個包號,然後判斷收到的ACK包是否是確認最後一個包即可。

if str(data) != "b''":  # b''表示檔案讀完
    end_flag = 0
    #rnwd傳送方沒用到
    client_socket.sendto(pkt_struct.pack(*(nextseqnum, ack, end_flag, 1, data)), server_address)
else:
    end_flag = 1  # 傳送的結束標誌為1,表示檔案已傳送完畢
    lastSendPacketNum = nextseqnum
    # rnwd傳送方沒用到
    print ("=============================" +  str(lastSendPacketNum) + "============================== ")
    client_socket.sendto(pkt_struct.pack(*(nextseqnum, ack, end_flag, 1 , 'end'.encode('utf-8'))), server_address)
    break

當收到確認包後,對確認的包進行刪除, 把從base到確認的包 之間的包全部從緩衝區刪除, 更新base值,
同時會更新cwnd值, 程式碼在下面的阻塞控制實現可以看到。

如果緩衝區為0 停止定時器, 否則啟動計數器

#如果緩衝區為0  停止定時器, 否則啟動定時器
if (base == nextseqnum):
    mytimer.cancel()
else:
    mytimer.cancel()
    mytimer = threading.Timer(MAX_TIME_OUT, timeout, [base, nextseqnum, sendBuffer,client_socket, lastSendPacketNum, server_address])
    mytimer.start()

對於狀態2:(傳送資料包
當沒有收到包, 就會進入傳送資料包的狀態。。狀態實際實現中,將socket的接收方式設定為非阻塞,一旦沒有資料,則會丟擲一個錯誤,我們在捕獲的錯誤中實現這個

注意此時傳送方發包不需要先把原來的包確認,才能傳送包, 這就實現了允許傳送方傳送多個分組,而不需要等待確認
當滿足以上條件, 表明傳送方可以傳送分組, 所以傳送一個包, 傳送前,把傳送包加入到快取區 這裡面儲存的是傳送但未接受的包。
當base == nextseqnum 說明此時緩衝區為空, 這是應該啟動定時器, 來檢視是否丟包

sendBuffer[nextseqnum] = pkt_struct.pack(*(nextseqnum, ack, end_flag, 1, data))
#當base和nextseqnum相等時, 開始計時
if (base == nextseqnum) :
    mytimer = threading.Timer(MAX_TIME_OUT, timeout,
                              [base, nextseqnum, sendBuffer, client_socket, lastSendPacketNum, server_address])
    mytimer.start()

如果發生超時了,則要跳到處理超時的函式。重發base到nextseqnum - 1範圍內的包,即已經發送了但卻未確認的包。

#重新發送緩衝區裡的所有包
print("重新 send packet:" + str(base) + "~" + str(nextseqnum - 1))
for i in range (base, nextseqnum + 1):
    try :
        client_socket.sendto(sendBuffer[i], server_address)
        print ("重新發送packet: " + str(i))
    except:
        #如果緩衝區的包已經發完, 將停止傳送
        mytimer.cancel()

重發的包也有可能丟,因此也要重新設定定時器。

#重新啟動定時器
mytimer = threading.Timer(MAX_TIME_OUT, timeout, [base, nextseqnum, sendBuffer, client_socket, lastSendPacketNum, server_address])
mytimer.start()

接收方的動作則比較簡單,接收方維護一個expected值就行了。如果收到的序列號為expected,則更新expected,返回當前序列號的ACK包,否則不更新expected,返回值為expected的ACK包。

2. 實現流控制

流控制,是讓傳送方的傳送速率不要太快,讓接收方來得及接受。
接收端維護一個接收快取,每次處理資料之後,接收端把當前快取的空閒空間的大小(rwnd)返回給傳送端。傳送端跟蹤兩個變數,LastByteSent和LastByteAcked,這兩個值的差就是主機A傳送到連線中但未被確認的資料量。傳送端要將未確認的資料量控制在rwnd以內,以確保傳送端不會使主機B的接收快取溢位。

receive

實現方法

我們在接收端使用一個佇列來模擬接收快取。因為我們用的python的socket介面,實際上socket.recvfrom(size)這個方法是從UDP接收快取中獲取size個位元組的資料,快取已經是有一個實現了的,通過sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)我們可以知道它的大小為65536。但是這個應用中我們忽視這個底層真正的快取,我們把真正的快取裡的內容當作是仍在網路中傳輸的資料包。

在流控制中,接收方還要多做一些工作。當接收方接收到資料包,先儲存在快取中,當沒有收到資料包了,就從快取中取出資料處理資料包。處理完資料包回覆ACK包時要把當前的rwnd值傳送給傳送方。rwnd值儲存在資料包的頭部資訊中。

rwnd = RCV_BUFFER_SIZE - buff.qsize()

傳送方通過收到ACK包中的rwnd,來限制其傳送速度。在傳送方傳送資料包的狀態中,在傳送之前加一個判斷,倘若已傳送但還未確認的資料量大於rwnd值,則說明此次傳送有可能讓接收方的接收快取溢位,因此不讓發。

  if nextseqnum - base > rwnd: 
    continue

因此傳送方除了維護視窗之外還要維護rwnd值。

3. 實現阻塞控制

參照TCP的擁塞控制演算法,應用實現了慢啟動和擁塞避免。
通過維護一個變數擁塞視窗(cwnd),對傳送方的傳送流量的速率進行了限制。加上前面的流量控制,傳送端必須滿足

LastByteSent - LastByteAcked <= min {cwnd, rwnd } 

何時擁塞:當出現丟包的時候就假設出現網路擁塞的情況。

慢啟動
設定初始cwnd為1,表示一個RTT內傳送一個數據包。開始時應用向接收端傳送一個數據包並等待確認,當收到確認包後,將cwnd值翻倍,指數增長。當檢測到擁塞時,將慢啟動閾值(ssthresh)設為cwnd值的一半,當到達或超過ssthresh值時,結束慢啟動進行擁塞避免模式。

slow
擁塞避免
此時距離擁塞可能並不遙遠,因此不能每過一個RTT將cwnd的值翻番,而採用每次只增加1、當再次出現擁塞時,ssthresh的值被更新為原來的cwnd值的一半,然後將cwnd設為1,進入慢啟動階段。

實現方法

書上慢啟動的圖,實際上是一種停等協議,它是一開始先發一個數據包,然後等待確認,再一次性發2個,再等待2個確認全部確認完才開始發4個。因為我們是流水線地傳送,因此作一些變換:在慢啟動階段,只要收到一個ACK包,就將cwnd加1,這實際上的效果是將速率翻倍了。

同樣像流控制一樣,在傳送資料包之前,檢測在網路上的包數量是否大於rwnd值,如果是的話,就不允許傳送。

 elif nextseqnum - base > cwnd:
    #print("受擁塞控制限制,傳送速率拒絕傳送")
    continue

每次收到ACK包時,就維護cwnd的值。若cwnd>ssthresh,則說明在擁塞避免狀態,這裡不能直接將cwnd加1,而是維護一個變數add_num,一輪結束了才將cwnd加1。若cwnd<=ssthresh,則說明在慢啟動狀態,將cwnd加1。

if cwnd > ssthresh:
    if add >= cwnd:
        cwnd += 1
        add_num = 0
    else:
        add_num += 1
else:
    cwnd += 1

當進入超時函數了,說明出現了擁塞狀態。此時要改變ssthresh為當前cwnd值的一半,將cwnd變為1,回到慢啟動狀態。
實際實現中,因為超時是用另外一個執行緒來處理的,在主執行緒中也會修改cwnd值,因此要避免兩者衝突,要為變數加互斥鎖。

 #當進入超時操作時 阻塞控制 使得cwnd為1, ssthresh 為當時的cwnd的一半, 利用互斥鎖使得更改ssthresh不會衝突
    if mutex.acquire(1):
        ssthresh = cwnd / 2
        #防止ssthresh變為負數
        if (ssthresh < 1) :
            ssthresh = 1
        cwnd = 1
        print("更新cwnd值為" + str(cwnd), "  更新ssthresh值為" + str(ssthresh))
        mutex.release()

這樣,傳送方要想傳送資料包,得滿足3個條件:

  1. 未確認的分組數不能超過N
  2. 未確認的分組數不能超過rwnd
  3. 未確認的分組數不能超過cwnd

4. 服務端要支援多個客戶端同時線上

使用python的threading包。當檢測到有使用者連線到伺服器時,為使用者建立一個程序。

# 建立新的執行緒,處理客戶端的請求
new_thread = threading.Thread(target=serve_client, args=(client_address, message))
new_thread.start()

這樣,對不同的客戶端,會使用不同的執行緒去作相應的處理。

5. 程式要提供必要的出錯反饋資訊

當get檔案不存在時將返回給接收方錯誤資訊

if cmd == 'lget':
    # 檔案不存在,並告知客戶端
    if os.path.exists(SERVER_FOLDER + large_file_name) is False:
        server_socket.sendto('fileNotExists'.encode('utf-8'), client_address)
        # 關閉socket
        server_socket.close()
        return

6.相關的細節

1. 資料包的格式

根據以上設計,資料包得儲存一個Seq值,一個ACK值,一個結束標記值,一個rwnd值,還有1024個位元組的資料。

pkt_struct = struct.Struct('IIII1024s')

實際的方式採用python的struct包,可方便地打包和解包。

server_socket.sendto(pkt_struct.pack(*(nextseqnum, ack, end_flag, 1, data)), client_address)
unpacked_data = pkt_struct.unpack(packed_data)

2. 建立連線的方式

模仿TCP連線,在資料傳送之前要進行三次握手。對於傳送方應該要傳送兩次資料包, 第一次傳送我的命令字串給接收方這是第一次握手,然後接收方傳送允許,這是第二次握手, 第三次 傳送ACK 等待接收方的確認,這是第三次握手。

3. 超時時隔的確定

同樣參考TCP連線。在建立連線的時候先估計RTT的值EstimatedRTT,顯然超時時隔應該大於等於EstimatedRTT,否則將造成不必要的重傳。
根據以下公式確定超時時隔。
T i m e o u t I n t e r v a l = E s t i m a t e d R T T + 4 D e v R T T TimeoutInterval = EstimatedRTT + 4 * DevRTT
DevRTT是偏離值。
D e v R T T = S a m e p l e R T T E s t i m a t e d R T T DevRTT = |SamepleRTT - EstimatedRTT|