From 97cad8be2ccdc99c7922b2afcde7facf36111b3b Mon Sep 17 00:00:00 2001 From: Xlin123 Date: Sat, 2 Dec 2023 20:16:47 -0500 Subject: [PATCH 1/6] feat: decrypt_events, get_decrypted_events, and a modified start_monitor --- at_client/atclient.py | 48 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/at_client/atclient.py b/at_client/atclient.py index d2c6b38..be3b619 100644 --- a/at_client/atclient.py +++ b/at_client/atclient.py @@ -25,6 +25,7 @@ class AtClient(ABC): def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, queue:Queue=None, verbose:bool = False): self.atsign = atsign self.queue = queue + self.decrypted_events = Queue(queue.maxsize) self.monitor_connection = None self.keys = KeysUtil.load_keys(atsign) self.verbose = verbose @@ -342,8 +343,16 @@ def delete(self, key): def __del__(self): if self.secondary_connection: self.secondary_connection.disconnect() + + def start_monitor(self, regex="", decrypt_events=True): + threading.Thread(target=self._start_monitor, args=(regex,)).start() + while self.monitor_connection == None: + time.sleep(0.1) + if decrypt_events: + threading.Thread(target=self.decrypt_events).start() + - def start_monitor(self, regex=""): + def _start_monitor(self, regex=""): if self.queue != None: global should_be_running_lock what = "" @@ -366,7 +375,7 @@ def start_monitor(self, regex=""): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - def stop_monitor(self): + def _stop_monitor(self): if self.queue != None: global should_be_running_lock what = "" @@ -386,7 +395,8 @@ def stop_monitor(self): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - def handle_event(self, queue, at_event): + #I want to make this private. Not sure if we should abstract this, along with the above 2, out to a separate class + def _handle_event(self, queue, at_event): if self.queue != None: try: event_type = at_event.event_type @@ -406,13 +416,16 @@ def handle_event(self, queue, at_event): key = event_data["key"] encrypted_value = event_data["value"] ivNonce = event_data["metadata"]["ivNonce"] + self.secondary_connection.execute_command("notify:remove:" + event_data["id"]) try: encryption_key_shared_by_other = self.get_encryption_key_shared_by_other(SharedKey.from_string(key=key)) decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_text=encrypted_value.encode(), self_encryption_key=encryption_key_shared_by_other, iv=base64.b64decode(ivNonce)) new_event_data = dict(event_data) new_event_data["decryptedValue"] = decrypted_value new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data) - queue.put(new_at_event) + #For now,the decrypted event is going into both queues, since older code may be listening to the queue directly? + self.decrypted_events.put(new_at_event) + #self.queue.put(new_at_event) except Exception as e: print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received data with key name [" + key + "]") except Empty: @@ -420,6 +433,33 @@ def handle_event(self, queue, at_event): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + #Up to suggestions on this + #I don't want to leave this as a while True, but I'm not sure how else to tackle it... + def decrypt_events(self): + while True: + try: + at_event = self.queue.get(block=False) + event_type = at_event.event_type + if event_type == AtEventType.UPDATE_NOTIFICATION or event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: + self._handle_event(self.queue, at_event) + timeout = 0 + except Empty: + pass + + + #Handling events using a generator. + def get_decrypted_events(self): + if self.decrypted_events != None: + while True: + try: + at_event = self.decrypted_events.get(block=False) + yield at_event + except Empty: + break + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + def notify(self, at_key : AtKey, value, operation = OperationEnum.UPDATE, session_id = str(uuid.uuid4())): iv = at_key.metadata.iv_nonce shared_key = self.get_encryption_key_shared_by_me(at_key) From 5ce12367440f3f3c0dd69f7e7c8bd6b97f7c6a2c Mon Sep 17 00:00:00 2001 From: Xlin123 Date: Sat, 2 Dec 2023 20:29:57 -0500 Subject: [PATCH 2/6] fix: queue.maxsize when empty --- at_client/atclient.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/at_client/atclient.py b/at_client/atclient.py index be3b619..266a17b 100644 --- a/at_client/atclient.py +++ b/at_client/atclient.py @@ -25,7 +25,8 @@ class AtClient(ABC): def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, queue:Queue=None, verbose:bool = False): self.atsign = atsign self.queue = queue - self.decrypted_events = Queue(queue.maxsize) + if queue != None: + self.decrypted_events = Queue(queue.maxsize) self.monitor_connection = None self.keys = KeysUtil.load_keys(atsign) self.verbose = verbose From de5fb45cbde5757cd2f3118175ba8edfd664fbed Mon Sep 17 00:00:00 2001 From: Xavier Lin Date: Wed, 20 Dec 2023 14:27:16 -0500 Subject: [PATCH 3/6] chore: check in changes --- at_client/atclient.py | 26 ++++++++------------------ test/notification_test.py | 0 2 files changed, 8 insertions(+), 18 deletions(-) create mode 100644 test/notification_test.py diff --git a/at_client/atclient.py b/at_client/atclient.py index 266a17b..f0bacfa 100644 --- a/at_client/atclient.py +++ b/at_client/atclient.py @@ -350,7 +350,7 @@ def start_monitor(self, regex="", decrypt_events=True): while self.monitor_connection == None: time.sleep(0.1) if decrypt_events: - threading.Thread(target=self.decrypt_events).start() + threading.Thread(target=self.decrypt_events, args=(self.queue,)).start() def _start_monitor(self, regex=""): @@ -424,7 +424,7 @@ def _handle_event(self, queue, at_event): new_event_data = dict(event_data) new_event_data["decryptedValue"] = decrypted_value new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data) - #For now,the decrypted event is going into both queues, since older code may be listening to the queue directly? + #What should I do about legacy code that is expecting the decrypted event to be in the queue? self.decrypted_events.put(new_at_event) #self.queue.put(new_at_event) except Exception as e: @@ -437,29 +437,19 @@ def _handle_event(self, queue, at_event): #Up to suggestions on this #I don't want to leave this as a while True, but I'm not sure how else to tackle it... - def decrypt_events(self): + def decrypt_events(self, queue): while True: try: - at_event = self.queue.get(block=False) + at_event = queue.get(block=False) event_type = at_event.event_type if event_type == AtEventType.UPDATE_NOTIFICATION or event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: - self._handle_event(self.queue, at_event) + self._handle_event(queue, at_event) timeout = 0 except Empty: pass - - - #Handling events using a generator. - def get_decrypted_events(self): - if self.decrypted_events != None: - while True: - try: - at_event = self.decrypted_events.get(block=False) - yield at_event - except Empty: - break - else: - raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + def get_decrypted_events(self, queue): + return list(queue.queue) def notify(self, at_key : AtKey, value, operation = OperationEnum.UPDATE, session_id = str(uuid.uuid4())): iv = at_key.metadata.iv_nonce diff --git a/test/notification_test.py b/test/notification_test.py new file mode 100644 index 0000000..e69de29 From 3ed649ff15a12ee7bde53ef03586c7017d1abbf9 Mon Sep 17 00:00:00 2001 From: Xlin123 Date: Thu, 21 Dec 2023 19:27:56 -0500 Subject: [PATCH 4/6] feat: AtNotificationService --- at_client/atclient.py | 55 +++---- .../notification/atnotificationservice.py | 153 ++++++++++++++++++ 2 files changed, 173 insertions(+), 35 deletions(-) create mode 100644 at_client/connections/notification/atnotificationservice.py diff --git a/at_client/atclient.py b/at_client/atclient.py index f0bacfa..b105fe3 100644 --- a/at_client/atclient.py +++ b/at_client/atclient.py @@ -1,10 +1,8 @@ -import base64 -import json +import base64, json, time, traceback, uuid from queue import Empty, Queue -import time -import traceback from at_client.connections.notification.atevents import AtEvent, AtEventType +from at_client.connections.notification.atnotificationservice import AtNotificationService from .common.atsign import AtSign @@ -25,8 +23,6 @@ class AtClient(ABC): def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, queue:Queue=None, verbose:bool = False): self.atsign = atsign self.queue = queue - if queue != None: - self.decrypted_events = Queue(queue.maxsize) self.monitor_connection = None self.keys = KeysUtil.load_keys(atsign) self.verbose = verbose @@ -40,6 +36,9 @@ def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org" self.secondary_connection.connect() AuthUtil.authenticate_with_pkam(self.secondary_connection, self.atsign, self.keys) self.authenticated = True + if queue != None: + self.notification_service = AtNotificationService(self, verbose=verbose) + def get_at_keys(self, regex, fetch_metadata): scan_command = ScanVerbBuilder().set_regex(regex).set_show_hidden(True).build() @@ -344,16 +343,11 @@ def delete(self, key): def __del__(self): if self.secondary_connection: self.secondary_connection.disconnect() - - def start_monitor(self, regex="", decrypt_events=True): - threading.Thread(target=self._start_monitor, args=(regex,)).start() - while self.monitor_connection == None: - time.sleep(0.1) - if decrypt_events: - threading.Thread(target=self.decrypt_events, args=(self.queue,)).start() - - def _start_monitor(self, regex=""): + def start_monitor(self, regex=""): + ''' + DEPRECATED: use AtNotificationService instead + ''' if self.queue != None: global should_be_running_lock what = "" @@ -376,7 +370,10 @@ def _start_monitor(self, regex=""): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - def _stop_monitor(self): + def stop_monitor(self): + ''' + DEPRECATED: use AtNotificationService instead + ''' if self.queue != None: global should_be_running_lock what = "" @@ -396,8 +393,11 @@ def _stop_monitor(self): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - #I want to make this private. Not sure if we should abstract this, along with the above 2, out to a separate class - def _handle_event(self, queue, at_event): + + def handle_event(self, queue, at_event): + ''' + DEPRECATED: use AtNotificationService instead + ''' if self.queue != None: try: event_type = at_event.event_type @@ -434,23 +434,8 @@ def _handle_event(self, queue, at_event): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - - #Up to suggestions on this - #I don't want to leave this as a while True, but I'm not sure how else to tackle it... - def decrypt_events(self, queue): - while True: - try: - at_event = queue.get(block=False) - event_type = at_event.event_type - if event_type == AtEventType.UPDATE_NOTIFICATION or event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: - self._handle_event(queue, at_event) - timeout = 0 - except Empty: - pass - - def get_decrypted_events(self, queue): - return list(queue.queue) - + #I want to leave this here instead of the AtNotificationService + # because it's logic is fairly related to the AtClient instead of the NotficationService (despite the name..) def notify(self, at_key : AtKey, value, operation = OperationEnum.UPDATE, session_id = str(uuid.uuid4())): iv = at_key.metadata.iv_nonce shared_key = self.get_encryption_key_shared_by_me(at_key) diff --git a/at_client/connections/notification/atnotificationservice.py b/at_client/connections/notification/atnotificationservice.py new file mode 100644 index 0000000..4a72ed5 --- /dev/null +++ b/at_client/connections/notification/atnotificationservice.py @@ -0,0 +1,153 @@ +import base64, threading, time, traceback +from queue import Empty, Queue +from at_client.common.keys import AtKey, SharedKey +from at_client.connections.atmonitorconnection import AtMonitorConnection +from at_client.connections.notification.atevents import AtEvent, AtEventType +from at_client.util.authutil import AuthUtil +from at_client.util.encryptionutil import EncryptionUtil +from at_client.util.keysutil import KeysUtil + +class AtNotificationService: + + + def __init__(self, at_client, verbose = False): + self._notification_list = [] + self.at_client = at_client + self.verbose = verbose + self.monitor_connection = at_client.monitor_connection + self.queue = at_client.queue + if at_client.queue != None: + self.decrypted_events = Queue(at_client.queue.maxsize) + else: + raise Exception("You must assign a Queue object to the queue parameter of AtClient class") + + def start_monitor(self, regex="", decrypt_events=True): + ''' + Starts the monitor connection and starts listening for events. + Will start a thread to decrypt events if decrypt_events is True. + ''' + threading.Thread(target=self._start_monitor, args=(regex,)).start() + while self.monitor_connection == None: + time.sleep(0.1) + if decrypt_events: + threading.Thread(target=self._decrypt_events, args=(self.queue,)).start() + + + def stop_monitor(self): + ''' + Stops the monitor connection. + ''' + if self.queue != None: + should_be_running_lock + what = "" + try: + if self.monitor_connection == None: + return + should_be_running_lock.acquire(blocking=1) + if not self.monitor_connection.running: + should_be_running_lock.release() + what = "call monitor_connection.stop_monitor()" + self.monitor_connection.stop_monitor() + else: + should_be_running_lock.release() + except Exception as e: + print("SEVERE: failed to " + what + " : " + str(e)) + traceback.print_exc() + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + + def get_decrypted_events(self): + ''' + Returns a generator that yields decrypted events. + + Example: + for event in at_client.get_decrypted_events(): + do_something_with_event(event) + ''' + if self.decrypted_events != None: + while True: + try: + at_event = self.decrypted_events.get(block=False) + yield at_event + except Empty: + break + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + def _start_monitor(self, regex=""): + if self.queue != None: + global should_be_running_lock + should_be_running_lock = threading.Lock() + what = "" + try: + if self.monitor_connection == None: + what = "construct an AtMonitorConnection" + self.monitor_connection = AtMonitorConnection(queue=self.queue, atsign=self.at_client.atsign, address=self.at_client.secondary_address, verbose=self.verbose) + self.monitor_connection.connect() + AuthUtil.authenticate_with_pkam(self.monitor_connection, self.at_client.atsign, self.at_client.keys) + should_be_running_lock.acquire(blocking=1) + if not self.monitor_connection.running: + should_be_running_lock.release() + what = "call monitor_connection.start_monitor()" + self.monitor_connection.start_monitor(regex) + else: + should_be_running_lock.release() + except Exception as e: + print("SEVERE: failed to " + what + " : " + str(e)) + traceback.print_exc() + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + + + + def _handle_event(self, queue, at_event): + if queue != None: + try: + event_type = at_event.event_type + event_data = at_event.event_data + + if event_type == AtEventType.SHARED_KEY_NOTIFICATION: + if event_data["value"] != None: + shared_shared_key_name = event_data["key"] + shared_shared_key_encrypted_value = event_data["value"] + try: + shared_key_decrypted_value = EncryptionUtil.rsa_decrypt_from_base64(shared_shared_key_encrypted_value, self.keys[KeysUtil.encryption_private_key_name]) + self.keys[shared_shared_key_name] = shared_key_decrypted_value + except Exception as e: + print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received shared key " + shared_shared_key_name) + elif event_type == AtEventType.UPDATE_NOTIFICATION: + if event_data["value"] != None: + key = event_data["key"] + encrypted_value = event_data["value"] + ivNonce = event_data["metadata"]["ivNonce"] + self.at_client.secondary_connection.execute_command("notify:remove:" + event_data["id"]) + try: + encryption_key_shared_by_other = self.at_client.get_encryption_key_shared_by_other(SharedKey.from_string(key=key)) + decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_text=encrypted_value.encode(), self_encryption_key=encryption_key_shared_by_other, iv=base64.b64decode(ivNonce)) + new_event_data = dict(event_data) + new_event_data["decryptedValue"] = decrypted_value + new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data) + self.decrypted_events.put(new_at_event) + except Exception as e: + print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received data with key name [" + key + "]") + except Empty: + pass + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + + def _decrypt_events(self, queue): + while True: + try: + at_event = queue.get(block=False) + event_type = at_event.event_type + if event_type == AtEventType.UPDATE_NOTIFICATION or event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: + self._handle_event(queue, at_event) + + except Empty: + pass + + + \ No newline at end of file From d3e1a7aa65acc0c60cac3b0becb401ffb09e63d2 Mon Sep 17 00:00:00 2001 From: Xlin123 Date: Thu, 4 Jan 2024 15:58:33 -0500 Subject: [PATCH 5/6] chore: some touch ups --- .../connections/notification/atnotificationservice.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/at_client/connections/notification/atnotificationservice.py b/at_client/connections/notification/atnotificationservice.py index 4a72ed5..e01a828 100644 --- a/at_client/connections/notification/atnotificationservice.py +++ b/at_client/connections/notification/atnotificationservice.py @@ -122,7 +122,6 @@ def _handle_event(self, queue, at_event): key = event_data["key"] encrypted_value = event_data["value"] ivNonce = event_data["metadata"]["ivNonce"] - self.at_client.secondary_connection.execute_command("notify:remove:" + event_data["id"]) try: encryption_key_shared_by_other = self.at_client.get_encryption_key_shared_by_other(SharedKey.from_string(key=key)) decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_text=encrypted_value.encode(), self_encryption_key=encryption_key_shared_by_other, iv=base64.b64decode(ivNonce)) @@ -130,6 +129,7 @@ def _handle_event(self, queue, at_event): new_event_data["decryptedValue"] = decrypted_value new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data) self.decrypted_events.put(new_at_event) + self.at_client.secondary_connection.execute_command("notify:remove:" + event_data["id"]) except Exception as e: print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received data with key name [" + key + "]") except Empty: @@ -137,7 +137,7 @@ def _handle_event(self, queue, at_event): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - + # This method is meant to be run in a separate thread def _decrypt_events(self, queue): while True: try: @@ -148,6 +148,9 @@ def _decrypt_events(self, queue): except Empty: pass + except Exception as e: + print("SEVERE: failed to decrypt event : " + str(e)) + break \ No newline at end of file From e051e2e0b620acc96baabb4bda74450d352b37ad Mon Sep 17 00:00:00 2001 From: Xlin123 Date: Thu, 4 Jan 2024 16:00:03 -0500 Subject: [PATCH 6/6] chore: remove lingering comments and moving stuff around --- at_client/atclient.py | 3 +- .../notification/atnotificationservice.py | 36 ++++++++----------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/at_client/atclient.py b/at_client/atclient.py index b105fe3..5808497 100644 --- a/at_client/atclient.py +++ b/at_client/atclient.py @@ -434,8 +434,7 @@ def handle_event(self, queue, at_event): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - #I want to leave this here instead of the AtNotificationService - # because it's logic is fairly related to the AtClient instead of the NotficationService (despite the name..) + def notify(self, at_key : AtKey, value, operation = OperationEnum.UPDATE, session_id = str(uuid.uuid4())): iv = at_key.metadata.iv_nonce shared_key = self.get_encryption_key_shared_by_me(at_key) diff --git a/at_client/connections/notification/atnotificationservice.py b/at_client/connections/notification/atnotificationservice.py index e01a828..1622f51 100644 --- a/at_client/connections/notification/atnotificationservice.py +++ b/at_client/connections/notification/atnotificationservice.py @@ -99,8 +99,20 @@ def _start_monitor(self, regex=""): else: raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - - + # This method is meant to be run in a separate thread + def _decrypt_events(self, queue): + while True: + try: + at_event = queue.get(block=False) + event_type = at_event.event_type + if event_type == AtEventType.UPDATE_NOTIFICATION or event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: + self._handle_event(queue, at_event) + + except Empty: + pass + except Exception as e: + print("SEVERE: failed to decrypt event : " + str(e)) + break def _handle_event(self, queue, at_event): if queue != None: @@ -135,22 +147,4 @@ def _handle_event(self, queue, at_event): except Empty: pass else: - raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") - - # This method is meant to be run in a separate thread - def _decrypt_events(self, queue): - while True: - try: - at_event = queue.get(block=False) - event_type = at_event.event_type - if event_type == AtEventType.UPDATE_NOTIFICATION or event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: - self._handle_event(queue, at_event) - - except Empty: - pass - except Exception as e: - print("SEVERE: failed to decrypt event : " + str(e)) - break - - - \ No newline at end of file + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") \ No newline at end of file