Browse Source

add stop watch

pull/4/head
andrey 4 months ago
parent
commit
085589a938
  1. 1
      error.go
  2. 2
      provider.go
  3. 9
      provider/watcher/provider.go
  4. 15
      provider/watcher/provider_test.go

1
error.go

@ -7,4 +7,5 @@ var (
ErrInvalidValue = errors.New("invalid value") ErrInvalidValue = errors.New("invalid value")
ErrUnknowType = errors.New("unknow type") ErrUnknowType = errors.New("unknow type")
ErrInitFactory = errors.New("init factory") ErrInitFactory = errors.New("init factory")
ErrStopWatch = errors.New("stop watch")
) )

2
provider.go

@ -11,7 +11,7 @@ type NamedProvider interface {
Provider Provider
} }
type WatchCallback func(ctx context.Context, oldVar, newVar Value) type WatchCallback func(ctx context.Context, oldVar, newVar Value) error
type WatchProvider interface { type WatchProvider interface {
Watch(ctx context.Context, callback WatchCallback, path ...string) error Watch(ctx context.Context, callback WatchCallback, path ...string) error

9
provider/watcher/provider.go

@ -2,6 +2,7 @@ package watcher
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"time" "time"
@ -61,7 +62,13 @@ func (p *Provider) Watch(ctx context.Context, callback config.WatchCallback, key
if err != nil { if err != nil {
p.logger(ctx, "get value%v:%v", key, err.Error()) p.logger(ctx, "get value%v:%v", key, err.Error())
} else if !newVar.IsEquals(oldVar) { } else if !newVar.IsEquals(oldVar) {
callback(ctx, oldVar, newVar) if err := callback(ctx, oldVar, newVar); err != nil {
if errors.Is(err, config.ErrStopWatch) {
return
}
p.logger(ctx, "callback %v:%v", key, err)
}
oldVar = newVar oldVar = newVar
} }
case <-ctx.Done(): case <-ctx.Done():

15
provider/watcher/provider_test.go

@ -31,7 +31,11 @@ func (p *provider) Value(context.Context, ...string) (config.Value, error) {
func TestWatcher(t *testing.T) { func TestWatcher(t *testing.T) {
t.Parallel() t.Parallel()
ctx := context.Background() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer func() {
cancel()
}()
prov := &provider{} prov := &provider{}
w := watcher.New(time.Second, prov) w := watcher.New(time.Second, prov)
@ -42,14 +46,19 @@ func TestWatcher(t *testing.T) {
err := w.Watch( err := w.Watch(
ctx, ctx,
func(ctx context.Context, oldVar, newVar config.Value) { func(ctx context.Context, oldVar, newVar config.Value) error {
atomic.AddInt32(&cnt, 1) atomic.AddInt32(&cnt, 1)
wg.Done() wg.Done()
if atomic.LoadInt32(&cnt) == 2 {
return config.ErrStopWatch
}
return nil
}, },
"tmpname", "tmpname",
) )
require.NoError(t, err)
wg.Wait() wg.Wait()
require.NoError(t, err)
require.Equal(t, int32(2), cnt) require.Equal(t, int32(2), cnt)
} }

Loading…
Cancel
Save