Beego/example/chat/controllers/ws.go

177 lines
3.9 KiB
Go

// Beego (http://beego.me/)
// @description beego is an open-source, high-performance web framework for the Go programming language.
// @link http://github.com/astaxie/beego for the canonical source repository
// @license http://github.com/astaxie/beego/blob/master/LICENSE
// @authors Unknwon
package controllers
import (
"io/ioutil"
"math/rand"
"net/http"
"time"
"github.com/astaxie/beego"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the client.
writeWait = 10 * time.Second
// Time allowed to read the next message from the client.
readWait = 60 * time.Second
// Send pings to client with this period. Must be less than readWait.
pingPeriod = (readWait * 9) / 10
// Maximum message size allowed from client.
maxMessageSize = 512
)
func init() {
rand.Seed(time.Now().UTC().UnixNano())
go h.run()
}
// connection is an middleman between the websocket connection and the hub.
type connection struct {
username string
// The websocket connection.
ws *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
// readPump pumps messages from the websocket connection to the hub.
func (c *connection) readPump() {
defer func() {
h.unregister <- c
c.ws.Close()
}()
c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(readWait))
for {
op, r, err := c.ws.NextReader()
if err != nil {
break
}
switch op {
case websocket.OpPong:
c.ws.SetReadDeadline(time.Now().Add(readWait))
case websocket.OpText:
message, err := ioutil.ReadAll(r)
if err != nil {
break
}
h.broadcast <- []byte(c.username + "_" + time.Now().Format("15:04:05") + ":" + string(message))
}
}
}
// write writes a message with the given opCode and payload.
func (c *connection) write(opCode int, payload []byte) error {
c.ws.SetWriteDeadline(time.Now().Add(writeWait))
return c.ws.WriteMessage(opCode, payload)
}
// writePump pumps messages from the hub to the websocket connection.
func (c *connection) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.ws.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
c.write(websocket.OpClose, []byte{})
return
}
if err := c.write(websocket.OpText, message); err != nil {
return
}
case <-ticker.C:
if err := c.write(websocket.OpPing, []byte{}); err != nil {
return
}
}
}
}
type hub struct {
// Registered connections.
connections map[*connection]bool
// Inbound messages from the connections.
broadcast chan []byte
// Register requests from the connections.
register chan *connection
// Unregister requests from connections.
unregister chan *connection
}
var h = &hub{
broadcast: make(chan []byte, maxMessageSize),
register: make(chan *connection, 1),
unregister: make(chan *connection, 1),
connections: make(map[*connection]bool),
}
func (h *hub) run() {
for {
select {
case c := <-h.register:
h.connections[c] = true
case c := <-h.unregister:
delete(h.connections, c)
close(c.send)
case m := <-h.broadcast:
for c := range h.connections {
select {
case c.send <- m:
default:
close(c.send)
delete(h.connections, c)
}
}
}
}
}
type WSController struct {
beego.Controller
}
func (this *WSController) Get() {
ws, err := websocket.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request.Header, nil, 1024, 1024)
if _, ok := err.(websocket.HandshakeError); ok {
http.Error(this.Ctx.ResponseWriter, "Not a websocket handshake", 400)
return
} else if err != nil {
return
}
c := &connection{send: make(chan []byte, 256), ws: ws, username: randomString(10)}
h.register <- c
go c.writePump()
c.readPump()
}
func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}
func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}