1. 程式人生 > >kcp源碼走讀

kcp源碼走讀

row slice 字符數 方式 回來 數據包 重復 false cover

我看的是golang版本的kcp源碼,下載地址:https://github.com/skywind3000/kcp

一個發送數據接收數據的基本流程如下
//發送端有一段數據buffer需要發送,於是調用send函數

kcp1.Send(buf.Bytes())//send函數將buffer分片成kcp的數據包格式,存在待發送隊列中

kcp1.flush()//將發送隊列中的數據通過下層協議(UDP)進行發送

//kcp2接收到下層協議(UDP)傳進來的數據底層數據buffer

kcp2.Input(buffer[:hr], true, false)//調用Input函數將buffer轉換成kcp的數據包格式

hr 
= int32(kcp2.Recv(buffer[:10]))//kcp2將接收的kcp數據包還原成kcp1發送的buffer數據


 
func (kcp *KCP) Recv(buffer []byte) (n int)

    // merge fragment
    count := 0
    for k := range kcp.rcv_queue {
        seg := &kcp.rcv_queue[k]
        copy(buffer, seg.data)
        buffer = buffer[len(seg.data):]
        n += len(seg.data)
        count++
        kcp.delSegment(*seg)
        if seg.frg == 0 {
            break
        }
    }
    if count > 0 {
        kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
    }

count = 0
    for k := range kcp.rcv_buf {
        seg := &kcp.rcv_buf[k]
        if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
            kcp.rcv_nxt++
            count++
        } else {
            break
        }
    }

    if count > 0 {
        kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
        kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
    }
// fast recover
    if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        kcp.probe |= IKCP_ASK_TELL
    }


recv函數將接收消息隊列中的數據包還原成原來的消息格式,通過buffer返回給調用者

還會把rcv_buf中的與接收序號相匹配的數據拷貝到rcv_queue中。這裏註意到在Input->parse_data函數中有同樣的處理,這裏之所以需要重復處理是因為kcp.rcv_queue的大小可能會發生改變,len(kcp.rcv_queue) < int(kcp.rcv_wnd)條件有可能重新成立。

fast_recover標識的意思是快速告知對端我又有窗口大小空出來了,因為在Input函數中有可能窗口會滿了,此時發送給對端的是窗口滿消息,而在recv過後,因為取走了消息,可用接收窗口又變大了,此時需要快速告知對端可以繼續發消息了。

func (kcp *KCP) Send(buffer []byte) int {
    var count int
    if len(buffer) == 0 {
        return -1
    }

    // append to previous segment in streaming mode (if possible)
    if kcp.stream != 0 {
        n := len(kcp.snd_queue)
        if n > 0 {
            seg := &kcp.snd_queue[n-1]
            if len(seg.data) < int(kcp.mss) {
                capacity := int(kcp.mss) - len(seg.data)
                extend := capacity
                if len(buffer) < capacity {
                    extend = len(buffer)
                }

                // grow slice, the underlying cap is guaranteed to
                // be larger than kcp.mss
                oldlen := len(seg.data)
                seg.data = seg.data[:oldlen+extend]
                copy(seg.data[oldlen:], buffer)
                buffer = buffer[extend:]
            }
        }

        if len(buffer) == 0 {
            return 0
        }
    }

    if len(buffer) <= int(kcp.mss) {
        count = 1
    } else {
        count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
    }

    if count > 255 {
        return -2
    }

    if count == 0 {
        count = 1
    }

    for i := 0; i < count; i++ {
        var size int
        if len(buffer) > int(kcp.mss) {
            size = int(kcp.mss)
        } else {
            size = len(buffer)
        }
        seg := kcp.newSegment(size)
        copy(seg.data, buffer[:size])
        if kcp.stream == 0 { // message mode
            seg.frg = uint8(count - i - 1)
        } else { // stream mode
            seg.frg = 0
        }
        kcp.snd_queue = append(kcp.snd_queue, seg)
        buffer = buffer[size:]
    }
    return 0
}

send函數主要功能是把用戶需要發送的字符數組轉化成kcp的數據包。如果用戶的數據超過一個MSS,還會對數據進行分片。這裏有兩種分片的方式,消息方式和流方式。消息方式把用戶數據分片,為每個分片設置ID,將分片後的數據一個一個地存入發送隊列種,接收方通過id解析回原來的包,消息方式一個分片的數據量可能不能達到MSS(最大分片大小)。流方式則是會檢測每個發送隊列裏的分片是否達到最大mss,如果沒有達到就會用新的數據填充分片。流方式的網絡速度優於消息方式,但是流方式接收方接收時是一個分片一個分片地接收,而消息方式kcp接收函數會自己把原本屬於一個數據的分片重組回來。

kcp源碼走讀