package controllers import ( "io/ioutil" "math/rand" "net/http" "time" "github.com/astaxie/beego" "github.com/gorilla/websocket" ) const ( // Time allowed to write a message to the client. writeWait = 10 * time.Second // Time allowed to read the next message from the client. readWait = 60 * time.Second // Send pings to client with this period. Must be less than readWait. pingPeriod = (readWait * 9) / 10 // Maximum message size allowed from client. maxMessageSize = 512 ) func init() { rand.Seed(time.Now().UTC().UnixNano()) go h.run() } // connection is an middleman between the websocket connection and the hub. type connection struct { username string // The websocket connection. ws *websocket.Conn // Buffered channel of outbound messages. send chan []byte } // readPump pumps messages from the websocket connection to the hub. func (c *connection) readPump() { defer func() { h.unregister <- c c.ws.Close() }() c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadDeadline(time.Now().Add(readWait)) for { op, r, err := c.ws.NextReader() if err != nil { break } switch op { case websocket.OpPong: c.ws.SetReadDeadline(time.Now().Add(readWait)) case websocket.OpText: message, err := ioutil.ReadAll(r) if err != nil { break } h.broadcast <- []byte(c.username + "_" + time.Now().Format("15:04:05") + ":" + string(message)) } } } // write writes a message with the given opCode and payload. func (c *connection) write(opCode int, payload []byte) error { c.ws.SetWriteDeadline(time.Now().Add(writeWait)) return c.ws.WriteMessage(opCode, payload) } // writePump pumps messages from the hub to the websocket connection. func (c *connection) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.ws.Close() }() for { select { case message, ok := <-c.send: if !ok { c.write(websocket.OpClose, []byte{}) return } if err := c.write(websocket.OpText, message); err != nil { return } case <-ticker.C: if err := c.write(websocket.OpPing, []byte{}); err != nil { return } } } } type hub struct { // Registered connections. connections map[*connection]bool // Inbound messages from the connections. broadcast chan []byte // Register requests from the connections. register chan *connection // Unregister requests from connections. unregister chan *connection } var h = &hub{ broadcast: make(chan []byte, maxMessageSize), register: make(chan *connection, 1), unregister: make(chan *connection, 1), connections: make(map[*connection]bool), } func (h *hub) run() { for { select { case c := <-h.register: h.connections[c] = true case c := <-h.unregister: delete(h.connections, c) close(c.send) case m := <-h.broadcast: for c := range h.connections { select { case c.send <- m: default: close(c.send) delete(h.connections, c) } } } } } type WSController struct { beego.Controller } func (this *WSController) Get() { ws, err := websocket.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request.Header, nil, 1024, 1024) if _, ok := err.(websocket.HandshakeError); ok { http.Error(this.Ctx.ResponseWriter, "Not a websocket handshake", 400) return } else if err != nil { return } c := &connection{send: make(chan []byte, 256), ws: ws, username: randomString(10)} h.register <- c go c.writePump() c.readPump() } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) }