Skip to content

Commit

Permalink
update the forget rate metrics compute in federated_class_incremental…
Browse files Browse the repository at this point in the history
…_learning paradigm, complete the example of fci_ssl

Signed-off-by: Marchons <[email protected]>
  • Loading branch information
Marchons committed Sep 8, 2024
1 parent 885f77d commit 1b9b881
Show file tree
Hide file tree
Showing 46 changed files with 1,551 additions and 419 deletions.
2 changes: 2 additions & 0 deletions core/common/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class SystemMetricType(Enum):
BWT = "BWT"
TASK_AVG_ACC = "task_avg_acc"
MATRIX = "MATRIX"
FORGET_RATE = "FORGET_RATE"



class TestObjectType(Enum):
Expand Down
1 change: 0 additions & 1 deletion core/testcasecontroller/algorithm/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def get_module_instance(self, module_type):
# pylint: disable=E1134
func = ClassFactory.get_cls(
type_name=class_factory_type, t_cls_name=self.name)(**self.hyperparameters)

return func
except Exception as err:
raise RuntimeError(f"module(type={module_type} loads class(name={self.name}) "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
"""Federated Class-Incremental Learning Paradigm"""
import numpy as np
from core.common.constant import ParadigmType
from core.common.utils import get_file_format
from .federated_learning import FederatedLearning
from sedna.algorithms.aggregation import AggClient
from core.common.log import LOGGER
from threading import Thread, RLock

from core.testcasecontroller.metrics.metrics import get_metric_func

class FederatedClassIncrementalLearning(FederatedLearning):
"""
Expand Down Expand Up @@ -55,6 +56,9 @@ def __init__(self, workspace, **kwargs):
self.lock = RLock()
self.aggregate_clients=[]
self.train_infos=[]
self.forget_rate_metrics = []
self.accuracy_per_round = []
self.metrics_dict = kwargs.get('model_eval', {})['model_metric']

def get_task_size(self, train_datasets):
return np.unique([train_datasets[i][1] for i in range(len(train_datasets))]).shape[0]
Expand All @@ -63,24 +67,29 @@ def task_definition(self, dataset_files, task_id):
"""
Define the task for the class incremental learning paradigm
"""
LOGGER.info(f'len(dataset_files): {len(dataset_files)}')
# 1. Partition Dataset
train_dataset_files, _ = dataset_files[task_id]
LOGGER.info(f'train_dataset_files: {train_dataset_files}')
train_datasets = self.train_data_partition(train_dataset_files)
LOGGER.info(f"train_datasets: {len(train_datasets)}")
task_size = self.get_task_size(train_datasets)
LOGGER.info(f"task_size: {task_size}")
# 2. According to setting, to split the label and unlabel data for each task
need_split_label_unlabel_data = 1.0 - self.fl_data_setting.get("label_data_ratio") > 1e-6
if need_split_label_unlabel_data:
train_datasets = self.split_label_unlabel_data(train_datasets)
# need_split_label_unlabel_data = 1.0 - self.fl_data_setting.get("label_data_ratio") > 1e-6
# if need_split_label_unlabel_data:
train_datasets = self.split_label_unlabel_data(train_datasets)
# 3. Return the dataset for each task [{label_data, unlabel_data}, ...]
return train_datasets, task_size

def split_label_unlabel_data(self, train_datasets):
label_ratio = self.fl.data_setting.get("label_data_ratio")
label_ratio = self.fl_data_setting.get("label_data_ratio")
new_train_datasets = []
for i in range(len(train_datasets)):
train_dataset_dict = {}
LOGGER.info(f"train_datasets[i][0]: {train_datasets[i][0].shape}, {len(train_datasets[i])}")
label_data_number = int(label_ratio * len(train_datasets[i][0]))
LOGGER.info(f"label_data_number: {label_data_number}")
# split dataset into label and unlabel data
train_dataset_dict['label_x'] = train_datasets[i][0][:label_data_number]
train_dataset_dict['label_y'] = train_datasets[i][1][:label_data_number]
Expand All @@ -98,8 +107,12 @@ def run(self):
# split_time = self.rounds // self.task_size # split the dataset into several tasks
# print(f'split_time: {split_time}')
dataset_files = self._split_dataset(self.incremental_rounds)
test_dataset_files = self._split_test_dataset(self.incremental_rounds)
LOGGER.info(f'get the dataset_files: {dataset_files}')

for task_id in range(self.incremental_rounds):
train_datasets, task_size = self.task_definition(dataset_files, task_id)
testdatasets = test_dataset_files[:task_id+1]
for r in range(self.rounds):
LOGGER.info(f"Round {r} task id: {task_id}")
self._train(train_datasets, task_id=task_id, round=r, task_size=task_size)
Expand All @@ -109,10 +122,35 @@ def run(self):
self.send_weights_to_clients(global_weights)
self.aggregate_clients.clear()
self.train_infos.clear()
# test_res = self.predict(self.dataset.test_url)
# self.system_metric_info = self.evaluation(testdatasets, task_id)
test_res = self.predict(self.dataset.test_url)
return test_res, self.system_metric_info



def _split_test_dataset(self, split_time):

test_dataset = self.dataset.load_data(self.dataset.test_url, "eval")
all_data = len(test_dataset.x)
step = all_data // split_time
test_datasets_files = []
index = 1
while index <= split_time:
new_dataset = {}
if index == split_time:
new_dataset['x'] = test_dataset.x[step * (index - 1):]
new_dataset['y'] = test_dataset.y[step * (index - 1):]
# new_dataset = (test_dataset.x[step * (index - 1):], test_dataset.y[step * (index - 1):])
else:
new_dataset['x'] = test_dataset.x[step * (index - 1):step * index]
new_dataset['y'] = test_dataset.y[step * (index - 1):step * index]
# new_dataset = (test_dataset.x[step * (index - 1):step * index], test_dataset.y[step * (index - 1):step * index])
test_datasets_files.append(new_dataset)
index += 1
return test_datasets_files


def train_data_partition(self, train_dataset_file):
return super().train_data_partition(train_dataset_file)

Expand All @@ -130,7 +168,7 @@ def client_train(self, client_idx, train_datasets, validation_datasets, **kwargs

def _train(self, train_datasets, **kwargs):
client_threads = []
print(f'len(self.clients): {len(self.clients)}')
print(f'len(self.clients): {len(self.clients)} len train_datasets: {len(train_datasets)}')
for idx in range(len(self.clients)):
client_thread = Thread(target=self.client_train, args=(idx, train_datasets, None), kwargs=kwargs)
client_thread.start()
Expand All @@ -148,5 +186,51 @@ def helper_function(self,train_infos):
helper_info = self.aggregator.helper_function(train_infos[i])
self.clients[i].helper_function(helper_info)
LOGGER.info('finish helper function')

def evaluation(self, testdataset_files, incremental_round):
_, accuracy_func = get_metric_func(self.metrics_dict)
LOGGER.info('*'*20 +'start evaluation' + '*'*20)
if isinstance(testdataset_files, str):
testdataset_files = [testdataset_files]
job = self.get_global_model()
# caculate the old class accuracy
old_class_acc_list = [] # for current round [class_0: acc_0, class_1: acc1, ....]
for index in range(len(testdataset_files)):
acc_list = []
for data_index in range(len(testdataset_files[index]['x'])):
data = testdataset_files[index]['x'][data_index]
res = job.inference([data])
LOGGER.info(f"label is {testdataset_files[index]['y'][data_index]}, res is {res}")
acc = accuracy_func([testdataset_files[index]['y'][data_index]], res )
acc_list.append(acc)
old_class_acc_list.extend(acc_list)
# old_classes.extend(current_classes)
current_forget_rate = 0.0
max_acc_sum = 0
self.accuracy_per_round.append(old_class_acc_list)
# caculate the forget rate
for i in range(len(old_class_acc_list)):
max_acc_diff = 0
for j in range(incremental_round):
acc_per_round = self.accuracy_per_round[j]
LOGGER.info(f'acc_per_round: {acc_per_round} and len is {len(acc_per_round)}')
if i < len(acc_per_round):
LOGGER.info(f'acc_per_round: {acc_per_round[i]} and diff is {acc_per_round[i] - old_class_acc_list[i]}')
max_acc_diff = max(max_acc_diff, acc_per_round[i] - old_class_acc_list[i])
max_acc_sum += max_acc_diff
LOGGER.info(f'max_acc_diff: {max_acc_diff}')
current_forget_rate = max_acc_sum / len(old_class_acc_list) if incremental_round > 0 else 0.0


LOGGER.info(f'for current round: {incremental_round} forget rate: {current_forget_rate}')
self.forget_rate_metrics.append(current_forget_rate)
# caculate the new class accuracy
new_classes_acc = 0
for index in range(len(testdataset_files[-1]['x'])):
data = testdataset_files[-1]['x'][index]
LOGGER.info(data)
res = job.inference([data])
acc = accuracy_func([testdataset_files[-1]['y'][index]], res)
new_classes_acc += acc
new_classes_acc = new_classes_acc / float(len(testdataset_files[-1]['x']))
LOGGER.info(f'for current round : {incremental_round} new_classes_acc: {new_classes_acc} {self.accuracy_per_round}')
LOGGER.info('*'*20 +'finish evaluation' + '*'*20)
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def predict(self, test_dataset_file):
test_dataset.append(self.dataset.load_data(file, "eval"))
assert test_dataset is not None, "test_dataset is None"
job = self.get_global_model()
test_res = job.inference(test_dataset)
test_res = job.inference(test_dataset.x)
LOGGER.info(f" after predict {len(test_res)}")
return test_res

Expand Down
7 changes: 7 additions & 0 deletions core/testcasecontroller/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ def task_avg_acc_func(system_metric_info: dict):
info = system_metric_info.get(SystemMetricType.TASK_AVG_ACC.value)
return info["accuracy"]

def task_forget_rate(system_metric_info: dict):
"""
compute task forget rate
"""
info = system_metric_info.get(SystemMetricType.TASK_FORGET_RATE.value)
return info["forget_rate"][-1]


def get_metric_func(metric_dict: dict):
"""
Expand Down
5 changes: 3 additions & 2 deletions core/testenvmanager/dataset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ def read_data_from_file_to_npy( files: BaseDataSource):
"""
x_train = []
y_train = []
# print(files.x, files.y)
print(files.x, files.y)
for i, file in enumerate(files.x):
x = np.load(file)
# print(x.shape)
print(x.shape)
# print(type(files.y[i]))
y = np.full((x.shape[0], 1), (files.y[i]).astype(np.int32))
x_train.append(x)
Expand Down Expand Up @@ -69,6 +69,7 @@ def partition_data(datasets, client_number, data_partition ='iid', non_iid_ratio
if data_partition == 'iid':
x_data = datasets[0]
y_data = datasets[1]
print(f'x_data shape: {x_data.shape}, y_data shape: {y_data.shape}')
indices = np.arange(len(x_data))
np.random.shuffle(indices)
data = []
Expand Down
Loading

0 comments on commit 1b9b881

Please sign in to comment.