From 6481986c7d752ffb957e9bd26dabc5c38acc844f Mon Sep 17 00:00:00 2001 From: Mohammed Date: Mon, 26 Feb 2024 05:10:45 +0000 Subject: [PATCH 1/2] add visorClients to redis store | add client_type value to dmsgclient conf --- internal/dmsg-discovery/api/api.go | 15 +++++++++++++++ internal/dmsg-discovery/store/redis.go | 16 ++++++++++++++++ internal/dmsg-discovery/store/storer.go | 3 +++ internal/dmsg-discovery/store/testing.go | 21 +++++++++++++++++++++ pkg/disc/entry.go | 4 ++++ pkg/dmsg/client.go | 21 +++++++++++++++------ pkg/dmsg/entity_common.go | 5 +++-- 7 files changed, 77 insertions(+), 8 deletions(-) diff --git a/internal/dmsg-discovery/api/api.go b/internal/dmsg-discovery/api/api.go index f97ac8f29..1c2432fae 100644 --- a/internal/dmsg-discovery/api/api.go +++ b/internal/dmsg-discovery/api/api.go @@ -89,6 +89,7 @@ func New(log logrus.FieldLogger, db store.Storer, m discmetrics.Metrics, testMod r.Post("/dmsg-discovery/entry/{pk}", api.setEntry()) r.Delete("/dmsg-discovery/entry", api.delEntry()) r.Get("/dmsg-discovery/entries", api.allEntries()) + r.Get("/dmsg-discovery/visorEntries", api.allVisorEntries()) r.Delete("/dmsg-discovery/deregister", api.deregisterEntry()) r.Get("/dmsg-discovery/available_servers", api.getAvailableServers()) r.Get("/dmsg-discovery/all_servers", api.getAllServers()) @@ -163,6 +164,20 @@ func (a *API) allEntries() func(w http.ResponseWriter, r *http.Request) { } } +// allVisorEntries returns all visor client entries connected to dmsg +// URI: /dmsg-discovery/visorEntries +// Method: GET +func (a *API) allVisorEntries() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + entries, err := a.db.AllVisorEntries(r.Context()) + if err != nil { + a.handleError(w, r, err) + return + } + a.writeJSON(w, r, http.StatusOK, entries) + } +} + // deregisterEntry deletes the client entry associated with the PK requested by the network monitor // URI: /dmsg-discovery/deregister/:pk // Method: DELETE diff --git a/internal/dmsg-discovery/store/redis.go b/internal/dmsg-discovery/store/redis.go index 013feea46..ce2203a12 100644 --- a/internal/dmsg-discovery/store/redis.go +++ b/internal/dmsg-discovery/store/redis.go @@ -93,6 +93,13 @@ func (r *redisStore) SetEntry(ctx context.Context, entry *disc.Entry, timeout ti return disc.ErrUnexpected } } + if entry.ClientType == "visor" { + err = r.client.SAdd(ctx, "visorClients", entry.Static.Hex()).Err() + if err != nil { + log.WithError(err).Errorf("Failed to add to visorClients (SAdd) from redis") + return disc.ErrUnexpected + } + } return nil } @@ -107,6 +114,7 @@ func (r *redisStore) DelEntry(ctx context.Context, staticPubKey cipher.PubKey) e // Delete pubkey from servers or clients set stored r.client.SRem(ctx, "servers", staticPubKey.Hex()) r.client.SRem(ctx, "clients", staticPubKey.Hex()) + r.client.SRem(ctx, "visorClients", staticPubKey.Hex()) return nil } @@ -233,3 +241,11 @@ func (r *redisStore) AllEntries(ctx context.Context) ([]string, error) { } return clients, err } + +func (r *redisStore) AllVisorEntries(ctx context.Context) ([]string, error) { + clients, err := r.client.SMembers(ctx, "visorClients").Result() + if err != nil { + return nil, err + } + return clients, err +} diff --git a/internal/dmsg-discovery/store/storer.go b/internal/dmsg-discovery/store/storer.go index 2258f9a60..61fbee3d7 100644 --- a/internal/dmsg-discovery/store/storer.go +++ b/internal/dmsg-discovery/store/storer.go @@ -46,6 +46,9 @@ type Storer interface { // AllEntries returns all clients PKs. AllEntries(ctx context.Context) ([]string, error) + + // AllVisorEntries returns all clients PKs. + AllVisorEntries(ctx context.Context) ([]string, error) } // Config configures the Store object. diff --git a/internal/dmsg-discovery/store/testing.go b/internal/dmsg-discovery/store/testing.go index 69c5e4c63..e619c3f84 100644 --- a/internal/dmsg-discovery/store/testing.go +++ b/internal/dmsg-discovery/store/testing.go @@ -217,3 +217,24 @@ func (ms *MockStore) AllEntries(_ context.Context) ([]string, error) { } return entries, nil } + +// AllVisorEntries implements Storer CountEntries method for MockStore +func (ms *MockStore) AllVisorEntries(_ context.Context) ([]string, error) { + entries := []string{} + + ms.mLock.RLock() + defer ms.mLock.RUnlock() + + clients := arrayFromMap(ms.m) + for _, entryString := range clients { + var e disc.Entry + + err := json.Unmarshal(entryString, &e) + if err != nil { + return nil, disc.ErrUnexpected + } + + entries = append(entries, e.String()) + } + return entries, nil +} diff --git a/pkg/disc/entry.go b/pkg/disc/entry.go index 44b365a18..77a4a2312 100644 --- a/pkg/disc/entry.go +++ b/pkg/disc/entry.go @@ -115,6 +115,9 @@ type Entry struct { // Contains the instance's client meta if it's to be advertised as a DMSG Client. Client *Client `json:"client,omitempty"` + // ClientType the instance's client_type meta if it's to be advertised as a DMSG Client. + ClientType string `json:"client_type,omitempty"` + // Contains the instance's server meta if it's to be advertised as a DMSG Server. Server *Server `json:"server,omitempty"` @@ -131,6 +134,7 @@ func (e *Entry) String() string { res += fmt.Sprintf("\tsignature: %s\n", e.Signature) if e.Client != nil { + res += fmt.Sprintf("\tclient type: %s\n", e.ClientType) indentedStr := strings.Replace(e.Client.String(), "\n\t", "\n\t\t\t", -1) res += fmt.Sprintf("\tentry is registered as client. Related info: \n\t\t%s\n", indentedStr) } diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index 9d199f699..e6ad91a76 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -32,10 +32,10 @@ type ClientCallbacks struct { func (sc *ClientCallbacks) ensure() { if sc.OnSessionDial == nil { - sc.OnSessionDial = func(network, addr string) (err error) { return nil } + sc.OnSessionDial = func(network, addr string) (err error) { return nil } //nolint } if sc.OnSessionDisconnect == nil { - sc.OnSessionDisconnect = func(network, addr string, err error) {} + sc.OnSessionDisconnect = func(network, addr string, err error) {} //nolint } } @@ -44,6 +44,7 @@ type Config struct { MinSessions int UpdateInterval time.Duration // Duration between discovery entry updates. Callbacks *ClientCallbacks + ClientType string } // Ensure ensures all config values are set. @@ -108,10 +109,9 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Conf // Init callback: on set session. c.EntityCommon.setSessionCallback = func(ctx context.Context) error { - if err := c.EntityCommon.updateClientEntry(ctx, c.done); err != nil { + if err := c.EntityCommon.updateClientEntry(ctx, c.done, c.conf.ClientType); err != nil { return err } - // Client is 'ready' once we have successfully updated the discovery entry // with at least one delegated server. c.readyOnce.Do(func() { close(c.ready) }) @@ -120,7 +120,7 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Conf // Init callback: on delete session. c.EntityCommon.delSessionCallback = func(ctx context.Context) error { - err := c.EntityCommon.updateClientEntry(ctx, c.done) + err := c.EntityCommon.updateClientEntry(ctx, c.done, c.conf.ClientType) return err } @@ -458,7 +458,7 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client // AllStreams returns all the streams of the current client. func (ce *Client) AllStreams() (out []*Stream) { - fn := func(port uint16, pv netutil.PorterValue) (next bool) { + fn := func(port uint16, pv netutil.PorterValue) (next bool) { //nolint if str, ok := pv.Value.(*Stream); ok { out = append(out, str) return true @@ -485,6 +485,15 @@ func (ce *Client) AllEntries(ctx context.Context) (entries []string, err error) return entries, err } +// AllVisorEntries returns all the entries registered in discovery that are visor +func (ce *Client) AllVisorEntries(ctx context.Context) (entries []string, err error) { + err = netutil.NewDefaultRetrier(ce.log).Do(ctx, func() error { + entries, err = ce.dc.AllEntries(ctx) + return err + }) + return entries, err +} + // ConnectedServersPK return keys of all connected dmsg servers func (ce *Client) ConnectedServersPK() []string { sessions := ce.allClientSessions(ce.porter) diff --git a/pkg/dmsg/entity_common.go b/pkg/dmsg/entity_common.go index 64382ab78..ee0d98438 100644 --- a/pkg/dmsg/entity_common.go +++ b/pkg/dmsg/entity_common.go @@ -225,7 +225,7 @@ func (c *EntityCommon) updateServerEntryLoop(ctx context.Context, addr string, m } } -func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{}) (err error) { +func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{}, clientType string) (err error) { if isClosed(done) { return nil } @@ -245,12 +245,13 @@ func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{} entry, err := c.dc.Entry(ctx, c.pk) if err != nil { entry = disc.NewClientEntry(c.pk, 0, srvPKs) + entry.ClientType = clientType if err := entry.Sign(c.sk); err != nil { return err } return c.dc.PostEntry(ctx, entry) } - + entry.ClientType = clientType entry.Client.DelegatedServers = srvPKs c.log.WithField("entry", entry).Debug("Updating entry.") return c.dc.PutEntry(ctx, c.sk, entry) From 0cc33d6fd1e15db1de8915f5b3ebc50784810ec0 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Mon, 26 Feb 2024 05:32:03 +0000 Subject: [PATCH 2/2] remote useless code --- pkg/disc/entry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/disc/entry.go b/pkg/disc/entry.go index 77a4a2312..365b270ce 100644 --- a/pkg/disc/entry.go +++ b/pkg/disc/entry.go @@ -134,7 +134,6 @@ func (e *Entry) String() string { res += fmt.Sprintf("\tsignature: %s\n", e.Signature) if e.Client != nil { - res += fmt.Sprintf("\tclient type: %s\n", e.ClientType) indentedStr := strings.Replace(e.Client.String(), "\n\t", "\n\t\t\t", -1) res += fmt.Sprintf("\tentry is registered as client. Related info: \n\t\t%s\n", indentedStr) }