Skip to content

Commit

Permalink
Resolves #4 Add Bluetooth/BLE functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamMiltonBarker committed Aug 2, 2021
1 parent bb0be3f commit ba85dfa
Showing 1 changed file with 237 additions and 70 deletions.
307 changes: 237 additions & 70 deletions agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
""" HIAS Bluetooth IoT Agent Class
HIAS Bluetooth IoT Agents are bridges between HIAS devices that support
Expand Down Expand Up @@ -36,73 +36,130 @@
import json
import os
import psutil
import requests
import signal
import sys
import time
import threading

import os.path
sys.path.append(
os.path.abspath(os.path.join(__file__, "..", "..", "..", "..")))

from abc import ABC, abstractmethod
from datetime import datetime
from bluepy import btle
from datetime import datetime, timedelta
from flask import Flask, request, Response
from threading import Thread

from components.agents.AbstractAgent import AbstractAgent

from modules.AbstractAgent import AbstractAgent

class Agent(AbstractAgent):
""" Class representing a HIAS Bluetooth IoT Agent.
""" HIAS Bluetooth IoT Agent Class
This object represents a HIAS Bluetooth IoT Agent which
process all data coming from IoT Agents and store immutable
hashes in the HIASBCH Blockchain providing the functionality
to perform data integrity checks.
HIAS Bluetooth IoT Agents are bridges between HIAS
devices that support Bluetooth/Bluetooth Low Energy
protocol and the HIASCDI Context Broker.
"""

def __init__(self, protocol):
super().__init__(protocol)
def get_ble_devices(self):
""" Returns a list of HIAS BLE devices """

bles = self.hiascdi.get_ble_devices()

for ble in bles:
self.bles.append((
ble["bluetoothAddress"]["value"],
ble["bluetoothServiceUUID"]["value"],
ble["bluetoothCharacteristicUUID"]["value"],
ble["networkLocation"]["value"],
ble["networkZone"]["value"],
ble["id"]))

def check_ble_devices(self):
""" Checks for disconnected HIAS BLE devices """

for device in self.ble_tracker:
if self.ble_tracker[device]["last_seen"] != "" and self.ble_tracker[device]["last_seen"] < datetime.now() - timedelta(minutes=5):

self.mqtt.device_status_publish(self.ble_tracker[device]["location"],
self.ble_tracker[device]["zone"],
self.ble_tracker[device]["device"],
"OFFLINE")

self.ble_tracker[device]["last_seen"] = ""

self.helpers.logger.info(
"BLE device " + self.ble_tracker[device]["address"] + " disconnected from iotJumpWay")

threading.Timer(100.0, self.check_ble_devices).start()

def statusCallback(self, topic, payload):
pass
def ble_connection(self, addr, service, characteristic):
""" Connects to a HIAS BLE device """

def lifeCallback(self, topic, payload):
pass
while True:
try:
self.helpers.logger.info(
"Attempting BLE connection to "+addr)

def sensorsCallback(self, topic, payload):
pass
peripheral = btle.Peripheral(addr)
peripheral.setMTU(512)

def life(self):
""" Sends entity statistics to HIAS """
delegate = BtAgentDelegate()
peripheral.withDelegate(delegate)

cpu = psutil.cpu_percent()
mem = psutil.virtual_memory()[2]
hdd = psutil.disk_usage('/fserver').percent
tmp = psutil.sensors_temperatures()['coretemp'][0].current
r = requests.get('http://ipinfo.io/json?token=' +
self.helpers.credentials["iotJumpWay"]["ipinfo"])
data = r.json()
location = data["loc"].split(',')
serv = peripheral.getServiceByUUID(service)
charac = serv.getCharacteristics(characteristic)[0]

self.mqtt.publish("Life", {
"CPU": str(cpu),
"Memory": str(mem),
"Diskspace": str(hdd),
"Temperature": str(tmp),
"Latitude": str(location[0]),
"Longitude": str(location[1])
})
peripheral.writeCharacteristic(charac.valHandle + 1, b"\x01\x00")

