1. 程式人生 > >Go語言併發程式設計(三)

Go語言併發程式設計(三)

Telnet迴音伺服器

Telnet協議是TCP/IP協議族中的一種。它允許使用者(Telnet客戶端)通過一個協商過程與一個遠端裝置進行通訊。本例將使用一部分Telnet協議與伺服器進行通訊。

伺服器的網路庫為了完整展示自己的程式碼實現了完整的收發過程,一般比較傾向於使用傳送任意封包返回原資料的邏輯。這個過程類似於對著大山高喊,大山把你的聲音原樣返回的過程。也就是迴音(Echo)。本節使用Go語言中的socket、goroutine和通道編寫一個簡單的Telnet協議的迴音伺服器。

迴音伺服器的程式碼分為4個部分,分別是接受連線、會話處理、Telnet命令處理和程式入口。

1.接受連線

迴音伺服器能同時服務於多個連線。要接受連線就需要先建立偵聽器,偵聽器需要一個偵聽地址和協議型別。就像你想賣東西,需要先確認賣什麼東西,賣東西的型別就是協議型別,然後需要一個店面,店面位於街區的某個位置,這就是偵聽器的地址。一個伺服器可以開啟多個偵聽器,就像一個街區可以有多個店面。街區上的編號對應的就是地址中的埠號,如圖1-2所示。

圖1-2   IP和埠號

  • 主機 IP:一般為一個 IP 地址或者域名,127.0.0.1 表示本機地址。
  • 埠號:16 位無符號整型值,一共有 65536 個有效埠號。

通過地址和協議名建立偵聽器後,可以使用偵聽器響應客戶端連線。響應連線是一個不斷迴圈的過程,就像到銀行辦理業務時,一般是排隊處理,前一個人辦理完後,輪到下一個人辦理。

我們把每個客戶端連線處理業務的過程叫做會話。在會話中處理的操作和接受連線的業務並不衝突可以同時進行。就像銀行有 3 個視窗,喊號器會將使用者分配到不同的櫃檯。這裡的喊號器就是 Accept 操作,視窗的數量就是 CPU 的處理能力。因此,使用 goroutine 可以輕鬆實現會話處理和接受連線的併發執行。

如圖1-3清晰地展現了這一過程。

圖1-3   socket處理過程

Go語言中可以根據實際會話數量建立多個goroutine,並自動的排程它們的處理。

server.go

package main

import (
    "fmt"
    "net"
)

// 服務邏輯, 傳入地址和退出的通道
func server(address string, exitChan chan int) {

    // 根據給定地址進行偵聽
    l, err := net.Listen("tcp", address)

    // 如果偵聽發生錯誤, 列印錯誤並退出
    if err != nil {
        fmt.Println(err.Error())
        exitChan <- 1
    }

    // 列印偵聽地址, 表示偵聽成功
    fmt.Println("listen: " + address)

    // 延遲關閉偵聽器
    defer l.Close()

    // 偵聽迴圈
    for {

        // 新連線沒有到來時, Accept是阻塞的
        conn, err := l.Accept()

        // 發生任何的偵聽錯誤, 列印錯誤並退出伺服器
        if err != nil {
            fmt.Println(err.Error())
            continue
        }

        // 根據連線開啟會話, 這個過程需要並行執行
        go handleSession(conn, exitChan)
    }
}

  

程式碼說明如下:

  • 第9行,接受連線的入口,address為傳入的地址,退出伺服器使用exitChan的通道控制。往exitChan寫入一個整型值時,程序將以整型值作為程式返回值來結束伺服器。
  • 第12行,使用net包的Listen()函式進行偵聽。這個函式需要提供兩個引數,第一個引數為協議型別,本例需要做的是TCP連線,因此填入“tcp”;address為地址,格式為“主機:埠號”。
  • 第15行,如果偵聽發生錯誤,通過第17行,往exitChan中寫入非0值結束伺服器,同時列印偵聽錯誤。
  • 第24行,使用defer,將偵聽器的結束延遲呼叫。
  • 第27行,偵聽開始後,開始進行連線接受,每次接受連線後需要繼續接受新的連線,周而復始。
  • 第30行,伺服器接受了一個連線。在沒有連線時,Accept()函式呼叫後會一直阻塞。連線到來時,返回conn和錯誤變數,conn的型別是*tcp.Conn。
  • 第33行,某些情況下,連線接受會發生錯誤,不影響伺服器邏輯,這時重新進行新連線接受。
  • 第39行,每個連線會生成一個會話。這個會話的處理與接受邏輯需要並行執行,彼此不干擾。

2.會話處理

每個連線的會話就是一個接收資料的迴圈。當沒有資料時,呼叫reader.ReadString會發生阻塞,等待資料的到來。一旦資料到來,就可以進行各種邏輯處理。

迴音伺服器的基本邏輯是“收到什麼返回什麼”,reader.ReadString可以一直讀取socket連線中的資料直到碰到期望的結尾符。這種期望的結尾符也叫定界符,一般用於將TCP封包中的邏輯資料拆分開。下例中使用的定界符是回車換行符(“\r\n”),HTTP協議也是使用同樣的定界符。使用reader.ReadString()函式可以將封包簡單地拆分開。

如圖1-4所示為Telnet資料處理過程。

圖1-4   Telnet資料處理過程

迴音伺服器需要將收到的有效資料通過socket傳送回去。

session.go

package main

import (
    "bufio"
    "fmt"
    "net"
    "strings"
)

