-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rewrite to remove mqtt and send notification to ntfy, no longer runs …
…in loop and takes commandline arguments
- Loading branch information
1 parent
efbae73
commit f8c39f1
Showing
23 changed files
with
114 additions
and
218 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import logging | ||
|
||
from .api import Api | ||
from datetime import date | ||
import requests | ||
|
||
class Bindicator: | ||
|
||
api: Api | ||
|
||
def __init__(self, uprn, topic) -> None: | ||
self.api = Api(uprn) | ||
self.topic = topic | ||
|
||
def run(self): | ||
collections = self.api.get_data() | ||
|
||
if collections: | ||
logging.debug( | ||
f'{len(collections)} found publishing first collection date information') | ||
if date.today() == collections[0].date: | ||
message = f'Bin collection is today for {collections[0].wheelie.bin_type}' | ||
logging.info(message ) | ||
requests.post(f"https://ntfy.sh/{self.topic}",data=message.encode(encoding='utf-8')) | ||
else: | ||
logging.info( | ||
f'Next bin collection is {collections[0].date}, {collections[0].wheelie.bin_type}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import logging | ||
import argparse | ||
|
||
from app.bindicator import Bindicator | ||
|
||
if __name__ == "__main__": | ||
|
||
logging.basicConfig( | ||
level="INFO", | ||
format="%(asctime)s | %(levelname)-8s | %(message)s", | ||
datefmt="%Y-%m-%d %H:%M:%S", | ||
) | ||
|
||
logging.info('Creating Harlow Bindicator') | ||
parser = argparse.ArgumentParser(description="Environment Checker") | ||
parser.add_argument( | ||
"--uprn", type=str, required=True, help="Property Reference Number" | ||
) | ||
parser.add_argument( | ||
"--topic", type=str, required=True, help="Ntfy Topic" | ||
) | ||
args = parser.parse_args() | ||
bindicator = Bindicator(args.uprn, args.topic) | ||
bindicator.run() | ||
logging.info('Harlow Bindicator run completed') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,54 @@ | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
from datetime import date, timedelta | ||
|
||
from harlow_bindicator.app.bindicator import Bindicator | ||
from harlow_bindicator.app.configuration import Configuration | ||
from harlow_bindicator.app.data import Collection, CollectionDate | ||
|
||
from src.app.bindicator import Bindicator | ||
from src.app.data import Collection, CollectionDate | ||
|
||
def test_bindicator_init(): | ||
with pytest.raises(Exception): | ||
with pytest.raises(TypeError): | ||
Bindicator() | ||
|
||
|
||
@patch("harlow_bindicator.app.bindicator.MQTT") | ||
def test_bindicator_init_config(mock_mqtt): | ||
config = Configuration() | ||
bindicator = Bindicator(config) | ||
|
||
assert bindicator | ||
assert bindicator.configuration | ||
assert bindicator.mqtt_client | ||
assert bindicator.api | ||
mock_mqtt.called_once_with(config) | ||
|
||
@patch("src.app.bindicator.Api") | ||
def test_bindicator_run_no_collections(mock_api): | ||
bindicator = Bindicator("uprn", "topic") | ||
mock_api().get_data.return_value = None | ||
bindicator.run() | ||
mock_api().get_data.assert_called_once() | ||
|
||
@patch("harlow_bindicator.app.bindicator.MQTT") | ||
@patch("harlow_bindicator.app.bindicator.Api") | ||
def test_bindicator_run_no_collections(mock_api, mock_mqtt): | ||
config = Configuration() | ||
bindicator = Bindicator(config) | ||
|
||
mock_api().get_data.return_value = None | ||
@patch("src.app.bindicator.Api") | ||
@patch("src.app.bindicator.requests") | ||
def test_bindicator_run_collections(mock_requests,mock_api): | ||
bindicator = Bindicator("uprn", "topic") | ||
today = date.today() | ||
collection = Collection("recycling", today.strftime("%d/%m/%Y")) | ||
collection_date = CollectionDate(collection) | ||
mock_api().get_data.return_value = [collection_date] | ||
|
||
bindicator.run() | ||
|
||
mock_api().get_data.called_once() | ||
assert not mock_mqtt().publish.called | ||
mock_api().get_data.assert_called_once() | ||
mock_requests.post.assert_called_once_with( | ||
f"https://ntfy.sh/topic", | ||
data=f"Bin collection is today for {collection_date.wheelie.bin_type}".encode(encoding='utf-8') | ||
) | ||
|
||
|
||
@patch("harlow_bindicator.app.bindicator.MQTT") | ||
@patch("harlow_bindicator.app.bindicator.Api") | ||
def test_bindicator_run_collections(mock_api, mock_mqtt): | ||
config = Configuration() | ||
bindicator = Bindicator(config) | ||
@patch("src.app.bindicator.Api") | ||
@patch("src.app.bindicator.requests") | ||
def test_no_collection_today(mock_requests,mock_api): | ||
bindicator = Bindicator("uprn", "topic") | ||
|
||
collection = Collection("recycling", "15/1/2023") | ||
future_date = (date.today() + timedelta(days=1)).strftime("%d/%m/%Y") | ||
collection = Collection("recycling", future_date) | ||
collection_date = CollectionDate(collection) | ||
mock_api().get_data.return_value = [collection_date] | ||
expected = '{"date": "15/01/2023", "bin_day": false, "bin_type": "recycling"}' | ||
|
||
bindicator.run() | ||
|
||
mock_api().get_data.called_once() | ||
mock_mqtt().publish.called_once_with(expected) | ||
mock_api().get_data.assert_called_once() | ||
mock_requests.post.assert_not_called() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.