diff --git a/docs/Logging.md b/docs/Logging.md new file mode 100644 index 0000000..ca5d325 --- /dev/null +++ b/docs/Logging.md @@ -0,0 +1,35 @@ +# Logging Documentation + +This document provides a detailed overview of what is being logged in the Sonar setup. + +## Table of Contents + +1. [Overview](#overview) +2. [Logging Types](#logging-types) +3. [Log Sources](#log-sources) +4. [Log Details](#log-details) + +## Overview + +This documentation aims to provide transparency on the logging mechanisms implemented in the Sonar project. It includes information on the types of data being logged, their sources, formats, and purposes. + +## Logging Types + +- **DEBUG:** Detailed information, typically of interest only when diagnosing problems. +- **INFO:** Confirmation that things are working as expected. +- **Tensorboard logging**: Logging specific metrics, images, and other data to TensorBoard for visualization and analysis. + - Console Logging: Logs a message to the console. + - Scalar Logging: Logs scalar values to TensorBoard for tracking metrics(loss, accuracy) over time. + - Image Logging: Logs images to both a file and TensorBoard for visual analysis. + +## Log Sources + +| Component/Module | Data Logged | Log Level | Format | Storage Location | Frequency/Trigger | +|--------------------|--------------------------------------------------|---------------|-------------|-------------------------------------|----------------------------------------| +| Model Training (FL) | Aggregated model metrics, client updates | INFO, DEBUG | Plain text | `./expt_dump//logs/client_/summary.txt` | On every FL round + +## Log Details + +### Federated Learning +Logs aggregated model metrics (loss and accuracy) and updates from clients to track the overall progress and performance of the federated learning process. Additionally, logs include training loss and accuracy from individual clients. Also logs communication events between the server and clients to monitor interactions and data exchange. + diff --git a/resources/images/Client_metrics.png b/resources/images/Client_metrics.png new file mode 100644 index 0000000..81ee6d2 Binary files /dev/null and b/resources/images/Client_metrics.png differ diff --git a/resources/images/Server_metrics.png b/resources/images/Server_metrics.png new file mode 100644 index 0000000..206c5b7 Binary files /dev/null and b/resources/images/Server_metrics.png differ diff --git a/src/README.md b/src/README.md index 29f6ad0..0e66197 100644 --- a/src/README.md +++ b/src/README.md @@ -17,12 +17,17 @@ We have intentionally kept configuration files as a python file which is typical One of the awesome things about this project is that whenever you run an experiment, all the source code, logs, and model weights are saved in a separate folder. This is done to ensure that you can reproduce the results by looking at the code that was responsible for the results. The naming of the folder is based on the keys inside the config file. That also means you can not run the same experiment again without renaming/deleting the previous experimental run. The code automatically asks you to press `r` to remove and create a new folder. Be careful you are not overwriting someone else's results. ### Logging -We log the results in the console and also in a log file that captures the same information. We also log a few metrics for the tensorboard. +We log metrics such as loss, accuracy along with other client-server synchronization details in log file and in tensorboard. +For detail check [Logging](../docs/Logging.md) The tensorboard logs can be viewed by running tensorboard as follows:
`tensorboard --logdir=expt_dump/ --host 0.0.0.0`
Assuming `expt_dump` is the folder where the experiment logs are stored. -After a successful run with 50 epochs, the Tensorboard experiment log should look something like below: +After a successful run with 30 epochs, the Tensorboard experiment log displays the following Client Metrics and Server Metrics on the local host: - +Client Metrics + + +Server Metrics + diff --git a/src/algos/base_class.py b/src/algos/base_class.py index 3bf0ced..f61fcdc 100644 --- a/src/algos/base_class.py +++ b/src/algos/base_class.py @@ -29,7 +29,7 @@ get_dset_communities, ) import torchvision.transforms as T - +import os class BaseNode(ABC): def __init__(self, config) -> None: @@ -37,6 +37,13 @@ def __init__(self, config) -> None: self.node_id = self.comm_utils.rank if self.node_id == 0: + self.log_dir = config['log_path'] + config['log_path'] = f'{self.log_dir}/server' + try: + os.mkdir(config['log_path']) + except FileExistsError: + pass + config['load_existing'] = False self.log_utils = LogUtils(config) self.log_utils.log_console("Config: {}".format(config)) self.plot_utils = PlotUtils(config) @@ -68,8 +75,10 @@ def setup_cuda(self, config): if torch.cuda.is_available(): self.device = torch.device("cuda:{}".format(gpu_id)) + print("Using GPU: cuda:{}".format(gpu_id)) else: self.device = torch.device("cpu") + print("Using CPU") def set_model_parameters(self, config): # Model related parameters diff --git a/src/algos/fl.py b/src/algos/fl.py index 11059cb..781c417 100644 --- a/src/algos/fl.py +++ b/src/algos/fl.py @@ -2,9 +2,10 @@ from typing import Any, Dict, List from torch import Tensor import torch.nn as nn - +from utils.log_utils import LogUtils from algos.base_class import BaseClient, BaseServer - +import os +import time class CommProtocol(object): """ @@ -21,16 +22,44 @@ def __init__(self, config) -> None: super().__init__(config) self.config = config self.tag = CommProtocol + self.folder_deletion_signal = config["folder_deletion_signal_path"] + + while not os.path.exists(self.folder_deletion_signal): + print("Existing experiment already present, waiting user input, enter 'r' or 'e'...") + time.sleep(5) + + # Once the signal file exists, read its contents + with open(self.folder_deletion_signal, "r") as signal_file: + mode = signal_file.read().strip() + + if mode == 'r' or mode == 'new': + try: + config['log_path'] = f"{config['log_path']}/client_{self.node_id}" + os.makedirs(config['log_path'], exist_ok=True) + except FileExistsError: + pass - def local_train(self): + config['load_existing'] = False + self.client_log_utils = LogUtils(config) + + def local_train(self, round): """ Train the model locally """ - avg_loss = self.model_utils.train( + start_time = time.time() + avg_loss, avg_accuracy = self.model_utils.train( self.model, self.optim, self.dloader, self.loss_fn, self.device ) - print("Client {} finished training with loss {}".format(self.node_id, avg_loss)) - # self.log_utils.logger.log_tb(f"train_loss/client{client_num}", avg_loss, epoch) + end_time = time.time() + time_taken = end_time - start_time + + self.client_log_utils.log_console( + "Client {} finished training with loss {:.4f}, accuracy {:.4f}, time taken {:.2f} seconds".format(self.node_id, avg_loss, avg_accuracy, time_taken) + ) + self.client_log_utils.log_summary("Client {} finished training with loss {:.4f}, accuracy {:.4f}, time taken {:.2f} seconds".format(self.node_id, avg_loss, avg_accuracy, time_taken)) + + self.client_log_utils.log_tb(f"train_loss/client{self.node_id}", avg_loss, round) + self.client_log_utils.log_tb(f"train_accuracy/client{self.node_id}", avg_accuracy, round) def local_test(self, **kwargs): """ @@ -54,24 +83,23 @@ def run_protocol(self): start_epochs = self.config.get("start_epochs", 0) total_epochs = self.config["epochs"] for round in range(start_epochs, total_epochs): - # self.log_utils.logging.info("Client waiting for semaphore from {}".format(self.server_node)) - # print("Client waiting for semaphore from {}".format(self.server_node)) + self.client_log_utils.log_summary("Client {} waiting for semaphore from {}".format(self.node_id, self.server_node)) self.comm_utils.wait_for_signal(src=self.server_node, tag=self.tag.START) - # self.log_utils.logging.info("Client received semaphore from {}".format(self.server_node)) - self.local_train() + self.client_log_utils.log_summary("Client {} received semaphore from {}".format(self.node_id, self.server_node)) + self.local_train(round) self.local_test() repr = self.get_representation() - # self.log_utils.logging.info("Client {} sending done signal to {}".format(self.node_id, self.server_node)) + self.client_log_utils.log_summary("Client {} sending done signal to {}".format(self.node_id, self.server_node)) self.comm_utils.send_signal( dest=self.server_node, data=repr, tag=self.tag.DONE ) - # self.log_utils.logging.info("Client {} waiting to get new model from {}".format(self.node_id, self.server_node)) + self.client_log_utils.log_summary("Client {} waiting to get new model from {}".format(self.node_id, self.server_node)) repr = self.comm_utils.wait_for_signal( src=self.server_node, tag=self.tag.UPDATES ) - # self.log_utils.logging.info("Client {} received new model from {}".format(self.node_id, self.server_node)) + self.client_log_utils.log_summary("Client {} received new model from {}".format(self.node_id, self.server_node)) self.set_representation(repr) - # self.log_utils.logging.info("Round {} done".format(round)) + self.client_log_utils.log_summary("Round {} done for Client {}".format(round, self.node_id)) class FedAvgServer(BaseServer): @@ -84,6 +112,7 @@ def __init__(self, config) -> None: self.model_save_path = "{}/saved_models/node_{}.pt".format( self.config["results_path"], self.node_id ) + self.folder_deletion_signal = config["folder_deletion_signal_path"] def fed_avg(self, model_wts: List[OrderedDict[str, Tensor]]): # All models are sampled currently at every round @@ -122,15 +151,18 @@ def test(self) -> float: """ Test the model on the server """ - test_loss, acc = self.model_utils.test( + start_time = time.time() + test_loss, test_acc = self.model_utils.test( self.model, self._test_loader, self.loss_fn, self.device ) + end_time = time.time() + time_taken = end_time - start_time # TODO save the model if the accuracy is better than the best accuracy # so far - if acc > self.best_acc: - self.best_acc = acc + if test_acc > self.best_acc: + self.best_acc = test_acc self.model_utils.save_model(self.model, self.model_save_path) - return acc + return test_loss, test_acc, time_taken def single_round(self): """ @@ -148,6 +180,9 @@ def single_round(self): self.log_utils.log_console("Server received all clients done signal") avg_wts = self.aggregate(reprs) self.set_representation(avg_wts) + #Remove the signal file after confirming that all client paths have been created + if os.path.exists(self.folder_deletion_signal): + os.remove(self.folder_deletion_signal) def run_protocol(self): self.log_utils.log_console("Starting iid clients federated averaging") @@ -155,9 +190,13 @@ def run_protocol(self): total_epochs = self.config["epochs"] for round in range(start_epochs, total_epochs): self.log_utils.log_console("Starting round {}".format(round)) + self.log_utils.log_summary("Starting round {}".format(round)) self.single_round() self.log_utils.log_console("Server testing the model") - acc = self.test() + loss, acc, time_taken = self.test() self.log_utils.log_tb(f"test_acc/clients", acc, round) - self.log_utils.log_console("round: {} test_acc:{:.4f}".format(round, acc)) - self.log_utils.log_console("Round {} done".format(round)) + self.log_utils.log_tb(f"test_loss/clients", loss, round) + self.log_utils.log_console("Round: {} test_acc:{:.4f}, test_loss:{:.4f}, time taken {:.2f} seconds".format(round, acc, loss, time_taken)) + self.log_utils.log_summary("Round: {} test_acc:{:.4f}, test_loss:{:.4f}, time taken {:.2f} seconds".format(round, acc, loss, time_taken)) + self.log_utils.log_console("Round {} complete".format(round)) + self.log_utils.log_summary("Round {} complete".format(round,)) diff --git a/src/configs/sys_config.py b/src/configs/sys_config.py index 3238bc2..0d4ffa4 100644 --- a/src/configs/sys_config.py +++ b/src/configs/sys_config.py @@ -17,6 +17,7 @@ # we need to make this a dictionary with user_id as key and number of samples as value "train_label_distribution": "iid", "test_label_distribution": "iid", + "folder_deletion_signal_path":"./expt_dump/folder_deletion.signal" } current_config = system_config \ No newline at end of file diff --git a/src/utils/log_utils.py b/src/utils/log_utils.py index 3957f9d..fe94728 100644 --- a/src/utils/log_utils.py +++ b/src/utils/log_utils.py @@ -36,7 +36,7 @@ def deprocess(img): return img.type(torch.uint8) -def check_and_create_path(path): +def check_and_create_path(path, folder_deletion_path): """ Checks if the specified path exists and prompts the user for action if it does. Creates the directory if it does not exist. @@ -48,17 +48,28 @@ def check_and_create_path(path): print(f"Experiment in {path} already present") done = False while not done: - inp = input("Press e to exit, r to replace it: ") + # Color the input prompt + color_code = "\033[94m" # Blue text + reset_code = "\033[0m" # Reset to default color + # Highlighted prompt in blue + inp = input(f"{color_code}Press e to exit, r to replace it: {reset_code}") + if inp == "e": sys.exit() elif inp == "r": done = True shutil.rmtree(path) os.makedirs(path) + with open(folder_deletion_path, "w") as signal_file: + #Folder deletion complete signal. + signal_file.write("r") else: print("Input not understood") else: os.makedirs(path) + with open(folder_deletion_path, "w") as signal_file: + #new folder creation complete signal. + signal_file.write("new") def copy_source_code(config: dict) -> None: @@ -69,11 +80,12 @@ def copy_source_code(config: dict) -> None: config (dict): Configuration dictionary with the results path. """ path = config["results_path"] + folder_deletion_path = config["folder_deletion_signal_path"] print("exp path:", path) if config["load_existing"]: print("Continue with loading checkpoint") return - check_and_create_path(path) + check_and_create_path(path, folder_deletion_path) denylist = [ "./__pycache__/", "./.ipynb_checkpoints/", @@ -99,7 +111,7 @@ def copy_source_code(config: dict) -> None: if folder not in denylist: copytree(folder, path + folder[1:]) os.mkdir(config["saved_models"]) - os.mkdir(config["log_path"]) + os.makedirs(config["log_path"], exist_ok=True) print("source code copied to exp_dump") @@ -140,7 +152,7 @@ def init_tb(self, load_existing): """ tb_path = f"{self.log_dir}/tensorboard" if not load_existing: - os.makedirs(tb_path) + os.makedirs(tb_path, exist_ok=True) self.writer = SummaryWriter(tb_path) def init_npy(self): @@ -151,6 +163,16 @@ def init_npy(self): if not os.path.exists(npy_path) or not os.path.isdir(npy_path): os.makedirs(npy_path) + def log_summary(self, text): + """ + Add summary text to the summary file for logging. + """ + if self.summary_file: + self.summary_file.write(text + "\n") + self.summary_file.flush() + else: + raise ValueError("Summary file is not initialized. Call init_summary() first.") + def log_image(self, imgs: torch.Tensor, key, iteration): """ Log image to file and TensorBoard.