From e10ec648fa1e9fc313ff6f2b1ba1257e62bc3f97 Mon Sep 17 00:00:00 2001 From: Flavio Cirillo Date: Wed, 18 Sep 2024 01:10:18 +0200 Subject: [PATCH] - Implement loop avoidance also for NGSI-LD notification - Minimize subscription set for tasks - Remove some debug generation by setting debugenabled into the config --- broker/ngsild.go | 9 +++++ broker/ngsiv1.go | 2 +- broker/thinBroker.go | 30 ++++++++++---- common/config/config.go | 9 +++-- discovery/fastDiscovery.go | 5 ++- master/master.go | 81 ++++++++++++++++++-------------------- master/taskMgr.go | 66 +++++++++++++++++++------------ test/restsrv/restserver.py | 48 ++++++++++++++++++++++ 8 files changed, 168 insertions(+), 82 deletions(-) create mode 100644 test/restsrv/restserver.py diff --git a/broker/ngsild.go b/broker/ngsild.go index 7f2b53a0..780aeec4 100644 --- a/broker/ngsild.go +++ b/broker/ngsild.go @@ -25,6 +25,9 @@ func (tb *ThinBroker) NGSILD_UpdateContext(w rest.ResponseWriter, r *rest.Reques // DEBUG.Println(updateCtxReq) + // check and add the "Fiware-Correlator" header into the update message + updateCtxReq.Correlator = r.Header.Get("Fiware-Correlator") + if numUpdates > 0 { tb.handleInternalUpdateContext(&updateCtxReq) } @@ -46,6 +49,9 @@ func (tb *ThinBroker) NGSILD_CreateEntity(w rest.ResponseWriter, r *rest.Request updateCtxReq := UpdateContextRequest{} numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert) + // check and add the "Fiware-Correlator" header into the update message + updateCtxReq.Correlator = r.Header.Get("Fiware-Correlator") + // DEBUG.Println(updateCtxReq) if numUpdates > 0 { @@ -153,6 +159,9 @@ func (tb *ThinBroker) NGSILD_NotifyContext(w rest.ResponseWriter, r *rest.Reques updateCtxReq := UpdateContextRequest{} numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert) + // check and add the "Fiware-Correlator" header into the update message + updateCtxReq.Correlator = r.Header.Get("Fiware-Correlator") + if numUpdates > 0 { tb.handleInternalUpdateContext(&updateCtxReq) } diff --git a/broker/ngsiv1.go b/broker/ngsiv1.go index 261db141..6663b0a8 100644 --- a/broker/ngsiv1.go +++ b/broker/ngsiv1.go @@ -15,7 +15,7 @@ func (tb *ThinBroker) NGSIV1_UpdateContext(w rest.ResponseWriter, r *rest.Reques updateCtxReq := UpdateContextRequest{} err := r.DecodeJsonPayload(&updateCtxReq) if err != nil { - DEBUG.Println("not able to decode the orion updates") + ERROR.Println("not able to decode the orion updates") rest.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/broker/thinBroker.go b/broker/thinBroker.go index ab43eba5..25df2de0 100644 --- a/broker/thinBroker.go +++ b/broker/thinBroker.go @@ -41,6 +41,8 @@ type ThinBroker struct { entityId2Subcriptions map[string][]string e2sub_lock sync.RWMutex + isDebugEnabled bool + //counter of heartbeat counter int64 } @@ -68,6 +70,8 @@ func (tb *ThinBroker) Start(cfg *Config) { tb.myProfile.BID = tb.myEntityId tb.myProfile.MyURL = cfg.GetExternalBrokerURL() + tb.isDebugEnabled = cfg.Logging.DebugEnabled + // register itself to the IoT discovery tb.registerMyself() } @@ -203,7 +207,9 @@ func (tb *ThinBroker) getEntity(eid string) *ContextElement { } func (tb *ThinBroker) deleteEntity(eid string) error { - DEBUG.Println(" TO REMOVE ENTITY ", eid) + if tb.isDebugEnabled { + DEBUG.Println(" TO REMOVE ENTITY ", eid) + } //remove it from the local entity map tb.entities_lock.Lock() @@ -444,17 +450,21 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri originator := subscription.Subscriber.Correlator if correlator != "" && originator != "" && correlator == originator { beTheSame = true - DEBUG.Println("session ID from producer ", correlator, ", subscriber ", originator) + if tb.isDebugEnabled { + DEBUG.Println("session ID from producer ", correlator, ", subscriber ", originator) + } } } tb.subscriptions_lock.RUnlock() - if beTheSame == true { - DEBUG.Println(" ======= producer and subscriber are the same ===========") + if beTheSame { + if tb.isDebugEnabled { + DEBUG.Println(" ======= producer and subscriber are the same ===========") + } continue } - if checkSelectedAttributes == true { + if checkSelectedAttributes { selectedAttributes := make([]string, 0) tb.subscriptions_lock.RLock() @@ -533,8 +543,10 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement, DestinationBroker := subscription.Subscriber.DestinationType Tenant := subscription.Subscriber.Tenant - if subscription.Subscriber.RequireReliability == true && len(subscription.Subscriber.NotifyCache) > 0 { - DEBUG.Println("resend notify: ", len(subscription.Subscriber.NotifyCache)) + if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 { + if tb.isDebugEnabled { + DEBUG.Println("resend notify: ", len(subscription.Subscriber.NotifyCache)) + } for _, pCtxElem := range subscription.Subscriber.NotifyCache { elements = append(elements, *pCtxElem) } @@ -548,7 +560,9 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement, err := postNotifyContext(elements, sid, subscriberURL, DestinationBroker, Tenant, tb.SecurityCfg) if err != nil { - DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL) + if tb.isDebugEnabled { + DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL) + } tb.subscriptions_lock.Lock() if subscription, exist := tb.subscriptions[sid]; exist { diff --git a/common/config/config.go b/common/config/config.go index 0e9fc4f1..45508468 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -46,10 +46,11 @@ type Config struct { Location PhysicalLocation `json:"physical_location"` SiteID string `json:"site_id"` Logging struct { - Info string `json:"info"` - Protocol string `json:"protocol"` - Errlog string `json:"error"` - Debug string `json:"debug"` + Info string `json:"info"` + Protocol string `json:"protocol"` + Errlog string `json:"error"` + Debug string `json:"debug"` + DebugEnabled bool `json:"debugEnabled"` } `json:"logging"` Discovery struct { HostIP string `json:"host_ip"` diff --git a/discovery/fastDiscovery.go b/discovery/fastDiscovery.go index 9bb8b8ea..56d29e43 100644 --- a/discovery/fastDiscovery.go +++ b/discovery/fastDiscovery.go @@ -50,6 +50,8 @@ type FastDiscovery struct { delayStoreOnFile int storeOnDisk bool + isDebugEnabled bool + // lock to control the update subscriptions in database subscriptionsDbLock sync.RWMutex storeSubscriptionsOnFileScheduled bool @@ -72,6 +74,7 @@ func (fd *FastDiscovery) Init(config *Config) { fd.storeSubscriptionsOnFileScheduled = false fd.storeBrokersOnFileScheduled = false fd.storeOnDisk = config.Discovery.StoreOnDisk + fd.isDebugEnabled = config.Logging.DebugEnabled //INFO.Println("config.Discovery.DelayStoreRegistrationsOnFile ", config.Discovery.DelayStoreStoreOnFile) fd.delayStoreOnFile = config.Discovery.DelayStoreOnFile @@ -281,7 +284,7 @@ func (fd *FastDiscovery) SubscribeContextAvailability(w rest.ResponseWriter, r * go fd.handleSubscribeCtxAvailability(&subscribeCtxAvailabilityReq) } -//receive updateContextAvailability for subscription +// receive updateContextAvailability for subscription func (fd *FastDiscovery) UpdateLDContextAvailability(w rest.ResponseWriter, r *rest.Request) { sid := r.PathParam("sid") subscribeCtxAvailabilityReq := SubscribeContextAvailabilityRequest{} diff --git a/master/master.go b/master/master.go index e88439cd..4badccd4 100644 --- a/master/master.go +++ b/master/master.go @@ -58,6 +58,8 @@ type Master struct { prevNumOfTask int counter_lock sync.RWMutex + isDebugEnabled bool + //type of subscribed entities subID2Type map[string]string } @@ -70,6 +72,8 @@ func (master *Master) Start(configuration *Config) { master.discoveryURL = configuration.GetDiscoveryURL() master.designerURL = configuration.GetDesignerURL() + master.isDebugEnabled = configuration.Logging.DebugEnabled + master.workers = make(map[string]*WorkerProfile) master.operatorList = make(map[string]Operator) @@ -154,7 +158,7 @@ func (master *Master) onTimer() { master.workerList_lock.Lock() for workerID, worker := range master.workers { duration := master.cfg.Worker.HeartbeatInterval * master.cfg.Worker.DetectionDuration - if worker.IsLive(duration) == false { + if worker.IsLive(duration) { delete(master.workers, workerID) INFO.Println("REMOVE worker " + workerID + " from the list") } @@ -265,43 +269,43 @@ func (master *Master) contextRegistration2EntityRegistration(entityId *EntityId, return &entityRegistration } -func (master *Master) contextRegistration2EntityRegistration_tbd(entityId *EntityId, ctxRegistration *ContextRegistration) *EntityRegistration { - entityRegistration := EntityRegistration{} +// func (master *Master) contextRegistration2EntityRegistration_tbd(entityId *EntityId, ctxRegistration *ContextRegistration) *EntityRegistration { +// entityRegistration := EntityRegistration{} - ctxObj := master.RetrieveContextEntity(entityId.ID) - if ctxObj == nil { - entityRegistration.ID = entityId.ID - entityRegistration.Type = entityId.Type +// ctxObj := master.RetrieveContextEntity(entityId.ID) +// if ctxObj == nil { +// entityRegistration.ID = entityId.ID +// entityRegistration.Type = entityId.Type - entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute) - entityRegistration.MetadataList = make(map[string]ContextMetadata) - } else { - entityRegistration.ID = ctxObj.Entity.ID - entityRegistration.Type = ctxObj.Entity.Type +// entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute) +// entityRegistration.MetadataList = make(map[string]ContextMetadata) +// } else { +// entityRegistration.ID = ctxObj.Entity.ID +// entityRegistration.Type = ctxObj.Entity.Type - entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute) - for attrName, attrValue := range ctxObj.Attributes { - attributeRegistration := ContextRegistrationAttribute{} - attributeRegistration.Name = attrName - attributeRegistration.Type = attrValue.Type - entityRegistration.AttributesList[attrName] = attributeRegistration - } +// entityRegistration.AttributesList = make(map[string]ContextRegistrationAttribute) +// for attrName, attrValue := range ctxObj.Attributes { +// attributeRegistration := ContextRegistrationAttribute{} +// attributeRegistration.Name = attrName +// attributeRegistration.Type = attrValue.Type +// entityRegistration.AttributesList[attrName] = attributeRegistration +// } - entityRegistration.MetadataList = make(map[string]ContextMetadata) - for metaname, ctxmeta := range ctxObj.Metadata { - cm := ContextMetadata{} - cm.Name = metaname - cm.Type = ctxmeta.Type - cm.Value = ctxmeta.Value +// entityRegistration.MetadataList = make(map[string]ContextMetadata) +// for metaname, ctxmeta := range ctxObj.Metadata { +// cm := ContextMetadata{} +// cm.Name = metaname +// cm.Type = ctxmeta.Type +// cm.Value = ctxmeta.Value - entityRegistration.MetadataList[metaname] = cm - } - } +// entityRegistration.MetadataList[metaname] = cm +// } +// } - entityRegistration.ProvidingApplication = ctxRegistration.ProvidingApplication +// entityRegistration.ProvidingApplication = ctxRegistration.ProvidingApplication - return &entityRegistration -} +// return &entityRegistration +// } func (master *Master) subscribeContextAvailability(availabilitySubscription *SubscribeContextAvailabilityRequest) string { availabilitySubscription.Reference = master.myURL + "/notifyContextAvailability" @@ -324,9 +328,7 @@ func (master *Master) unsubscribeContextAvailability(sid string) { } } -// // to deal with the communication between master and workers via rabbitmq -// func (master *Master) Process(msg *RecvMessage) error { switch msg.Type { case "WORKER_JOIN": @@ -467,9 +469,7 @@ func (master *Master) RemoveInputEntity(flowInfo FlowInfo) { master.communicator.Publish(&taskMsg) } -// // the shared functions for function manager and topology manager to call -// func (master *Master) RetrieveContextEntity(eid string) *ContextObject { query := QueryContextRequest{} @@ -507,9 +507,7 @@ func (master *Master) GetStatus(w rest.ResponseWriter, r *rest.Request) { w.WriteJson(profile) } -// // to select the worker that is closest to the given points -// func (master *Master) SelectWorker(locations []Point) string { master.workerList_lock.RLock() defer master.workerList_lock.RUnlock() @@ -527,7 +525,10 @@ func (master *Master) SelectWorker(locations []Point) string { closestTotalDistance := uint64(math.MaxUint64) for _, worker := range master.workers { // if this worker is already overloaded, check the next one - if worker.IsOverloaded() == true { + if worker.IsOverloaded() { + master.isDebugEnabled { + DEBUG.Println("Worker", worker.WID, " has reached its capacity of ", worker.Capacity, " with ", worker.Workload, " tasks running") + } continue } @@ -555,9 +556,7 @@ func (master *Master) SelectWorker(locations []Point) string { return closestWorkerID } -// // query the topology from Designer based on the given name -// func (master *Master) getTopologyByName(name string) *Topology { designerURL := fmt.Sprintf("%s/topology/%s", master.cfg.GetDesignerURL(), name) fmt.Println(designerURL) @@ -598,9 +597,7 @@ func (master *Master) getTopologyByName(name string) *Topology { return &topology } -// // to select the right docker image of an operator for the selected worker -// func (master *Master) DetermineDockerImage(operatorName string, wID string) string { master.workerList_lock.RLock() wProfile := master.workers[wID] diff --git a/master/taskMgr.go b/master/taskMgr.go index a84447f6..1754aab9 100644 --- a/master/taskMgr.go +++ b/master/taskMgr.go @@ -99,7 +99,7 @@ func (gf *GroupInfo) GetHash() string { sortedpairs := make([]*KVPair, 0) for k, v := range *gf { - DEBUG.Printf("group k: %s, v: %+v\r\n", k, v) + DEBUG.Println("group k: %s, v: %+v\r\n", k, v) kvpair := KVPair{} kvpair.Key = k @@ -149,9 +149,7 @@ func (flow *FogFlow) Init() { flow.DeploymentPlan = make(map[string]*ScheduledTaskInstance) } -// // to update the execution plan based on the changes of registered context availability -// func (flow *FogFlow) MetadataDrivenTaskOrchestration(subID string, entityAction string, registredEntity *EntityRegistration, workerSelection ProximityWorkerSelectionFn) []*DeploymentAction { if _, exist := flow.Subscriptions[subID]; exist == false { DEBUG.Println(subID, "subscription does not exist any more") @@ -196,10 +194,8 @@ func (flow *FogFlow) MetadataDrivenTaskOrchestration(subID string, entityAction return nil } -// // to check if we already received some context registration // for all required and subscribed context availability -// func (flow *FogFlow) checkInputAvailability() bool { for _, inputSubscription := range flow.Subscriptions { if len(inputSubscription.ReceivedEntityRegistrations) == 0 { @@ -210,9 +206,7 @@ func (flow *FogFlow) checkInputAvailability() bool { return true } -// // check the available of all required input stream for a specific task instance -// func (flow *FogFlow) checkInputsOfTaskInstance(taskCfg *TaskConfig) bool { INFO.Println(taskCfg) INFO.Println(flow.Intent.TaskObject) @@ -251,13 +245,22 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp for _, entity := range entitiesList { newInput := true for _, input := range task.Inputs { - if input.ID == entity.ID { - newInput = false - break + // If (input.ID != "") it means that (selector.Scoped || selector.GroupBy == "EntityID") + // see later in this file why that + if input.ID != "" { + if input.ID == entity.ID { + newInput = false + break + } + } else { + if input.Type == entity.Type { + newInput = false + break + } } } - if newInput == true { + if newInput { DEBUG.Printf("new input %+v to task %+v\r\n", entity, task) inputEntity := InputEntity{} @@ -287,7 +290,7 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp // check if the location in this input entity is changed locationChanged := false for i := 0; i < len(task.Inputs); i++ { - if task.Inputs[i].ID == entity.ID && task.Inputs[i].Location.IsEqual(&entity.Location) == false { + if task.Inputs[i].ID == entity.ID && task.Inputs[i].Location.IsEqual(&entity.Location) { locationChanged = true DEBUG.Println("[location changed] entity: ", entity.ID) // update the input entities with the new location @@ -298,7 +301,7 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp // if the location is changed, calculate the new optimal worker assignment newOptimalWorkerID := task.WorkerID - if locationChanged == true { + if locationChanged { locations := make([]Point, 0) for _, input := range task.Inputs { locations = append(locations, input.Location) @@ -314,11 +317,11 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp if newOptimalWorkerID != task.WorkerID { INFO.Println("[Task migration]:", task.TaskID, " migrated from ", task.WorkerID, " to ", newOptimalWorkerID) - if (task.WorkerID != ""){ + if task.WorkerID != "" { removeTaskAction := flow.removeExistingTask(task) deploymentActions = append(deploymentActions, removeTaskAction) } - + task.WorkerID = newOptimalWorkerID addTaskAction := flow.addNewTask(task) @@ -429,7 +432,7 @@ func (flow *FogFlow) removeExecutionPlan(entityID string, inputSubscription *Inp task.removeInput(entityID) //if any of the input streams is delete, the task will be terminated - if flow.checkInputsOfTaskInstance(task) == false { + if flow.checkInputsOfTaskInstance(task) { // remove this task DEBUG.Printf("removing an existing task %+v\r\n", task) @@ -505,7 +508,7 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID if groupKey == "ALL" { key := name + "-" + groupKey _, exist := flow.UniqueKeys[key] - if exist == false { + if exist { flow.UniqueKeys[key] = make([]interface{}, 0) flow.UniqueKeys[key] = append(flow.UniqueKeys[key], "ALL") } @@ -534,7 +537,7 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID } } - if inList == false { + if inList { flow.UniqueKeys[key] = append(flow.UniqueKeys[key], value) } } else { // create a new key @@ -631,10 +634,25 @@ func (flow *FogFlow) searchRelevantEntities(group *GroupInfo, updatedEntityID st } // filtering + entityloop: for _, entityRegistration := range inputSub.ReceivedEntityRegistrations { - if entityRegistration.IsMatched(restrictions) == true { + if entityRegistration.IsMatched(restrictions) { inputEntity := InputEntity{} - inputEntity.ID = entityRegistration.ID + + // The following if is to check if it is really necessary to subscribe for each matching entity (that might thousands) + // it is necessary it is grouped per entityID + // and it is necessary if it is scoped because only the entities within the scope + if selector.Scoped || selector.GroupBy == "EntityID" { + inputEntity.ID = entityRegistration.ID + } else { + // if we are here, it is because we want to subscribe per type and not care the entityId + // In this way the subscription will be minimal and faster + for _, entity := range entities { + if inputEntity.Type == entity.Type { + continue entityloop + } + } + } inputEntity.Type = entityRegistration.Type inputEntity.AttributeList = selector.SelectedAttributes @@ -692,9 +710,7 @@ func (tMgr *TaskMgr) Init() { tMgr.subID2FogFunc = make(map[string]string) } -// // deal with received task intents -// func (tMgr *TaskMgr) handleTaskIntentUpdate(intentCtxObj *ContextObject) { INFO.Println("handle taskintent update") INFO.Println(intentCtxObj) @@ -859,15 +875,13 @@ func (tMgr *TaskMgr) selector2Subscription(inputSelector *InputStreamConfig, geo return subscriptionId } -// // the main function to deal with data-driven and context aware task orchestration -// func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction string, entityRegistration *EntityRegistration) { INFO.Println("[Registration update]: ", subID, entityAction, entityRegistration.ID) tMgr.subID2FogFunc_lock.RLock() funcName, fogFunctionExist := tMgr.subID2FogFunc[subID] - if fogFunctionExist == false { + if fogFunctionExist { INFO.Println("this subscripption is not issued by me") tMgr.subID2FogFunc_lock.RUnlock() return @@ -879,7 +893,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction defer tMgr.fogFlows_lock.Unlock() fogflow, fogFlowExist := tMgr.fogFlows[funcName] - if fogFlowExist == false { + if fogFlowExist { INFO.Println("no flow established for this function: ", funcName) return } diff --git a/test/restsrv/restserver.py b/test/restsrv/restserver.py new file mode 100644 index 00000000..f0923d9e --- /dev/null +++ b/test/restsrv/restserver.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +""" +Very simple HTTP server in python for logging requests +Usage:: + ./server.py [] +""" +from http.server import BaseHTTPRequestHandler, HTTPServer +import logging + +class S(BaseHTTPRequestHandler): + def _set_response(self): + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + + def do_GET(self): + logging.info("GET request,\nPath: %s\nHeaders:\n%s\n", str(self.path), str(self.headers)) + self._set_response() + self.wfile.write("GET request for {}".format(self.path).encode('utf-8')) + + def do_POST(self): + content_length = int(self.headers['Content-Length']) # <--- Gets the size of data + post_data = self.rfile.read(content_length) # <--- Gets the data itself + logging.info("POST request,\nPath: %s\nHeaders:\n%s\n\nBody:\n%s\n", + str(self.path), str(self.headers), post_data.decode('utf-8')) + + self._set_response() + self.wfile.write("POST request for {}".format(self.path).encode('utf-8')) + +def run(server_class=HTTPServer, handler_class=S, port=8080): + logging.basicConfig(level=logging.INFO) + server_address = ('', port) + httpd = server_class(server_address, handler_class) + logging.info('Starting httpd...\n') + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass + httpd.server_close() + logging.info('Stopping httpd...\n') + +if __name__ == '__main__': + from sys import argv + + if len(argv) == 2: + run(port=int(argv[1])) + else: + run()