// 連線的會話邏輯
func handleSession(conn net.Conn, exitChan chan int) {

    fmt.Println("Session started:")

    // 建立一個網路連線資料的讀取器
    reader := bufio.NewReader(conn)

    // 接收資料的迴圈
    for {

        // 讀取字串, 直到碰到回車返回
        str, err := reader.ReadString('\n')

        // 資料讀取正確
        if err == nil {

            // 去掉字串尾部的回車
            str = strings.TrimSpace(str)

            // 處理Telnet指令
            if !processTelnetCommand(str, exitChan) {
                conn.Close()
                break
            }

            // Echo邏輯, 發什麼資料, 原樣返回
            conn.Write([]byte(str + "\r\n"))

        } else {
            // 發生錯誤
            fmt.Println("Session closed")
            conn.Close()
            break
        }
    }

}

  

程式碼說明如下:

  • 第11行是會話入口,傳入連線和退出用的通道。handleSession()函式被併發執行。
  • 第16行,使用bufio包的NewReader()方法,建立一個網路資料讀取器,這個Reader將輸入資料的讀取過程進行封裝,方便我們迅速獲取到需要的資料。
  • 第19行,會話處理開始時,從socket連線,通過reader讀取器讀取封包,處理封包後需要繼續讀取從網路傳送過來的下一個封包,因此需要一個會話處理迴圈。
  • 第22行,使用reader.ReadString()方法進行封包讀取。內部會自動處理粘包過程,直到下一個回車符到達後返回資料。這裡認為封包來自Telnet,每個指令以回車換行符(“\r\n”)結尾。
  • 第25行,資料讀取正常時,返回err為nil。如果發生連線斷開、接收錯誤等網路錯誤時,err就不是nil了。
  • 第28行,reader.ReadString讀取返回的字串尾部帶有回車符,使用strings.TrimSpace()函式將尾部帶的回車和空白符去掉。
  • 第31行,將str字串傳入Telnet指令處理函式processTelnetCommand()中,同時傳入退出控制通道exitChan。當這個函式返回false時,表示需要關閉當前連線。
  • 第32行和第33行,關閉當前連線並退出會話接受迴圈。
  • 第37行,將有效資料通過conn的Write()方法寫入,同時在字串尾部添加回車換行符(“\r\n”),資料將被socket傳送給連線方。
  • 第41~43行,處理當reader.ReadString()函式返回錯誤時,列印錯誤資訊並關閉連線,退出會話並接收迴圈。

3.Telnet命令處理

Telnet是一種協議。在作業系統中可以在命令列使用Telnet命令發起TCP連線。我們一般用Telnet來連線TCP伺服器,鍵盤輸入一行字元回車後,即被髮送到伺服器上。

在下例中,定義了以下兩個特殊控制指令,用以實現一些功能:

  • 輸入“@close”退出當前連線會話。
  • 輸入“@shutdown”終止伺服器執行。

Telnet命令處理:

telnet.go

package main

import (
    "fmt"
    "strings"
)

func processTelnetCommand(str string, exitChan chan int) bool {

    // @close指令表示終止本次會話
    if strings.HasPrefix(str, "@close") {

        fmt.Println("Session closed")

        // 告訴外部需要斷開連線
        return false

        // @shutdown指令表示終止服務程序
    } else if strings.HasPrefix(str, "@shutdown") {

        fmt.Println("Server shutdown")

        // 往通道中寫入0, 阻塞等待接收方處理
        exitChan <- 0

        // 告訴外部需要斷開連線
        return false
    }

    // 列印輸入的字串
    fmt.Println(str)

    return true

}

  

程式碼說明如下:

  • 第8行,處理Telnet命令的函式入口,傳入有效字元並退出通道。
  • 第11~16行,當輸入字串中包含“@close”字首時,在第16行返回false,表示需要關閉當前會話。
  • 第19~27行,當輸入字串中包含“@shutdown”字首時,第24行將0寫入exitChan,表示結束伺服器。
  • 第31行,沒有特殊的控制字元時,列印輸入的字串。

4.程式入口
Telnet迴音處理主流程:

main.go

package main

import (
	"os"
)

func main() {

	// 建立一個程式結束碼的通道
	exitChan := make(chan int)

	// 將伺服器併發執行
	go server("127.0.0.1:7001", exitChan)

	// 通道阻塞,等待接收返回值
	code := <-exitChan

	// 標記程式返回值,並退出
	os.Exit(code)
}

  

程式碼說明如下:

  • 第10行,建立一個整型的無緩衝通道作為退出訊號。
  • 第13行,接受連線的過程可以併發操作,使用go將server()函式開啟goroutine。
  • 第16行,從exitChan中取出返回值。如果取不到資料就一直阻塞。
  • 第19行,將程式返回值傳入os.Exit()函式中並終止程式。

編譯所有程式碼並執行,命令列提示如下:

listen: 127.0.0.1:7001

  

此時,socket偵聽成功。在作業系統中的命令列中輸入:

telnet 127.0.0.1 7001

  

嘗試連線本地的7001埠。接下來進入測試伺服器的流程。

5.測試輸入字串

在Telnet連線後,輸入字串hello,Telnet命令列顯示如下:

$ telnet 127.0.0.1 7001
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hello
hello

  

伺服器顯示如下:

listen: 127.0.0.1:7001
Session started:
hello

  

客戶端輸入的字串會在伺服器中顯示,同時客戶端也會收到自己發給伺服器的內容,這就是一次迴音。

6.測試關閉會話
當輸入@close時,Telnet命令列顯示如下:

@close
Connection closed by foreign host

  

伺服器顯示如下:

Session closed

  

此時,客戶端Telnet與伺服器斷開連線。

7.測試關閉伺服器

測試關閉伺服器
當輸入@shutdown時,Telnet命令列顯示如下:

@shutdown
Connection closed by foreign host.

  

伺服器顯示如下:

Server shutdown

  

此時伺服器會自動關閉。