Skip to content

Commit

Permalink
Merge pull request #399 from flaviocirillo/development
Browse files Browse the repository at this point in the history
Fixing quick startup following the tutorial
  • Loading branch information
smartfog authored Jul 3, 2024
2 parents d4c7a88 + ef9581d commit 88db58d
Show file tree
Hide file tree
Showing 47 changed files with 1,649 additions and 148 deletions.
5 changes: 5 additions & 0 deletions broker/Dockerfile_multiarch
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine
ARG TARGETOS
ARG TARGETARCH
ADD /${TARGETOS}/${TARGETARCH}/broker /
CMD ["/broker"]
7 changes: 7 additions & 0 deletions broker/build
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ case "${command}" in
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -a -installsuffix cgo -o broker_arm64
time docker build -f Dockerfile4Armv8 -t "fogflow/broker:arm64" .
;;
"multiarch")
go get
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo -o linux/amd64/broker
CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -a -installsuffix cgo -o linux/arm/broker
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -a -installsuffix cgo -o linux/arm64/broker
time docker buildx build --platform linux/arm,linux/arm64,linux/amd64 --push -f ./Dockerfile_multiarch -t "fogflow/broker" .
;;
*)
echo "Command not Found."
echo "usage: ./build [multistage|development|arm|arm64]"
Expand Down
1 change: 1 addition & 0 deletions broker/build_k8s
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker buildx build --platform linux/arm,linux/arm64,linux/amd64 --push -f ./Dockerfile_multiarch -t "fogflow/broker:k8s" .
4 changes: 4 additions & 0 deletions broker/ngsild.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func (tb *ThinBroker) NGSILD_UpdateContext(w rest.ResponseWriter, r *rest.Reques
updateCtxReq := UpdateContextRequest{}
numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert)

// DEBUG.Println(updateCtxReq)

