diff --git a/pkg/http/types.go b/pkg/http/types.go index c8982759..f2de3561 100644 --- a/pkg/http/types.go +++ b/pkg/http/types.go @@ -7,6 +7,8 @@ type ServerOptions struct { } type ClientOptions struct { - URL string - PrivateKey string + URL string + PrivateKey string + PublicAddress string + Type string } diff --git a/pkg/http/websocket_server.go b/pkg/http/websocket_server.go index 91e15ba9..206055d2 100644 --- a/pkg/http/websocket_server.go +++ b/pkg/http/websocket_server.go @@ -21,12 +21,19 @@ type ConnectionWrapper struct { mu sync.Mutex } +type WSConnectionParams struct { + ID string + Type string +} + // StartWebSocketServer starts a WebSocket server func StartWebSocketServer( r *mux.Router, path string, messageChan chan []byte, ctx context.Context, + connectCB func(params WSConnectionParams), + disconnectCB func(params WSConnectionParams), ) { var mutex = &sync.Mutex{} @@ -88,7 +95,13 @@ func StartWebSocketServer( log.Error().Msgf("Error upgrading websocket: %s", err.Error()) return } + params := r.URL.Query() + connParams := WSConnectionParams{ + ID: params.Get("ID"), + Type: params.Get("Type"), + } defer conn.Close() + connectCB(connParams) addConnection(conn) log.Debug(). @@ -98,10 +111,14 @@ func StartWebSocketServer( messageType, _, err := conn.ReadMessage() if err != nil { log.Trace().Msgf("Client disconnected: %s", err.Error()) + removeConnection(conn) + disconnectCB(connParams) break } if messageType == websocket.CloseMessage { log.Trace().Msgf("Received close frame from client.") + removeConnection(conn) + disconnectCB(connParams) break } } diff --git a/pkg/jobcreator/controller.go b/pkg/jobcreator/controller.go index f40e85fc..fecce780 100644 --- a/pkg/jobcreator/controller.go +++ b/pkg/jobcreator/controller.go @@ -42,10 +42,13 @@ func NewJobCreatorController( return nil, err } - solverClient, err := solver.NewSolverClient(http.ClientOptions{ - URL: solverUrl, - PrivateKey: options.Web3.PrivateKey, - }) + solverClient, err := solver.NewSolverClient( + http.ClientOptions{ + URL: solverUrl, + PrivateKey: options.Web3.PrivateKey, + Type: "JobCreator", + PublicAddress: web3SDK.GetAddress().String(), + }) if err != nil { return nil, err } diff --git a/pkg/mediator/controller.go b/pkg/mediator/controller.go index 2b1eeda2..ddc6e44c 100644 --- a/pkg/mediator/controller.go +++ b/pkg/mediator/controller.go @@ -51,10 +51,13 @@ func NewMediatorController( return nil, err } - solverClient, err := solver.NewSolverClient(http.ClientOptions{ - URL: solverUrl, - PrivateKey: options.Web3.PrivateKey, - }) + solverClient, err := solver.NewSolverClient( + http.ClientOptions{ + URL: solverUrl, + PrivateKey: options.Web3.PrivateKey, + Type: "Mediator", + PublicAddress: web3SDK.GetAddress().String(), + }) if err != nil { log.Error().Msgf("error NewSolverClient") return nil, err diff --git a/pkg/metricsDashboard/logger.go b/pkg/metricsDashboard/logger.go index 6dc6f791..6f41f079 100644 --- a/pkg/metricsDashboard/logger.go +++ b/pkg/metricsDashboard/logger.go @@ -11,11 +11,13 @@ import ( "github.com/lilypad-tech/lilypad/pkg/data" ) -var host = os.Getenv("API_HOST") -var endpoint = "metrics-dashboard/logs" -var url = host + endpoint +const jobsEndpoint = "jobs" +const nodeInfoEndpoint = "nodes" +const nodeConnectionEndpoint = "uptimes" -func TrackEvent(json string) { +var host = os.Getenv("API_HOST") + "metrics-dashboard/" + +func TrackEvent(url string, json string) { data := []byte(json) client := &http.Client{Timeout: time.Second * 1} @@ -31,6 +33,7 @@ func TrackEvent(json string) { } func TrackJobOfferUpdate(evOffer data.JobOfferContainer) { + var url = host + jobsEndpoint var module = evOffer.JobOffer.Module.Name if module == "" { module = evOffer.JobOffer.Module.Repo + ":" + evOffer.JobOffer.Module.Hash @@ -49,5 +52,34 @@ func TrackJobOfferUpdate(evOffer data.JobOfferContainer) { byts, _ := json.Marshal(data) payload := string(byts) - TrackEvent(payload) + TrackEvent(url, payload) +} + +func TrackNodeInfo(resourceOffer data.ResourceOffer) { + var url = host + nodeInfoEndpoint + + data := map[string]interface{}{ + "ID": resourceOffer.ResourceProvider, + "GPU": resourceOffer.Spec.GPU, + "CPU": resourceOffer.Spec.CPU, + "RAM": resourceOffer.Spec.RAM, + "Modules": resourceOffer.Modules, + } + byts, _ := json.Marshal(data) + payload := string(byts) + + TrackEvent(url, payload) +} + +func TrackNodeConnectionEvent(event string, ID string) { + var url = host + nodeConnectionEndpoint + data := map[string]interface{}{ + "ID": ID, + "Event": event, + "Time": time.Now().UnixMilli(), + } + byts, _ := json.Marshal(data) + payload := string(byts) + + TrackEvent(url, payload) } diff --git a/pkg/resourceprovider/controller.go b/pkg/resourceprovider/controller.go index a653f584..456e1a2c 100644 --- a/pkg/resourceprovider/controller.go +++ b/pkg/resourceprovider/controller.go @@ -49,10 +49,13 @@ func NewResourceProviderController( return nil, err } - solverClient, err := solver.NewSolverClient(http.ClientOptions{ - URL: solverUrl, - PrivateKey: options.Web3.PrivateKey, - }) + solverClient, err := solver.NewSolverClient( + http.ClientOptions{ + URL: solverUrl, + PrivateKey: options.Web3.PrivateKey, + Type: "ResourceProvider", + PublicAddress: web3SDK.GetAddress().String(), + }) if err != nil { return nil, err } diff --git a/pkg/solver/client.go b/pkg/solver/client.go index 9d2bcca4..3f75fae7 100644 --- a/pkg/solver/client.go +++ b/pkg/solver/client.go @@ -49,8 +49,9 @@ func (client *SolverClient) Start(ctx context.Context, cm *system.CleanupManager } } }() + websocketURL := fmt.Sprintf("%s%s%s%s%s", http.WEBSOCKET_SUB_PATH, "?&Type=", client.options.Type, "&ID=", client.options.PublicAddress) http.ConnectWebSocket( - http.WebsocketURL(client.options, http.WEBSOCKET_SUB_PATH), + http.WebsocketURL(client.options, websocketURL), websocketEventChannel, ctx, ) diff --git a/pkg/solver/controller.go b/pkg/solver/controller.go index fbde7e7a..7286d0dc 100644 --- a/pkg/solver/controller.go +++ b/pkg/solver/controller.go @@ -6,6 +6,7 @@ import ( "time" "github.com/lilypad-tech/lilypad/pkg/data" + "github.com/lilypad-tech/lilypad/pkg/metricsDashboard" "github.com/lilypad-tech/lilypad/pkg/solver/store" "github.com/lilypad-tech/lilypad/pkg/system" "github.com/lilypad-tech/lilypad/pkg/web3" @@ -20,6 +21,7 @@ type SolverEventType string const ( JobOfferAdded SolverEventType = "JobOfferAdded" ResourceOfferAdded SolverEventType = "ResourceOfferAdded" + ResourceOfferRemoved SolverEventType = "ResourceOfferRemoved" DealAdded SolverEventType = "DealAdded" JobOfferStateUpdated SolverEventType = "JobOfferStateUpdated" ResourceOfferStateUpdated SolverEventType = "ResourceOfferStateUpdated" @@ -321,10 +323,13 @@ func (controller *SolverController) addResourceOffer(resourceOffer data.Resource controller.log.Info("add resource offer", resourceOffer) + metricsDashboard.TrackNodeInfo(resourceOffer) + ret, err := controller.store.AddResourceOffer(data.GetResourceOfferContainer(resourceOffer)) if err != nil { return nil, err } + controller.writeEvent(SolverEvent{ EventType: ResourceOfferAdded, ResourceOffer: ret, @@ -332,6 +337,31 @@ func (controller *SolverController) addResourceOffer(resourceOffer data.Resource return ret, nil } +func (controller *SolverController) removeResourceOfferBYResourceProvider(ID string) error { + controller.log.Info("remove resource offer", ID) + resourceOffers, err := controller.store.GetResourceOffers(store.GetResourceOffersQuery{ + ResourceProvider: ID, + }) + if err != nil { + return err + } + + if len(resourceOffers) == 0 { + return nil + } + + err = controller.store.RemoveResourceOffer(resourceOffers[0].ID) + if err != nil { + return err + } + + controller.writeEvent(SolverEvent{ + EventType: ResourceOfferRemoved, + ResourceOffer: nil, + }) + return nil +} + func (controller *SolverController) addDeal(deal data.Deal) (*data.DealContainer, error) { id, err := data.GetDealID(deal) if err != nil { diff --git a/pkg/solver/server.go b/pkg/solver/server.go index 00ecbcc6..2c2ba41a 100644 --- a/pkg/solver/server.go +++ b/pkg/solver/server.go @@ -11,11 +11,12 @@ import ( "path/filepath" "time" + "github.com/gorilla/mux" "github.com/lilypad-tech/lilypad/pkg/data" "github.com/lilypad-tech/lilypad/pkg/http" + "github.com/lilypad-tech/lilypad/pkg/metricsDashboard" "github.com/lilypad-tech/lilypad/pkg/solver/store" "github.com/lilypad-tech/lilypad/pkg/system" - "github.com/gorilla/mux" "github.com/rs/zerolog/log" ) @@ -95,6 +96,8 @@ func (solverServer *solverServer) ListenAndServe(ctx context.Context, cm *system http.WEBSOCKET_SUB_PATH, websocketEventChannel, ctx, + solverServer.connectCB, + solverServer.disconnectCB, ) srv := &corehttp.Server{ @@ -131,6 +134,20 @@ func (solverServer *solverServer) ListenAndServe(ctx context.Context, cm *system return nil } +// WS connect events +func (solverServer *solverServer) connectCB(connParams http.WSConnectionParams) { + if connParams.Type == "ResourceProvider" { + metricsDashboard.TrackNodeConnectionEvent("Connect", connParams.ID) + } +} + +func (solverServer *solverServer) disconnectCB(connParams http.WSConnectionParams) { + if connParams.Type == "ResourceProvider" { + metricsDashboard.TrackNodeConnectionEvent("Disconnect", connParams.ID) + solverServer.controller.removeResourceOfferBYResourceProvider(connParams.ID) + } +} + /* * *