first commit
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
andrey1s
2021-04-27 08:22:11 +03:00
commit b5f9c60818
23 changed files with 1214 additions and 0 deletions

220
priority/closer.go Normal file
View File

@@ -0,0 +1,220 @@
package priority
import (
"context"
"log"
"os"
"os/signal"
"sort"
"sync"
"time"
)
// defaults.
const (
First = 250
Normal = 100
Last = 5
)
// Option configure priority closer.
type Option func(*Closer)
// WithHandleError configure priority error handler.
func WithHandleError(he func(error)) Option {
return func(async *Closer) {
async.handler = he
}
}
// WithTimeout configure priority error handler.
func WithTimeout(d time.Duration) Option {
return func(async *Closer) {
async.timeout = d
}
}
// New create new closer with options.
func New(opts ...Option) *Closer {
a := &Closer{}
for _, o := range opts {
o(a)
}
return a
}
// Closer close by priority.
type Closer struct {
sync.Mutex
fnc map[uint8][]func() error
priority prioritySlice
sumPriority uint
once sync.Once
handler func(error)
timeout time.Duration
len int
done chan struct{}
}
// Add adds closed func by normal priority.
func (c *Closer) Add(f ...func() error) {
c.AddByPriority(Normal, f...)
}
// AddLast add closer which execute at the end.
func (c *Closer) AddLast(f ...func() error) {
c.AddByPriority(Last, f...)
}
// AddFirst add closer which execute at the begin.
func (c *Closer) AddFirst(f ...func() error) {
c.AddByPriority(First, f...)
}
// AddByPriority add close by priority 255 its close first 0 - last.
func (c *Closer) AddByPriority(priority uint8, f ...func() error) {
if len(f) == 0 {
return
}
c.Lock()
if c.fnc == nil {
c.fnc = make(map[uint8][]func() error)
}
if len(c.fnc[priority]) == 0 {
c.priority = append(c.priority, priority)
sort.Sort(c.priority)
c.sumPriority += uint(priority)
}
c.len += len(f)
c.fnc[priority] = append(c.fnc[priority], f...)
c.Unlock()
}
// Wait wait signal or cancel context.
func (c *Closer) Wait(ctx context.Context, sig ...os.Signal) {
go func() {
ch := make(chan os.Signal, 1)
if len(sig) > 0 {
signal.Notify(ch, sig...)
defer signal.Stop(ch)
}
select {
case <-ch:
case <-ctx.Done():
}
_ = c.Close()
}()
<-c.wait()
}
// Close closes all func by priority.
func (c *Closer) Close() error {
c.once.Do(func() {
start := time.Now()
c.Lock()
eh := func(err error) {
log.Print(err)
}
if c.handler != nil {
eh = c.handler
}
w := &wait{
timeout: c.timeout,
priority: c.sumPriority,
}
funcs := c.fnc
c.fnc = nil
c.Unlock()
errs := make(chan error, c.len)
go func() {
defer close(c.wait())
for i := 0; i < cap(errs); i++ {
if err := <-errs; err != nil {
eh(err)
}
}
}()
for _, p := range c.priority {
var wg sync.WaitGroup
wg.Add(len(funcs[p]))
for _, f := range funcs[p] {
go func(f func() error) {
errs <- f()
wg.Done()
}(f)
}
w.done(start, uint(p), &wg)
}
<-c.wait()
})
return nil
}
// SetTimeout before close func.
func (c *Closer) SetTimeout(t time.Duration) {
c.Lock()
c.timeout = t
c.Unlock()
}
// SetErrHandler before close func.
func (c *Closer) SetErrHandler(e func(error)) {
c.Lock()
c.handler = e
c.Unlock()
}
func (c *Closer) wait() chan struct{} {
c.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
c.Unlock()
return c.done
}
type wait struct {
timeout time.Duration
priority uint
}
func (w *wait) done(start time.Time, priority uint, wg *sync.WaitGroup) {
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-w.after(start, priority):
case <-done:
}
}
func (w *wait) after(start time.Time, priority uint) <-chan time.Time {
timeout := (w.timeout - time.Since(start)) / time.Duration(w.priority) * time.Duration(priority)
w.priority -= priority
if timeout <= 0 {
return nil
}
return time.After(timeout)
}
type prioritySlice []uint8
func (p prioritySlice) Len() int { return len(p) }
func (p prioritySlice) Less(i, j int) bool { return p[i] > p[j] }
func (p prioritySlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@@ -0,0 +1,112 @@
package priority_test
import (
"context"
"fmt"
"io"
"net"
"net/http"
"syscall"
"time"
"gitoa.ru/go-4devs/closer/priority"
)
type closer struct {
msg string
}
// nolint: forbidigo
func (c *closer) Close() error {
fmt.Print(c.msg)
return nil
}
func OpenDB() io.Closer {
return &closer{msg: "close db. "}
}
func NewConsumer() io.Closer {
return &closer{msg: "close consumer. "}
}
func ExampleCloser_Wait() {
time.AfterFunc(time.Second/2, func() {
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
})
ctx := context.Background()
cl := priority.New(
priority.WithTimeout(time.Second*5),
priority.WithHandleError(func(e error) {
fmt.Print(e)
}),
)
db := OpenDB()
cl.AddLast(db.Close)
// your consumer events
consumer := NewConsumer()
cl.AddFirst(consumer.Close)
s := http.Server{}
// your http listeners
listener, _ := net.Listen("tcp", "127.0.0.1:0")
go func() {
if err := s.Serve(listener); err != nil {
_ = cl.Close()
}
}()
cl.Add(func() error {
fmt.Print("stop server. ")
return s.Shutdown(ctx)
})
cl.Wait(ctx, syscall.SIGTERM, syscall.SIGINT)
// Output: close consumer. stop server. close db.
}
func ExampleCloser_Close() {
cl := priority.New()
cl.Add(func() error {
fmt.Print("normal stop. ")
return nil
}, func() error {
time.Sleep(time.Millisecond)
fmt.Print("long normal stop. ")
return nil
})
cl.AddFirst(func() error {
fmt.Print("first stop. ")
return nil
})
cl.AddLast(func() error {
fmt.Print("last stop. ")
return nil
})
cl.AddByPriority(priority.First+1, func() error {
fmt.Print("run before first. ")
return nil
})
cl.AddByPriority(priority.Normal-1, func() error {
fmt.Print("run after normal. ")
return nil
})
cl.Close()
// Output: run before first. first stop. normal stop. long normal stop. run after normal. last stop.
}

63
priority/closer_test.go Normal file
View File

@@ -0,0 +1,63 @@
package priority_test
import (
"context"
"syscall"
"testing"
"time"
"gitoa.ru/go-4devs/closer/priority"
"gitoa.ru/go-4devs/closer/test"
)
func TestPriority_Close(t *testing.T) {
t.Parallel()
cl := priority.Closer{}
closed := &test.Closed{}
go cl.Wait(context.Background())
cl.Add()
cl.Add(closed.CloseFnc("one", time.Microsecond))
cl.AddLast(closed.CloseFnc("last", time.Microsecond))
cl.AddFirst(closed.CloseFnc("first", time.Millisecond))
cl.AddByPriority(priority.Normal, closed.CloseFnc("one", time.Microsecond), closed.CloseFnc("one", time.Microsecond))
test.RequireNil(t, cl.Close())
closed.TestEqual(t, []string{"first", "one", "one", "one", "last"})
}
func TestPriority_Wait_Timeout(t *testing.T) {
t.Parallel()
closed := &test.Closed{}
cl := priority.New(priority.WithTimeout(time.Second/5), priority.WithHandleError(func(error) {}))
cl.Add(closed.CloseFnc("one", 0))
cl.AddByPriority(priority.First, closed.CloseFnc("first", time.Second))
cl.AddByPriority(priority.Last, closed.CloseFnc("last", 0))
cl.AddLast(func() error {
return test.ErrClose
})
time.AfterFunc(time.Second/3, func() {
test.RequireNil(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
})
cl.Wait(context.Background(), syscall.SIGTERM)
closed.TestEqual(t, []string{"one", "last", "first"})
}
func TestPriority_Wait(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
cl := priority.New(priority.WithHandleError(func(err error) {
test.RequireError(t, err, test.ErrClose)
}))
cl.AddLast(func() error {
return test.ErrClose
})
cl.Wait(ctx)
}