Browse Source

first commit

master
andrey1s 3 years ago
commit
e80be6c6cb
  1. 13
      .drone.yml
  2. 17
      .gitignore
  3. 31
      .golangci.yml
  4. 19
      LICENSE
  5. 5
      README.md
  6. 4
      doc.go
  7. 53
      error.go
  8. 3
      go.mod
  9. 219
      job.go
  10. 62
      manager.go
  11. 260
      manager_example_test.go
  12. 164
      manager_test.go
  13. 31
      middleware.go
  14. 42
      timer.go

13
.drone.yml

@ -0,0 +1,13 @@
kind: pipeline
name: default
steps:
- name: test
image: golang
commands:
- go test
- name: golangci-lint
image: golangci/golangci-lint:v1.26
commands:
- golangci-lint run

17
.gitignore

@ -0,0 +1,17 @@
# ---> Go
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/

31
.golangci.yml

@ -0,0 +1,31 @@
linters-settings:
dupl:
threshold: 100
funlen:
lines: 100
statements: 50
goconst:
min-len: 2
min-occurrences: 2
gocyclo:
min-complexity: 15
golint:
min-confidence: 0
govet:
check-shadowing: true
lll:
line-length: 140
maligned:
suggest-new: true
misspell:
locale: US
linters:
enable-all: true
issues:
# Excluding configuration per-path, per-linter, per-text and per-source
exclude-rules:
- path: _test\.go
linters:
- gomnd

19
LICENSE

@ -0,0 +1,19 @@
MIT License Copyright (c) 2020 4devs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice (including the next
paragraph) shall be included in all copies or substantial portions of the
Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF
OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

5
README.md

