This commit is contained in:
71
provider/watcher/provider.go
Normal file
71
provider/watcher/provider.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"gitoa.ru/go-4devs/config"
|
||||
)
|
||||
|
||||
var (
|
||||
_ config.Provider = (*Provider)(nil)
|
||||
_ config.WatchProvider = (*Provider)(nil)
|
||||
)
|
||||
|
||||
func New(duration time.Duration, provider config.Provider, opts ...Option) *Provider {
|
||||
p := &Provider{
|
||||
Provider: provider,
|
||||
ticker: time.NewTicker(duration),
|
||||
logger: func(_ context.Context, msg string) {
|
||||
log.Print(msg)
|
||||
},
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(p)
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func WithLogger(l func(context.Context, string)) Option {
|
||||
return func(p *Provider) {
|
||||
p.logger = l
|
||||
}
|
||||
}
|
||||
|
||||
type Option func(*Provider)
|
||||
|
||||
type Provider struct {
|
||||
config.Provider
|
||||
ticker *time.Ticker
|
||||
logger func(context.Context, string)
|
||||
}
|
||||
|
||||
func (p *Provider) Watch(ctx context.Context, key config.Key, callback config.WatchCallback) error {
|
||||
oldVar, err := p.Provider.Read(ctx, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: failed watch variable: %w", p.Provider.Name(), err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-p.ticker.C:
|
||||
newVar, err := p.Provider.Read(ctx, key)
|
||||
if err != nil {
|
||||
p.logger(ctx, err.Error())
|
||||
} else if !newVar.IsEquals(oldVar) {
|
||||
callback(ctx, oldVar, newVar)
|
||||
oldVar = newVar
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
58
provider/watcher/provider_test.go
Normal file
58
provider/watcher/provider_test.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package watcher_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"gitoa.ru/go-4devs/config"
|
||||
"gitoa.ru/go-4devs/config/provider/watcher"
|
||||
"gitoa.ru/go-4devs/config/value"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
cnt int32
|
||||
}
|
||||
|
||||
func (p *provider) Name() string {
|
||||
return "test"
|
||||
}
|
||||
|
||||
func (p *provider) Read(ctx context.Context, k config.Key) (config.Variable, error) {
|
||||
p.cnt++
|
||||
|
||||
return config.Variable{
|
||||
Name: "tmpname",
|
||||
Value: value.JString(fmt.Sprint(p.cnt)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestWatcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
prov := &provider{}
|
||||
|
||||
w := watcher.New(time.Second, prov)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
var cnt int32
|
||||
|
||||
err := w.Watch(
|
||||
ctx,
|
||||
config.Key{Name: "tmpname"},
|
||||
func(ctx context.Context, oldVar, newVar config.Variable) {
|
||||
atomic.AddInt32(&cnt, 1)
|
||||
wg.Done()
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, int32(2), cnt)
|
||||
}
|
||||
Reference in New Issue
Block a user