golint grace

This commit is contained in:
astaxie 2015-09-10 16:35:40 +08:00
parent 01a5e54264
commit 65fb7ce391
4 changed files with 58 additions and 46 deletions

View File

@ -4,7 +4,7 @@ import "net"
type graceConn struct { type graceConn struct {
net.Conn net.Conn
server *graceServer server *Server
} }
func (c graceConn) Close() error { func (c graceConn) Close() error {

View File

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Package grace use to hot reload
// Description: http://grisha.org/blog/2014/06/03/graceful-restart-in-golang/ // Description: http://grisha.org/blog/2014/06/03/graceful-restart-in-golang/
// //
// Usage: // Usage:
@ -52,26 +53,36 @@ import (
) )
const ( const (
PRE_SIGNAL = iota // PreSignal is the position to add filter before signal
POST_SIGNAL PreSignal = iota
// PostSignal is the position to add filter after signal
PostSignal
STATE_INIT // StateInit represent the application inited
STATE_RUNNING StateInit
STATE_SHUTTING_DOWN // StateRunning represent the application is running
STATE_TERMINATE StateRunning
// StateShuttingDown represent the application is shutting down
StateShuttingDown
// StateTerminate represent the application is killed
StateTerminate
) )
var ( var (
regLock *sync.Mutex regLock *sync.Mutex
runningServers map[string]*graceServer runningServers map[string]*Server
runningServersOrder []string runningServersOrder []string
socketPtrOffsetMap map[string]uint socketPtrOffsetMap map[string]uint
runningServersForked bool runningServersForked bool
DefaultReadTimeOut time.Duration // DefaultReadTimeOut is the HTTP read timeout
DefaultWriteTimeOut time.Duration DefaultReadTimeOut time.Duration
// DefaultWriteTimeOut is the HTTP Write timeout
DefaultWriteTimeOut time.Duration
// DefaultMaxHeaderBytes is the Max HTTP Herder size, default is 0, no limit
DefaultMaxHeaderBytes int DefaultMaxHeaderBytes int
DefaultTimeout time.Duration // DefaultTimeout is the shutdown server's timeout. default is 60s
DefaultTimeout time.Duration
isChild bool isChild bool
socketOrder string socketOrder string
@ -81,7 +92,7 @@ func init() {
regLock = &sync.Mutex{} regLock = &sync.Mutex{}
flag.BoolVar(&isChild, "graceful", false, "listen on open fd (after forking)") flag.BoolVar(&isChild, "graceful", false, "listen on open fd (after forking)")
flag.StringVar(&socketOrder, "socketorder", "", "previous initialization order - used when more than one listener was started") flag.StringVar(&socketOrder, "socketorder", "", "previous initialization order - used when more than one listener was started")
runningServers = make(map[string]*graceServer) runningServers = make(map[string]*Server)
runningServersOrder = []string{} runningServersOrder = []string{}
socketPtrOffsetMap = make(map[string]uint) socketPtrOffsetMap = make(map[string]uint)
@ -91,7 +102,7 @@ func init() {
} }
// NewServer returns a new graceServer. // NewServer returns a new graceServer.
func NewServer(addr string, handler http.Handler) (srv *graceServer) { func NewServer(addr string, handler http.Handler) (srv *Server) {
regLock.Lock() regLock.Lock()
defer regLock.Unlock() defer regLock.Unlock()
if !flag.Parsed() { if !flag.Parsed() {
@ -105,23 +116,23 @@ func NewServer(addr string, handler http.Handler) (srv *graceServer) {
socketPtrOffsetMap[addr] = uint(len(runningServersOrder)) socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
} }
srv = &graceServer{ srv = &Server{
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
sigChan: make(chan os.Signal), sigChan: make(chan os.Signal),
isChild: isChild, isChild: isChild,
SignalHooks: map[int]map[os.Signal][]func(){ SignalHooks: map[int]map[os.Signal][]func(){
PRE_SIGNAL: map[os.Signal][]func(){ PreSignal: map[os.Signal][]func(){
syscall.SIGHUP: []func(){}, syscall.SIGHUP: []func(){},
syscall.SIGINT: []func(){}, syscall.SIGINT: []func(){},
syscall.SIGTERM: []func(){}, syscall.SIGTERM: []func(){},
}, },
POST_SIGNAL: map[os.Signal][]func(){ PostSignal: map[os.Signal][]func(){
syscall.SIGHUP: []func(){}, syscall.SIGHUP: []func(){},
syscall.SIGINT: []func(){}, syscall.SIGINT: []func(){},
syscall.SIGTERM: []func(){}, syscall.SIGTERM: []func(){},
}, },
}, },
state: STATE_INIT, state: StateInit,
Network: "tcp", Network: "tcp",
} }
srv.Server = &http.Server{} srv.Server = &http.Server{}
@ -137,13 +148,13 @@ func NewServer(addr string, handler http.Handler) (srv *graceServer) {
return return
} }
// refer http.ListenAndServe // ListenAndServe refer http.ListenAndServe
func ListenAndServe(addr string, handler http.Handler) error { func ListenAndServe(addr string, handler http.Handler) error {
server := NewServer(addr, handler) server := NewServer(addr, handler)
return server.ListenAndServe() return server.ListenAndServe()
} }
// refer http.ListenAndServeTLS // ListenAndServeTLS refer http.ListenAndServeTLS
func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler) error { func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler) error {
server := NewServer(addr, handler) server := NewServer(addr, handler)
return server.ListenAndServeTLS(certFile, keyFile) return server.ListenAndServeTLS(certFile, keyFile)

View File

@ -11,10 +11,10 @@ type graceListener struct {
net.Listener net.Listener
stop chan error stop chan error
stopped bool stopped bool
server *graceServer server *Server
} }
func newGraceListener(l net.Listener, srv *graceServer) (el *graceListener) { func newGraceListener(l net.Listener, srv *Server) (el *graceListener) {
el = &graceListener{ el = &graceListener{
Listener: l, Listener: l,
stop: make(chan error), stop: make(chan error),
@ -46,17 +46,17 @@ func (gl *graceListener) Accept() (c net.Conn, err error) {
return return
} }
func (el *graceListener) Close() error { func (gl *graceListener) Close() error {
if el.stopped { if gl.stopped {
return syscall.EINVAL return syscall.EINVAL
} }
el.stop <- nil gl.stop <- nil
return <-el.stop return <-gl.stop
} }
func (el *graceListener) File() *os.File { func (gl *graceListener) File() *os.File {
// returns a dup(2) - FD_CLOEXEC flag *not* set // returns a dup(2) - FD_CLOEXEC flag *not* set
tl := el.Listener.(*net.TCPListener) tl := gl.Listener.(*net.TCPListener)
fl, _ := tl.File() fl, _ := tl.File()
return fl return fl
} }

View File

@ -15,7 +15,8 @@ import (
"time" "time"
) )
type graceServer struct { // Server embedded http.Server
type Server struct {
*http.Server *http.Server
GraceListener net.Listener GraceListener net.Listener
SignalHooks map[int]map[os.Signal][]func() SignalHooks map[int]map[os.Signal][]func()
@ -30,19 +31,19 @@ type graceServer struct {
// Serve accepts incoming connections on the Listener l, // Serve accepts incoming connections on the Listener l,
// creating a new service goroutine for each. // creating a new service goroutine for each.
// The service goroutines read requests and then call srv.Handler to reply to them. // The service goroutines read requests and then call srv.Handler to reply to them.
func (srv *graceServer) Serve() (err error) { func (srv *Server) Serve() (err error) {
srv.state = STATE_RUNNING srv.state = StateRunning
err = srv.Server.Serve(srv.GraceListener) err = srv.Server.Serve(srv.GraceListener)
log.Println(syscall.Getpid(), "Waiting for connections to finish...") log.Println(syscall.Getpid(), "Waiting for connections to finish...")
srv.wg.Wait() srv.wg.Wait()
srv.state = STATE_TERMINATE srv.state = StateTerminate
return return
} }
// ListenAndServe listens on the TCP network address srv.Addr and then calls Serve // ListenAndServe listens on the TCP network address srv.Addr and then calls Serve
// to handle requests on incoming connections. If srv.Addr is blank, ":http" is // to handle requests on incoming connections. If srv.Addr is blank, ":http" is
// used. // used.
func (srv *graceServer) ListenAndServe() (err error) { func (srv *Server) ListenAndServe() (err error) {
addr := srv.Addr addr := srv.Addr
if addr == "" { if addr == "" {
addr = ":http" addr = ":http"
@ -83,7 +84,7 @@ func (srv *graceServer) ListenAndServe() (err error) {
// CA's certificate. // CA's certificate.
// //
// If srv.Addr is blank, ":https" is used. // If srv.Addr is blank, ":https" is used.
func (srv *graceServer) ListenAndServeTLS(certFile, keyFile string) (err error) { func (srv *Server) ListenAndServeTLS(certFile, keyFile string) (err error) {
addr := srv.Addr addr := srv.Addr
if addr == "" { if addr == "" {
addr = ":https" addr = ":https"
@ -131,9 +132,9 @@ func (srv *graceServer) ListenAndServeTLS(certFile, keyFile string) (err error)
// getListener either opens a new socket to listen on, or takes the acceptor socket // getListener either opens a new socket to listen on, or takes the acceptor socket
// it got passed when restarted. // it got passed when restarted.
func (srv *graceServer) getListener(laddr string) (l net.Listener, err error) { func (srv *Server) getListener(laddr string) (l net.Listener, err error) {
if srv.isChild { if srv.isChild {
var ptrOffset uint = 0 var ptrOffset uint
if len(socketPtrOffsetMap) > 0 { if len(socketPtrOffsetMap) > 0 {
ptrOffset = socketPtrOffsetMap[laddr] ptrOffset = socketPtrOffsetMap[laddr]
log.Println("laddr", laddr, "ptr offset", socketPtrOffsetMap[laddr]) log.Println("laddr", laddr, "ptr offset", socketPtrOffsetMap[laddr])
@ -157,7 +158,7 @@ func (srv *graceServer) getListener(laddr string) (l net.Listener, err error) {
// handleSignals listens for os Signals and calls any hooked in function that the // handleSignals listens for os Signals and calls any hooked in function that the
// user had registered with the signal. // user had registered with the signal.
func (srv *graceServer) handleSignals() { func (srv *Server) handleSignals() {
var sig os.Signal var sig os.Signal
signal.Notify( signal.Notify(
@ -170,7 +171,7 @@ func (srv *graceServer) handleSignals() {
pid := syscall.Getpid() pid := syscall.Getpid()
for { for {
sig = <-srv.sigChan sig = <-srv.sigChan
srv.signalHooks(PRE_SIGNAL, sig) srv.signalHooks(PreSignal, sig)
switch sig { switch sig {
case syscall.SIGHUP: case syscall.SIGHUP:
log.Println(pid, "Received SIGHUP. forking.") log.Println(pid, "Received SIGHUP. forking.")
@ -187,11 +188,11 @@ func (srv *graceServer) handleSignals() {
default: default:
log.Printf("Received %v: nothing i care about...\n", sig) log.Printf("Received %v: nothing i care about...\n", sig)
} }
srv.signalHooks(POST_SIGNAL, sig) srv.signalHooks(PostSignal, sig)
} }
} }
func (srv *graceServer) signalHooks(ppFlag int, sig os.Signal) { func (srv *Server) signalHooks(ppFlag int, sig os.Signal) {
if _, notSet := srv.SignalHooks[ppFlag][sig]; !notSet { if _, notSet := srv.SignalHooks[ppFlag][sig]; !notSet {
return return
} }
@ -204,12 +205,12 @@ func (srv *graceServer) signalHooks(ppFlag int, sig os.Signal) {
// shutdown closes the listener so that no new connections are accepted. it also // shutdown closes the listener so that no new connections are accepted. it also
// starts a goroutine that will serverTimeout (stop all running requests) the server // starts a goroutine that will serverTimeout (stop all running requests) the server
// after DefaultTimeout. // after DefaultTimeout.
func (srv *graceServer) shutdown() { func (srv *Server) shutdown() {
if srv.state != STATE_RUNNING { if srv.state != StateRunning {
return return
} }
srv.state = STATE_SHUTTING_DOWN srv.state = StateShuttingDown
if DefaultTimeout >= 0 { if DefaultTimeout >= 0 {
go srv.serverTimeout(DefaultTimeout) go srv.serverTimeout(DefaultTimeout)
} }
@ -224,26 +225,26 @@ func (srv *graceServer) shutdown() {
// serverTimeout forces the server to shutdown in a given timeout - whether it // serverTimeout forces the server to shutdown in a given timeout - whether it
// finished outstanding requests or not. if Read/WriteTimeout are not set or the // finished outstanding requests or not. if Read/WriteTimeout are not set or the
// max header size is very big a connection could hang // max header size is very big a connection could hang
func (srv *graceServer) serverTimeout(d time.Duration) { func (srv *Server) serverTimeout(d time.Duration) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println("WaitGroup at 0", r) log.Println("WaitGroup at 0", r)
} }
}() }()
if srv.state != STATE_SHUTTING_DOWN { if srv.state != StateShuttingDown {
return return
} }
time.Sleep(d) time.Sleep(d)
log.Println("[STOP - Hammer Time] Forcefully shutting down parent") log.Println("[STOP - Hammer Time] Forcefully shutting down parent")
for { for {
if srv.state == STATE_TERMINATE { if srv.state == StateTerminate {
break break
} }
srv.wg.Done() srv.wg.Done()
} }
} }
func (srv *graceServer) fork() (err error) { func (srv *Server) fork() (err error) {
regLock.Lock() regLock.Lock()
defer regLock.Unlock() defer regLock.Unlock()
if runningServersForked { if runningServersForked {