Skip to content

Commit

Permalink
Merge pull request hyperledger#1113 from kaleido-io/config-listener
Browse files Browse the repository at this point in the history
Dynamic configuration reload
  • Loading branch information
nguyer authored Jan 13, 2023
2 parents 21873ba + f23122e commit b69d4f2
Show file tree
Hide file tree
Showing 17 changed files with 2,633 additions and 1,362 deletions.
10 changes: 7 additions & 3 deletions cmd/firefly.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ func init() {

func resetConfig() {
coreconfig.Reset()
namespace.InitConfig()
apiserver.InitConfig()
}

func getRootManager() namespace.Manager {
if _utManager != nil {
return _utManager
}
return namespace.NewNamespaceManager(true)
return namespace.NewNamespaceManager()
}

// Execute is called by the main method of the package
Expand Down Expand Up @@ -142,14 +143,17 @@ func run() error {
mgr.WaitStop()
return nil
case <-resetChan:
// This API that performs a full stop/restart reset, is deprecated
// in favor of selective reload of namespaces based on listening to changes
// in the configuration file.
log.L(rootCtx).Infof("Restarting due to configuration change")
cancelRunCtx()
mgr.WaitStop()
// Must wait for the server to close before we restart
<-ffDone
// Re-read the configuration
resetConfig()
if err := config.ReadConfig(configSuffix, cfgFile); err != nil {
if err = config.ReadConfig(configSuffix, cfgFile); err != nil {
return err
}
case err := <-errChan:
Expand Down Expand Up @@ -186,7 +190,7 @@ func startFirefly(ctx context.Context, cancelCtx context.CancelFunc, mgr namespa
close(ffDone)
}()

if err = mgr.Init(ctx, cancelCtx, resetChan); err != nil {
if err = mgr.Init(ctx, cancelCtx, resetChan, resetConfig); err != nil {
errChan <- err
return
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/firefly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestShowConfig(t *testing.T) {

func TestExecEngineInitFail(t *testing.T) {
o := &namespacemocks.Manager{}
o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("splutter"))
o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("splutter"))
_utManager = o
defer func() { _utManager = nil }()
os.Chdir(configDir)
Expand All @@ -66,7 +66,7 @@ func TestExecEngineInitFail(t *testing.T) {

func TestExecEngineStartFail(t *testing.T) {
o := &namespacemocks.Manager{}
o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil)
o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
o.On("Start").Return(fmt.Errorf("bang"))
_utManager = o
defer func() { _utManager = nil }()
Expand All @@ -77,7 +77,7 @@ func TestExecEngineStartFail(t *testing.T) {

func TestExecOkExitSIGINT(t *testing.T) {
o := &namespacemocks.Manager{}
o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil)
o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
o.On("Start").Return(nil)
o.On("WaitStop").Return()
_utManager = o
Expand All @@ -93,7 +93,7 @@ func TestExecOkExitSIGINT(t *testing.T) {

func TestExecOkCancel(t *testing.T) {
o := &namespacemocks.Manager{}
init := o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil)
init := o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
init.RunFn = func(a mock.Arguments) {
cancelCtx := a[1].(context.CancelFunc)
cancelCtx()
Expand All @@ -111,7 +111,7 @@ func TestExecOkCancel(t *testing.T) {
func TestExecOkRestartThenExit(t *testing.T) {
o := &namespacemocks.Manager{}
initCount := 0
init := o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil)
init := o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
init.RunFn = func(a mock.Arguments) {
resetChan := a[2].(chan bool)
initCount++
Expand All @@ -137,7 +137,7 @@ func TestExecOkRestartConfigProblem(t *testing.T) {
assert.NoError(t, err)
defer os.RemoveAll(tmpDir)
var orContext context.Context
init := o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil)
init := o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
init.RunFn = func(a mock.Arguments) {
orContext = a[0].(context.Context)
resetChan := a[2].(chan bool)
Expand All @@ -158,7 +158,7 @@ func TestExecOkRestartConfigProblem(t *testing.T) {

func TestAPIServerError(t *testing.T) {
o := &namespacemocks.Manager{}
o.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil)
o.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
o.On("Start").Return(nil)
as := &apiservermocks.Server{}
as.On("Serve", mock.Anything, o).Return(fmt.Errorf("pop"))
Expand Down
2 changes: 1 addition & 1 deletion docs/config_docs_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestGenerateConfigDocs(t *testing.T) {
// Initialize config of all plugins
namespace.NewNamespaceManager(false)
namespace.InitConfig()
apiserver.InitConfig()
f, err := os.Create(filepath.Join("reference", "config.md"))
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion docs/config_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

func TestConfigDocsUpToDate(t *testing.T) {
// Initialize config of all plugins
namespace.NewNamespaceManager(false)
namespace.InitConfig()
apiserver.InitConfig()
generatedConfig, err := config.GenerateConfigMarkdown(context.Background(), configDocHeader, config.GetKnownKeys())
assert.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ nav_order: 2
|size|Max size of cached validators for data manager|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`<nil>`
|ttl|Time to live of cached validators for data manager|`string`|`<nil>`

## config

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|autoReload|Monitor the configuration file for changes, and automatically add/remove/reload namespaces and plugins|`boolean`|`<nil>`

## cors

|Key|Description|Type|Default Value|
Expand Down
3 changes: 1 addition & 2 deletions internal/apiserver/route_spi_post_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ var spiPostReset = &ffapi.Route{
JSONOutputCodes: []int{http.StatusNoContent},
Extensions: &coreExtensions{
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
cr.mgr.Reset(cr.ctx)
return nil, nil
return nil, cr.mgr.Reset(cr.ctx)
},
},
}
2 changes: 1 addition & 1 deletion internal/apiserver/route_spi_post_reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestAdminPostResetConfig(t *testing.T) {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

mgr.On("Reset", mock.Anything).Return()
mgr.On("Reset", mock.Anything).Return(nil)
r.ServeHTTP(res, req)

assert.Equal(t, 204, res.Result().StatusCode)
Expand Down
4 changes: 2 additions & 2 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
}

if ethconnectConf.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.ethereum.ethconnect")
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", ethconnectConf)
}
e.client = ffresty.New(e.ctx, ethconnectConf)

e.topic = ethconnectConf.GetString(EthconnectConfigTopic)
if e.topic == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.ethereum.ethconnect")
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", ethconnectConf)
}
e.prefixShort = ethconnectConf.GetString(EthconnectPrefixShort)
e.prefixLong = ethconnectConf.GetString(EthconnectPrefixLong)
Expand Down
3 changes: 3 additions & 0 deletions internal/coreconfig/coreconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ var (
// BroadcastBatchTimeout is the timeout to wait for a batch to fill, before sending
BroadcastBatchTimeout = ffc("broadcast.batch.timeout")

// ConfigAutoReload starts a filesystem listener against the config file, and if it changes analyzes the config file for changes that require individual namespaces to restart
ConfigAutoReload = ffc("config.autoReload")

// CacheEnabled determines whether cache will be enabled or not, default to true
CacheEnabled = ffc("cache.enabled")

Expand Down
2 changes: 2 additions & 0 deletions internal/coremsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
ConfigGlobalMigrationsDirectory = ffc("config.global.migrations.directory", "The directory containing the numerically ordered migration DDL files to apply to the database", i18n.StringType)
ConfigGlobalShutdownTimeout = ffc("config.global.shutdownTimeout", "The maximum amount of time to wait for any open HTTP requests to finish before shutting down the HTTP server", i18n.TimeDurationType)

ConfigConfigAutoReload = ffc("config.config.autoReload", "Monitor the configuration file for changes, and automatically add/remove/reload namespaces and plugins", i18n.BooleanType)

ConfigLegacyAdmin = ffc("config.admin.enabled", "Deprecated - use spi.enabled instead", i18n.BooleanType)
ConfigSPIAddress = ffc("config.spi.address", "The IP address on which the admin HTTP API should listen", "IP Address "+i18n.StringType)
ConfigSPIEnabled = ffc("config.spi.enabled", "Enables the admin HTTP API", i18n.BooleanType)
Expand Down
2 changes: 2 additions & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,6 @@ var (
MsgUnknownInterfaceFormat = ffe("FF10435", "Unknown interface format: %s", 400)
MsgUnknownNamespace = ffe("FF10436", "Unknown namespace '%s'", 404)
MsgMissingNamespace = ffe("FF10437", "Missing namespace in request", 400)
MsgDeprecatedResetWithAutoReload = ffe("FF10438", "The deprecated reset API cannot be used when dynamic config reload is enabled", 409)
MsgConfigArrayVsRawConfigMismatch = ffe("FF10439", "Error processing configuration - mismatch between raw and processed array lengths")
)
48 changes: 40 additions & 8 deletions internal/namespace/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
package namespace

import (
"github.com/hyperledger/firefly-common/pkg/auth/authfactory"
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly/internal/blockchain/bifactory"
"github.com/hyperledger/firefly/internal/coreconfig"
"github.com/hyperledger/firefly/internal/database/difactory"
"github.com/hyperledger/firefly/internal/dataexchange/dxfactory"
"github.com/hyperledger/firefly/internal/events/eifactory"
"github.com/hyperledger/firefly/internal/identity/iifactory"
"github.com/hyperledger/firefly/internal/sharedstorage/ssfactory"
"github.com/hyperledger/firefly/internal/tokens/tifactory"
"github.com/hyperledger/firefly/pkg/core"
)

Expand All @@ -29,11 +37,27 @@ const (
)

var (
namespaceConfig = config.RootSection("namespaces")
namespacePredefined = namespaceConfig.SubArray(NamespacePredefined)
namespaceConfigSection = config.RootSection("namespaces")
namespacePredefined = namespaceConfigSection.SubArray(NamespacePredefined)

blockchainConfig = config.RootArray("plugins.blockchain")
tokensConfig = config.RootArray("plugins.tokens")
databaseConfig = config.RootArray("plugins.database")
sharedstorageConfig = config.RootArray("plugins.sharedstorage")
dataexchangeConfig = config.RootArray("plugins.dataexchange")
identityConfig = config.RootArray("plugins.identity")
authConfig = config.RootArray("plugins.auth")
eventsConfig = config.RootSection("events") // still at root

// Deprecated configs
deprecatedTokensConfig = config.RootArray("tokens")
deprecatedBlockchainConfig = config.RootSection("blockchain")
deprecatedDatabaseConfig = config.RootSection("database")
deprecatedSharedStorageConfig = config.RootSection("sharedstorage")
deprecatedDataexchangeConfig = config.RootSection("dataexchange")
)

func InitConfig(withDefaults bool) {
func InitConfig() {
namespacePredefined.AddKnownKey(coreconfig.NamespaceName)
namespacePredefined.AddKnownKey(coreconfig.NamespaceDescription)
namespacePredefined.AddKnownKey(coreconfig.NamespacePlugins)
Expand All @@ -53,9 +77,17 @@ func InitConfig(withDefaults bool) {
contractConf.AddKnownKey(coreconfig.NamespaceMultipartyContractFirstEvent, string(core.SubOptsFirstEventOldest))
contractConf.AddKnownKey(coreconfig.NamespaceMultipartyContractLocation)

if withDefaults {
namespaceConfig.AddKnownKey(NamespacePredefined+".0."+coreconfig.NamespaceName, "default")
namespaceConfig.AddKnownKey(NamespacePredefined+".0."+coreconfig.NamespaceDescription, "Default predefined namespace")
namespaceConfig.AddKnownKey(NamespacePredefined+".0."+coreconfig.NamespaceAssetKeyNormalization, "blockchain_plugin")
}
bifactory.InitConfigDeprecated(deprecatedBlockchainConfig)
bifactory.InitConfig(blockchainConfig)
difactory.InitConfigDeprecated(deprecatedDatabaseConfig)
difactory.InitConfig(databaseConfig)
ssfactory.InitConfigDeprecated(deprecatedSharedStorageConfig)
ssfactory.InitConfig(sharedstorageConfig)
dxfactory.InitConfig(dataexchangeConfig)
dxfactory.InitConfigDeprecated(deprecatedDataexchangeConfig)
iifactory.InitConfig(identityConfig)
tifactory.InitConfigDeprecated(deprecatedTokensConfig)
tifactory.InitConfig(tokensConfig)
authfactory.InitConfigArray(authConfig)
eifactory.InitConfig(eventsConfig)
}
Loading

0 comments on commit b69d4f2

Please sign in to comment.