@ -0,0 +1,5 @@
# daemon
[![Build Status](https://drone.gitoa.ru/api/badges/go-4devs/daemon/status.svg)](https://drone.gitoa.ru/go-4devs/daemon)
[![Go Report Card](https://goreportcard.com/badge/gitoa.ru/go-4devs/daemon)](https://goreportcard.com/report/gitoa.ru/go-4devs/daemon)
[![GoDoc](https://godoc.org/gitoa.ru/go-4devs/daemon?status.svg)](http://godoc.org/gitoa.ru/go-4devs/daemon)

4
doc.go

@ -0,0 +1,4 @@
/*
Package daemon for the run job background and manage them.
*/
package daemon

53
error.go

@ -0,0 +1,53 @@
package daemon
import "time"
// StopJob stop job.
func StopJob(err error) error {
return &stop{e: err}
}
// IsStoppedJob check stopped job.
func IsStoppedJob(err error) bool {
_, ok := err.(*stop)
return ok
}
type stop struct {
e error
}
// Is check type.
func (s *stop) Is(err error) bool {
_, ok := err.(*stop)
return ok
}
// Error base error interface.
func (s *stop) Error() string {
return s.e.Error()
}
// GetDelayedJob get delay job.
func GetDelayedJob(err error) (time.Duration, bool) {
if d, ok := err.(*delay); ok {
return d.d, ok
}
return 0, false
}
// DelayJob update delay next Run job.
func DelayJob(d time.Duration, err error) error {
return &delay{d: d, e: err}
}
type delay struct {
d time.Duration
e error
}
// Error base error interface.
func (d *delay) Error() string {
return d.e.Error()
}

3
go.mod

@ -0,0 +1,3 @@
module gitoa.ru/go-4devs/daemon
go 1.14

219
job.go

@ -0,0 +1,219 @@
package daemon
import (
"context"
"reflect"
"runtime"
"strings"
"time"
)
// Option configure job.
type Option func(*Job)
// Job run job by frequency.
type Job struct {
name string
run Run
stop Run
sem chan struct{}
err chan error
timer Timer
delay time.Duration
freq func(time.Time) time.Duration
handleErr func(error)
}
// WithName sets job name.
func WithName(name string) Option {
return func(j *Job) { j.name = name }
}
// WithStop sets stop handle for job.
func WithStop(stop Run) Option {
return func(j *Job) { j.stop = stop }
}
// WithTimer sets time,r to job.
func WithTimer(timer Timer) Option {
return func(j *Job) { j.timer = timer }
}
// WithDelay sets delay Run job.
func WithDelay(delay time.Duration) Option {
return func(j *Job) {
j.timer.Reset(delay)
j.delay = delay
}
}
// WithFreq sets frequency Run job..
func WithFreq(freq time.Duration) Option {
return func(j *Job) {
j.freq = func(time.Time) time.Duration {
return freq
}
}
}
// WithSchedule set delay and frequency Run job.
func WithSchedule(next func(time.Time) time.Duration) Option {
return func(j *Job) {
j.freq = next
j.delay = next(time.Now())
j.timer.Reset(j.delay)
}
}
// WithRunMiddleware added middleware for the run job.
func WithRunMiddleware(fn ...Handle) Option {
return func(j *Job) {
run := j.run
j.run = func(ctx context.Context) error {
return chain(fn...)(ctx, run)
}
}
}
// WithStopMiddleware added middleware for the stop job.
func WithStopMiddleware(fn ...Handle) Option {
return func(j *Job) {
stop := j.stop
j.stop = func(ctx context.Context) error {
return chain(fn...)(ctx, stop)
}
}
}
// WithHandleErr add error hanler.
func WithHandleErr(fn func(error)) Option {
return func(j *Job) {
j.handleErr = fn
}
}
// Run init function for the change state.
type Run func(ctx context.Context) error
// Handle middleware interface.
type Handle func(ctx context.Context, next Run) error
func stopJob(ctx context.Context) error {
return nil
}
func handleErr(error) {}
// NewJob creates new job.
func NewJob(run Run, opts ...Option) *Job {
j := Job{
delay: time.Nanosecond,
sem: make(chan struct{}, 1),
err: make(chan error, 1),
timer: NewTicker(time.Nanosecond),
freq: func(time.Time) time.Duration {
return time.Second
},
name: getFuncName(run),
stop: stopJob,
run: run,
handleErr: handleErr,
}
j = j.With(opts...)
return &j
}
// HandleErr handle returned error.
func (j *Job) HandleErr(err error) {
j.handleErr(err)
}
// Do run job.
func (j *Job) Do(ctx context.Context) <-chan error {
<-j.timer.Tick()
j.sem <- struct{}{}
err := j.run(ctx)
<-j.sem
switch tr := err.(type) {
case *stop:
case *delay:
j.timer.Reset(tr.d)
default:
j.timer.Reset(j.freq(time.Now()))
}
j.err <- err
return j.err
}
// Stop job.
func (j *Job) Stop(ctx context.Context) error {
return j.stop(ctx)
}
// String gets job name.
func (j *Job) String() string {
return j.name
}
// With configure job.
func (j Job) With(opts ...Option) Job {
for _, o := range opts {
o(&j)
}
return j
}
func getFuncName(i interface{}) string {
callerName := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
if callerName == "" {
return "-"
}
callerName = strings.NewReplacer("(*", "", ")", "").Replace(callerName)
lastIndex := strings.LastIndex(callerName, "/")
if lastIndex != -1 {
callerName = callerName[lastIndex+1:]
}
return callerName
}
func chain(handleFunc ...Handle) Handle {
n := len(handleFunc)
if n > 1 {
lastI := n - 1
return func(ctx context.Context, next Run) error {
var (
chainHandler Run
curI int
)
chainHandler = func(currentCtx context.Context) error {
if curI == lastI {
return next(currentCtx)
}
curI++
err := handleFunc[curI](currentCtx, chainHandler)
curI--
return err
}
return handleFunc[0](ctx, chainHandler)
}
}
if n == 1 {
return handleFunc[0]
}
return func(ctx context.Context, next Run) error {
return next(ctx)
}
}

62
manager.go

@ -0,0 +1,62 @@
package daemon
import (
"context"
"errors"
"sync"
)
// Manager run jobs.
type Manager struct {
sync.WaitGroup
close chan struct{}
opts []Option
}
// New creates new manager and configure them.
func New(opts ...Option) *Manager {
m := &Manager{
opts: opts,
close: make(chan struct{}),
}
return m
}
// Do runs job.
func (m *Manager) Do(ctx context.Context, j *Job, opts ...Option) {
m.Add(1)
job := j.With(append(m.opts, opts...)...)
go func() {
defer func() {
j.HandleErr(job.Stop(ctx))
m.Done()
}()
for {
select {
case <-m.close:
return
case <-ctx.Done():
return
case err := <-job.Do(ctx):
if err != nil {
if errors.Is(err, &stop{}) {
return
}
j.HandleErr(err)
}
}
}
}()
}
// Close jobs.
func (m *Manager) Close() error {
close(m.close)
m.Wait()
return nil
}

260
manager_example_test.go

@ -0,0 +1,260 @@
package daemon_test
import (
"context"
"errors"
"fmt"
"log"
"sync/atomic"
"time"
"gitoa.ru/go-4devs/daemon"
)
var ErrJob = errors.New("some reason")
func ExampleManager() {
m := daemon.New()
j := daemon.NewJob(func(ctx context.Context) error {
// do some job
return daemon.StopJob(nil)
}, daemon.WithName("awesome job"))
m.Do(context.Background(), j,
// set frequency run job
daemon.WithFreq(time.Minute),
// set delay for first run job
daemon.WithDelay(time.Second),
// set handler if run job return err
daemon.WithHandleErr(func(err error) {
log.Println(err)
}),
)
m.Wait()
// Output:
}
func ExampleManager_withClose() {
m := daemon.New()
defer func() {
_ = m.Close()
}()
j := daemon.NewJob(func(ctx context.Context) error {
fmt.Println("do some job;")
return daemon.StopJob(nil)
}, daemon.WithName("awesome job"))
m.Do(context.Background(), j, daemon.WithFreq(time.Microsecond))
// some blocked process
time.Sleep(time.Second)
// Output: do some job;
}
func ExampleManager_withOptions() {
ctx := context.Background()
middlewareRun := func(ctx context.Context, next daemon.Run) error {
fmt.Println("do some before run all job;")
err := next(ctx)
fmt.Println("do some after run all job;")
return err
}
middlewareStop := func(ctx context.Context, next daemon.Run) error {
fmt.Println("do some before close all job;")
err := next(ctx)
fmt.Println("do some after close all job;")
return err
}
m := daemon.New(
daemon.WithRunMiddleware(middlewareRun),
daemon.WithStopMiddleware(middlewareStop),
daemon.WithHandleErr(func(err error) {
// do some if close return err
log.Println(err)
}),
)
j := daemon.NewJob(func(ctx context.Context) error {
fmt.Println("do some job;")
return daemon.StopJob(nil)
}, daemon.WithName("awesome job"))
j2 := daemon.NewJob(func(ctx context.Context) error {
fmt.Println("do some job2;")
return daemon.StopJob(nil)
}, daemon.WithName("awesome job2"))
m.Do(ctx, j, daemon.WithFreq(time.Minute), daemon.WithDelay(time.Second))
m.Do(ctx, j2, daemon.WithFreq(time.Nanosecond))
m.Wait()
// Output:
// do some before run all job;
// do some job2;
// do some after run all job;
// do some before close all job;
// do some after close all job;
// do some before run all job;
// do some job;
// do some after run all job;
// do some before close all job;
// do some after close all job;
}
func ExampleNewJob() {
ctx := context.Background()
m := daemon.New(func(j *daemon.Job) {
daemon.WithRunMiddleware(func(ctx context.Context, next daemon.Run) error {
fmt.Printf("running job: %s\n", j)
return daemon.StopJob(next(ctx))
})(j)
})
j := daemon.NewJob(func(ctx context.Context) error {
// do some
return nil
}, daemon.WithName("my awesome job"))
m.Do(ctx, j)
m.Wait()
// Output: running job: my awesome job
}
func ExampleNewJob_withClose() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
m := daemon.New()
j := daemon.NewJob(func(ctx context.Context) error {
fmt.Println("do some long job;")
return daemon.StopJob(nil)
}, daemon.WithStop(func(ctx context.Context) error {
fmt.Println("do some close job;")
return nil
}))
m.Do(ctx, j)
m.Wait()
// Output:
// do some long job;
// do some close job;
}
func ExampleJob_withMiddleware() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
m := daemon.New()
j := daemon.NewJob(
func(ctx context.Context) error {
fmt.Println("do some job;")
return daemon.StopJob(nil)
},
daemon.WithStop(func(ctx context.Context) error {
fmt.Println("do some close job;")
return nil
}),
daemon.WithRunMiddleware(func(ctx context.Context, next daemon.Run) error {
fmt.Println("do some before run func;")
err := next(ctx)
fmt.Println("do some after run func;")
return err
}),
daemon.WithStopMiddleware(func(ctx context.Context, next daemon.Run) error {
fmt.Println("do some before close func;")
err := next(ctx)
fmt.Println("do some after close func;")
return err
}),
)
m.Do(ctx, j)
m.Wait()
// Output:
// do some before run func;
// do some job;
// do some after run func;
// do some before close func;
// do some close job;
// do some after close func;
}
func ExampleJob_option() {
var cnt int32
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
m := daemon.New()
j := daemon.NewJob(func(ctx context.Context) error {
if cnt == 2 {
return daemon.StopJob(nil)
}
atomic.AddInt32(&cnt, 1)
fmt.Println("do some")
return nil
},
// set freq run job
daemon.WithFreq(time.Microsecond),
// set delay to start job
daemon.WithDelay(time.Nanosecond),
)
m.Do(ctx, j)
m.Wait()
// Output:
//do some
//do some
}
func ExampleNewJob_stop() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
m := daemon.New()
var i int32
j := daemon.NewJob(func(ctx context.Context) error {
atomic.AddInt32(&i, 1)
fmt.Print("do some:", i, " ")
return daemon.StopJob(ErrJob)
})
m.Do(ctx, j)
m.Wait()
// Output: do some:1
}
func ExampleJob_delay() {
var i int32
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
m := daemon.New()
j := daemon.NewJob(func(ctx context.Context) error {
if i == 3 {
return daemon.StopJob(nil)
}
atomic.AddInt32(&i, 1)
fmt.Print("do some:", i, " ")
return daemon.DelayJob(time.Second/2, ErrJob)
})
m.Do(ctx, j)
m.Wait()
// Output: do some:1 do some:2 do some:3
}

164
manager_test.go

@ -0,0 +1,164 @@
package daemon_test
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"gitoa.ru/go-4devs/daemon"
)
//nolint: gochecknoglobals
var (
ctx = context.Background()
errJob = errors.New("error job")
jobName = "job name"
jobNameOption = daemon.WithName(jobName)
)
func createJob(count *int32, d time.Duration, err error, opts ...daemon.Option) *daemon.Job {
opts = append(opts, jobNameOption)
return daemon.NewJob(func(ctx context.Context) error {
if d > 0 {
<-time.After(d)
}
atomic.AddInt32(count, 1)
return err
}, opts...)
}
func TestDoJobSuccess(t *testing.T) {
t.Parallel()
var (
cnt int32
cnt2 int32
runm int32
clm int32
)
m := daemon.New()
m.Do(ctx, createJob(&cnt, 0, nil), daemon.WithFreq(time.Second/3))
mwRun := func(ctx context.Context, next daemon.Run) error {
atomic.AddInt32(&runm, 1)
return next(ctx)
}
mwStop := func(ctx context.Context, next daemon.Run) error {
atomic.AddInt32(&clm, 1)
return next(ctx)
}
m.Do(ctx, createJob(&cnt2, 0, nil,
daemon.WithRunMiddleware(mwRun, mwRun, mwRun),
daemon.WithStopMiddleware(mwStop, mwStop, mwStop),
), daemon.WithFreq(time.Second/10))
time.AfterFunc(time.Second, func() {
requireNil(t, m.Close())
})
m.Wait()
requireTrue(t, cnt > 2)
requireTrue(t, cnt2 >= 10)
requireTrue(t, cnt2*3 == runm)
requireTrue(t, clm == 3)
}
func requireNil(t *testing.T, ex interface{}) {
t.Helper()
if ex != nil {
t.Fatal("expect nil")
}
}
func requireTrue(t *testing.T, ex bool) {
t.Helper()
if !ex {
t.Fatal("expect true")
}
}
func TestDoJobStop(t *testing.T) {
t.Parallel()
m := daemon.New()
var cnt int32
m.Do(ctx, createJob(&cnt, 0, daemon.StopJob(nil)), daemon.WithFreq(time.Nanosecond))
m.Wait()
requireTrue(t, cnt == 1)
}
func TestDoJobDelay(t *testing.T) {
t.Parallel()
var cnt int32
m := daemon.New()
m.Do(ctx, createJob(&cnt, 0, daemon.DelayJob(time.Second/3, nil)), daemon.WithFreq(time.Second))
time.AfterFunc(time.Second, func() {
requireNil(t, m.Close())
})
m.Wait()
requireTrue(t, cnt > 2)
}
func TestDoJobSkipErr(t *testing.T) {
t.Parallel()
var cnt int32
m := daemon.New()
m.Do(ctx, createJob(&cnt, 0, errJob), daemon.WithFreq(time.Second/3))
time.AfterFunc(time.Second, func() {
requireNil(t, m.Close())
})
m.Wait()
requireTrue(t, cnt > 2)
}
func TestDoJobRetryErr(t *testing.T) {
t.Parallel()
var cnt int32
m := daemon.New()
m.Do(ctx, createJob(&cnt, time.Millisecond, errJob, daemon.Retry(3, daemon.StopJob)), daemon.WithFreq(time.Nanosecond))
m.Wait()
requireTrue(t, cnt == 3)
}
func TestDoJobName(t *testing.T) {
t.Parallel()
j := daemon.NewJob(func(ctx context.Context) error {
return nil
})
requireTrue(t, j.String() == "daemon_test.TestDoJobName.func1")
jn := daemon.NewJob(func(ctx context.Context) error {
return nil
}, jobNameOption)
requireTrue(t, jn.String() == "job name")
}
func TestDoManagerRetryErr(t *testing.T) {
t.Parallel()
var cnt int32
m := daemon.New(daemon.Retry(3, daemon.StopJob))
m.Do(ctx, createJob(&cnt, 0, errJob), daemon.WithFreq(time.Second/5))
m.Wait()
requireTrue(t, cnt == 3)
}

31
middleware.go

@ -0,0 +1,31 @@
package daemon
import "context"
// Retry set retry job and change return after max retry.
func Retry(max uint8, handleRetry func(err error) error) Option {
var retry uint8
return func(j *Job) {
WithRunMiddleware(func(ctx context.Context, next Run) error {
if err := next(ctx); err != nil {
retry++
if retry >= max {
return handleRetry(err)
}
return err
}
retry = 0
return nil
})(j)
}
}
// RunOnce run once and stopped job.
func RunOnce() Option {
return func(j *Job) {
WithRunMiddleware(func(ctx context.Context, next Run) error {
return StopJob(next(ctx))
})(j)
}
}

42
timer.go

@ -0,0 +1,42 @@
package daemon
import "time"
// Timer for the Run job.
type Timer interface {
Tick() <-chan time.Time
Reset(d time.Duration)
Stop()
}
// NewTicker create new ticker based on time.ticker.
func NewTicker(freq time.Duration) Timer {
return &ticker{
freq: freq,
ticker: time.NewTicker(freq),
}
}
type ticker struct {
freq time.Duration
ticker *time.Ticker
}
// Tick time.
func (t *ticker) Tick() <-chan time.Time {
return t.ticker.C
}
// Stop timer.
func (t *ticker) Stop() {
t.ticker.Stop()
}
// Reset timer.
func (t *ticker) Reset(freq time.Duration) {
if t.freq != freq && freq > 0 {
t.ticker.Stop()
t.freq = freq
t.ticker = time.NewTicker(freq)
}
}
Loading…
Cancel
Save