This commit is contained in:
134
mw/fallback.go
Normal file
134
mw/fallback.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package mw
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"gitoa.ru/go-4devs/cache"
|
||||
)
|
||||
|
||||
type Fallback func(ctx context.Context, key, value interface{}) error
|
||||
|
||||
type Getter func(ctx context.Context, key interface{}) (interface{}, error)
|
||||
|
||||
// HandleByErr checks if cache return err.
|
||||
func HandleByErr(_ *cache.Item, err error) bool {
|
||||
return err != nil
|
||||
}
|
||||
|
||||
// LockFallback locks run fallback by item key.
|
||||
func LockFallback(fallback Fallback) Fallback {
|
||||
var mu sync.Mutex
|
||||
|
||||
type entry struct {
|
||||
item interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
keys := make(map[interface{}]chan entry)
|
||||
|
||||
return func(ctx context.Context, key, value interface{}) error {
|
||||
mu.Lock()
|
||||
if _, ok := keys[key]; !ok {
|
||||
keys[key] = make(chan entry, 1)
|
||||
mu.Unlock()
|
||||
|
||||
err := fallback(ctx, key, value)
|
||||
keys[key] <- entry{
|
||||
item: value,
|
||||
err: err,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
close(keys[key])
|
||||
delete(keys, key)
|
||||
}()
|
||||
|
||||
return err
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
entry := <-keys[key]
|
||||
if entry.err != nil {
|
||||
return entry.err
|
||||
}
|
||||
|
||||
if err := cache.TypeAssert(entry.item, value); err != nil {
|
||||
return fmt.Errorf("%w: assert value", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithFallback sets fallback when cache handle success and set result in cache.
|
||||
func WithFallback(fallback Fallback, isHandleFallback func(*cache.Item, error) bool) cache.Configure {
|
||||
return cache.WithHandleGet(func(ctx context.Context, op string, item *cache.Item, next cache.Provider) error {
|
||||
err := next(ctx, op, item)
|
||||
if isHandleFallback(item, err) {
|
||||
if ferr := fallback(ctx, item.Key.Key, item.Value); ferr != nil {
|
||||
return ferr
|
||||
}
|
||||
|
||||
return next(ctx, cache.OperationSet, item)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// WithLockGetter sets values from getter when cache handle success and set result in cache.
|
||||
func WithLockGetter(getter Getter, isHandle func(*cache.Item, error) bool) cache.Configure {
|
||||
var mu sync.Mutex
|
||||
|
||||
type entry struct {
|
||||
value interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
keys := make(map[cache.Key]chan entry)
|
||||
|
||||
return cache.WithHandleGet(func(ctx context.Context, op string, item *cache.Item, next cache.Provider) error {
|
||||
if err := next(ctx, op, item); !isHandle(item, err) {
|
||||
return err
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
if _, ok := keys[item.Key]; !ok {
|
||||
keys[item.Key] = make(chan entry, 1)
|
||||
mu.Unlock()
|
||||
value, gerr := getter(ctx, item.Key.Value())
|
||||
keys[item.Key] <- entry{
|
||||
value: value,
|
||||
err: gerr,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
close(keys[item.Key])
|
||||
delete(keys, item.Key)
|
||||
}()
|
||||
if gerr != nil {
|
||||
return gerr
|
||||
}
|
||||
|
||||
if err := cache.TypeAssert(value, item.Value); err != nil {
|
||||
return fmt.Errorf("lock failed assert type: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
entry := <-keys[item.Key]
|
||||
if entry.err != nil {
|
||||
return entry.err
|
||||
}
|
||||
|
||||
if err := cache.TypeAssert(entry.value, item.Value); err != nil {
|
||||
return fmt.Errorf("lock failed assert type: %w", err)
|
||||
}
|
||||
|
||||
return next(ctx, cache.OperationSet, item)
|
||||
})
|
||||
}
|
||||
221
mw/fallback_test.go
Normal file
221
mw/fallback_test.go
Normal file
@@ -0,0 +1,221 @@
|
||||
package mw_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gitoa.ru/go-4devs/cache"
|
||||
"gitoa.ru/go-4devs/cache/mw"
|
||||
"gitoa.ru/go-4devs/cache/test"
|
||||
)
|
||||
|
||||
var (
|
||||
errFallback = errors.New("fallback error")
|
||||
errKey = errors.New("unexpected key")
|
||||
)
|
||||
|
||||
func TestWithFallback(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
key1 := "fallback:key1"
|
||||
key2 := "fb:key2"
|
||||
|
||||
prov := test.NewProviderMock(t,
|
||||
test.WithGet(cacheGetMiss),
|
||||
test.WithSet(cacheSetMiss(map[interface{}]test.User{
|
||||
key1: test.NewUser(1),
|
||||
key2: test.NewUser(2),
|
||||
})),
|
||||
)
|
||||
c := cache.New(prov, mw.WithFallback(
|
||||
func(ctx context.Context, key, value interface{}) error {
|
||||
switch key.(string) {
|
||||
case key1:
|
||||
*value.(*test.User) = test.NewUser(1)
|
||||
case key2:
|
||||
*value.(*test.User) = test.NewUser(2)
|
||||
default:
|
||||
t.Errorf("unexpected key: %s", key)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
mw.HandleByErr,
|
||||
))
|
||||
|
||||
var user test.User
|
||||
|
||||
require.Nil(t, c.Get(ctx, key1, &user))
|
||||
require.Equal(t, test.NewUser(1), user)
|
||||
|
||||
require.Nil(t, c.Get(ctx, key2, &user, cache.WithNamespace("namespace", ":")))
|
||||
require.Equal(t, test.NewUser(2), user)
|
||||
}
|
||||
|
||||
func TestLockFallback(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
var (
|
||||
val1, val2, val3 string
|
||||
wg sync.WaitGroup
|
||||
cnt int32
|
||||
)
|
||||
|
||||
fallback := mw.LockFallback(func(ctx context.Context, key, value interface{}) error {
|
||||
time.Sleep(time.Second)
|
||||
atomic.AddInt32(&cnt, 1)
|
||||
*value.(*string) = fmt.Sprintf("value:%v", cnt)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
assert.Nil(t, fallback(ctx, 1, &val1))
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
assert.Nil(t, fallback(ctx, 1, &val2))
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, "value:1", val1)
|
||||
require.Equal(t, "value:1", val2)
|
||||
|
||||
assert.Nil(t, fallback(ctx, 1, &val3))
|
||||
require.Equal(t, "value:2", val3)
|
||||
}
|
||||
|
||||
func TestLockFallback_Error(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
var (
|
||||
val1, val2, val3 string
|
||||
wg sync.WaitGroup
|
||||
cnt int32
|
||||
)
|
||||
|
||||
fallback := mw.LockFallback(func(ctx context.Context, key, value interface{}) error {
|
||||
time.Sleep(time.Second)
|
||||
atomic.AddInt32(&cnt, 1)
|
||||
|
||||
return fmt.Errorf("%w:%v", errFallback, cnt)
|
||||
})
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
assert.EqualError(t, fallback(ctx, 1, &val1), "fallback error:1")
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
assert.EqualError(t, fallback(ctx, 1, &val2), "fallback error:1")
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
require.Empty(t, val1)
|
||||
require.Empty(t, val2)
|
||||
|
||||
assert.EqualError(t, fallback(ctx, 1, val3), "fallback error:2")
|
||||
require.Empty(t, val3)
|
||||
}
|
||||
|
||||
func TestWithLockGetter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
key1 := "getter:key1"
|
||||
|
||||
var cnt int32
|
||||
|
||||
prov := test.NewProviderMock(t,
|
||||
test.WithGet(cacheGetMiss),
|
||||
test.WithSet(cacheSetMiss(
|
||||
map[interface{}]test.User{
|
||||
key1: test.NewUser(1),
|
||||
},
|
||||
)),
|
||||
)
|
||||
c := cache.New(prov,
|
||||
cache.WithDataOption(
|
||||
cache.WithNamespace("gn", ":"),
|
||||
),
|
||||
mw.WithLockGetter(
|
||||
func(ctx context.Context, key interface{}) (interface{}, error) {
|
||||
atomic.AddInt32(&cnt, 1)
|
||||
time.Sleep(time.Second / 2)
|
||||
switch key.(string) {
|
||||
case key1:
|
||||
return test.NewUser(1), nil
|
||||
case "key2":
|
||||
return test.NewUser(2), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("%w: key '%v'", errKey, key)
|
||||
},
|
||||
mw.HandleByErr,
|
||||
))
|
||||
|
||||
var (
|
||||
user1, user2 test.User
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
require.Nil(t, c.Get(ctx, key1, &user1))
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
require.Nil(t, c.Get(ctx, key1, &user2))
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, test.NewUser(1), user1)
|
||||
require.Equal(t, test.NewUser(1), user2)
|
||||
require.Equal(t, int32(1), cnt)
|
||||
}
|
||||
|
||||
func cacheGetMiss(t *testing.T) func(ctx context.Context, item *cache.Item) error {
|
||||
t.Helper()
|
||||
|
||||
return func(ctx context.Context, item *cache.Item) error {
|
||||
return cache.ErrCacheMiss
|
||||
}
|
||||
}
|
||||
|
||||
func cacheSetMiss(items map[interface{}]test.User) func(t *testing.T) func(ctx context.Context, item *cache.Item) error {
|
||||
return func(t *testing.T) func(ctx context.Context, item *cache.Item) error {
|
||||
t.Helper()
|
||||
|
||||
return func(ctx context.Context, item *cache.Item) error {
|
||||
if value, ok := items[item.Key.Key]; ok {
|
||||
require.Equal(t, &value, item.Value)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
t.Errorf("unexpected key %v", item.Key.String())
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
78
mw/gc.go
Normal file
78
mw/gc.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package mw
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitoa.ru/go-4devs/cache"
|
||||
)
|
||||
|
||||
type key struct {
|
||||
key interface{}
|
||||
ctxPrefix string
|
||||
}
|
||||
|
||||
func (k key) Value() interface{} {
|
||||
return k.key
|
||||
}
|
||||
|
||||
func (k key) String() string {
|
||||
return fmt.Sprint(k.ctxPrefix, k.key)
|
||||
}
|
||||
|
||||
// WithClearByContext clear cache if context done.
|
||||
func WithClearByContext(ctxKey interface{}) cache.Configure {
|
||||
operation := func(ctx context.Context, op string, item *cache.Item, next cache.Provider) error {
|
||||
ctxPrefix, ok := ctx.Value(ctxKey).(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("%w: must be unique ctx key", cache.ErrKeyNotValid)
|
||||
}
|
||||
|
||||
k := item.Key.Key
|
||||
item.Key.Key = key{
|
||||
key: k,
|
||||
ctxPrefix: ctxPrefix,
|
||||
}
|
||||
|
||||
return next(ctx, op, item)
|
||||
}
|
||||
|
||||
return cache.WithMiddleware(
|
||||
func(ctx context.Context, op string, item *cache.Item, next cache.Provider) error {
|
||||
if op == cache.OperationSet {
|
||||
go func(ctx context.Context, item *cache.Item) {
|
||||
<-ctx.Done()
|
||||
_ = next(ctx, cache.OperationDelete, item)
|
||||
}(ctx, item)
|
||||
}
|
||||
|
||||
return operation(ctx, op, item, next)
|
||||
})
|
||||
}
|
||||
|
||||
// WithClearByTTL clear cache by key after ttl.
|
||||
func WithClearByTTL() cache.Configure {
|
||||
keys := make(map[cache.Key]*time.Timer)
|
||||
mu := sync.Mutex{}
|
||||
|
||||
return cache.WithHandleSet(func(ctx context.Context, op string, item *cache.Item, next cache.Provider) error {
|
||||
if item.TTL > 0 {
|
||||
go func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if t, ok := keys[item.Key]; ok {
|
||||
t.Reset(item.TTL)
|
||||
} else {
|
||||
keys[item.Key] = time.AfterFunc(item.TTL, func() {
|
||||
_ = next(ctx, cache.OperationDelete, item)
|
||||
delete(keys, item.Key)
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return next(ctx, op, item)
|
||||
})
|
||||
}
|
||||
68
mw/gc_test.go
Normal file
68
mw/gc_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package mw_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"gitoa.ru/go-4devs/cache"
|
||||
"gitoa.ru/go-4devs/cache/mw"
|
||||
"gitoa.ru/go-4devs/cache/provider/memory"
|
||||
)
|
||||
|
||||
func TestWithClearByTTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
gcMap := cache.New(memory.NewMap(), mw.WithClearByTTL())
|
||||
cacheMap := cache.New(memory.NewMap())
|
||||
|
||||
var (
|
||||
value string
|
||||
err error
|
||||
)
|
||||
|
||||
require.NoError(t, gcMap.Set(ctx, "keys", "value", cache.WithTTL(time.Second/3)))
|
||||
require.NoError(t, cacheMap.Set(ctx, "keys", "value", cache.WithTTL(time.Second/3)))
|
||||
time.Sleep(time.Second)
|
||||
|
||||
err = gcMap.Get(ctx, "keys", &value)
|
||||
require.EqualError(t, err, cache.ErrCacheMiss.Error()+": map")
|
||||
|
||||
err = cacheMap.Get(ctx, "keys", &value)
|
||||
require.EqualError(t, err, cache.ErrCacheExpired.Error()+": map")
|
||||
|
||||
require.NoError(t, gcMap.Set(ctx, "keys", "value", cache.WithTTL(time.Second/2)))
|
||||
time.AfterFunc(time.Second/3, func() {
|
||||
require.NoError(t, gcMap.Set(ctx, "keys", "value", cache.WithTTL(time.Second)))
|
||||
})
|
||||
time.Sleep(time.Second / 2)
|
||||
require.NoError(t, gcMap.Get(ctx, "keys", &value))
|
||||
require.Equal(t, value, "value")
|
||||
}
|
||||
|
||||
func TestWithClearByContext(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
type ctxKey int
|
||||
|
||||
var (
|
||||
requestID ctxKey = 1
|
||||
data string
|
||||
)
|
||||
|
||||
ctx1, cancel1 := context.WithCancel(context.WithValue(context.Background(), requestID, "request1"))
|
||||
ctx2, cancel2 := context.WithCancel(context.WithValue(context.Background(), requestID, "request2"))
|
||||
|
||||
cacheMap := cache.New(memory.NewMap(), mw.WithClearByContext(requestID))
|
||||
|
||||
require.NoError(t, cacheMap.Set(ctx1, "key", "value"))
|
||||
require.EqualError(t, cacheMap.Get(ctx2, "key", &data), "cache miss: map")
|
||||
require.NoError(t, cacheMap.Get(ctx1, "key", &data))
|
||||
cancel1()
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
require.EqualError(t, cacheMap.Get(ctx1, "key", &data), "cache miss: map")
|
||||
cancel2()
|
||||
}
|
||||
60
mw/metrics.go
Normal file
60
mw/metrics.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package mw
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"gitoa.ru/go-4devs/cache"
|
||||
)
|
||||
|
||||
// Metrics interface for middleware.
|
||||
type Metrics interface {
|
||||
Hit(label string)
|
||||
Miss(label string)
|
||||
Expired(label string)
|
||||
Err(label string, operation string)
|
||||
Observe(label string, operation string, start time.Time)
|
||||
}
|
||||
|
||||
// LabelName sets static name.
|
||||
func LabelName(name string) func(ctx context.Context, item *cache.Item) string {
|
||||
return func(_ context.Context, _ *cache.Item) string {
|
||||
return name
|
||||
}
|
||||
}
|
||||
|
||||
// LabelPreficKey gets lebale by item prefix.
|
||||
func LabelPreficKey(ctx context.Context, item *cache.Item) string {
|
||||
return item.Key.Prefix
|
||||
}
|
||||
|
||||
// WithMetrics cache middleware metrics.
|
||||
func WithMetrics(m Metrics, labelCallback func(ctx context.Context, item *cache.Item) string) cache.Configure {
|
||||
return cache.WithMiddleware(
|
||||
func(ctx context.Context, op string, item *cache.Item, next cache.Provider) error {
|
||||
label := labelCallback(ctx, item)
|
||||
start := time.Now()
|
||||
err := next(ctx, op, item)
|
||||
m.Observe(label, op, start)
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, cache.ErrCacheMiss):
|
||||
m.Miss(label)
|
||||
case errors.Is(err, cache.ErrCacheExpired):
|
||||
m.Expired(label)
|
||||
default:
|
||||
m.Err(label, op)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if op == cache.OperationGet {
|
||||
m.Hit(label)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
}
|
||||
93
mw/prometheus/metrics.go
Normal file
93
mw/prometheus/metrics.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
metrics "github.com/prometheus/client_golang/prometheus"
|
||||
"gitoa.ru/go-4devs/cache/mw"
|
||||
)
|
||||
|
||||
const (
|
||||
labelSet = "label"
|
||||
labelOperation = "operation"
|
||||
)
|
||||
|
||||
//nolint: gochecknoglobals
|
||||
var (
|
||||
hitCount = metrics.NewCounterVec(
|
||||
metrics.CounterOpts{
|
||||
Name: "cache_hit_total",
|
||||
Help: "Counter of hits cache.",
|
||||
},
|
||||
[]string{labelSet},
|
||||
)
|
||||
missCount = metrics.NewCounterVec(
|
||||
metrics.CounterOpts{
|
||||
Name: "cache_miss_total",
|
||||
Help: "Counter of misses cache.",
|
||||
},
|
||||
[]string{labelSet},
|
||||
)
|
||||
expiredCount = metrics.NewCounterVec(
|
||||
metrics.CounterOpts{
|
||||
Name: "cache_expired_total",
|
||||
Help: "Counter of expired items in cache.",
|
||||
},
|
||||
[]string{labelSet},
|
||||
)
|
||||
errorsCount = metrics.NewCounterVec(
|
||||
metrics.CounterOpts{
|
||||
Name: "cache_errors_total",
|
||||
Help: "Counter of errors in cache.",
|
||||
},
|
||||
[]string{labelSet, labelOperation},
|
||||
)
|
||||
responseTime = metrics.NewHistogramVec(
|
||||
metrics.HistogramOpts{
|
||||
Name: "cache_request_duration_seconds",
|
||||
Help: "Histogram of RT for the request cache (seconds).",
|
||||
},
|
||||
[]string{labelSet, labelOperation},
|
||||
)
|
||||
)
|
||||
|
||||
//nolint: gochecknoinits
|
||||
func init() {
|
||||
metrics.MustRegister(
|
||||
hitCount,
|
||||
missCount,
|
||||
expiredCount,
|
||||
errorsCount,
|
||||
responseTime,
|
||||
)
|
||||
}
|
||||
|
||||
var _ mw.Metrics = Metrics{}
|
||||
|
||||
// Metrics prometeus.
|
||||
type Metrics struct{}
|
||||
|
||||
// Miss inc miss error cache.
|
||||
func (m Metrics) Miss(label string) {
|
||||
missCount.WithLabelValues(label).Inc()
|
||||
}
|
||||
|
||||
// Hit increment hit cache.
|
||||
func (m Metrics) Hit(label string) {
|
||||
hitCount.WithLabelValues(label).Inc()
|
||||
}
|
||||
|
||||
// Expired increment error expired.
|
||||
func (m Metrics) Expired(label string) {
|
||||
expiredCount.WithLabelValues(label).Inc()
|
||||
}
|
||||
|
||||
// Err increment base undefined error.
|
||||
func (m Metrics) Err(label string, operation string) {
|
||||
errorsCount.WithLabelValues(label, operation).Inc()
|
||||
}
|
||||
|
||||
// Observe time from start.
|
||||
func (m Metrics) Observe(label string, operation string, start time.Time) {
|
||||
responseTime.WithLabelValues(label, operation).Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
}
|
||||
Reference in New Issue
Block a user