self.helpers.logger.info("Agent life statistics published.")
threading.Timer(300.0, self.life).start()
self.helpers.logger.info(
"BLE connection to " + addr + " established")

def respond(self, responseCode, response):
if addr in self.ble_tracker:
self.ble_tracker[addr]["last_seen"] = datetime.now()
self.helpers.logger.info(
addr + " connection timestamp updated")


self.notification_loop(peripheral)
except Exception as e:
self.helpers.logger.info(
"BLE connection to " + addr + " failed")
time.sleep(1.0)
continue

def notification_loop(self, peripheral):
""" Notification loop """

try:
if peripheral.waitForNotifications(2.0):
self.helpers.logger.info(
"Awaiting notifications...")
except Exception as e:
pass
finally:
self.helpers.logger.info(
"Disconnecting from HIAS BLE device")
try:
peripheral.disconnect()
time.sleep(4)
except Exception as e:
self.helpers.logger.info(
"Failed to disconnect from HIAS BLE device")
pass

def parse_data(self, data):
""" Parses the data dictionary """

entity_type = data["EntityType"]
entity = data["Entity"]
data_type = data["Type"]
data_value = data["Value"]
data_message = data["Message"]

return entity_type, entity, data_type, data_value, data_message

def respond(self, response_code, response):
""" Returns the request repsonse """

