Skip to content

Commit

Permalink
Auto config caching
Browse files Browse the repository at this point in the history
  • Loading branch information
z4kn4fein committed Nov 11, 2024
1 parent 14abad0 commit 25bcb4b
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 33 deletions.
150 changes: 127 additions & 23 deletions sdk/sdk_auto_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/puzpuzpuz/xsync/v3"
"io"
"net/http"
"net/url"
"time"
)

Expand All @@ -38,6 +39,8 @@ type sdkConfigModel struct {
}

type autoRegistrar struct {
options *optionsModel
cacheKey string
sdkClients *xsync.MapOf[string, Client]
httpClient *http.Client
ctx context.Context
Expand All @@ -53,6 +56,16 @@ type autoRegistrar struct {

func newAutoRegistrar(conf *config.Config, metricsReporter metrics.Reporter, statusReporter status.Reporter, externalCache configcat.ConfigCache, log log.Logger) (*autoRegistrar, error) {
regLog := log.WithPrefix("auto-sdk-registrar").WithLevel(conf.AutoSDK.Log.GetLevel())
var transport = http.DefaultTransport.(*http.Transport)
if !conf.GlobalOfflineConfig.Enabled && conf.HttpProxy.Url != "" {
proxyUrl, err := url.Parse(conf.HttpProxy.Url)
if err != nil {
regLog.Errorf("failed to parse proxy url: %s", conf.HttpProxy.Url)
} else {
transport.Proxy = http.ProxyURL(proxyUrl)
regLog.Reportf("using HTTP proxy: %s", conf.HttpProxy.Url)
}
}
ctx, cancel := context.WithCancel(context.Background())
registrar := &autoRegistrar{
conf: conf,
Expand All @@ -62,24 +75,35 @@ func newAutoRegistrar(conf *config.Config, metricsReporter metrics.Reporter, sta
externalCache: externalCache,
log: regLog,
Publisher: pubsub.NewPublisher[string](),
httpClient: http.DefaultClient,
ctx: ctx,
ctxCancel: cancel,
httpClient: &http.Client{
Timeout: 30 * time.Second,
Transport: transport,
},
ctx: ctx,
ctxCancel: cancel,
cacheKey: "configcat-proxy-conf/" + conf.AutoSDK.Key,
}

timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*15)
defer timeoutCancel()
autoConfig, err := registrar.fetchConfig(timeoutCtx)
autoConfig, err := registrar.getConfig(timeoutCtx)
if err != nil {
regLog.Errorf("%v", err)
return nil, err
}
registrar.options = &autoConfig.Options
for sdkId, sdkModel := range autoConfig.SDKs {
sdkConfig := registrar.buildSdkConfig(sdkId, sdkModel, &autoConfig.Options)
sdkConfig := registrar.buildSdkConfig(sdkId, sdkModel)
statusReporter.RegisterSdk(sdkId, sdkConfig)
registrar.sdkClients.Store(sdkId, registrar.buildSdkClient(sdkId, sdkConfig))
}
registrar.poller = time.NewTicker(time.Duration(conf.AutoSDK.PollInterval) * time.Second)
var interval time.Duration
if conf.GlobalOfflineConfig.Enabled {
interval = time.Duration(conf.GlobalOfflineConfig.CachePollInterval) * time.Second
} else {
interval = time.Duration(conf.AutoSDK.PollInterval) * time.Second
}
registrar.poller = time.NewTicker(interval)
go registrar.run()

return registrar, nil
Expand Down Expand Up @@ -117,7 +141,7 @@ func (r *autoRegistrar) run() {
for {
select {
case <-r.poller.C:
autoConfig, err := r.fetchConfig(r.ctx)
autoConfig, err := r.getConfig(r.ctx)
if err != nil {
r.log.Errorf("%v", err)
} else {
Expand All @@ -128,6 +152,11 @@ func (r *autoRegistrar) run() {
toDeleteKeys := utils.Except(existingKeys, remoteKeys)

r.deleteSdkClients(toDeleteKeys)
if r.shouldUpdateOptions(&autoConfig.Options) {
r.options = &autoConfig.Options
resetKeys := utils.Except(remoteKeys, toAddKeys)
r.resetSdkClients(resetKeys, autoConfig)
}
r.addSdkClients(toAddKeys, autoConfig)
}
case <-r.ctx.Done():
Expand All @@ -136,10 +165,57 @@ func (r *autoRegistrar) run() {
}
}

func (r *autoRegistrar) fetchConfig(ctx context.Context) (*proxyConfigModel, error) {
url := r.conf.AutoSDK.BaseUrl + "/v1/proxy/config"
r.log.Debugf("fetching remote configuration from %s?key=%s", url, r.conf.AutoSDK.Key)
request, err := http.NewRequestWithContext(ctx, "GET", url, nil)
func (r *autoRegistrar) getConfig(ctx context.Context) (*proxyConfigModel, error) {
if r.conf.GlobalOfflineConfig.Enabled {
if r.externalCache == nil {
return nil, fmt.Errorf("could not load auto sdk configuration: offline mode is enabled without an external cache")
}
cached, err := r.externalCache.Get(ctx, r.cacheKey)
if err != nil {
return nil, fmt.Errorf("could not read a valid auto sdk configuration from cache: %v", err)
}
if len(cached) == 0 {
return nil, fmt.Errorf("no valid auto sdk configuration found in cache")
}
return r.parseConfig(cached)
}
fetched, err := r.fetchConfig(ctx)
if err != nil {
if r.externalCache == nil {
return nil, err
}
r.log.Errorf("could not fetch auto sdk configuration, falling back to cache: %v", err)
cached, err := r.externalCache.Get(ctx, r.cacheKey)
if err != nil {
return nil, fmt.Errorf("could not read a valid auto sdk configuration from cache: %v", err)
}
if len(cached) == 0 {
return nil, fmt.Errorf("no valid auto sdk configuration found in cache")
}
return r.parseConfig(cached)
}
if r.externalCache != nil {
err = r.externalCache.Set(ctx, r.cacheKey, fetched)
if err != nil {
r.log.Errorf("could not write the auto sdk configuration to cache: %v", err)
}
}
return r.parseConfig(fetched)
}

func (r *autoRegistrar) parseConfig(body []byte) (*proxyConfigModel, error) {
parsed := proxyConfigModel{}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, fmt.Errorf("error during parsing auto sdk configuration: %v", err)
}
r.log.Debugf("auto sdk configuration loaded, got %d SDK keys", len(parsed.SDKs))
return &parsed, nil
}

func (r *autoRegistrar) fetchConfig(ctx context.Context) ([]byte, error) {
apiUrl := r.conf.AutoSDK.BaseUrl + "/v1/proxy/config"
r.log.Debugf("fetching remote configuration from %s?key=%s", apiUrl, r.conf.AutoSDK.Key)
request, err := http.NewRequestWithContext(ctx, "GET", apiUrl, nil)
if err != nil {
return nil, fmt.Errorf("error during fetching remote configuration: %v", err)
}
Expand All @@ -150,35 +226,38 @@ func (r *autoRegistrar) fetchConfig(ctx context.Context) (*proxyConfigModel, err
if err != nil {
return nil, fmt.Errorf("error during fetching remote configuration: %v", err)
}
defer response.Body.Close()
defer func() {
_ = response.Body.Close()
}()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("error during fetching remote configuration, status code: %d", response.StatusCode)
}
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("error during reading configuration response: %v", err)
}
parsedResponse := proxyConfigModel{}
if err := json.Unmarshal(body, &parsedResponse); err != nil {
return nil, fmt.Errorf("error during parsing configuration response: %v", err)
}
r.log.Debugf("remote configuration fetch was successful, got %d SDK keys", len(parsedResponse.SDKs))
return &parsedResponse, nil
return body, nil
}

func (r *autoRegistrar) buildSdkConfig(sdkId string, sdkModel *sdkConfigModel, opts *optionsModel) *config.SDKConfig {
func (r *autoRegistrar) buildSdkConfig(sdkId string, sdkModel *sdkConfigModel) *config.SDKConfig {
sdkConfig := &config.SDKConfig{
BaseUrl: opts.BaseUrl,
BaseUrl: r.options.BaseUrl,
Key: sdkModel.SDKKey,
PollInterval: opts.PollInterval,
DataGovernance: opts.DataGovernance,
PollInterval: r.options.PollInterval,
DataGovernance: r.options.DataGovernance,
Log: r.conf.AutoSDK.Log,
}
if localSdkConfig, ok := r.conf.SDKs[sdkId]; ok {
sdkConfig.DefaultAttrs = localSdkConfig.DefaultAttrs
sdkConfig.Offline = localSdkConfig.Offline
sdkConfig.Log = localSdkConfig.Log
}
if r.conf.GlobalOfflineConfig.Enabled && !sdkConfig.Offline.Enabled {
sdkConfig.Offline.Enabled = true
sdkConfig.Offline.UseCache = true
sdkConfig.Offline.CachePollInterval = r.conf.GlobalOfflineConfig.CachePollInterval
sdkConfig.Offline.Log = r.conf.GlobalOfflineConfig.Log
}
return sdkConfig
}

Expand Down Expand Up @@ -219,7 +298,7 @@ func (r *autoRegistrar) addSdkClients(sdkIds []string, config *proxyConfigModel)
r.log.Debugf("adding %d SDK clients", len(sdkIds))
for _, sdkId := range sdkIds {
if sdkModel, ok := config.SDKs[sdkId]; ok {
sdkConfig := r.buildSdkConfig(sdkId, sdkModel, &config.Options)
sdkConfig := r.buildSdkConfig(sdkId, sdkModel)
if _, loaded := r.sdkClients.LoadOrCompute(sdkId, func() Client {
return r.buildSdkClient(sdkId, sdkConfig)
}); !loaded {
Expand All @@ -229,3 +308,28 @@ func (r *autoRegistrar) addSdkClients(sdkIds []string, config *proxyConfigModel)
}
}
}

func (r *autoRegistrar) resetSdkClients(sdkIds []string, config *proxyConfigModel) {
if len(sdkIds) == 0 {
r.log.Debugf("no SDK clients to reset")
return
}

r.log.Debugf("resetting %d SDK clients", len(sdkIds))
for _, sdkId := range sdkIds {
if sdkModel, ok := config.SDKs[sdkId]; ok {
sdkConfig := r.buildSdkConfig(sdkId, sdkModel)
sdkClient := r.buildSdkClient(sdkId, sdkConfig)
if existing, loaded := r.sdkClients.LoadAndStore(sdkId, sdkClient); loaded {
existing.Close()
r.Publish(sdkId)
}
}
}
}

func (r *autoRegistrar) shouldUpdateOptions(options *optionsModel) bool {
return r.options.BaseUrl != options.BaseUrl ||
r.options.PollInterval != options.PollInterval ||
r.options.DataGovernance != options.DataGovernance
}
6 changes: 4 additions & 2 deletions stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ func (s *server) run() {
func (s *server) handleSdkId(sdkId string) {
sdkClient := s.sdkRegistrar.GetSdkOrNil(sdkId)
if sdkClient != nil {
s.streams.LoadOrCompute(sdkId, func() Stream {
if str, loaded := s.streams.LoadOrCompute(sdkId, func() Stream {
return NewStream(sdkId, sdkClient, s.metrics, s.log, s.serverType)
})
}); loaded {
str.ResetSdk(sdkClient)
}
} else {
if str, ok := s.streams.LoadAndDelete(sdkId); ok {
str.Close()
Expand Down
30 changes: 22 additions & 8 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"github.com/configcat/configcat-proxy/model"
"github.com/configcat/configcat-proxy/sdk"
"hash/maphash"
"sync/atomic"
)

type Stream interface {
CanEval(key string) bool
IsInValidState() bool
CreateConnection(key string, user model.UserAttrs) *Connection
CloseConnection(conn *Connection, key string)
ResetSdk(client sdk.Client)
Close()
Closed() <-chan struct{}
}
Expand All @@ -29,7 +31,7 @@ type connClosed struct {
}

type stream struct {
sdkClient sdk.Client
sdkClient atomic.Value
sdkConfigChanged chan struct{}
stop chan struct{}
log log.Logger
Expand All @@ -50,13 +52,14 @@ func NewStream(sdkId string, sdkClient sdk.Client, metrics metrics.Reporter, log
connClosed: make(chan *connClosed),
stop: make(chan struct{}),
sdkConfigChanged: make(chan struct{}, 1),
sdkClient: sdkClient,
sdkClient: atomic.Value{},
log: log.WithPrefix("stream-" + sdkId),
serverType: serverType,
sdkId: sdkId,
metrics: metrics,
seed: maphash.MakeSeed(),
}
s.sdkClient.Store(sdkClient)
sdkClient.Subscribe(s.sdkConfigChanged)
go s.run()
return s
Expand Down Expand Up @@ -91,7 +94,7 @@ func (s *stream) run() {
}

func (s *stream) CanEval(key string) bool {
keys := s.sdkClient.Keys()
keys := s.sdkClient.Load().(sdk.Client).Keys()
for _, k := range keys {
if k == key {
return true
Expand All @@ -101,7 +104,7 @@ func (s *stream) CanEval(key string) bool {
}

func (s *stream) IsInValidState() bool {
return s.sdkClient.IsInValidState()
return s.sdkClient.Load().(sdk.Client).IsInValidState()
}

func (s *stream) CreateConnection(key string, user model.UserAttrs) *Connection {
Expand All @@ -128,9 +131,20 @@ func (s *stream) CloseConnection(conn *Connection, key string) {
}
}

func (s *stream) ResetSdk(client sdk.Client) {
select {
case <-s.stop:
return
default:
old := s.sdkClient.Swap(client).(sdk.Client)
old.Unsubscribe(s.sdkConfigChanged)
client.Subscribe(s.sdkConfigChanged)
}
}

func (s *stream) Close() {
close(s.stop)
s.sdkClient.Unsubscribe(s.sdkConfigChanged)
s.sdkClient.Load().(sdk.Client).Unsubscribe(s.sdkConfigChanged)
s.log.Reportf("shutdown complete")
}

Expand All @@ -141,13 +155,13 @@ func (s *stream) Closed() <-chan struct{} {
func (s *stream) addConnection(established *connEstablished) {
bucket, ok := s.channels[established.key]
if !ok {
ch := createChannel(established, s.sdkClient)
ch := createChannel(established, s.sdkClient.Load().(sdk.Client))
bucket = map[uint64]channel{established.conn.discriminator: ch}
s.channels[established.key] = bucket
}
ch, ok := bucket[established.conn.discriminator]
if !ok {
ch = createChannel(established, s.sdkClient)
ch = createChannel(established, s.sdkClient.Load().(sdk.Client))
bucket[established.conn.discriminator] = ch
}
ch.AddConnection(established.conn)
Expand Down Expand Up @@ -179,7 +193,7 @@ func (s *stream) notifyConnections() {
sent := 0
for key, bucket := range s.channels {
for _, ch := range bucket {
count := ch.Notify(s.sdkClient, key)
count := ch.Notify(s.sdkClient.Load().(sdk.Client), key)
sent += count
if s.metrics != nil {
s.metrics.AddSentMessageCount(count, s.sdkId, s.serverType, key)
Expand Down

0 comments on commit 25bcb4b

Please sign in to comment.