Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing quick startup following tutorial #399

Merged
merged 17 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
08cd031
Included some golang libraries
flaviocirillo Jul 29, 2022
a8fddeb
Merge branch 'development' of https://github.com/smartfog/fogflow int…
flaviocirillo Jul 29, 2022
24a3fe6
Merge branch 'development' of https://github.com/smartfog/fogflow int…
flaviocirillo Nov 7, 2022
0d59964
Fixed bug on wrong parsing of NGSI QueryContextResponse with attribut…
flaviocirillo Nov 16, 2022
ac0a277
Merge branch 'development' of https://github.com/smartfog/fogflow int…
flaviocirillo Nov 16, 2022
ff52a48
Fixed bug on marshalling golang struct
flaviocirillo Nov 18, 2022
a816ace
Merge branch 'development' of https://github.com/smartfog/fogflow int…
flaviocirillo Nov 18, 2022
c3edfe4
Extended the bug fix for array of attribute values also to ngsiv1 not…
flaviocirillo Nov 29, 2022
9f583a3
Merge branch 'development' of https://github.com/smartfog/fogflow int…
flaviocirillo Nov 29, 2022
a2a1683
- Implement multiarch for broker and worker
flaviocirillo Feb 17, 2023
c688b56
New Features: persistent storage for Discovery
flaviocirillo Mar 13, 2023
c65578f
Automatic generation of DBFiles for Discovery permanent storage if no…
flaviocirillo Mar 22, 2023
adbf8a2
Implemented Suggested Store with and without Digital Twin
flaviocirillo Mar 27, 2023
15b6d05
All functionalities working of customer journey demo. To change images
flaviocirillo Mar 28, 2023
bf9f5af
Customer Journey demo fully working
flaviocirillo Mar 29, 2023
f776aa1
Reverted populating FogFlow for the tutorial
Jun 14, 2024
ef9581d
Cleaned up interface
Jun 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions broker/Dockerfile_multiarch
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine
ARG TARGETOS
ARG TARGETARCH
ADD /${TARGETOS}/${TARGETARCH}/broker /
CMD ["/broker"]
7 changes: 7 additions & 0 deletions broker/build
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ case "${command}" in
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -a -installsuffix cgo -o broker_arm64
time docker build -f Dockerfile4Armv8 -t "fogflow/broker:arm64" .
;;
"multiarch")
go get
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo -o linux/amd64/broker
CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -a -installsuffix cgo -o linux/arm/broker
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -a -installsuffix cgo -o linux/arm64/broker
time docker buildx build --platform linux/arm,linux/arm64,linux/amd64 --push -f ./Dockerfile_multiarch -t "fogflow/broker" .
;;
*)
echo "Command not Found."
echo "usage: ./build [multistage|development|arm|arm64]"
Expand Down
1 change: 1 addition & 0 deletions broker/build_k8s
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker buildx build --platform linux/arm,linux/arm64,linux/amd64 --push -f ./Dockerfile_multiarch -t "fogflow/broker:k8s" .
4 changes: 4 additions & 0 deletions broker/ngsild.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func (tb *ThinBroker) NGSILD_UpdateContext(w rest.ResponseWriter, r *rest.Reques
updateCtxReq := UpdateContextRequest{}
numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert)

// DEBUG.Println(updateCtxReq)

if numUpdates > 0 {
tb.handleInternalUpdateContext(&updateCtxReq)
}
Expand All @@ -44,6 +46,8 @@ func (tb *ThinBroker) NGSILD_CreateEntity(w rest.ResponseWriter, r *rest.Request
updateCtxReq := UpdateContextRequest{}
numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert)

// DEBUG.Println(updateCtxReq)

if numUpdates > 0 {
tb.handleInternalUpdateContext(&updateCtxReq)
}
Expand Down
9 changes: 7 additions & 2 deletions broker/ngsiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"io/ioutil"
"net/http"
"strings"

"github.com/ant0ine/go-json-rest/rest"
"github.com/google/uuid"
Expand Down Expand Up @@ -119,6 +120,8 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
subReq := SubscribeContextRequest{}
subReq.Attributes = make([]string, 0)