return Response(response=json.dumps(response, indent=4), status=responseCode,
return Response(response=json.dumps(response, indent=4),
status=response_code,
mimetype="application/json")

def signal_handler(self, signal, frame):
Expand All @@ -111,7 +168,106 @@ def signal_handler(self, signal, frame):
sys.exit(1)

app = Flask(__name__)
Agent = Agent("hiasbch")
agent = Agent()

class BtAgentDelegate(btle.DefaultDelegate):

def __init__(self):
btle.DefaultDelegate.__init__(self)

def handleNotification(self, cHandle, data):

data = json.loads(data.decode("utf-8"))

if "EntityType" not in data:
return
if "Entity" not in data:
return

(entity_type, entity, data_type,
data_value, data_message) = agent.parse_data(data)

agent.helpers.logger.info(
"Received " + entity_type + " data via BLE: " + str(data))

attrs = agent.get_attributes(entity_type, entity)
bch = attrs["blockchain"]

if not agent.hiasbch.iotjumpway_check(bch):
return

_id = attrs["id"]

location = attrs["location"]
zone = attrs["zone"] if "zone" in attrs else "NA"
sensor = data["Sensor"] if "Sensor" in data else "NA"
actuator = data["Actuator"] if "Actuator" in data else "NA"

if sensor != "NA":
agent.helpers.logger.info("Processing sensors")

sensors = agent.hiascdi.get_sensors(
entity, entity_type)
sensorData = sensors["sensors"]

i = 0
for sensor in sensorData["value"]:
for prop in sensor["properties"]["value"]:
if data["Type"].lower() in prop:
sensorData["value"][i]["properties"]["value"][data["Type"].lower()] = {
"value": data["Value"],
"timestamp": {
"value": datetime.now().isoformat()
}
}
i = i + 1

updateResponse = agent.hiascdi.update_entity(
entity, entity_type, {
"networkStatus": {"value": "ONLINE"},
"networkStatus.metadata": {"timestamp": {
"value": datetime.now().isoformat()
}},
"dateModified": {"value": datetime.now().isoformat()},
"sensors": sensorData
})

if updateResponse:
_id = agent.hiashdi.insert_data("Sensors", {
"Use": entity_type,
"Location": location,
"Zone": zone,
"Device": entity if entity_type == "Device" else "NA",
"HIASCDI": entity if entity_type == "HIASCDI" else "NA",
"Agent": entity if entity_type == "Agent" else "NA",
"Application": entity if entity_type == "Application" else "NA",
"Device": entity if entity_type == "Device" else "NA",
"Staff": entity if entity_type == "Staff" else "NA",
"Sensor": data["Sensor"],
"Type": data["Type"],
"Value": data["Value"],
"Message": data["Message"],
"Time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})

if _id != False:
agent.helpers.logger.info(
entity_type + " " + entity + " sensors update OK")

agent.mqtt.publish("Integrity", {
"_id": str(_id),
"Sensor": data["Sensor"],
"Type": data["Type"],
"Value": data["Value"],
"Message": data["Message"]
})

else:
agent.helpers.logger.error(
entity_type + " " + entity + " sensors update KO")
else:
agent.helpers.logger.error(
entity_type + " " + entity + " sensors update KO")

@app.route('/About', methods=['GET'])
def about():
Expand All @@ -120,10 +276,10 @@ def about():
Responds to GET requests sent to the North Port About API endpoint.
"""

return Agent.respond(200, {
"Identifier": Agent.credentials["iotJumpWay"]["entity"],
"Host": Agent.credentials["server"]["ip"],
"NorthPort": Agent.credentials["server"]["port"],
return agent.respond(200, {
"Identifier": agent.credentials["iotJumpWay"]["entity"],
"Host": agent.credentials["server"]["ip"],
"NorthPort": agent.credentials["server"]["port"],
"CPU": psutil.cpu_percent(),
"Memory": psutil.virtual_memory()[2],
"Diskspace": psutil.disk_usage('/').percent,
Expand All @@ -132,29 +288,40 @@ def about():

def main():

signal.signal(signal.SIGINT, Agent.signal_handler)
signal.signal(signal.SIGTERM, Agent.signal_handler)

Agent.mongodbConn()
Agent.hiascdiConn()
Agent.hiasbchConn()
Agent.mqttConn({
"host": Agent.credentials["iotJumpWay"]["host"],
"port": Agent.credentials["iotJumpWay"]["port"],
"location": Agent.credentials["iotJumpWay"]["location"],
"zone": Agent.credentials["iotJumpWay"]["zone"],
"entity": Agent.credentials["iotJumpWay"]["entity"],
"name": Agent.credentials["iotJumpWay"]["name"],
"un": Agent.credentials["iotJumpWay"]["un"],
"up": Agent.credentials["iotJumpWay"]["up"]
signal.signal(signal.SIGINT, agent.signal_handler)
signal.signal(signal.SIGTERM, agent.signal_handler)

agent.hiascdi_connection()
agent.get_ble_devices()
agent.hiashdi_connection()
agent.hiasbch_connection()

agent.mqtt_connection({
"host": agent.credentials["iotJumpWay"]["host"],
"port": agent.credentials["iotJumpWay"]["port"],
"location": agent.credentials["iotJumpWay"]["location"],
"zone": agent.credentials["iotJumpWay"]["zone"],
"entity": agent.credentials["iotJumpWay"]["entity"],
"name": agent.credentials["iotJumpWay"]["name"],
"un": agent.credentials["iotJumpWay"]["un"],
"up": agent.credentials["iotJumpWay"]["up"]
})

Agent.mqtt.integrityCallback = Agent.integrityCallback

Thread(target=Agent.life, args=(), daemon=True).start()

app.run(host=Agent.helpers.credentials["server"]["ip"],
port=Agent.helpers.credentials["server"]["port"])
for ble in agent.bles:
agent.ble_tracker[ble[0]] = {
"location": ble[3],
"zone": ble[4],
"device": ble[5],
"address": ble[0],
"last_seen": ""
}
Thread(target=agent.ble_connection, args=(ble[0], ble[1], ble[2]), daemon=True).start()

Thread(target=agent.life, args=(), daemon=True).start()
Thread(target=agent.check_ble_devices, args=(), daemon=True).start()

app.run(host=agent.helpers.credentials["server"]["ip"],
port=agent.helpers.credentials["server"]["port"])

if __name__ == "__main__":
main()

0 comments on commit ba85dfa

Please sign in to comment.