if numUpdates > 0 {
tb.handleInternalUpdateContext(&updateCtxReq)
}
Expand All @@ -44,6 +46,8 @@ func (tb *ThinBroker) NGSILD_CreateEntity(w rest.ResponseWriter, r *rest.Request
updateCtxReq := UpdateContextRequest{}
numUpdates := updateCtxReq.ReadFromNGSILD(ngsildUpsert)

// DEBUG.Println(updateCtxReq)

if numUpdates > 0 {
tb.handleInternalUpdateContext(&updateCtxReq)
}
Expand Down
9 changes: 7 additions & 2 deletions broker/ngsiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"io/ioutil"
"net/http"
"strings"

"github.com/ant0ine/go-json-rest/rest"
"github.com/google/uuid"
Expand Down Expand Up @@ -119,6 +120,8 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
subReq := SubscribeContextRequest{}
subReq.Attributes = make([]string, 0)

DEBUG.Println("Subscription request from: ", r.RemoteAddr)

err := r.DecodeJsonPayload(&subReq)
if err != nil {
rest.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -132,20 +135,22 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req
}
subID := u1.String()

subReq.Reference = strings.TrimSpace(subReq.Reference)

// send out the response
subResp := SubscribeContextResponse{}
subResp.SubscribeResponse.SubscriptionId = subID
subResp.SubscribeError.SubscriptionId = subID
w.WriteJson(&subResp)

INFO.Println(r.Header)
// INFO.Println(r.Header)

// check the request header
subReq.Subscriber.DestinationType = r.Header.Get("Destination")
subReq.Subscriber.Tenant = r.Header.Get("Ngsild-Tenant")
subReq.Subscriber.Correlator = r.Header.Get("Fiware-Correlator")

DEBUG.Println(subReq.Subscriber)
// DEBUG.Println(subReq.Subscriber)

if r.Header.Get("User-Agent") == "lightweight-iot-broker" {
subReq.Subscriber.IsInternal = true
Expand Down
28 changes: 19 additions & 9 deletions broker/thinBroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ func (tb *ThinBroker) fetchEntities(ids []EntityId, providerURL string) []Contex
// handle context updates from external applications/devices

func (tb *ThinBroker) handleInternalUpdateContext(updateCtxReq *UpdateContextRequest) {

// DEBUG.Println("updateCtxReq", updateCtxReq)

switch strings.ToUpper(updateCtxReq.UpdateAction) {
case "UPDATE":
for _, ctxElem := range updateCtxReq.ContextElements {
Expand Down Expand Up @@ -388,6 +391,9 @@ func (tb *ThinBroker) queryOwnerOfEntity(eid string) string {
}

func (tb *ThinBroker) UpdateContext2LocalSite(ctxElem *ContextElement, correlator string, params ...rest.ResponseWriter) {

// DEBUG.Println("elements", ctxElem)

// register the entity if there is any changes on attribute list, domain metadata
tb.entities_lock.Lock()
eid := ctxElem.Entity.ID
Expand All @@ -402,13 +408,13 @@ func (tb *ThinBroker) UpdateContext2LocalSite(ctxElem *ContextElement, correlato
tb.updateContextElement(ctxElem)

// propogate this update to its subscribers
go tb.notifySubscribers(ctxElem, correlator, true)
tb.notifySubscribers(ctxElem, correlator, true)
}

func (tb *ThinBroker) UpdateContext2RemoteSite(ctxElem *ContextElement, updateAction string, brokerURL string) {
switch updateAction {
case "UPDATE":
INFO.Println(brokerURL)
// INFO.Println(brokerURL)
client := NGSI10Client{IoTBrokerURL: brokerURL, SecurityCfg: tb.SecurityCfg}
client.UpdateContext(ctxElem)

Expand All @@ -421,7 +427,7 @@ func (tb *ThinBroker) UpdateContext2RemoteSite(ctxElem *ContextElement, updateAc
func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator string, checkSelectedAttributes bool) {
eid := ctxElem.Entity.ID

//DEBUG.Println(ctxElem)
// DEBUG.Println("elements", ctxElem)

tb.e2sub_lock.RLock()
defer tb.e2sub_lock.RUnlock()
Expand Down Expand Up @@ -468,6 +474,7 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri
elements = append(elements, *ctxElem)
}

// DEBUG.Println("elements", elements)
go tb.sendReliableNotify(elements, sid)
}
}
Expand Down Expand Up @@ -535,11 +542,13 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
}
tb.subscriptions_lock.Unlock()

INFO.Println("NOTIFY: ", len(elements), ", ", sid, ", ", subscriberURL, ", ", DestinationBroker)
//INFO.Println("NOTIFY: ", len(elements), ", ", sid, ", ", subscriberURL, ", ", DestinationBroker)
// DEBUG.Println(elements)

err := postNotifyContext(elements, sid, subscriberURL, DestinationBroker, Tenant, tb.SecurityCfg)

if err != nil {
INFO.Println("NOTIFY is not received by the subscriber, ", subscriberURL)
DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL)

tb.subscriptions_lock.Lock()
if subscription, exist := tb.subscriptions[sid]; exist {
Expand All @@ -561,6 +570,7 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement,
*/

func (tb *ThinBroker) sendReliableNotify(elements []ContextElement, sid string) {
// DEBUG.Println(elements)
tb.subscriptions_lock.Lock()
_, ok := tb.subscriptions[sid]
if ok == true {
Expand Down Expand Up @@ -683,9 +693,9 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
tb.e2sub_lock.Unlock()
}

INFO.Println(registration.ProvidingApplication, ", ", tb.MyURL)
INFO.Println("TO ngsi10 subscription, ", mainSubID)
INFO.Printf("entity list: %+v\r\n", registration.EntityIdList)
// INFO.Println(registration.ProvidingApplication, ", ", tb.MyURL)
// INFO.Println("TO ngsi10 subscription, ", mainSubID)
// INFO.Printf("entity list: %+v\r\n", registration.EntityIdList)

if registration.ProvidingApplication == tb.MyURL {
//for matched entities provided by myself
Expand All @@ -702,7 +712,7 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil
if action == "CREATE" || action == "UPDATE" {
sid, err := subscribeContextProvider(&newSubscription, registration.ProvidingApplication, tb.SecurityCfg)
if err == nil {
INFO.Println("issue a new subscription ", sid)
// INFO.Println("issue a new subscription ", sid)

tb.subscriptions_lock.Lock()
tb.subscriptions[sid] = &newSubscription
Expand Down
13 changes: 9 additions & 4 deletions broker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
)

func postNotifyContext(ctxElems []ContextElement, subscriptionId string, URL string, DestinationBrokerType string, tenant string, httpsCfg *HTTPS) error {
INFO.Println("destination protocol: ", DestinationBrokerType)
//INFO.Println("destination protocol: ", DestinationBrokerType)
// INFO.Println("ctxElems: ", ctxElems)

switch DestinationBrokerType {
case "NGSI-LD":
Expand All @@ -28,7 +29,7 @@ func postNotifyContext(ctxElems []ContextElement, subscriptionId string, URL str

// for ngsiv1 consumer
func postNGSIV1NotifyContext(ctxElems []ContextElement, subscriptionId string, URL string, httpsCfg *HTTPS) error {
INFO.Println("NGSIv1 NOTIFY: ", URL)
// INFO.Println("NGSIv1 NOTIFY: ", URL)

payload := toNGSIv1Payload(ctxElems)

Expand Down Expand Up @@ -95,7 +96,7 @@ func postNGSIV2NotifyContext(ctxElems []ContextElement, subscriptionId string, U
return err
}

INFO.Println(string(body))
// INFO.Println(string(body))

req, err := http.NewRequest("POST", URL, bytes.NewBuffer(body))
req.Header.Add("Content-Type", "application/json")
Expand Down Expand Up @@ -164,7 +165,8 @@ func toNGSIv2Payload(ctxElems []ContextElement) []map[string]interface{} {

// for NGSI-LD consumer
func postNGSILDUpsert(ctxElems []ContextElement, subscriptionId string, URL string, tenant string) error {
INFO.Println("NGSI-LD NOTIFY: ", URL)
//INFO.Println("NGSI-LD NOTIFY: ", URL)
// DEBUG.Println(ctxElems)

payload := toNGSILDPayload(ctxElems)

Expand Down Expand Up @@ -233,6 +235,9 @@ func toNGSILDPayload(ctxElems []ContextElement) []map[string]interface{} {
default:
propertyValue["type"] = "Property"
propertyValue["value"] = attr.Value
for _, metadata := range attr.Metadata {
propertyValue[metadata.Name] = metadata.Value
}
}

element[attr.Name] = propertyValue
Expand Down
34 changes: 19 additions & 15 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ type Config struct {
Debug string `json:"debug"`
} `json:"logging"`
Discovery struct {
HostIP string `json:"host_ip"`
HTTPPort int `json:"http_port"`
HTTPSPort int `json:"https_port"`
HostIP string `json:"host_ip"`
HTTPPort int `json:"http_port"`
HTTPSPort int `json:"https_port"`
StoreOnDisk bool `json:"storeOnDisk"`
DelayStoreOnFile int `json:"delayStoreOnFile"`
} `json:"discovery"`
Broker struct {
HostIP string `json:"host_ip"`
Expand All @@ -63,25 +65,27 @@ type Config struct {
HeartbeatInterval int `json:"heartbeat_interval"`
} `json:"broker"`
Master struct {
HostIP string `json:"host_ip"`
AgentPort int `json:"ngsi_agent_port"`
RESTAPIPort int `json:"rest_api_port"`
HostIP string `json:"host_ip"`
AgentPort int `json:"ngsi_agent_port"`
RESTAPIPort int `json:"rest_api_port"`
InfiniteReconnectionTries bool `json:"infinite_reconnection_tries"`
} `json:"master"`
Designer struct {
HostIP string `json:"host_ip"`
WebSrvPort int `json:"webSrvPort"`
HTTPSPort int `json:"https_webSrvPort"`
} `json:"designer"`
Worker struct {
ContainerManagement string `json:"container_management"`
AppNameSpace string `json:"app_namespace"`
EdgeControllerPort int `json:"edge_controller_port"`
Registry RegistryConfiguration `json:"registry,omitempty"`
ContainerAutoRemove bool `json:"container_autoremove"`
StartActualTask bool `json:"start_actual_task"`
Capacity int `json:"capacity"`
HeartbeatInterval int `json:"heartbeat_interval"`
DetectionDuration int `json:"detection_duration"`
ContainerManagement string `json:"container_management"`
AppNameSpace string `json:"app_namespace"`
EdgeControllerPort int `json:"edge_controller_port"`
Registry RegistryConfiguration `json:"registry,omitempty"`
ContainerAutoRemove bool `json:"container_autoremove"`
StartActualTask bool `json:"start_actual_task"`
Capacity int `json:"capacity"`
HeartbeatInterval int `json:"heartbeat_interval"`
DetectionDuration int `json:"detection_duration"`
InfiniteReconnectionTries bool `json:"infinite_reconnection_tries"`
} `json:"worker"`
RabbitMQ struct {
HostIP string `json:"host_ip"`
Expand Down
20 changes: 20 additions & 0 deletions common/ngsi/ngsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,26 @@ func (ce *ContextElement) ReadFromNGSILD(ngsildEntity map[string]interface{}) bo

newCtxAttribute.Value = attrValue

// dateObserved, dateObservedExist := attribute["dateObserved"]
// if dateObservedExist {
// newCtxMedata := ContextMetadata{}
// newCtxMedata.Name = "dateObserved"
// newCtxMedata.Type = "dateObserved"
// newCtxMedata.Value = dateObserved
// newCtxAttribute.Metadata = append(newCtxAttribute.Metadata, newCtxMedata)
// }

for key, element := range attribute {
if strings.ToLower(key) != "type" && strings.ToLower(key) != "value" {
newCtxMedata := ContextMetadata{}
newCtxMedata.Name = key
newCtxMedata.Type = key
newCtxMedata.Value = element
newCtxAttribute.Metadata = append(newCtxAttribute.Metadata, newCtxMedata)
}
//fmt.Println("Key:", key, "=>", "Element:", element)
}

ce.Attributes = append(ce.Attributes, newCtxAttribute)
} else if strings.ToLower(attrType) == "relationship" {
refObject := attribute["object"]
Expand Down
2 changes: 2 additions & 0 deletions common/ngsi/ngsiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ func (nc *NGSI9Client) DiscoverContextAvailability(discoverCtxAvailabilityReq *D
return nil, err
}

// DEBUG.Printf("Discovery request to %s: %s", nc.IoTDiscoveryURL+"/discoverContextAvailability", string(body))

req, err := http.NewRequest("POST", nc.IoTDiscoveryURL+"/discoverContextAvailability", bytes.NewBuffer(body))
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions designer/build_k8s
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker image tag fogflow/designer:latest fogflow/designer:k8s
3 changes: 2 additions & 1 deletion designer/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
},
"designer": {
"webSrvPort": 8080,
"agentPort": 1030
"agentPort": 1030,
"notRecreateSubscriptions": false
},
"rabbitmq": {
"port": 5672,
Expand Down
Loading

0 comments on commit 88db58d

Please sign in to comment.