Skip to content

Commit

Permalink
add logging for client metrics, create logging.md (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitaparate authored Aug 1, 2024
1 parent 73eb5cf commit 2b71fbe
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 30 deletions.
35 changes: 35 additions & 0 deletions docs/Logging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Logging Documentation

This document provides a detailed overview of what is being logged in the Sonar setup.

## Table of Contents

1. [Overview](#overview)
2. [Logging Types](#logging-types)
3. [Log Sources](#log-sources)
4. [Log Details](#log-details)

## Overview

This documentation aims to provide transparency on the logging mechanisms implemented in the Sonar project. It includes information on the types of data being logged, their sources, formats, and purposes.

## Logging Types

- **DEBUG:** Detailed information, typically of interest only when diagnosing problems.
- **INFO:** Confirmation that things are working as expected.
- **Tensorboard logging**: Logging specific metrics, images, and other data to TensorBoard for visualization and analysis.
- Console Logging: Logs a message to the console.
- Scalar Logging: Logs scalar values to TensorBoard for tracking metrics(loss, accuracy) over time.
- Image Logging: Logs images to both a file and TensorBoard for visual analysis.

## Log Sources

| Component/Module | Data Logged | Log Level | Format | Storage Location | Frequency/Trigger |
|--------------------|--------------------------------------------------|---------------|-------------|-------------------------------------|----------------------------------------|
| Model Training (FL) | Aggregated model metrics, client updates | INFO, DEBUG | Plain text | `./expt_dump/<experiment_name>/logs/client_<client_index>/summary.txt` | On every FL round

## Log Details

### Federated Learning
Logs aggregated model metrics (loss and accuracy) and updates from clients to track the overall progress and performance of the federated learning process. Additionally, logs include training loss and accuracy from individual clients. Also logs communication events between the server and clients to monitor interactions and data exchange.

Binary file added resources/images/Client_metrics.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added resources/images/Server_metrics.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 8 additions & 3 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ We have intentionally kept configuration files as a python file which is typical
One of the awesome things about this project is that whenever you run an experiment, all the source code, logs, and model weights are saved in a separate folder. This is done to ensure that you can reproduce the results by looking at the code that was responsible for the results. The naming of the folder is based on the keys inside the config file. That also means you can not run the same experiment again without renaming/deleting the previous experimental run. The code automatically asks you to press `r` to remove and create a new folder. Be careful you are not overwriting someone else's results.

### Logging
We log the results in the console and also in a log file that captures the same information. We also log a few metrics for the tensorboard.
We log metrics such as loss, accuracy along with other client-server synchronization details in log file and in tensorboard.
For detail check [Logging](../docs/Logging.md)

The tensorboard logs can be viewed by running tensorboard as follows:<br>
`tensorboard --logdir=expt_dump/ --host 0.0.0.0`<br>
Assuming `expt_dump` is the folder where the experiment logs are stored.

After a successful run with 50 epochs, the Tensorboard experiment log should look something like below:
After a successful run with 30 epochs, the Tensorboard experiment log displays the following Client Metrics and Server Metrics on the local host:

<img src="../resources/images/TensorboardSample.png" width=50% height=50%>
Client Metrics
<img src="../resources/images/Client_metrics.png" width=50% height=50%>

Server Metrics
<img src="../resources/images/Server_metrics.png" width=50% height=50%>
11 changes: 10 additions & 1 deletion src/algos/base_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@
get_dset_communities,
)
import torchvision.transforms as T

import os

class BaseNode(ABC):
def __init__(self, config) -> None:
self.comm_utils = CommUtils()
self.node_id = self.comm_utils.rank

if self.node_id == 0:
self.log_dir = config['log_path']
config['log_path'] = f'{self.log_dir}/server'
try:
os.mkdir(config['log_path'])
except FileExistsError:
pass
config['load_existing'] = False
self.log_utils = LogUtils(config)
self.log_utils.log_console("Config: {}".format(config))
self.plot_utils = PlotUtils(config)
Expand Down Expand Up @@ -68,8 +75,10 @@ def setup_cuda(self, config):

if torch.cuda.is_available():
self.device = torch.device("cuda:{}".format(gpu_id))
print("Using GPU: cuda:{}".format(gpu_id))
else:
self.device = torch.device("cpu")
print("Using CPU")

def set_model_parameters(self, config):
# Model related parameters
Expand Down
81 changes: 60 additions & 21 deletions src/algos/fl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import Any, Dict, List
from torch import Tensor
import torch.nn as nn

from utils.log_utils import LogUtils
from algos.base_class import BaseClient, BaseServer

import os
import time

class CommProtocol(object):
"""
Expand All @@ -21,16 +22,44 @@ def __init__(self, config) -> None:
super().__init__(config)
self.config = config
self.tag = CommProtocol
self.folder_deletion_signal = config["folder_deletion_signal_path"]

while not os.path.exists(self.folder_deletion_signal):
print("Existing experiment already present, waiting user input, enter 'r' or 'e'...")
time.sleep(5)

# Once the signal file exists, read its contents
with open(self.folder_deletion_signal, "r") as signal_file:
mode = signal_file.read().strip()

if mode == 'r' or mode == 'new':
try:
config['log_path'] = f"{config['log_path']}/client_{self.node_id}"
os.makedirs(config['log_path'], exist_ok=True)
except FileExistsError:
pass

def local_train(self):
config['load_existing'] = False
self.client_log_utils = LogUtils(config)

def local_train(self, round):
"""
Train the model locally
"""
avg_loss = self.model_utils.train(
start_time = time.time()
avg_loss, avg_accuracy = self.model_utils.train(
self.model, self.optim, self.dloader, self.loss_fn, self.device
)
print("Client {} finished training with loss {}".format(self.node_id, avg_loss))
# self.log_utils.logger.log_tb(f"train_loss/client{client_num}", avg_loss, epoch)
end_time = time.time()
time_taken = end_time - start_time

self.client_log_utils.log_console(
"Client {} finished training with loss {:.4f}, accuracy {:.4f}, time taken {:.2f} seconds".format(self.node_id, avg_loss, avg_accuracy, time_taken)
)
self.client_log_utils.log_summary("Client {} finished training with loss {:.4f}, accuracy {:.4f}, time taken {:.2f} seconds".format(self.node_id, avg_loss, avg_accuracy, time_taken))

self.client_log_utils.log_tb(f"train_loss/client{self.node_id}", avg_loss, round)
self.client_log_utils.log_tb(f"train_accuracy/client{self.node_id}", avg_accuracy, round)

def local_test(self, **kwargs):
"""
Expand All @@ -54,24 +83,23 @@ def run_protocol(self):
start_epochs = self.config.get("start_epochs", 0)
total_epochs = self.config["epochs"]
for round in range(start_epochs, total_epochs):
# self.log_utils.logging.info("Client waiting for semaphore from {}".format(self.server_node))
# print("Client waiting for semaphore from {}".format(self.server_node))
self.client_log_utils.log_summary("Client {} waiting for semaphore from {}".format(self.node_id, self.server_node))
self.comm_utils.wait_for_signal(src=self.server_node, tag=self.tag.START)
# self.log_utils.logging.info("Client received semaphore from {}".format(self.server_node))
self.local_train()
self.client_log_utils.log_summary("Client {} received semaphore from {}".format(self.node_id, self.server_node))
self.local_train(round)
self.local_test()
repr = self.get_representation()
# self.log_utils.logging.info("Client {} sending done signal to {}".format(self.node_id, self.server_node))
self.client_log_utils.log_summary("Client {} sending done signal to {}".format(self.node_id, self.server_node))
self.comm_utils.send_signal(
dest=self.server_node, data=repr, tag=self.tag.DONE
)
# self.log_utils.logging.info("Client {} waiting to get new model from {}".format(self.node_id, self.server_node))
self.client_log_utils.log_summary("Client {} waiting to get new model from {}".format(self.node_id, self.server_node))
repr = self.comm_utils.wait_for_signal(
src=self.server_node, tag=self.tag.UPDATES
)
# self.log_utils.logging.info("Client {} received new model from {}".format(self.node_id, self.server_node))
self.client_log_utils.log_summary("Client {} received new model from {}".format(self.node_id, self.server_node))
self.set_representation(repr)
# self.log_utils.logging.info("Round {} done".format(round))
self.client_log_utils.log_summary("Round {} done for Client {}".format(round, self.node_id))


class FedAvgServer(BaseServer):
Expand All @@ -84,6 +112,7 @@ def __init__(self, config) -> None:
self.model_save_path = "{}/saved_models/node_{}.pt".format(
self.config["results_path"], self.node_id
)
self.folder_deletion_signal = config["folder_deletion_signal_path"]

def fed_avg(self, model_wts: List[OrderedDict[str, Tensor]]):
# All models are sampled currently at every round
Expand Down Expand Up @@ -122,15 +151,18 @@ def test(self) -> float:
"""
Test the model on the server
"""
test_loss, acc = self.model_utils.test(
start_time = time.time()
test_loss, test_acc = self.model_utils.test(
self.model, self._test_loader, self.loss_fn, self.device
)
end_time = time.time()
time_taken = end_time - start_time
# TODO save the model if the accuracy is better than the best accuracy
# so far
if acc > self.best_acc:
self.best_acc = acc
if test_acc > self.best_acc:
self.best_acc = test_acc
self.model_utils.save_model(self.model, self.model_save_path)
return acc
return test_loss, test_acc, time_taken

def single_round(self):
"""
Expand All @@ -148,16 +180,23 @@ def single_round(self):
self.log_utils.log_console("Server received all clients done signal")
avg_wts = self.aggregate(reprs)
self.set_representation(avg_wts)
#Remove the signal file after confirming that all client paths have been created
if os.path.exists(self.folder_deletion_signal):
os.remove(self.folder_deletion_signal)

def run_protocol(self):
self.log_utils.log_console("Starting iid clients federated averaging")
start_epochs = self.config.get("start_epochs", 0)
total_epochs = self.config["epochs"]
for round in range(start_epochs, total_epochs):
self.log_utils.log_console("Starting round {}".format(round))
self.log_utils.log_summary("Starting round {}".format(round))
self.single_round()
self.log_utils.log_console("Server testing the model")
acc = self.test()
loss, acc, time_taken = self.test()
self.log_utils.log_tb(f"test_acc/clients", acc, round)
self.log_utils.log_console("round: {} test_acc:{:.4f}".format(round, acc))
self.log_utils.log_console("Round {} done".format(round))
self.log_utils.log_tb(f"test_loss/clients", loss, round)
self.log_utils.log_console("Round: {} test_acc:{:.4f}, test_loss:{:.4f}, time taken {:.2f} seconds".format(round, acc, loss, time_taken))
self.log_utils.log_summary("Round: {} test_acc:{:.4f}, test_loss:{:.4f}, time taken {:.2f} seconds".format(round, acc, loss, time_taken))
self.log_utils.log_console("Round {} complete".format(round))
self.log_utils.log_summary("Round {} complete".format(round,))
1 change: 1 addition & 0 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# we need to make this a dictionary with user_id as key and number of samples as value
"train_label_distribution": "iid",
"test_label_distribution": "iid",
"folder_deletion_signal_path":"./expt_dump/folder_deletion.signal"
}

current_config = system_config
32 changes: 27 additions & 5 deletions src/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def deprocess(img):
return img.type(torch.uint8)


def check_and_create_path(path):
def check_and_create_path(path, folder_deletion_path):
"""
Checks if the specified path exists and prompts the user for action if it does.
Creates the directory if it does not exist.
Expand All @@ -48,17 +48,28 @@ def check_and_create_path(path):
print(f"Experiment in {path} already present")
done = False
while not done:
inp = input("Press e to exit, r to replace it: ")
# Color the input prompt
color_code = "\033[94m" # Blue text
reset_code = "\033[0m" # Reset to default color
# Highlighted prompt in blue
inp = input(f"{color_code}Press e to exit, r to replace it: {reset_code}")

if inp == "e":
sys.exit()
elif inp == "r":
done = True
shutil.rmtree(path)
os.makedirs(path)
with open(folder_deletion_path, "w") as signal_file:
#Folder deletion complete signal.
signal_file.write("r")
else:
print("Input not understood")
else:
os.makedirs(path)
with open(folder_deletion_path, "w") as signal_file:
#new folder creation complete signal.
signal_file.write("new")


def copy_source_code(config: dict) -> None:
Expand All @@ -69,11 +80,12 @@ def copy_source_code(config: dict) -> None:
config (dict): Configuration dictionary with the results path.
"""
path = config["results_path"]
folder_deletion_path = config["folder_deletion_signal_path"]
print("exp path:", path)
if config["load_existing"]:
print("Continue with loading checkpoint")
return
check_and_create_path(path)
check_and_create_path(path, folder_deletion_path)
denylist = [
"./__pycache__/",
"./.ipynb_checkpoints/",
Expand All @@ -99,7 +111,7 @@ def copy_source_code(config: dict) -> None:
if folder not in denylist:
copytree(folder, path + folder[1:])
os.mkdir(config["saved_models"])
os.mkdir(config["log_path"])
os.makedirs(config["log_path"], exist_ok=True)
print("source code copied to exp_dump")


Expand Down Expand Up @@ -140,7 +152,7 @@ def init_tb(self, load_existing):
"""
tb_path = f"{self.log_dir}/tensorboard"
if not load_existing:
os.makedirs(tb_path)
os.makedirs(tb_path, exist_ok=True)
self.writer = SummaryWriter(tb_path)

def init_npy(self):
Expand All @@ -151,6 +163,16 @@ def init_npy(self):
if not os.path.exists(npy_path) or not os.path.isdir(npy_path):
os.makedirs(npy_path)

def log_summary(self, text):
"""
Add summary text to the summary file for logging.
"""
if self.summary_file:
self.summary_file.write(text + "\n")
self.summary_file.flush()
else:
raise ValueError("Summary file is not initialized. Call init_summary() first.")

def log_image(self, imgs: torch.Tensor, key, iteration):
"""
Log image to file and TensorBoard.
Expand Down

0 comments on commit 2b71fbe

Please sign in to comment.