1. 程式人生 > >go實現tcp遊戲伺服器sibo——伺服器實現篇

go實現tcp遊戲伺服器sibo——伺服器實現篇

package server

import (
    "net"
    "time"
    "sync"
    "crypto/tls"
    log "github.com/sirupsen/logrus"
    "runtime"
    "bufio"
    "strings"
    "io"
    "sibo/protocol"
    "sibo/share"
    "errors"
    "fmt"
    "sibo/proto"
    "github.com/robfig/cron"
    "path"
    "github.com/lestrrat/go-file-rotatelogs"
"github.com/rifflock/lfshook" ) var ErrServerClosed = errors.New("sibo: Server closed") var shutdownPollInterval = 500 * time.Millisecond var schuduleInterval = 1 * time.Minute // 讀寫緩衝區大小 const ( ReaderBufferSize = 1024 WriterBufferSize = 1024 ) type Server struct { ln net.Listener readTimeout time.Duration writeTimeout time.Duration mu sync.RWMutex activeSession map
[ISession]struct{} // 全域性連線管理器 doneChan chan struct{} // inShutdown int32 // 監聽server的關閉狀態 // TLSConfig for creating tls tcp connection. tlsConfig *tls.Config // tls配置 waitGroup *sync.WaitGroup cron *cron.Cron // 定時任務 scheduleSaveDuration time.Duration // 記憶體資料儲存到資料庫的定時任務的週期 } func
NewServer() *Server { return &Server{ waitGroup: &sync.WaitGroup{}, cron: cron.New(), scheduleSaveDuration: schuduleInterval, } } // 返回伺服器地址 func (s *Server) Address() net.Addr { s.mu.RLock() defer s.mu.RUnlock() if s.ln == nil { return nil } return s.ln.Addr() } // 設定定時任務週期 func (s *Server) SetScheduleSaveDuration(t time.Duration) { s.scheduleSaveDuration = t } func (s *Server) Serve(network, address string) (err error) { var ln net.Listener if s.tlsConfig == nil { ln, err = net.Listen("tcp", address) } else { ln, err = tls.Listen("tcp", address, s.tlsConfig) } s.cron.Start() s.scheduleSavePlayer2DB() // start save job return s.serveListener(ln) } func (s *Server) serveListener(ln net.Listener) error { var tempDelay time.Duration s.mu.Lock() s.ln = ln if s.activeSession == nil { s.activeSession = make(map[ISession]struct{}) } s.mu.Unlock() // 死迴圈監聽來自client的連線 for { conn, e := ln.Accept() if e != nil { select { case <-s.doneChan: return ErrServerClosed default: } if ne, ok := e.(net.Error); ok && ne.Temporary() { // temporary error, retry in one second if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } log.Printf("sibo: Accept error: %v; retrying in %v", e, tempDelay) time.Sleep(tempDelay) continue } log.Println("closed listener") return e } tempDelay = 0 if tc, ok := conn.(*net.TCPConn); ok { tc.SetKeepAlive(true) tc.SetKeepAlivePeriod(3 * time.Minute) } s.mu.Lock() session := NewSession(conn) s.activeSession[session] = struct{}{} s.mu.Unlock() go s.serveSession(session) // 每個連線對應一個goroutine } } func (s *Server) serveSession(session ISession) { log.Println("serve session -> ", session.SessionId()) player := NewPlayer(session) // 客戶端與伺服器斷開連線時執行 defer func() { // execute when client disconnect from server if err := recover(); err != nil { const size = 64 << 10 buf := make([]byte, size) ss := runtime.Stack(buf, false) if ss > size { ss = size } buf = buf[:ss] log.Printf("serving %s panic error: %s, stack:\n %s", session.Conn().RemoteAddr(), err, buf) } s.mu.Lock() delete(s.activeSession, session) s.mu.Unlock() session.Close(func() { log.Println("exec on session close.") /*player.Logout(func() { log.Println("exec on player logout.") })*/ }) }() if tlsConn, ok := session.Conn().(*tls.Conn); ok { if d := s.readTimeout; d != 0 { session.Conn().SetReadDeadline(time.Now().Add(d)) } if d := s.writeTimeout; d != 0 { session.Conn().SetWriteDeadline(time.Now().Add(d)) } if err := tlsConn.Handshake(); err != nil { log.Printf("sibo: TLS handshake error from %s: %v", session.Conn().RemoteAddr(), err) return } } r := bufio.NewReaderSize(session.Conn(), ReaderBufferSize) // client與server建立連線後,迴圈接受來自客戶端的請求資料 for { t0 := time.Now() if s.readTimeout != 0 { session.Conn().SetReadDeadline(t0.Add(s.readTimeout)) } req, err := session.ReadRequest(r) if err != nil { if err == io.EOF { // client close conn //player.SaveAll() log.Printf("client has closed this connection: %s", session.Conn().RemoteAddr().String()) } else if strings.Contains(err.Error(), "use of closed network connection") { // (伺服器主動關閉conn)conn已被conn.Close()關閉,從conn讀取會報這個錯誤 log.Printf("sibo: connection %s is closed", session.Conn().RemoteAddr().String()) } else { log.Printf("sibo: failed to read request: %v", err) } player.Logout(func() { log.Println("player logout on session close") }) /*session.Close(func() { player.SaveAll() log.Println("session close : " + session.SessionId()) })*/ return } if s.writeTimeout != 0 { session.Conn().SetWriteDeadline(t0.Add(s.writeTimeout)) } // 業務邏輯用單獨的goroutine處理以防請求被阻塞(此處後期可使用類似fasthttp的goroutine池來優化) go func() { if req.IsHeartbeat() { req.SetMessageType(protocol.Response) session.SendResponse(req) return } res, err := s.handleRequest(player, req) if err != nil { log.Printf("sibo: failed to handle request: %v", err) } if !req.IsOneway() { session.SendResponse(res) } protocol.FreeMsg(req) protocol.FreeMsg(res) }() } } // 請求處理(業務邏輯) func (s *Server) handleRequest(player IPlayer, req *protocol.Message) (res *protocol.Message, err error) { res = req.Clone() res.SetMessageType(protocol.Response) codec, ok := share.Codecs[req.SerializeType()] if !ok { err = fmt.Errorf("can not find codec for %d", req.SerializeType()) handleError(res, err) return res, err } compression, ok := share.Compression[req.CompressType()] if !ok { err = fmt.Errorf("can not find compress type for %d", req.CompressType()) handleError(res, err) return res, err } payload, err := compression.Decompress(req.Payload) if err != nil { err = fmt.Errorf("UnZip error for compress type %d", req.CompressType()) handleError(res, err) return res, err } mmId := req.ModuleMessageID() if _, ok := proto.RequestMap[mmId]; !ok { handleError(res, errors.New("messageID not exist")) } request := proto.RequestMap[mmId]() err = codec.Decode(payload, request) if err != nil { return handleError(res, err) } responsePayload, err := ProcessorMap[mmId].Process(player, request) if !req.IsOneway() { data, err := codec.Encode(responsePayload) if err != nil { handleError(res, err) return res, err } payload, err := compression.Compress(data) if err != nil { handleError(res, err) return res, err } res.Payload = payload } return res, nil } // 關閉伺服器 func (s *Server) Close() error { s.mu.Lock() s.closeDoneChan() var err error if s.ln != nil { err = s.ln.Close() } s.mu.Unlock() s.cron.Stop() // stop schedule job s.saveAllPlayer2DB() // 儲存資料 s.closeSessions() //關閉所有連線 s.waitGroup.Wait() return err } // 關閉所有已建立的連線 func (s *Server) closeSessions() { s.mu.Lock() defer s.mu.Unlock() for session := range s.activeSession { session.Close(func() { log.Println("session close on by server: " + session.SessionId()) }) delete(s.activeSession, session) } } // 定時任務儲存玩家資料 func (s *Server) scheduleSavePlayer2DB() { //spec := "0 */2 * * * ?" s.cron.Schedule(cron.Every(s.scheduleSaveDuration), new(SavePlayerJob)) } // 儲存玩家資料到資料庫,shutdown graceful func (s *Server) saveAllPlayer2DB() { if PlayerId2PlayerMap.Len() > 0 { for _, player := range PlayerId2PlayerMap.Values() { //player, ok := PlayerId2PlayerMap.Get(k) s.waitGroup.Add(1) go func(player IPlayer) { // 此處每個玩家使用一個goroutine,玩家數多可能有問題,可修改為一個goroutine處理多個玩家 defer s.waitGroup.Done() log.Println("task start") //time.Sleep(2 * time.Second) 模仿資料儲存 player.SaveAll() log.Println("task end") }(player) } PlayerId2PlayerMap.Clear() } } // 這裡使用了一點程式碼的trick將chan當作sync.Cond使用 func (s *Server) closeDoneChan() { if s.doneChan == nil { s.doneChan = make(chan struct{}) } select { case <-s.doneChan: // Already closed. Don't close again. default: // Safe to close here. We're the only closer, guarded // by s.mu. close(s.doneChan) } } // 日誌配置 func (s *Server) ConfigLocalFilesystemLogger(logPath, logFileName string, maxAge time.Duration, rotationTime time.Duration) { baseLogPaht := path.Join(logPath, logFileName) writer, err := rotatelogs.New( baseLogPaht+".%Y%m%d%H%M", rotatelogs.WithLinkName(baseLogPaht), // 生成軟鏈,指向最新日誌檔案 rotatelogs.WithMaxAge(maxAge), // 檔案最大儲存時間 rotatelogs.WithRotationTime(rotationTime), // 日誌切割時間間隔 ) if err != nil { log.Errorf("config local file system logger error. %+v", err) } lfHook := lfshook.NewHook(lfshook.WriterMap{ log.DebugLevel: writer, // 為不同級別設定不同的輸出目的 log.InfoLevel: writer, log.WarnLevel: writer, log.ErrorLevel: writer, log.FatalLevel: writer, log.PanicLevel: writer, }) log.AddHook(lfHook) } func handleError(res *protocol.Message, err error) (*protocol.Message, error) { res.SetMessageStatusType(protocol.Error) return res, err }

原始碼地址