diff --git a/go.mod b/go.mod index 7d749f31bf..6789961d89 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/robert-nix/ansihtml v1.0.1 github.com/shibukawa/configdir v0.0.0-20170330084843-e180dbdc8da0 github.com/sirupsen/logrus v1.9.3 - github.com/skycoin/dmsg v1.3.21 + github.com/skycoin/dmsg v1.3.22-0.20240502214137-b684f7064155 github.com/skycoin/skycoin v0.27.1 github.com/skycoin/skycoin-service-discovery v0.0.0-20240306165129-2af10aca698e github.com/skycoin/skywire-services v0.0.0-20240403004908-50ccbbf07004 diff --git a/go.sum b/go.sum index 32bc798c2e..accb274e21 100644 --- a/go.sum +++ b/go.sum @@ -481,6 +481,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/skycoin/dmsg v1.3.21 h1:31Jx5pPAUDNUxZmLUORSspvo9OTdtXOBY++Tzb/jotM= github.com/skycoin/dmsg v1.3.21/go.mod h1:INEDx+ECwCGQWw/Kd0QcLmSWMhbeRRcfkxj+xATQGFg= +github.com/skycoin/dmsg v1.3.22-0.20240502214137-b684f7064155 h1:yy/bheBsI2PXGgl3ose3avsBGSJnI0ukrCTdbB92pSc= +github.com/skycoin/dmsg v1.3.22-0.20240502214137-b684f7064155/go.mod h1:INEDx+ECwCGQWw/Kd0QcLmSWMhbeRRcfkxj+xATQGFg= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o= github.com/skycoin/skycoin v0.27.1 h1:HatxsRwVSPaV4qxH6290xPBmkH/HgiuAoY2qC+e8C9I= diff --git a/pkg/transport/network/stcpr.go b/pkg/transport/network/stcpr.go index 7186e18c94..3e882ca59f 100644 --- a/pkg/transport/network/stcpr.go +++ b/pkg/transport/network/stcpr.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "time" "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/netutil" @@ -89,6 +90,17 @@ func (c *stcprClient) serve() { c.log.Errorf("Failed to bind STCPR: %v", err) return } + // simple heartbeat process for stcpr + go func() { + for { + time.Sleep(5 * time.Minute) + if err := c.ar.BindSTCPR(context.Background(), port); err != nil { + c.log.Errorf("Failed to bind STCPR: %v", err) + continue + } + c.log.Infof("STCPR rebinded in heartbeating process") + } + }() c.log.Debugf("Successfully bound stcpr to port %s", port) c.acceptTransports(lis) } diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go index 679629a5c2..0973245a77 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go @@ -60,7 +60,7 @@ func (c *Config) Ensure() { func DefaultConfig() *Config { conf := &Config{ MinSessions: DefaultMinSessions, - UpdateInterval: DefaultUpdateInterval, + UpdateInterval: DefaultUpdateInterval * 5, } return conf } @@ -153,6 +153,8 @@ func (ce *Client) Serve(ctx context.Context) { } }(cancellabelCtx) + updateEntryLoopOnce := new(sync.Once) + for { if isClosed(ce.done) { return @@ -253,6 +255,10 @@ func (ce *Client) Serve(ctx context.Context) { ce.serveWait() } } + + // Only start the update entry loop once we have at least one session established. + updateEntryLoopOnce.Do(func() { go ce.updateClientEntryLoop(cancellabelCtx, ce.done, ce.conf.ClientType) }) + // We dial all servers and wait for error or done signal. select { case <-ce.done: diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/entity_common.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/entity_common.go index ee0d984387..4779984ee8 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/entity_common.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/entity_common.go @@ -251,12 +251,50 @@ func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{} } return c.dc.PostEntry(ctx, entry) } + + // Whether the client's CURRENT delegated servers is the same as what would be advertised. + sameSrvPKs := cipher.SamePubKeys(srvPKs, entry.Client.DelegatedServers) + + // No update is needed if delegated servers has no delta, and an entry update is not due. + if _, due := c.updateIsDue(); sameSrvPKs && !due { + return nil + } + entry.ClientType = clientType entry.Client.DelegatedServers = srvPKs c.log.WithField("entry", entry).Debug("Updating entry.") return c.dc.PutEntry(ctx, c.sk, entry) } +func (c *EntityCommon) updateClientEntryLoop(ctx context.Context, done chan struct{}, clientType string) { + t := time.NewTimer(c.updateInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-t.C: + if lastUpdate, due := c.updateIsDue(); !due { + t.Reset(c.updateInterval - time.Since(lastUpdate)) + continue + } + + c.sessionsMx.Lock() + err := c.updateClientEntry(ctx, done, clientType) + c.sessionsMx.Unlock() + + if err != nil { + c.log.WithError(err).Warn("Failed to update discovery entry.") + } + + // Ensure we trigger another update within given 'updateInterval'. + t.Reset(c.updateInterval) + } + } +} + func (c *EntityCommon) delEntry(ctx context.Context) (err error) { entry, err := c.dc.Entry(ctx, c.pk) diff --git a/vendor/modules.txt b/vendor/modules.txt index d143f01e34..3e25ac7ad1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -486,7 +486,7 @@ github.com/shirou/gopsutil/process ## explicit; go 1.13 github.com/sirupsen/logrus github.com/sirupsen/logrus/hooks/syslog -# github.com/skycoin/dmsg v1.3.21 +# github.com/skycoin/dmsg v1.3.22-0.20240502214137-b684f7064155 ## explicit; go 1.21 github.com/skycoin/dmsg/cmd/dmsg-discovery/commands github.com/skycoin/dmsg/cmd/dmsg-server/commands