// Copyright (c) 2014, Suryandaru Triandana // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. package util import ( "fmt" "sync" "sync/atomic" "time" ) type buffer struct { b []byte miss int } // BufferPool is a 'buffer pool'. type BufferPool struct { pool [6]chan []byte size [5]uint32 sizeMiss [5]uint32 sizeHalf [5]uint32 baseline [4]int baseline0 int mu sync.RWMutex closed bool closeC chan struct{} get uint32 put uint32 half uint32 less uint32 equal uint32 greater uint32 miss uint32 } func (p *BufferPool) poolNum(n int) int { if n <= p.baseline0 && n > p.baseline0/2 { return 0 } for i, x := range p.baseline { if n <= x { return i + 1 } } return len(p.baseline) + 1 } // Get returns buffer with length of n. func (p *BufferPool) Get(n int) []byte { if p == nil { return make([]byte, n) } p.mu.RLock() defer p.mu.RUnlock() if p.closed { return make([]byte, n) } atomic.AddUint32(&p.get, 1) poolNum := p.poolNum(n) pool := p.pool[poolNum] if poolNum == 0 { // Fast path. select { case b := <-pool: switch { case cap(b) > n: if cap(b)-n >= n { atomic.AddUint32(&p.half, 1) select { case pool <- b: default: } return make([]byte, n) } else { atomic.AddUint32(&p.less, 1) return b[:n] } case cap(b) == n: atomic.AddUint32(&p.equal, 1) return b[:n] default: atomic.AddUint32(&p.greater, 1) } default: atomic.AddUint32(&p.miss, 1) } return make([]byte, n, p.baseline0) } else { sizePtr := &p.size[poolNum-1] select { case b := <-pool: switch { case cap(b) > n: if cap(b)-n >= n { atomic.AddUint32(&p.half, 1) sizeHalfPtr := &p.sizeHalf[poolNum-1] if atomic.AddUint32(sizeHalfPtr, 1) == 20 { atomic.StoreUint32(sizePtr, uint32(cap(b)/2)) atomic.StoreUint32(sizeHalfPtr, 0) } else { select { case pool <- b: default: } } return make([]byte, n) } else { atomic.AddUint32(&p.less, 1) return b[:n] } case cap(b) == n: atomic.AddUint32(&p.equal, 1) return b[:n] default: atomic.AddUint32(&p.greater, 1) if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) { select { case pool <- b: default: } } } default: atomic.AddUint32(&p.miss, 1) } if size := atomic.LoadUint32(sizePtr); uint32(n) > size { if size == 0 { atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n)) } else { sizeMissPtr := &p.sizeMiss[poolNum-1] if atomic.AddUint32(sizeMissPtr, 1) == 20 { atomic.StoreUint32(sizePtr, uint32(n)) atomic.StoreUint32(sizeMissPtr, 0) } } return make([]byte, n) } else { return make([]byte, n, size) } } } // Put adds given buffer to the pool. func (p *BufferPool) Put(b []byte) { if p == nil { return } p.mu.RLock() defer p.mu.RUnlock() if p.closed { return } atomic.AddUint32(&p.put, 1) pool := p.pool[p.poolNum(cap(b))] select { case pool <- b: default: } } func (p *BufferPool) Close() { if p == nil { return } p.mu.Lock() if !p.closed { p.closed = true p.closeC <- struct{}{} } p.mu.Unlock() } func (p *BufferPool) String() string { if p == nil { return "" } return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}", p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss) } func (p *BufferPool) drain() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: for _, ch := range p.pool { select { case <-ch: default: } } case <-p.closeC: close(p.closeC) for _, ch := range p.pool { close(ch) } return } } } // NewBufferPool creates a new initialized 'buffer pool'. func NewBufferPool(baseline int) *BufferPool { if baseline <= 0 { panic("baseline can't be <= 0") } p := &BufferPool{ baseline0: baseline, baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4}, closeC: make(chan struct{}, 1), } for i, cap := range []int{2, 2, 4, 4, 2, 1} { p.pool[i] = make(chan []byte, cap) } go p.drain() return p }