add configure provider by preview providers
This commit is contained in:
106
client.go
106
client.go
@@ -4,14 +4,82 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
func New(namespace, appName string, providers []Provider) *Client {
|
||||
return &Client{
|
||||
func Must(namespace, appName string, providers ...interface{}) *Client {
|
||||
client, err := New(namespace, appName, providers...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
func New(namespace, appName string, providers ...interface{}) (*Client, error) {
|
||||
client := &Client{
|
||||
namespace: namespace,
|
||||
appName: appName,
|
||||
providers: providers,
|
||||
providers: make([]Provider, len(providers)),
|
||||
}
|
||||
|
||||
for idx, prov := range providers {
|
||||
switch current := prov.(type) {
|
||||
case Provider:
|
||||
client.providers[idx] = current
|
||||
case Factory:
|
||||
client.providers[idx] = &provider{
|
||||
factory: func(ctx context.Context) (Provider, error) {
|
||||
return current(ctx, client)
|
||||
},
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("provier[%d]: %w %T", idx, ErrUnknowType, prov)
|
||||
}
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
type provider struct {
|
||||
mu sync.Mutex
|
||||
done uint32
|
||||
provider Provider
|
||||
factory func(ctx context.Context) (Provider, error)
|
||||
}
|
||||
|
||||
func (p *provider) init(ctx context.Context) (err error) {
|
||||
if atomic.LoadUint32(&p.done) == 0 {
|
||||
if !p.mu.TryLock() {
|
||||
return fmt.Errorf("%w", ErrInitFactory)
|
||||
}
|
||||
defer atomic.StoreUint32(&p.done, 1)
|
||||
defer p.mu.Unlock()
|
||||
p.provider, err = p.factory(ctx)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *provider) Watch(ctx context.Context, key Key, callback WatchCallback) error {
|
||||
if err := p.init(ctx); err != nil {
|
||||
return fmt.Errorf("init read:%w", err)
|
||||
}
|
||||
|
||||
if watch, ok := p.provider.(WatchProvider); ok {
|
||||
return watch.Watch(ctx, key, callback)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) Read(ctx context.Context, key Key) (Variable, error) {
|
||||
if err := p.init(ctx); err != nil {
|
||||
return Variable{}, fmt.Errorf("init read:%w", err)
|
||||
}
|
||||
|
||||
return p.provider.Read(ctx, key)
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
@@ -47,7 +115,7 @@ func (c *Client) Variable(ctx context.Context, name string) (Variable, error) {
|
||||
|
||||
for _, provider := range c.providers {
|
||||
variable, err = provider.Read(ctx, key)
|
||||
if err == nil || !errors.Is(err, ErrVariableNotFound) {
|
||||
if err == nil || !(errors.Is(err, ErrVariableNotFound) || errors.Is(err, ErrInitFactory)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -59,36 +127,22 @@ func (c *Client) Variable(ctx context.Context, name string) (Variable, error) {
|
||||
return variable, nil
|
||||
}
|
||||
|
||||
func NewWatch(namespace, appName string, providers []Provider) *WatchClient {
|
||||
cl := WatchClient{
|
||||
Client: New(namespace, appName, providers),
|
||||
}
|
||||
func (c *Client) Watch(ctx context.Context, name string, callback WatchCallback) error {
|
||||
key := c.key(name)
|
||||
|
||||
for _, provider := range providers {
|
||||
if watch, ok := provider.(WatchProvider); ok {
|
||||
cl.providers = append(cl.providers, watch)
|
||||
for idx, prov := range c.providers {
|
||||
provider, ok := prov.(WatchProvider)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return &cl
|
||||
}
|
||||
|
||||
type WatchClient struct {
|
||||
*Client
|
||||
providers []WatchProvider
|
||||
}
|
||||
|
||||
func (wc *WatchClient) Watch(ctx context.Context, name string, callback WatchCallback) error {
|
||||
key := wc.key(name)
|
||||
|
||||
for _, provider := range wc.providers {
|
||||
err := provider.Watch(ctx, key, callback)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrVariableNotFound) {
|
||||
if errors.Is(err, ErrVariableNotFound) || errors.Is(err, ErrInitFactory) {
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("client: failed watch by provider %s: %w", provider.Name(), err)
|
||||
return fmt.Errorf("client: failed watch by provider[%d]: %w", idx, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"gitoa.ru/go-4devs/config/test"
|
||||
)
|
||||
|
||||
func ExampleNew() {
|
||||
func ExampleClient_Value() {
|
||||
ctx := context.Background()
|
||||
_ = os.Setenv("FDEVS_CONFIG_LISTEN", "8080")
|
||||
_ = os.Setenv("FDEVS_CONFIG_HOST", "localhost")
|
||||
@@ -51,14 +51,18 @@ func ExampleNew() {
|
||||
// read json config
|
||||
jsonConfig := test.ReadFile("config.json")
|
||||
|
||||
providers := []config.Provider{
|
||||
config, err := config.New(test.Namespace, test.AppName,
|
||||
arg.New(),
|
||||
env.New(),
|
||||
etcd.NewProvider(etcdClient),
|
||||
vault.NewSecretKV2(vaultClient),
|
||||
json.New(jsonConfig),
|
||||
)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
config := config.New(test.Namespace, test.AppName, providers)
|
||||
|
||||
dsn, err := config.Value(ctx, "example:dsn")
|
||||
if err != nil {
|
||||
@@ -120,7 +124,7 @@ func ExampleNew() {
|
||||
// replace env host by args: gitoa.ru
|
||||
}
|
||||
|
||||
func ExampleNewWatch() {
|
||||
func ExampleClient_Watch() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -151,12 +155,17 @@ func ExampleNewWatch() {
|
||||
}
|
||||
}()
|
||||
|
||||
providers := []config.Provider{
|
||||
watcher, err := config.New(test.Namespace, test.AppName,
|
||||
watcher.New(time.Microsecond, env.New()),
|
||||
watcher.New(time.Microsecond, yaml.NewFile("test/fixture/config.yaml")),
|
||||
watcher.New(time.Microsecond, yaml.NewWatch("test/fixture/config.yaml")),
|
||||
etcd.NewProvider(etcdClient),
|
||||
)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
watcher := config.NewWatch(test.Namespace, test.AppName, providers)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
@@ -196,3 +205,90 @@ func ExampleNewWatch() {
|
||||
// update env variable: FDEVS_CONFIG_EXAMPLE_ENABLE , old: true new: false
|
||||
// update etcd variable: fdevs/config/example_db_dsn , old: pgsql://user@pass:127.0.0.1:5432 new: mysql://localhost:5432
|
||||
}
|
||||
|
||||
func ExampleClient_Value_factory() {
|
||||
ctx := context.Background()
|
||||
_ = os.Setenv("FDEVS_CONFIG_LISTEN", "8080")
|
||||
_ = os.Setenv("FDEVS_CONFIG_HOST", "localhost")
|
||||
|
||||
args := os.Args
|
||||
|
||||
defer func() {
|
||||
os.Args = args
|
||||
}()
|
||||
|
||||
os.Args = []string{"main.go", "--config-json=config.json", "--config-yaml=test/fixture/config.yaml"}
|
||||
|
||||
config, err := config.New(test.Namespace, test.AppName,
|
||||
arg.New(),
|
||||
env.New(),
|
||||
config.Factory(func(ctx context.Context, cfg config.ReadConfig) (config.Provider, error) {
|
||||
val, err := cfg.Value(ctx, "config-json")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed read config file:%w", err)
|
||||
}
|
||||
jsonConfig := test.ReadFile(val.String())
|
||||
|
||||
return json.New(jsonConfig), nil
|
||||
}),
|
||||
config.Factory(func(ctx context.Context, cfg config.ReadConfig) (config.Provider, error) {
|
||||
val, err := cfg.Value(ctx, "config-yaml")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed read config file:%w", err)
|
||||
}
|
||||
|
||||
provader, err := yaml.NewFile(val.String())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed init by file %v:%w", val.String(), err)
|
||||
}
|
||||
|
||||
return provader, nil
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
port, err := config.Value(ctx, "listen")
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
title, err := config.Value(ctx, "app.name.title")
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
yamlTitle, err := config.Value(ctx, "app/title")
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
cfgValue, err := config.Value(ctx, "cfg")
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
cfg := test.Config{}
|
||||
_ = cfgValue.Unmarshal(&cfg)
|
||||
|
||||
fmt.Printf("listen from env: %d\n", port.Int())
|
||||
fmt.Printf("title from json: %v\n", title.String())
|
||||
fmt.Printf("title from yaml: %v\n", yamlTitle.String())
|
||||
fmt.Printf("struct from json: %+v\n", cfg)
|
||||
// Output:
|
||||
// listen from env: 8080
|
||||
// title from json: config title
|
||||
// title from yaml: yaml title
|
||||
// struct from json: {Duration:21m0s Enabled:true}
|
||||
}
|
||||
|
||||
2
error.go
2
error.go
@@ -5,4 +5,6 @@ import "errors"
|
||||
var (
|
||||
ErrVariableNotFound = errors.New("variable not found")
|
||||
ErrInvalidValue = errors.New("invalid value")
|
||||
ErrUnknowType = errors.New("unknow type")
|
||||
ErrInitFactory = errors.New("init factory")
|
||||
)
|
||||
|
||||
@@ -4,16 +4,16 @@ import "context"
|
||||
|
||||
type Provider interface {
|
||||
Read(ctx context.Context, key Key) (Variable, error)
|
||||
NamedProvider
|
||||
}
|
||||
|
||||
type WatchCallback func(ctx context.Context, oldVar, newVar Variable)
|
||||
|
||||
type WatchProvider interface {
|
||||
Watch(ctx context.Context, key Key, callback WatchCallback) error
|
||||
NamedProvider
|
||||
}
|
||||
|
||||
type NamedProvider interface {
|
||||
Name() string
|
||||
type ReadConfig interface {
|
||||
Value(ctx context.Context, name string) (Value, error)
|
||||
}
|
||||
|
||||
type Factory func(ctx context.Context, cfg ReadConfig) (Provider, error)
|
||||
|
||||
@@ -47,7 +47,7 @@ type Provider struct {
|
||||
func (p *Provider) Watch(ctx context.Context, key config.Key, callback config.WatchCallback) error {
|
||||
oldVar, err := p.Provider.Read(ctx, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: failed watch variable: %w", p.Provider.Name(), err)
|
||||
return fmt.Errorf("failed watch variable: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
package yaml
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
"gitoa.ru/go-4devs/config"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func WithFileKeyFactory(f func(context.Context, config.Key) []string) FileOption {
|
||||
return func(p *File) {
|
||||
p.key = f
|
||||
}
|
||||
}
|
||||
|
||||
type FileOption func(*File)
|
||||
|
||||
func NewFile(name string, opts ...FileOption) *File {
|
||||
f := File{
|
||||
file: name,
|
||||
key: keyFactory,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&f)
|
||||
}
|
||||
|
||||
return &f
|
||||
}
|
||||
|
||||
type File struct {
|
||||
file string
|
||||
key func(context.Context, config.Key) []string
|
||||
}
|
||||
|
||||
func (p *File) Name() string {
|
||||
return "yaml_file"
|
||||
}
|
||||
|
||||
func (p *File) Read(ctx context.Context, key config.Key) (config.Variable, error) {
|
||||
in, err := ioutil.ReadFile(p.file)
|
||||
if err != nil {
|
||||
return config.Variable{}, fmt.Errorf("yaml_file: read error: %w", err)
|
||||
}
|
||||
|
||||
var n yaml.Node
|
||||
if err = yaml.Unmarshal(in, &n); err != nil {
|
||||
return config.Variable{}, fmt.Errorf("yaml_file: unmarshal error: %w", err)
|
||||
}
|
||||
|
||||
data := node{Node: &n}
|
||||
k := p.key(ctx, key)
|
||||
|
||||
return data.read(p.Name(), k)
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
|
||||
"gitoa.ru/go-4devs/config"
|
||||
@@ -17,22 +18,34 @@ func keyFactory(ctx context.Context, key config.Key) []string {
|
||||
return strings.Split(key.Name, "/")
|
||||
}
|
||||
|
||||
func NewFile(name string, opts ...Option) (*Provider, error) {
|
||||
in, err := ioutil.ReadFile(name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("yaml_file: read error: %w", err)
|
||||
}
|
||||
|
||||
return New(in, opts...)
|
||||
}
|
||||
|
||||
func New(yml []byte, opts ...Option) (*Provider, error) {
|
||||
var data yaml.Node
|
||||
if err := yaml.Unmarshal(yml, &data); err != nil {
|
||||
return nil, fmt.Errorf("yaml: unmarshal err: %w", err)
|
||||
}
|
||||
|
||||
return create(opts...).With(&data), nil
|
||||
}
|
||||
|
||||
func create(opts ...Option) *Provider {
|
||||
p := Provider{
|
||||
key: keyFactory,
|
||||
data: node{Node: &data},
|
||||
key: keyFactory,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&p)
|
||||
}
|
||||
|
||||
return &p, nil
|
||||
return &p
|
||||
}
|
||||
|
||||
type Option func(*Provider)
|
||||
@@ -52,6 +65,13 @@ func (p *Provider) Read(ctx context.Context, key config.Key) (config.Variable, e
|
||||
return p.data.read(p.Name(), k)
|
||||
}
|
||||
|
||||
func (p *Provider) With(data *yaml.Node) *Provider {
|
||||
return &Provider{
|
||||
key: p.key,
|
||||
data: node{Node: data},
|
||||
}
|
||||
}
|
||||
|
||||
type node struct {
|
||||
*yaml.Node
|
||||
}
|
||||
|
||||
42
provider/yaml/watch.go
Normal file
42
provider/yaml/watch.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package yaml
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
"gitoa.ru/go-4devs/config"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func NewWatch(name string, opts ...Option) *Watch {
|
||||
f := Watch{
|
||||
file: name,
|
||||
prov: create(opts...),
|
||||
}
|
||||
|
||||
return &f
|
||||
}
|
||||
|
||||
type Watch struct {
|
||||
file string
|
||||
prov *Provider
|
||||
}
|
||||
|
||||
func (p *Watch) Name() string {
|
||||
return "yaml_watch"
|
||||
}
|
||||
|
||||
func (p *Watch) Read(ctx context.Context, key config.Key) (config.Variable, error) {
|
||||
in, err := ioutil.ReadFile(p.file)
|
||||
if err != nil {
|
||||
return config.Variable{}, fmt.Errorf("yaml_file: read error: %w", err)
|
||||
}
|
||||
|
||||
var n yaml.Node
|
||||
if err = yaml.Unmarshal(in, &n); err != nil {
|
||||
return config.Variable{}, fmt.Errorf("yaml_file: unmarshal error: %w", err)
|
||||
}
|
||||
|
||||
return p.prov.With(&n).Read(ctx, key)
|
||||
}
|
||||
@@ -11,6 +11,7 @@
|
||||
},
|
||||
"cfg": {
|
||||
"duration": 1260000000000,
|
||||
"enabled": true
|
||||
"enabled": true,
|
||||
"type":"json"
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
app:
|
||||
title: yaml title
|
||||
name:
|
||||
var:
|
||||
- test
|
||||
@@ -10,3 +11,4 @@ time_var: "2020-01-02T15:04:05Z"
|
||||
cfg:
|
||||
duration: 21m
|
||||
enabled: true
|
||||
type: yaml
|
||||
|
||||
Reference in New Issue
Block a user