DEBUG.Println("Subscription request from: ", r.RemoteAddr)

err := r.DecodeJsonPayload(&subReq)
if err != nil {
rest.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -132,20 +135,22 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
}
subID := u1.String()

subReq.Reference = strings.TrimSpace(subReq.Reference)

// send out the response
subResp := SubscribeContextResponse{}
subResp.SubscribeResponse.SubscriptionId = subID
subResp.SubscribeError.SubscriptionId = subID
w.WriteJson(&subResp)

INFO.Println(r.Header)
// INFO.Println(r.Header)

// check the request header
subReq.Subscriber.DestinationType = r.Header.Get("Destination")
subReq.Subscriber.Tenant = r.Header.Get("Ngsild-Tenant")
subReq.Subscriber.Correlator = r.Header.Get("Fiware-Correlator")

DEBUG.Println(subReq.Subscriber)
// DEBUG.Println(subReq.Subscriber)

if r.Header.Get("User-Agent") == "lightweight-iot-broker" {
subReq.Subscriber.IsInternal = true
Expand Down
28 changes: 19 additions & 9 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ func (tb *ThinBroker) fetchEntities(ids []EntityId, providerURL string) []Contex
// handle context updates from external applications/devices

func (tb *ThinBroker) handleInternalUpdateContext(updateCtxReq *UpdateContextRequest) {

// DEBUG.Println("updateCtxReq", updateCtxReq)

switch strings.ToUpper(updateCtxReq.UpdateAction) {
case "UPDATE":
for _, ctxElem := range updateCtxReq.ContextElements {
Expand Down Expand Up @@ -388,6 +391,9 @@ func (tb *ThinBroker) queryOwnerOfEntity(eid string) string {
}

func (tb *ThinBroker) UpdateContext2LocalSite(ctxElem *ContextElement, correlator string, params ...rest.ResponseWriter) {

// DEBUG.Println("elements", ctxElem)

// register the entity if there is any changes on attribute list, domain metadata
tb.entities_lock.Lock()
eid := ctxElem.Entity.ID
Expand All @@ -402,13 +408,13 @@ func (tb *ThinBroker) UpdateContext2LocalSite(ctxElem *ContextElement, correlato
tb.updateContextElement(ctxElem)

// propogate this update to its subscribers
go tb.notifySubscribers(ctxElem, correlator, true)
tb.notifySubscribers(ctxElem, correlator, true)
}

func (tb *ThinBroker) UpdateContext2RemoteSite(ctxElem *ContextElement, updateAction string, brokerURL string) {
switch updateAction {
case "UPDATE":
INFO.Println(brokerURL)
// INFO.Println(brokerURL)
client := NGSI10Client{IoTBrokerURL: brokerURL, SecurityCfg: tb.SecurityCfg}
client.UpdateContext(ctxElem)

Expand All @@ -421,7 +427,7 @@ func (tb *ThinBroker) UpdateContext2RemoteSite(ctxElem *ContextElement, updateAc
func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator string, checkSelectedAttributes bool) {
eid := ctxElem.Entity.ID

//DEBUG.Println(ctxElem)
// DEBUG.Println("elements", ctxElem)

tb.e2sub_lock.RLock()
defer tb.e2sub_lock.RUnlock()
Expand Down Expand Up @@ -468,6 +474,7 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri
elements = append(elements, *ctxElem)
}

// DEBUG.Println("elements", elements)
go tb.sendReliableNotify(elements, sid)
}
}
Expand Down Expand Up @@ -535,11 +542,13 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
}
tb.subscriptions_lock.Unlock()

INFO.Println("NOTIFY: ", len(elements), ", ", sid, ", ", subscriberURL, ", ", DestinationBroker)
//INFO.Println("NOTIFY: ", len(elements), ", ", sid, ", ", subscriberURL, ", ", DestinationBroker)
// DEBUG.Println(elements)

err := postNotifyContext(elements, sid, subscriberURL, DestinationBroker, Tenant, tb.SecurityCfg)

if err != nil {
INFO.Println("NOTIFY is not received by the subscriber, ", subscriberURL)
DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL)

tb.subscriptions_lock.Lock()
if subscription, exist := tb.subscriptions[sid]; exist {
Expand All @@ -561,6 +570,7 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
*/

func (tb *ThinBroker) sendReliableNotify(elements []ContextElement, sid string) {
// DEBUG.Println(elements)
tb.subscriptions_lock.Lock()
_, ok := tb.subscriptions[sid]
if ok == true {
Expand Down Expand Up @@ -683,9 +693,9 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
tb.e2sub_lock.Unlock()
}

INFO.Println(registration.ProvidingApplication, ", ", tb.MyURL)
INFO.Println("TO ngsi10 subscription, ", mainSubID)
INFO.Printf("entity list: %+v\r\n", registration.EntityIdList)
// INFO.Println(registration.ProvidingApplication, ", ", tb.MyURL)
// INFO.Println("TO ngsi10 subscription, ", mainSubID)
// INFO.Printf("entity list: %+v\r\n", registration.EntityIdList)

if registration.ProvidingApplication == tb.MyURL {
//for matched entities provided by myself
Expand All @@ -702,7 +712,7 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
if action == "CREATE" || action == "UPDATE" {
sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg)
if err == nil {
INFO.Println("issue a new subscription ", sid)
// INFO.Println("issue a new subscription ", sid)

tb.subscriptions_lock.Lock()
tb.subscriptions[sid] = &newSubscription
Expand Down
13 changes: 9 additions & 4 deletions broker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
)

func postNotifyContext(ctxElems []ContextElement, subscriptionId string, URL string, DestinationBrokerType string, tenant string, httpsCfg *HTTPS) error {
INFO.Println("destination protocol: ", DestinationBrokerType)
//INFO.Println("destination protocol: ", DestinationBrokerType)
// INFO.Println("ctxElems: ", ctxElems)

switch DestinationBrokerType {
case "NGSI-LD":
Expand All @@ -28,7 +29,7 @@ func postNotifyContext(ctxElems []ContextElement, subscriptionId string, URL str

// for ngsiv1 consumer
func postNGSIV1NotifyContext(ctxElems []ContextElement, subscriptionId string, URL string, httpsCfg *HTTPS) error {
INFO.Println("NGSIv1 NOTIFY: ", URL)
// INFO.Println("NGSIv1 NOTIFY: ", URL)

payload := toNGSIv1Payload(ctxElems)

Expand Down Expand Up @@ -95,7 +96,7 @@ func postNGSIV2NotifyContext(ctxElems []ContextElement, subscriptionId string, U
return err
}

INFO.Println(string(body))
// INFO.Println(string(body))

req, err := http.NewRequest("POST", URL, bytes.NewBuffer(body))
req.Header.Add("Content-Type", "application/json")
Expand Down Expand Up @@ -164,7 +165,8 @@ func toNGSIv2Payload(ctxElems []ContextElement) []map[string]interface{} {

// for NGSI-LD consumer
func postNGSILDUpsert(ctxElems []ContextElement, subscriptionId string, URL string, tenant string) error {
INFO.Println("NGSI-LD NOTIFY: ", URL)
//INFO.Println("NGSI-LD NOTIFY: ", URL)
// DEBUG.Println(ctxElems)

payload := toNGSILDPayload(ctxElems)

Expand Down Expand Up @@ -233,6 +235,9 @@ func toNGSILDPayload(ctxElems []ContextElement) []map[string]interface{} {
default:
propertyValue["type"] = "Property"
propertyValue["value"] = attr.Value
for _, metadata := range attr.Metadata {
propertyValue[metadata.Name] = metadata.Value
}
}

element[attr.Name] = propertyValue
Expand Down
34 changes: 19 additions & 15 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ type Config struct {
Debug string `json:"debug"`
} `json:"logging"`
Discovery struct {
HostIP string `json:"host_ip"`
HTTPPort int `json:"http_port"`
HTTPSPort int `json:"https_port"`
HostIP string `json:"host_ip"`
HTTPPort int `json:"http_port"`
HTTPSPort int `json:"https_port"`
StoreOnDisk bool `json:"storeOnDisk"`
DelayStoreOnFile int `json:"delayStoreOnFile"`
} `json:"discovery"`
Broker struct {
HostIP string `json:"host_ip"`
Expand All @@ -63,25 +65,27 @@ type Config struct {
HeartbeatInterval int `json:"heartbeat_interval"`
} `json:"broker"`
Master struct {
HostIP string `json:"host_ip"`
AgentPort int `json:"ngsi_agent_port"`
RESTAPIPort int `json:"rest_api_port"`
HostIP string `json:"host_ip"`
AgentPort int `json:"ngsi_agent_port"`
RESTAPIPort int `json:"rest_api_port"`
InfiniteReconnectionTries bool `json:"infinite_reconnection_tries"`
} `json:"master"`
Designer struct {
HostIP string `json:"host_ip"`
WebSrvPort int `json:"webSrvPort"`
HTTPSPort int `json:"https_webSrvPort"`
} `json:"designer"`
Worker struct {
ContainerManagement string `json:"container_management"`
AppNameSpace string `json:"app_namespace"`
EdgeControllerPort int `json:"edge_controller_port"`
Registry RegistryConfiguration `json:"registry,omitempty"`
ContainerAutoRemove bool `json:"container_autoremove"`
StartActualTask bool `json:"start_actual_task"`
Capacity int `json:"capacity"`
HeartbeatInterval int `json:"heartbeat_interval"`
DetectionDuration int `json:"detection_duration"`
ContainerManagement string `json:"container_management"`
AppNameSpace string `json:"app_namespace"`
EdgeControllerPort int `json:"edge_controller_port"`
Registry RegistryConfiguration `json:"registry,omitempty"`
ContainerAutoRemove bool `json:"container_autoremove"`
StartActualTask bool `json:"start_actual_task"`
Capacity int `json:"capacity"`
HeartbeatInterval int `json:"heartbeat_interval"`
DetectionDuration int `json:"detection_duration"`
InfiniteReconnectionTries bool `json:"infinite_reconnection_tries"`
} `json:"worker"`
RabbitMQ struct {
HostIP string `json:"host_ip"`
Expand Down
20 changes: 20 additions & 0 deletions common/ngsi/ngsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,26 @@ func (ce *ContextElement) ReadFromNGSILD(ngsildEntity map[string]interface{}) bo

newCtxAttribute.Value = attrValue

// dateObserved, dateObservedExist := attribute["dateObserved"]
// if dateObservedExist {
// newCtxMedata := ContextMetadata{}
// newCtxMedata.Name = "dateObserved"
// newCtxMedata.Type = "dateObserved"
// newCtxMedata.Value = dateObserved
// newCtxAttribute.Metadata = append(newCtxAttribute.Metadata, newCtxMedata)
// }

for key, element := range attribute {
if strings.ToLower(key) != "type" && strings.ToLower(key) != "value" {
newCtxMedata := ContextMetadata{}
newCtxMedata.Name = key
newCtxMedata.Type = key
newCtxMedata.Value = element
newCtxAttribute.Metadata = append(newCtxAttribute.Metadata, newCtxMedata)
}
//fmt.Println("Key:", key, "=>", "Element:", element)
}

ce.Attributes = append(ce.Attributes, newCtxAttribute)
} else if strings.ToLower(attrType) == "relationship" {
refObject := attribute["object"]
Expand Down
2 changes: 2 additions & 0 deletions common/ngsi/ngsiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ func (nc *NGSI9Client) DiscoverContextAvailability(discoverCtxAvailabilityReq *D
return nil, err
}

// DEBUG.Printf("Discovery request to %s: %s", nc.IoTDiscoveryURL+"/discoverContextAvailability", string(body))

req, err := http.NewRequest("POST", nc.IoTDiscoveryURL+"/discoverContextAvailability", bytes.NewBuffer(body))
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions designer/build_k8s
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker image tag fogflow/designer:latest fogflow/designer:k8s
3 changes: 2 additions & 1 deletion designer/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
},
"designer": {
"webSrvPort": 8080,
"agentPort": 1030
"agentPort": 1030,
"notRecreateSubscriptions": false
},
"rabbitmq": {
"port": 5672,
Expand Down
Loading
Loading