Skip to content

Commit

Permalink
Merge pull request #405 from flaviocirillo/fogflow-fcfork
Browse files Browse the repository at this point in the history
Bypass some operations when a fogfunction is looking only for a type …
  • Loading branch information
smartfog authored Sep 26, 2024
2 parents e72afcd + a2e009e commit 9be1751
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 70 deletions.
12 changes: 8 additions & 4 deletions broker/ngsiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,16 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
tb.subscriptions[subID] = &subReq
tb.subscriptions_lock.Unlock()
// take actions
if subReq.Subscriber.IsInternal == true {
if subReq.Subscriber.IsInternal {
INFO.Println("internal subscription coming from another broker")

for _, entity := range subReq.Entities {
tb.e2sub_lock.Lock()
tb.entityId2Subcriptions[entity.ID] = append(tb.entityId2Subcriptions[entity.ID], subID)
if tb.subscriptions[subID].IsSimpleByType() {
tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], subID)
} else {
tb.entityId2Subcriptions[entity.ID] = append(tb.entityId2Subcriptions[entity.ID], subID)
}
tb.e2sub_lock.Unlock()
}
tb.notifyOneSubscriberWithCurrentStatus(subReq.Entities, subID)
Expand Down Expand Up @@ -250,13 +254,13 @@ func (tb *ThinBroker) NGSIV1_NotifyContextAvailability(w rest.ResponseWriter, r
//map it to the main subscription
tb.subLinks_lock.Lock()
mainSubID, exist := tb.availabilitySub2MainSub[subID]
if exist == false {
if !exist {
DEBUG.Println("put it into the tempCache and handle it later")
tb.tmpNGSI9NotifyCache[subID] = &notifyContextAvailabilityReq
}
tb.subLinks_lock.Unlock()

if exist == true {
if exist {
tb.handleNGSI9Notify(mainSubID, &notifyContextAvailabilityReq)
}
}
91 changes: 63 additions & 28 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func (tb *ThinBroker) OnTimer() { // for every 2 second
hasCachedNotification := false
tb.subscriptions_lock.Lock()
if subscription, exist := tb.subscriptions[sid]; exist {
if subscription.Subscriber.RequireReliability == true && len(subscription.Subscriber.NotifyCache) > 0 {
if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 {
hasCachedNotification = true
}
}
tb.subscriptions_lock.Unlock()

if hasCachedNotification == true {
if hasCachedNotification {
elements := make([]ContextElement, 0)
tb.sendReliableNotify(elements, sid)
}
Expand Down Expand Up @@ -663,7 +663,17 @@ func (tb *ThinBroker) UnsubscribeContextAvailability(sid string) error {
return err
}

func stringsContains(slice []string, e string) bool {
for _, sliceElement := range slice {
if sliceElement == e {
return true
}
}
return false
}

func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabilityReq *NotifyContextAvailabilityRequest) {

var action string
notifyContextAvailabilityReq.ErrorCode.Code = 301
switch notifyContextAvailabilityReq.ErrorCode.Code {
Expand All @@ -674,15 +684,30 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
case 410:
action = "DELETE"
}
INFO.Println(action, " subID ", mainSubID)

if tb.isDebugEnabled {
DEBUG.Println(action, " subID ", mainSubID, " subscription isSimpleByType ", tb.subscriptions[mainSubID].IsSimpleByType())
DEBUG.Println(tb.entityId2Subcriptions)
}

for _, registrationResp := range notifyContextAvailabilityReq.ContextRegistrationResponseList {
registration := registrationResp.ContextRegistration
for _, eid := range registration.EntityIdList {
INFO.Println("===> ", eid, " , ", mainSubID)

if tb.isDebugEnabled {
DEBUG.Println("===> ", eid, " , ", mainSubID)
}

tb.e2sub_lock.Lock()

if action == "CREATE" {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
if tb.subscriptions[mainSubID].IsSimpleByType() {
if !stringsContains(tb.entityId2Subcriptions["*"], mainSubID) {
tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], mainSubID)
}
} else {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
}
} else if action == "DELETE" {
subList := tb.entityId2Subcriptions[eid.ID]
for i, id := range subList {
Expand All @@ -692,16 +717,16 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
}
}
} else if action == "UPDATE" {
existFlag := false
for _, subID := range tb.entityId2Subcriptions[eid.ID] {
if subID == mainSubID {
existFlag = true
break
if tb.subscriptions[mainSubID].IsSimpleByType() {
if !stringsContains(tb.entityId2Subcriptions["*"], mainSubID) {
tb.entityId2Subcriptions["*"] = append(tb.entityId2Subcriptions["*"], mainSubID)
}
} else {
if !stringsContains(tb.entityId2Subcriptions[eid.ID], mainSubID) {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
}
}
if existFlag == false {
tb.entityId2Subcriptions[eid.ID] = append(tb.entityId2Subcriptions[eid.ID], mainSubID)
}

}

tb.e2sub_lock.Unlock()
Expand All @@ -717,24 +742,34 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
tb.notifyOneSubscriberWithCurrentStatus(registration.EntityIdList, mainSubID)
}
} else {
//for matched entities provided by other IoT Brokers
newSubscription := SubscribeContextRequest{}
newSubscription.Entities = registration.EntityIdList
newSubscription.Reference = tb.MyURL
newSubscription.Subscriber.BrokerURL = registration.ProvidingApplication

if action == "CREATE" || action == "UPDATE" {
sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg)
if err == nil {
// INFO.Println("issue a new subscription ", sid)
// this check is to subscribe to the data only for complex subscription
if !tb.subscriptions[mainSubID].IsSimpleByType() || stringsContains(tb.entityId2Subcriptions["*"], mainSubID) {

//for matched entities provided by other IoT Brokers
newSubscription := SubscribeContextRequest{}
if tb.subscriptions[mainSubID].IsSimpleByType() {
entity := tb.subscriptions[mainSubID].Entities[0]
newSubscription.Entities = append(newSubscription.Entities, entity)
} else {
newSubscription.Entities = registration.EntityIdList
}
newSubscription.Reference = tb.MyURL
newSubscription.Subscriber.BrokerURL = registration.ProvidingApplication

if action == "CREATE" || action == "UPDATE" {
sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg)
if err == nil {
// INFO.Println("issue a new subscription ", sid)

tb.subscriptions_lock.Lock()
tb.subscriptions[sid] = &newSubscription
tb.subscriptions_lock.Unlock()
tb.subscriptions_lock.Lock()
tb.subscriptions[sid] = &newSubscription
tb.subscriptions_lock.Unlock()

tb.subLinks_lock.Lock()
tb.main2Other[mainSubID] = append(tb.main2Other[mainSubID], sid)
tb.subLinks_lock.Unlock()
tb.subLinks_lock.Lock()
tb.main2Other[mainSubID] = append(tb.main2Other[mainSubID], sid)
tb.subLinks_lock.Unlock()
}
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -227,7 +226,7 @@ func (c *Config) GetMessageBus() string {
var logTargets map[string]io.Writer = map[string]io.Writer{
"stdout": os.Stdout,
"stderr": os.Stderr,
"discard": ioutil.Discard,
"discard": io.Discard,
}

func (c *Config) SetLogTargets() {
Expand All @@ -238,7 +237,7 @@ func (c *Config) SetLogTargets() {
INFO = log.New(target, "INFO: ", log.Ldate|log.Ltime)
target, ok = logTargets[c.Logging.Protocol]
if !ok {
target = ioutil.Discard
target = io.Discard
}
PROTOCOL = log.New(target, "PROTOCOL: ", log.Ldate|log.Ltime)
target, ok = logTargets[c.Logging.Errlog]
Expand All @@ -248,7 +247,7 @@ func (c *Config) SetLogTargets() {
ERROR = log.New(target, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile)
target, ok = logTargets[c.Logging.Debug]
if !ok {
target = ioutil.Discard
target = io.Discard
}
DEBUG = log.New(target, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Expand Down
9 changes: 9 additions & 0 deletions common/datamodel/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ type InputStreamConfig struct {
Scoped bool `json:"scoped"`
}

// This is to state that we do not need to save all registrations if the subscription is simply by type
func (inputSelect *InputStreamConfig) IsSimpleByType() bool {
if !inputSelect.Scoped && inputSelect.GroupBy != "EntityID" {
return true
} else {
return false
}
}

type OutputStreamConfig struct {
EntityType string `json:"entity_type"`
}
Expand Down
20 changes: 15 additions & 5 deletions common/ngsi/ngsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,7 @@ func (registredEntity *EntityRegistration) GetLocation() Point {
return Point{0.0, 0.0}
}

//
// used by master to group the received input
//
func (registredEntity *EntityRegistration) IsMatched(restrictions map[string]interface{}) bool {
matched := true

Expand Down Expand Up @@ -938,6 +936,20 @@ type SubscribeContextRequest struct {
Subscriber Subscriber
}

func (subscribeContextRequest *SubscribeContextRequest) IsSimpleByType() bool {
var flag = true

if len(subscribeContextRequest.Restriction.Scopes) == 0 {
if len(subscribeContextRequest.Entities) == 1 {
if subscribeContextRequest.Entities[0].ID == "" {
flag = true
}
}
}

return flag
}

type SubscriptionRequest struct {
Attributes []string `json:"attributes,omitempty"`
Subscriber Subscriber
Expand Down Expand Up @@ -1375,7 +1387,7 @@ type ConfigCommand struct {
CorrelatorID string `json:"correlatorID"`
}

//To handle RegisterContextRequest coming from IoT Agent
// To handle RegisterContextRequest coming from IoT Agent
type RegisterContextRequest1 struct {
ContextRegistrations []ContextRegistration1 `json:"contextRegistrations,omitempty"`
Duration string `json:"duration,omitempty"`
Expand Down Expand Up @@ -1441,9 +1453,7 @@ type LDNotifyContextRequest struct {
NotifyAt string `json:"notifiedAt,omitempty"`
}

//
// the part to deal with NGSI v1 update supported by Orion Context Broker
//
func (element *ContextElement) SetEntityID() {
if element.ID != "" {
element.Entity.ID = element.ID
Expand Down
Loading

0 comments on commit 9be1751

Please sign in to comment.