commit e80be6c6cbfbbbca2bbbc83ea00ae4535bffc0a7 Author: andrey1s Date: Mon Apr 26 14:50:30 2021 +0300 first commit diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..e95735b --- /dev/null +++ b/.drone.yml @@ -0,0 +1,13 @@ +kind: pipeline +name: default + +steps: +- name: test + image: golang + commands: + - go test + +- name: golangci-lint + image: golangci/golangci-lint:v1.26 + commands: + - golangci-lint run diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f4d432a --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +# ---> Go +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..672df6e --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,31 @@ +linters-settings: + dupl: + threshold: 100 + funlen: + lines: 100 + statements: 50 + goconst: + min-len: 2 + min-occurrences: 2 + gocyclo: + min-complexity: 15 + golint: + min-confidence: 0 + govet: + check-shadowing: true + lll: + line-length: 140 + maligned: + suggest-new: true + misspell: + locale: US + +linters: + enable-all: true + +issues: + # Excluding configuration per-path, per-linter, per-text and per-source + exclude-rules: + - path: _test\.go + linters: + - gomnd diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f03940c --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +MIT License Copyright (c) 2020 4devs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice (including the next +paragraph) shall be included in all copies or substantial portions of the +Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS +OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF +OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..48af379 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# daemon + +[![Build Status](https://drone.gitoa.ru/api/badges/go-4devs/daemon/status.svg)](https://drone.gitoa.ru/go-4devs/daemon) +[![Go Report Card](https://goreportcard.com/badge/gitoa.ru/go-4devs/daemon)](https://goreportcard.com/report/gitoa.ru/go-4devs/daemon) +[![GoDoc](https://godoc.org/gitoa.ru/go-4devs/daemon?status.svg)](http://godoc.org/gitoa.ru/go-4devs/daemon) diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..7dd8399 --- /dev/null +++ b/doc.go @@ -0,0 +1,4 @@ +/* +Package daemon for the run job background and manage them. +*/ +package daemon diff --git a/error.go b/error.go new file mode 100644 index 0000000..0cf5cd3 --- /dev/null +++ b/error.go @@ -0,0 +1,53 @@ +package daemon + +import "time" + +// StopJob stop job. +func StopJob(err error) error { + return &stop{e: err} +} + +// IsStoppedJob check stopped job. +func IsStoppedJob(err error) bool { + _, ok := err.(*stop) + return ok +} + +type stop struct { + e error +} + +// Is check type. +func (s *stop) Is(err error) bool { + _, ok := err.(*stop) + return ok +} + +// Error base error interface. +func (s *stop) Error() string { + return s.e.Error() +} + +// GetDelayedJob get delay job. +func GetDelayedJob(err error) (time.Duration, bool) { + if d, ok := err.(*delay); ok { + return d.d, ok + } + + return 0, false +} + +// DelayJob update delay next Run job. +func DelayJob(d time.Duration, err error) error { + return &delay{d: d, e: err} +} + +type delay struct { + d time.Duration + e error +} + +// Error base error interface. +func (d *delay) Error() string { + return d.e.Error() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..412999c --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module gitoa.ru/go-4devs/daemon + +go 1.14 diff --git a/job.go b/job.go new file mode 100644 index 0000000..df4382b --- /dev/null +++ b/job.go @@ -0,0 +1,219 @@ +package daemon + +import ( + "context" + "reflect" + "runtime" + "strings" + "time" +) + +// Option configure job. +type Option func(*Job) + +// Job run job by frequency. +type Job struct { + name string + run Run + stop Run + sem chan struct{} + err chan error + timer Timer + delay time.Duration + freq func(time.Time) time.Duration + handleErr func(error) +} + +// WithName sets job name. +func WithName(name string) Option { + return func(j *Job) { j.name = name } +} + +// WithStop sets stop handle for job. +func WithStop(stop Run) Option { + return func(j *Job) { j.stop = stop } +} + +// WithTimer sets time,r to job. +func WithTimer(timer Timer) Option { + return func(j *Job) { j.timer = timer } +} + +// WithDelay sets delay Run job. +func WithDelay(delay time.Duration) Option { + return func(j *Job) { + j.timer.Reset(delay) + j.delay = delay + } +} + +// WithFreq sets frequency Run job.. +func WithFreq(freq time.Duration) Option { + return func(j *Job) { + j.freq = func(time.Time) time.Duration { + return freq + } + } +} + +// WithSchedule set delay and frequency Run job. +func WithSchedule(next func(time.Time) time.Duration) Option { + return func(j *Job) { + j.freq = next + j.delay = next(time.Now()) + j.timer.Reset(j.delay) + } +} + +// WithRunMiddleware added middleware for the run job. +func WithRunMiddleware(fn ...Handle) Option { + return func(j *Job) { + run := j.run + j.run = func(ctx context.Context) error { + return chain(fn...)(ctx, run) + } + } +} + +// WithStopMiddleware added middleware for the stop job. +func WithStopMiddleware(fn ...Handle) Option { + return func(j *Job) { + stop := j.stop + j.stop = func(ctx context.Context) error { + return chain(fn...)(ctx, stop) + } + } +} + +// WithHandleErr add error hanler. +func WithHandleErr(fn func(error)) Option { + return func(j *Job) { + j.handleErr = fn + } +} + +// Run init function for the change state. +type Run func(ctx context.Context) error + +// Handle middleware interface. +type Handle func(ctx context.Context, next Run) error + +func stopJob(ctx context.Context) error { + return nil +} + +func handleErr(error) {} + +// NewJob creates new job. +func NewJob(run Run, opts ...Option) *Job { + j := Job{ + delay: time.Nanosecond, + sem: make(chan struct{}, 1), + err: make(chan error, 1), + timer: NewTicker(time.Nanosecond), + freq: func(time.Time) time.Duration { + return time.Second + }, + name: getFuncName(run), + stop: stopJob, + run: run, + handleErr: handleErr, + } + j = j.With(opts...) + + return &j +} + +// HandleErr handle returned error. +func (j *Job) HandleErr(err error) { + j.handleErr(err) +} + +// Do run job. +func (j *Job) Do(ctx context.Context) <-chan error { + <-j.timer.Tick() + j.sem <- struct{}{} + err := j.run(ctx) + <-j.sem + + switch tr := err.(type) { + case *stop: + case *delay: + j.timer.Reset(tr.d) + default: + j.timer.Reset(j.freq(time.Now())) + } + j.err <- err + + return j.err +} + +// Stop job. +func (j *Job) Stop(ctx context.Context) error { + return j.stop(ctx) +} + +// String gets job name. +func (j *Job) String() string { + return j.name +} + +// With configure job. +func (j Job) With(opts ...Option) Job { + for _, o := range opts { + o(&j) + } + + return j +} + +func getFuncName(i interface{}) string { + callerName := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() + if callerName == "" { + return "-" + } + + callerName = strings.NewReplacer("(*", "", ")", "").Replace(callerName) + lastIndex := strings.LastIndex(callerName, "/") + + if lastIndex != -1 { + callerName = callerName[lastIndex+1:] + } + + return callerName +} + +func chain(handleFunc ...Handle) Handle { + n := len(handleFunc) + if n > 1 { + lastI := n - 1 + + return func(ctx context.Context, next Run) error { + var ( + chainHandler Run + curI int + ) + + chainHandler = func(currentCtx context.Context) error { + if curI == lastI { + return next(currentCtx) + } + curI++ + err := handleFunc[curI](currentCtx, chainHandler) + curI-- + + return err + } + + return handleFunc[0](ctx, chainHandler) + } + } + + if n == 1 { + return handleFunc[0] + } + + return func(ctx context.Context, next Run) error { + return next(ctx) + } +} diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..c944799 --- /dev/null +++ b/manager.go @@ -0,0 +1,62 @@ +package daemon + +import ( + "context" + "errors" + "sync" +) + +// Manager run jobs. +type Manager struct { + sync.WaitGroup + close chan struct{} + opts []Option +} + +// New creates new manager and configure them. +func New(opts ...Option) *Manager { + m := &Manager{ + opts: opts, + close: make(chan struct{}), + } + + return m +} + +// Do runs job. +func (m *Manager) Do(ctx context.Context, j *Job, opts ...Option) { + m.Add(1) + job := j.With(append(m.opts, opts...)...) + + go func() { + defer func() { + j.HandleErr(job.Stop(ctx)) + m.Done() + }() + + for { + select { + case <-m.close: + return + case <-ctx.Done(): + return + case err := <-job.Do(ctx): + if err != nil { + if errors.Is(err, &stop{}) { + return + } + + j.HandleErr(err) + } + } + } + }() +} + +// Close jobs. +func (m *Manager) Close() error { + close(m.close) + m.Wait() + + return nil +} diff --git a/manager_example_test.go b/manager_example_test.go new file mode 100644 index 0000000..ed44102 --- /dev/null +++ b/manager_example_test.go @@ -0,0 +1,260 @@ +package daemon_test + +import ( + "context" + "errors" + "fmt" + "log" + "sync/atomic" + "time" + + "gitoa.ru/go-4devs/daemon" +) + +var ErrJob = errors.New("some reason") + +func ExampleManager() { + m := daemon.New() + j := daemon.NewJob(func(ctx context.Context) error { + // do some job + return daemon.StopJob(nil) + }, daemon.WithName("awesome job")) + + m.Do(context.Background(), j, + // set frequency run job + daemon.WithFreq(time.Minute), + // set delay for first run job + daemon.WithDelay(time.Second), + // set handler if run job return err + daemon.WithHandleErr(func(err error) { + log.Println(err) + }), + ) + m.Wait() + // Output: +} + +func ExampleManager_withClose() { + m := daemon.New() + + defer func() { + _ = m.Close() + }() + + j := daemon.NewJob(func(ctx context.Context) error { + fmt.Println("do some job;") + return daemon.StopJob(nil) + }, daemon.WithName("awesome job")) + + m.Do(context.Background(), j, daemon.WithFreq(time.Microsecond)) + // some blocked process + time.Sleep(time.Second) + // Output: do some job; +} + +func ExampleManager_withOptions() { + ctx := context.Background() + + middlewareRun := func(ctx context.Context, next daemon.Run) error { + fmt.Println("do some before run all job;") + + err := next(ctx) + + fmt.Println("do some after run all job;") + + return err + } + middlewareStop := func(ctx context.Context, next daemon.Run) error { + fmt.Println("do some before close all job;") + + err := next(ctx) + + fmt.Println("do some after close all job;") + + return err + } + m := daemon.New( + daemon.WithRunMiddleware(middlewareRun), + daemon.WithStopMiddleware(middlewareStop), + daemon.WithHandleErr(func(err error) { + // do some if close return err + log.Println(err) + }), + ) + + j := daemon.NewJob(func(ctx context.Context) error { + fmt.Println("do some job;") + return daemon.StopJob(nil) + }, daemon.WithName("awesome job")) + j2 := daemon.NewJob(func(ctx context.Context) error { + fmt.Println("do some job2;") + return daemon.StopJob(nil) + }, daemon.WithName("awesome job2")) + + m.Do(ctx, j, daemon.WithFreq(time.Minute), daemon.WithDelay(time.Second)) + m.Do(ctx, j2, daemon.WithFreq(time.Nanosecond)) + m.Wait() + // Output: + // do some before run all job; + // do some job2; + // do some after run all job; + // do some before close all job; + // do some after close all job; + // do some before run all job; + // do some job; + // do some after run all job; + // do some before close all job; + // do some after close all job; +} + +func ExampleNewJob() { + ctx := context.Background() + m := daemon.New(func(j *daemon.Job) { + daemon.WithRunMiddleware(func(ctx context.Context, next daemon.Run) error { + fmt.Printf("running job: %s\n", j) + return daemon.StopJob(next(ctx)) + })(j) + }) + j := daemon.NewJob(func(ctx context.Context) error { + // do some + return nil + }, daemon.WithName("my awesome job")) + + m.Do(ctx, j) + + m.Wait() + // Output: running job: my awesome job +} + +func ExampleNewJob_withClose() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + m := daemon.New() + + j := daemon.NewJob(func(ctx context.Context) error { + fmt.Println("do some long job;") + return daemon.StopJob(nil) + }, daemon.WithStop(func(ctx context.Context) error { + fmt.Println("do some close job;") + return nil + })) + + m.Do(ctx, j) + + m.Wait() + // Output: + // do some long job; + // do some close job; +} + +func ExampleJob_withMiddleware() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + m := daemon.New() + + j := daemon.NewJob( + func(ctx context.Context) error { + fmt.Println("do some job;") + return daemon.StopJob(nil) + }, + daemon.WithStop(func(ctx context.Context) error { + fmt.Println("do some close job;") + return nil + }), + daemon.WithRunMiddleware(func(ctx context.Context, next daemon.Run) error { + fmt.Println("do some before run func;") + err := next(ctx) + fmt.Println("do some after run func;") + return err + }), + daemon.WithStopMiddleware(func(ctx context.Context, next daemon.Run) error { + fmt.Println("do some before close func;") + err := next(ctx) + fmt.Println("do some after close func;") + return err + }), + ) + + m.Do(ctx, j) + + m.Wait() + // Output: + // do some before run func; + // do some job; + // do some after run func; + // do some before close func; + // do some close job; + // do some after close func; +} + +func ExampleJob_option() { + var cnt int32 + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + m := daemon.New() + j := daemon.NewJob(func(ctx context.Context) error { + if cnt == 2 { + return daemon.StopJob(nil) + } + atomic.AddInt32(&cnt, 1) + fmt.Println("do some") + return nil + }, + // set freq run job + daemon.WithFreq(time.Microsecond), + // set delay to start job + daemon.WithDelay(time.Nanosecond), + ) + + m.Do(ctx, j) + m.Wait() + // Output: + //do some + //do some +} + +func ExampleNewJob_stop() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + m := daemon.New() + + var i int32 + + j := daemon.NewJob(func(ctx context.Context) error { + atomic.AddInt32(&i, 1) + fmt.Print("do some:", i, " ") + return daemon.StopJob(ErrJob) + }) + + m.Do(ctx, j) + + m.Wait() + // Output: do some:1 +} + +func ExampleJob_delay() { + var i int32 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + m := daemon.New() + j := daemon.NewJob(func(ctx context.Context) error { + if i == 3 { + return daemon.StopJob(nil) + } + atomic.AddInt32(&i, 1) + fmt.Print("do some:", i, " ") + return daemon.DelayJob(time.Second/2, ErrJob) + }) + + m.Do(ctx, j) + + m.Wait() + // Output: do some:1 do some:2 do some:3 +} diff --git a/manager_test.go b/manager_test.go new file mode 100644 index 0000000..afaef13 --- /dev/null +++ b/manager_test.go @@ -0,0 +1,164 @@ +package daemon_test + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "gitoa.ru/go-4devs/daemon" +) + +//nolint: gochecknoglobals +var ( + ctx = context.Background() + errJob = errors.New("error job") + jobName = "job name" + jobNameOption = daemon.WithName(jobName) +) + +func createJob(count *int32, d time.Duration, err error, opts ...daemon.Option) *daemon.Job { + opts = append(opts, jobNameOption) + + return daemon.NewJob(func(ctx context.Context) error { + if d > 0 { + <-time.After(d) + } + atomic.AddInt32(count, 1) + return err + }, opts...) +} + +func TestDoJobSuccess(t *testing.T) { + t.Parallel() + + var ( + cnt int32 + cnt2 int32 + runm int32 + clm int32 + ) + + m := daemon.New() + + m.Do(ctx, createJob(&cnt, 0, nil), daemon.WithFreq(time.Second/3)) + + mwRun := func(ctx context.Context, next daemon.Run) error { + atomic.AddInt32(&runm, 1) + return next(ctx) + } + mwStop := func(ctx context.Context, next daemon.Run) error { + atomic.AddInt32(&clm, 1) + return next(ctx) + } + m.Do(ctx, createJob(&cnt2, 0, nil, + daemon.WithRunMiddleware(mwRun, mwRun, mwRun), + daemon.WithStopMiddleware(mwStop, mwStop, mwStop), + ), daemon.WithFreq(time.Second/10)) + time.AfterFunc(time.Second, func() { + requireNil(t, m.Close()) + }) + m.Wait() + + requireTrue(t, cnt > 2) + requireTrue(t, cnt2 >= 10) + requireTrue(t, cnt2*3 == runm) + requireTrue(t, clm == 3) +} + +func requireNil(t *testing.T, ex interface{}) { + t.Helper() + + if ex != nil { + t.Fatal("expect nil") + } +} + +func requireTrue(t *testing.T, ex bool) { + t.Helper() + + if !ex { + t.Fatal("expect true") + } +} + +func TestDoJobStop(t *testing.T) { + t.Parallel() + + m := daemon.New() + + var cnt int32 + + m.Do(ctx, createJob(&cnt, 0, daemon.StopJob(nil)), daemon.WithFreq(time.Nanosecond)) + m.Wait() + requireTrue(t, cnt == 1) +} + +func TestDoJobDelay(t *testing.T) { + t.Parallel() + + var cnt int32 + + m := daemon.New() + + m.Do(ctx, createJob(&cnt, 0, daemon.DelayJob(time.Second/3, nil)), daemon.WithFreq(time.Second)) + time.AfterFunc(time.Second, func() { + requireNil(t, m.Close()) + }) + m.Wait() + requireTrue(t, cnt > 2) +} + +func TestDoJobSkipErr(t *testing.T) { + t.Parallel() + + var cnt int32 + + m := daemon.New() + + m.Do(ctx, createJob(&cnt, 0, errJob), daemon.WithFreq(time.Second/3)) + time.AfterFunc(time.Second, func() { + requireNil(t, m.Close()) + }) + m.Wait() + requireTrue(t, cnt > 2) +} + +func TestDoJobRetryErr(t *testing.T) { + t.Parallel() + + var cnt int32 + + m := daemon.New() + + m.Do(ctx, createJob(&cnt, time.Millisecond, errJob, daemon.Retry(3, daemon.StopJob)), daemon.WithFreq(time.Nanosecond)) + m.Wait() + requireTrue(t, cnt == 3) +} + +func TestDoJobName(t *testing.T) { + t.Parallel() + + j := daemon.NewJob(func(ctx context.Context) error { + return nil + }) + requireTrue(t, j.String() == "daemon_test.TestDoJobName.func1") + + jn := daemon.NewJob(func(ctx context.Context) error { + return nil + }, jobNameOption) + requireTrue(t, jn.String() == "job name") +} + +func TestDoManagerRetryErr(t *testing.T) { + t.Parallel() + + var cnt int32 + + m := daemon.New(daemon.Retry(3, daemon.StopJob)) + + m.Do(ctx, createJob(&cnt, 0, errJob), daemon.WithFreq(time.Second/5)) + m.Wait() + requireTrue(t, cnt == 3) +} diff --git a/middleware.go b/middleware.go new file mode 100644 index 0000000..2608369 --- /dev/null +++ b/middleware.go @@ -0,0 +1,31 @@ +package daemon + +import "context" + +// Retry set retry job and change return after max retry. +func Retry(max uint8, handleRetry func(err error) error) Option { + var retry uint8 + + return func(j *Job) { + WithRunMiddleware(func(ctx context.Context, next Run) error { + if err := next(ctx); err != nil { + retry++ + if retry >= max { + return handleRetry(err) + } + return err + } + retry = 0 + return nil + })(j) + } +} + +// RunOnce run once and stopped job. +func RunOnce() Option { + return func(j *Job) { + WithRunMiddleware(func(ctx context.Context, next Run) error { + return StopJob(next(ctx)) + })(j) + } +} diff --git a/timer.go b/timer.go new file mode 100644 index 0000000..e8e9037 --- /dev/null +++ b/timer.go @@ -0,0 +1,42 @@ +package daemon + +import "time" + +// Timer for the Run job. +type Timer interface { + Tick() <-chan time.Time + Reset(d time.Duration) + Stop() +} + +// NewTicker create new ticker based on time.ticker. +func NewTicker(freq time.Duration) Timer { + return &ticker{ + freq: freq, + ticker: time.NewTicker(freq), + } +} + +type ticker struct { + freq time.Duration + ticker *time.Ticker +} + +// Tick time. +func (t *ticker) Tick() <-chan time.Time { + return t.ticker.C +} + +// Stop timer. +func (t *ticker) Stop() { + t.ticker.Stop() +} + +// Reset timer. +func (t *ticker) Reset(freq time.Duration) { + if t.freq != freq && freq > 0 { + t.ticker.Stop() + t.freq = freq + t.ticker = time.NewTicker(freq) + } +}