1. 程式人生 > >Go websocket 做訊息推送(視訊彈幕的簡單實現原理)

Go websocket 做訊息推送(視訊彈幕的簡單實現原理)

  • server.go
package main

import (
	"github.com/gorilla/websocket"
	"net/http"
	"socket/impl"
	"time"
)

var (
	upGrader = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
)

func wsHandler(w http.ResponseWriter, r *http.Request) {

	var (
		wsConn *websocket.
Conn err error data []byte conn *impl.Connection ) if wsConn, err = upGrader.Upgrade(w, r, nil); err != nil { return } if conn, err = impl.InitConnection(wsConn); err != nil { goto ERR } go func() { for { if err := conn.WriteMessage([]byte("heartbeat")); err != nil { return
} time.Sleep(1 * time.Second) } }() for { if data, err = conn.ReadMessage(); err != nil { goto ERR } if err = conn.WriteMessage(data); err != nil { goto ERR } } ERR: conn.Close() } func main() { http.HandleFunc("/ws", wsHandler) http.ListenAndServe("0.0.0.0:7777", nil) }
  • connection.go
package impl

import (
	"errors"
	"github.com/gorilla/websocket"
	"sync"
)

type Connection struct {
	wsConn    *websocket.Conn
	inChan    chan []byte
	outChan   chan []byte
	closeChan chan byte

	mutex    sync.Mutex
	isClosed bool
}

func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
	conn = &Connection{
		wsConn:    wsConn,
		inChan:    make(chan []byte, 1000),
		outChan:   make(chan []byte, 1000),
		closeChan: make(chan byte, 1),
	}

	//啟動讀協程
	go conn.readLoop()
	//啟動寫協程
	go conn.writeLoop()
	return
}

//API
func (conn *Connection) ReadMessage() (data []byte, err error) {
	select {
	case data = <-conn.inChan:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

func (conn *Connection) WriteMessage(data []byte) (err error) {
	select {
	case conn.outChan <- data:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return
}

func (conn *Connection) Close() {
	//執行緒安全,可重入
	conn.wsConn.Close()

	//要保證close方法只被執行一次
	//close(conn.closeChan)
	conn.mutex.Lock()
	if !conn.isClosed {
		close(conn.closeChan)
		conn.isClosed = true
	}
	conn.mutex.Unlock()
}

func (conn *Connection) readLoop() {
	var (
		data []byte
		err  error
	)

	for {

		if _, data, err = conn.wsConn.ReadMessage(); err != nil {
			goto ERR
		}

		//可能會阻塞,等待inChan,因為inChan只有1000的容量
		select {
		case conn.inChan <- data:
		case <-conn.closeChan:
			//當closeChan被關閉的時候
			goto ERR
		}
	}
ERR:
	conn.Close()
}

func (conn *Connection) writeLoop() {

	var (
		data []byte
		err  error
	)

	for {
		select {
		case data = <-conn.outChan:
		case <-conn.closeChan:
			goto ERR
		}

		if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
			goto ERR
		}
	}
ERR:
	conn.Close()
}