diff --git a/EGG-INFO/PKG-INFO b/EGG-INFO/PKG-INFO new file mode 100644 index 00000000..084ddf04 --- /dev/null +++ b/EGG-INFO/PKG-INFO @@ -0,0 +1,144 @@ +Metadata-Version: 2.1 +Name: ianvs +Version: 0.1.0 +Summary: The ianvs package is designed to help algorithm developers better do algorithm test. +Home-page: https://github.com/kubeedge/ianvs +Author: MooreZheng +Maintainer: MooreZheng +Maintainer-email: +License: Apache License 2.0 +Classifier: Programming Language :: Python :: 3 +Classifier: License :: OSI Approved :: Apache Software License +Classifier: Operating System :: POSIX :: Linux +Requires-Python: >=3.6 +Description-Content-Type: text/markdown +License-File: LICENSE + +# Ianvs + +[![CI](https://github.com/kubeedge/ianvs/workflows/CI/badge.svg?branch=main)](https://github.com/sedna/ianvs/actions) +[![LICENSE SCAN](https://app.fossa.com/api/projects/custom%2B32178%2Fgithub.com%2Fkubeedge%2Fianvs.svg?type=shield)](https://app.fossa.com/projects/custom%2B32178%2Fgithub.com%2Fkubeedge%2Fianvs?ref=badge_shield) +[![LICENSE](https://img.shields.io/github/license/kubeedge-sedna/ianvs.svg)](/LICENSE) + +Ianvs is a distributed synergy AI benchmarking project incubated in KubeEdge SIG AI. Ianvs aims to test the performance of distributed synergy AI solutions following recognized standards, in order to facilitate more efficient and effective development. More detailedly, Ianvs prepares not only test cases with datasets and corresponding algorithms, but also benchmarking tools including simulation and hyper-parameter searching. Ianvs also reveals best practices for developers and end users with presentation tools including leaderboards and test reports. + +## Scope + +The distributed synergy AI benchmarking Ianvs aims to test the performance of distributed synergy AI solutions following recognized standards, in order to facilitate more efficient and effective development. + +The scope of Ianvs includes + +- Providing end-to-end benchmark toolkits across devices, edge nodes, and cloud nodes based on typical distributed-synergy AI paradigms and applications. + - Tools to manage test environment. For example, it would be necessary to support the CRUD (Create, Read, Update, and Delete) actions in test environments. Elements of such test environments include algorithm-wise and system-wise configuration. + - Tools to control test cases. Typical examples include paradigm templates, simulation tools, and hyper-parameter-based assistant tools. + - Tools to manage benchmark presentation, e.g., leaderboard and test report generation. +- Cooperation with other organizations or communities, e.g., in KubeEdge SIG AI, to establish comprehensive benchmarks and developed related applications, which can include but are not limited to + - Dataset collection, re-organization, and publication + - Formalized specifications, e.g., standards + - Holding competitions or coding events, e.g., open source promotion plan + - Maintaining solution leaderboards or certifications for commercial usage + +## Architecture + +The architectures and related concepts are shown in the below figure. The ianvs is designed to run **within a single node**. Critical components include + +- Test Environment Manager: the CRUD of test environments serving for global usage +- Test Case Controller: control the runtime behavior of test cases like instance generation and vanish + - Generation Assistant: assist users to generate test cases based on certain rules or constraints, e.g., the range of parameters + - Simulation Controller: control the simulation process of edge-cloud synergy AI, including the instance generation and vanishment of simulation containers +- Story Manager: the output management and presentation of the test case, e.g., leaderboards + +![](docs/guides/images/ianvs_arch.png) + +More details on Ianvs components: + +1. Test-Environment Manager supports the CRUD of Test environments, which basically includes + - Algorithm-wise configuration + - Public datasets + - Pre-processing algorithms + - Feature engineering algorithms + - Post-processing algorithms like metric computation + - System-wise configuration + - Overall architecture + - System constraints or budgets + - End-to-end cross-node + - Per node +2. Test-case Controller, which includes but is not limited to the following components + - Templates of common distributed-synergy-AI paradigms, which can help the developer to prepare their test case without too much effort. Such paradigms include edge-cloud synergy joint inference, incremental learning, federated learning, and lifelong learning. + - Simulation tools. Develop simulated test environments for test cases + - Other tools to assist test-case generation. For instance, prepare test cases based on a given range of hyper-parameters. +3. Story Manager, which includes but is not limited to the following components + - Leaderboard generation + - Test report generation + +## Guides + +### Documents + +Documentation is located on [readthedoc.io](https://ianvs.readthedocs.io/). The documents include the quick start, guides, dataset descriptions, algorithms, user interfaces, stories, and roadmap. + +### Installation + +Follow the [Ianvs installation document](docs/guides/how-to-install-ianvs.md) to install Ianvs. + +### Examples + +- Scenario PCB-AoI:[Industrial Defect Detection on the PCB-AoI Dataset](docs/proposals/scenarios/industrial-defect-detection/pcb-aoi.md). + + - Example PCB-AoI-1:[Testing single task learning in industrial defect detection](docs/proposals/test-reports/testing-single-task-learning-in-industrial-defect-detection-with-pcb-aoi.md). + + - Example PCB-AoI-2:[Testing incremental learning in industrial defect detection](docs/proposals/test-reports/testing-incremental-learning-in-industrial-defect-detection-with-pcb-aoi.md). + +- Scenario Cityscapes-Synthia: [Curb Detetion on Cityscapes-Synthia Dataset](docs/proposals/algorithms/lifelong-learning/Additional-documentation/curb_detetion_datasets.md) + + - Example Cityscapes-Synthia-1: [Lifelong learning in semantic segmentation](examples/cityscapes-synthia/lifelong_learning_bench/semantic-segmentation/README.md) + + - Example Cityscapes-Synthia-2: [Lifelong learning in curb detetion](examples/cityscapes-synthia/lifelong_learning_bench/curb-detection/README.md) + + - Example Cityscapes-Synthia-3: [Scene based unknown task recognition in curb detetion](examples/cityscapes-synthia/scene-based-unknown-task-recognition/curb-detection/README.md) + + - Example Cityscapes-Synthia-4: [Integrating GAN and Self-taught Learning into Ianvs Lifelong Learning](examples/cityscapes/lifelong_learning_bench/unseen_task_processing-GANwithSelfTaughtLearning/README.md) + +- Scenario Cloud-Robotics: [Semantic Segmentation on Cloud-Robotics Dataset](docs/proposals/scenarios/Cloud-Robotics/Cloud-Robotics_zh.md) + + - Example Cloud-Robotics-1: [Lifelong learning in semantic segmentation](examples/robot/lifelong_learning_bench/semantic-segmentation/README.md) + + - Example Cloud-Robotics-2: [Class increment learning in semantic segmentation](examples/robot-cityscapes-synthia/lifelong_learning_bench/semantic-segmentation/README.md) + + - Example Cloud-Robotics-3: [Lifelong learning in sam annotation](examples/robot/lifelong_learning_bench/sam_annotation/tutorial.md) + +## Roadmap + +- [2022 H2 Roadmap](docs/roadmap.md) + +## Meeting + +Routine Community Meeting for KubeEdge SIG AI runs weekly: + +- Europe Time: **Thursdays at 16:30-17:30 Beijing Time**. + ([Convert to your timezone.](https://www.thetimezoneconverter.com/?t=16%3A30&tz=GMT%2B8&)) + +Resources: + +- [Meeting notes and agenda](https://docs.google.com/document/d/12n3kGUWTkAH4q2Wv5iCVGPTA_KRWav_eakbFrF9iAww/edit) +- [Meeting recordings](https://www.youtube.com/playlist?list=PLQtlO1kVWGXkRGkjSrLGEPJODoPb8s5FM) +- [Meeting link](https://zoom.us/j/4167237304) +- [Meeting Calendar](https://calendar.google.com/calendar/u/0/r?cid=Y19nODluOXAwOG05MzFiYWM3NmZsajgwZzEwOEBncm91cC5jYWxlbmRhci5nb29nbGUuY29t) | [Subscribe](https://calendar.google.com/calendar/u/0/r?cid=OHJqazhvNTE2dmZ0ZTIxcWlidmxhZTNsajRAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ) + +## Contact + + + +If you have questions, feel free to reach out to us in the following ways: + +- [slack channel](https://kubeedge.io/docs/community/slack/) + +## Contributing + +If you're interested in being a contributor and want to get involved in developing the Ianvs code, please see [CONTRIBUTING](CONTRIBUTING.md) for details on submitting patches and the contribution workflow. + +## License + +Ianvs is under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details. diff --git a/EGG-INFO/SOURCES.txt b/EGG-INFO/SOURCES.txt new file mode 100644 index 00000000..2f38114c --- /dev/null +++ b/EGG-INFO/SOURCES.txt @@ -0,0 +1,61 @@ +LICENSE +README.md +setup.py +core/__init__.py +core/__version__.py +core/cmd/__init__.py +core/cmd/benchmarking.py +core/cmd/obj/__init__.py +core/cmd/obj/benchmarkingjob.py +core/common/__init__.py +core/common/constant.py +core/common/log.py +core/common/utils.py +core/storymanager/__init__.py +core/storymanager/rank/__init__.py +core/storymanager/rank/rank.py +core/storymanager/visualization/__init__.py +core/storymanager/visualization/visualization.py +core/testcasecontroller/__init__.py +core/testcasecontroller/testcasecontroller.py +core/testcasecontroller/algorithm/__init__.py +core/testcasecontroller/algorithm/algorithm.py +core/testcasecontroller/algorithm/module/__init__.py +core/testcasecontroller/algorithm/module/module.py +core/testcasecontroller/algorithm/paradigm/__init__.py +core/testcasecontroller/algorithm/paradigm/base.py +core/testcasecontroller/algorithm/paradigm/sedna_federated_learning.py +core/testcasecontroller/algorithm/paradigm/federated_learning/__init__.py +core/testcasecontroller/algorithm/paradigm/federated_learning/federated_class_incremental_learning.py +core/testcasecontroller/algorithm/paradigm/federated_learning/federated_learning.py +core/testcasecontroller/algorithm/paradigm/incremental_learning/__init__.py +core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py +core/testcasecontroller/algorithm/paradigm/lifelong_learning/__init__.py +core/testcasecontroller/algorithm/paradigm/lifelong_learning/lifelong_learning.py +core/testcasecontroller/algorithm/paradigm/multiedge_inference/__init__.py +core/testcasecontroller/algorithm/paradigm/multiedge_inference/multiedge_inference.py +core/testcasecontroller/algorithm/paradigm/singletask_learning/__init__.py +core/testcasecontroller/algorithm/paradigm/singletask_learning/singletask_learning.py +core/testcasecontroller/algorithm/paradigm/singletask_learning/singletask_learning_active_boost.py +core/testcasecontroller/algorithm/paradigm/singletask_learning/singletask_learning_tta.py +core/testcasecontroller/generation_assistant/__init__.py +core/testcasecontroller/generation_assistant/generation_assistant.py +core/testcasecontroller/metrics/__init__.py +core/testcasecontroller/metrics/metrics.py +core/testcasecontroller/simulation/__init__.py +core/testcasecontroller/simulation/simulation.py +core/testcasecontroller/simulation_system_admin/__init__.py +core/testcasecontroller/simulation_system_admin/simulation_system_admin.py +core/testcasecontroller/testcase/__init__.py +core/testcasecontroller/testcase/testcase.py +core/testenvmanager/__init__.py +core/testenvmanager/dataset/__init__.py +core/testenvmanager/dataset/dataset.py +core/testenvmanager/dataset/utils.py +core/testenvmanager/testenv/__init__.py +core/testenvmanager/testenv/testenv.py +ianvs.egg-info/PKG-INFO +ianvs.egg-info/SOURCES.txt +ianvs.egg-info/dependency_links.txt +ianvs.egg-info/entry_points.txt +ianvs.egg-info/top_level.txt \ No newline at end of file diff --git a/EGG-INFO/dependency_links.txt b/EGG-INFO/dependency_links.txt new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/EGG-INFO/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/EGG-INFO/entry_points.txt b/EGG-INFO/entry_points.txt new file mode 100644 index 00000000..da713a22 --- /dev/null +++ b/EGG-INFO/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +ianvs = core.cmd.benchmarking:main diff --git a/EGG-INFO/top_level.txt b/EGG-INFO/top_level.txt new file mode 100644 index 00000000..f5bd37c1 --- /dev/null +++ b/EGG-INFO/top_level.txt @@ -0,0 +1 @@ +core diff --git a/EGG-INFO/zip-safe b/EGG-INFO/zip-safe new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/EGG-INFO/zip-safe @@ -0,0 +1 @@ + diff --git a/core/testcasecontroller/algorithm/algorithm.py b/core/testcasecontroller/algorithm/algorithm.py index 59b57e01..73a232cb 100644 --- a/core/testcasecontroller/algorithm/algorithm.py +++ b/core/testcasecontroller/algorithm/algorithm.py @@ -73,6 +73,7 @@ def __init__(self, name, config): 'non_iid_ratio': 0.6, "label_data_ratio": 1.0 } + self.initial_model_url: str = "" self.modules: list = [] self.modules_list = None @@ -97,10 +98,10 @@ def paradigm(self, workspace: str, **kwargs): """ config = kwargs + # pylint: disable=C0103 for k, v in self.__dict__.items(): config.update({k: v}) - if self.paradigm_type == ParadigmType.SINGLE_TASK_LEARNING.value: return SingleTaskLearning(workspace, **config) diff --git a/core/testcasecontroller/algorithm/module/module.py b/core/testcasecontroller/algorithm/module/module.py index b12a8c01..6e58a313 100644 --- a/core/testcasecontroller/algorithm/module/module.py +++ b/core/testcasecontroller/algorithm/module/module.py @@ -86,6 +86,7 @@ def get_module_instance(self, module_type): function """ + print(f'hyperparameters_list: {self.hyperparameters_list}') class_factory_type = ClassType.GENERAL if module_type in [ModuleType.HARD_EXAMPLE_MINING.value]: class_factory_type = ClassType.HEM diff --git a/core/testcasecontroller/algorithm/paradigm/base.py b/core/testcasecontroller/algorithm/paradigm/base.py index dfe22741..8efccf2d 100644 --- a/core/testcasecontroller/algorithm/paradigm/base.py +++ b/core/testcasecontroller/algorithm/paradigm/base.py @@ -18,9 +18,8 @@ from sedna.core.incremental_learning import IncrementalLearning from sedna.core.lifelong_learning import LifelongLearning -from sedna.core.federated_learning import FederatedLearning from core.common.constant import ModuleType, ParadigmType - +from .sedna_federated_learning import FederatedLearning class ParadigmBase: """ @@ -126,14 +125,9 @@ def build_paradigm_job(self, paradigm_type): if paradigm_type == ParadigmType.MULTIEDGE_INFERENCE.value: return self.modules_funcs.get(ModuleType.BASEMODEL.value)() - if paradigm_type == ParadigmType.FEDERATED_LEARNING.value: - agg_name, agg = self.module_instances.get(ModuleType.AGGREGATION.value) + if paradigm_type == ParadigmType.FEDERATED_LEARNING.value or paradigm_type == ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value: return FederatedLearning( - estimator=self.module_instances.get(ModuleType.BASEMODEL.value), - aggregation= agg_name - + estimator=self.module_instances.get(ModuleType.BASEMODEL.value) ) - # return self.module_instances.get(ModuleType.BASEMODEL.value) - if paradigm_type == ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value: - return self.module_instances.get(ModuleType.BASEMODEL.value) + return None diff --git a/core/testcasecontroller/algorithm/paradigm/federated_learning/__init__.py b/core/testcasecontroller/algorithm/paradigm/federated_learning/__init__.py index a96c30b1..55ebbea2 100644 --- a/core/testcasecontroller/algorithm/paradigm/federated_learning/__init__.py +++ b/core/testcasecontroller/algorithm/paradigm/federated_learning/__init__.py @@ -13,5 +13,5 @@ # limitations under the License. # pylint: disable=missing-module-docstring -from .federeated_learning import FederatedLearning +from .federated_learning import FederatedLearning from .federated_class_incremental_learning import FederatedClassIncrementalLearning diff --git a/core/testcasecontroller/algorithm/paradigm/federated_learning/federated_class_incremental_learning.py b/core/testcasecontroller/algorithm/paradigm/federated_learning/federated_class_incremental_learning.py index 9c4bec7f..4e3b078e 100644 --- a/core/testcasecontroller/algorithm/paradigm/federated_learning/federated_class_incremental_learning.py +++ b/core/testcasecontroller/algorithm/paradigm/federated_learning/federated_class_incremental_learning.py @@ -13,12 +13,14 @@ # limitations under the License. """Federated Class-Incremental Learning Paradigm""" - +import numpy as np from core.common.constant import ParadigmType -from .federeated_learning import FederatedLearning +from .federated_learning import FederatedLearning from sedna.algorithms.aggregation import AggClient from core.common.log import LOGGER from threading import Thread, RLock + + class FederatedClassIncrementalLearning(FederatedLearning): """ FederatedClassIncrementalLearning @@ -47,13 +49,16 @@ class FederatedClassIncrementalLearning(FederatedLearning): def __init__(self, workspace, **kwargs): super(FederatedClassIncrementalLearning, self).__init__(workspace, **kwargs) - self.rounds = kwargs.get("incremental_rounds", 1) - self.task_size = kwargs.get("task_size", 10) + self.incremental_rounds = kwargs.get("incremental_rounds", 1) + # self.task_size = kwargs.get("task_size", 10) self.system_metric_info = {} self.lock = RLock() self.aggregate_clients=[] self.train_infos=[] - + + def get_task_size(self, train_datasets): + return np.unique([train_datasets[i][1] for i in range(len(train_datasets))]).shape[0] + def task_definition(self, dataset_files, task_id): """ Define the task for the class incremental learning paradigm @@ -61,12 +66,14 @@ def task_definition(self, dataset_files, task_id): # 1. Partition Dataset train_dataset_files, _ = dataset_files[task_id] train_datasets = self.train_data_partition(train_dataset_files) + task_size = self.get_task_size(train_datasets) + LOGGER.info(f"task_size: {task_size}") # 2. According to setting, to split the label and unlabel data for each task - need_split_label_unlabel_data = 1.0 - self.fl_data_setting.get("split_label_unlabel_data") > 1e-6 + need_split_label_unlabel_data = 1.0 - self.fl_data_setting.get("label_data_ratio") > 1e-6 if need_split_label_unlabel_data: train_datasets = self.split_label_unlabel_data(train_datasets) # 3. Return the dataset for each task [{label_data, unlabel_data}, ...] - return train_datasets + return train_datasets, task_size def split_label_unlabel_data(self, train_datasets): label_ratio = self.fl.data_setting.get("label_data_ratio") @@ -83,25 +90,25 @@ def split_label_unlabel_data(self, train_datasets): return new_train_datasets def init_client(self): - import copy - tempalte = self.build_paradigm_job(ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value) - self.clients = [copy.deepcopy(tempalte) for _ in range(self.task_size)] - # print(self.clients[0] == self.clients[1]) + self.clients = [self.build_paradigm_job(ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value)for _ in range(self.clients_number)] + def run(self): self.init_client() - dataset_files = self._split_dataset(self.task_size) - for r in range(self.rounds): - task_id = r // self.task_size - LOGGER.info(f"Round {r} task id: {task_id}") - train_datasets = self.task_definition(dataset_files, task_id) - self._train(train_datasets, task_id=task_id, round=r, task_size=self.task_size) - global_weights = self.aggregator.aggregate(self.aggregate_clients) - if hasattr(self.aggregator, "helper_function"): - self.helper_function(self.train_infos) - self.send_weights_to_clients(global_weights) - self.aggregate_clients.clear() - self.train_infos.clear() + # split_time = self.rounds // self.task_size # split the dataset into several tasks + # print(f'split_time: {split_time}') + dataset_files = self._split_dataset(self.incremental_rounds) + for task_id in range(self.incremental_rounds): + train_datasets, task_size = self.task_definition(dataset_files, task_id) + for r in range(self.rounds): + LOGGER.info(f"Round {r} task id: {task_id}") + self._train(train_datasets, task_id=task_id, round=r, task_size=task_size) + global_weights = self.aggregator.aggregate(self.aggregate_clients) + if hasattr(self.aggregator, "helper_function"): + self.helper_function(self.train_infos) + self.send_weights_to_clients(global_weights) + self.aggregate_clients.clear() + self.train_infos.clear() test_res = self.predict(self.dataset.test_url) return test_res, self.system_metric_info @@ -109,7 +116,7 @@ def run(self): def train_data_partition(self, train_dataset_file): return super().train_data_partition(train_dataset_file) - def client_train(self, client_idx, train_datasets, validation_datasets, post_process, **kwargs): + def client_train(self, client_idx, train_datasets, validation_datasets, **kwargs): train_info = self.clients[client_idx].train(train_datasets[client_idx], None, **kwargs) train_info['client_id'] = client_idx aggClient = AggClient() @@ -123,20 +130,18 @@ def client_train(self, client_idx, train_datasets, validation_datasets, post_pro def _train(self, train_datasets, **kwargs): client_threads = [] + print(f'len(self.clients): {len(self.clients)}') for idx in range(len(self.clients)): - client_thread = Thread(target=self.client_train, args=(idx, train_datasets, None, None), kwargs=kwargs) + client_thread = Thread(target=self.client_train, args=(idx, train_datasets, None), kwargs=kwargs) client_thread.start() client_threads.append(client_thread) for t in client_threads: t.join() LOGGER.info('finish training') - def send_weights_to_clients(self, global_weights): - for client in self.clients: - client.set_weights(global_weights) - LOGGER.info('finish send weights to clients') + super().send_weights_to_clients(global_weights) def helper_function(self,train_infos): for i in range(len(self.clients)): diff --git a/core/testcasecontroller/algorithm/paradigm/federated_learning/federeated_learning.py b/core/testcasecontroller/algorithm/paradigm/federated_learning/federated_learning.py similarity index 66% rename from core/testcasecontroller/algorithm/paradigm/federated_learning/federeated_learning.py rename to core/testcasecontroller/algorithm/paradigm/federated_learning/federated_learning.py index d4a80c9a..49ef2a97 100644 --- a/core/testcasecontroller/algorithm/paradigm/federated_learning/federeated_learning.py +++ b/core/testcasecontroller/algorithm/paradigm/federated_learning/federated_learning.py @@ -14,16 +14,13 @@ """Federated Learning Paradigm""" -import threading -import multiprocessing as mp -import asyncio - -from sedna.service.server import AggregationServer +from sedna.algorithms.aggregation import AggClient from core.common.log import LOGGER from core.common.constant import ParadigmType, ModuleType from core.common.utils import get_file_format from core.testcasecontroller.algorithm.paradigm.base import ParadigmBase from core.testenvmanager.dataset.utils import read_data_from_file_to_npy, partition_data +from threading import Thread, RLock class FederatedLearning(ParadigmBase): """ @@ -51,8 +48,6 @@ class FederatedLearning(ParadigmBase): network eval config, etc. """ - LOCAL_HOST = '127.0.0.1' - def __init__(self, workspace, **kwargs): ParadigmBase.__init__(self, workspace, **kwargs) @@ -61,28 +56,16 @@ def __init__(self, workspace, **kwargs): self.kwargs = kwargs self.fl_data_setting = kwargs.get("fl_data_setting") + # print(self.fl_data_setting) self.backend = kwargs.get("backend") self.global_model = None # global model to perform global evaluation self.rounds = kwargs.get("round", 1) - LOGGER.info(self.rounds) self.clients = [] - self.clients_number = kwargs.get("client_number", 10) - self.aggregation, self.aggregator = self.module_instances.get(ModuleType.AGGREGATION.value) + self.lock = RLock() - def run_server(self): - aggregation_algorithm = self.aggregation - exit_round = self.rounds - participants_count = self.clients_number - LOGGER.info("running server!!!!") - server = AggregationServer( - aggregation=aggregation_algorithm, - exit_round=exit_round, - ws_size=1000 * 1024 * 1024, - participants_count=participants_count, - host=self.LOCAL_HOST - - ) - server.start() + self.aggregate_clients=[] + self.clients_number = kwargs.get("client_number", 1) + self.aggregation, self.aggregator = self.module_instances.get(ModuleType.AGGREGATION.value) def init_client(self): self.clients = [self.build_paradigm_job(ParadigmType.FEDERATED_LEARNING.value) for i in @@ -99,18 +82,18 @@ def run(self): information needed to compute system metrics. """ # init client wait for connection - - server_thead = threading.Thread(target=self.run_server) - server_thead.start() - LOGGER.info(f"server is start and server is alive: {server_thead.is_alive()}") # self.init_client() - rounds = self.rounds + self.init_client() dataset_files = self._split_dataset(1) # only one split ——all the data - train_dataset_file, eval_dataset_file = dataset_files[0] + train_dataset_file, _ = dataset_files[0] train_datasets = self.train_data_partition(train_dataset_file) - self._train(train_datasets, rounds=rounds) - LOGGER.info(f'finish trianing for fedavg') - server_thead.join() + # split_time = self.rounds // self.task_size # split the dataset into several tasks + # print(f'split_time: {split_time}') + for r in range(self.rounds): + self.train(train_datasets, round=r) + global_weights = self.aggregator.aggregate(self.aggregate_clients) + self.send_weights_to_clients(global_weights) + self.aggregate_clients.clear() test_res = self.predict(self.dataset.test_url) return test_res, self.system_metric_info @@ -144,54 +127,39 @@ def train_data_partition(self, train_dataset_file): # - provide a default method to read data from file to npy # - can support customized method to read data from file to npy train_datasets = read_data_from_file_to_npy(train_datasets) - # TODO Partition data to iid or non-iid + # Partition data to iid or non-iid train_datasets = partition_data(train_datasets, self.clients_number, self.fl_data_setting.get("data_partition"), self.fl_data_setting.get("non_iid_ratio")) return train_datasets - def client_train(self, train_datasets, validation_datasets, post_process, **kwargs): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = self.build_paradigm_job(ParadigmType.FEDERATED_LEARNING.value) - client.train(train_datasets, validation_datasets, post_process, **kwargs) - loop.close() - self.clients.append(client) - - def _train(self, train_datasets, **kwargs): + + def client_train(self, client_idx, train_datasets, validation_datasets, **kwargs): + train_info = self.clients[client_idx].train(train_datasets[client_idx], None, **kwargs) + train_info['client_id'] = client_idx + aggClient = AggClient() + aggClient.num_samples = train_info['num_samples'] + aggClient.weights = self.clients[client_idx].get_weights() + self.lock.acquire() + self.aggregate_clients.append(aggClient) + self.lock.release() - # mp.set_start_method('spawn') - clients_threads = [] - for i in range(self.clients_number): - LOGGER.info(i , self.clients_number) - # self.clients[i].train(train_datasets[i], None, None, **kwargs) - t = threading.Thread(target=self.client_train, args=(train_datasets[i], None, None), kwargs=kwargs) - # t = Process(target=self.clients[i].train, args=(train_datasets[i], None, None), kwargs=kwargs) - - clients_threads.append(t) - t.start() - for t in clients_threads: - LOGGER.info(f"client process is alive: {t.is_alive()}") + def train(self, train_datasets, **kwargs): + client_threads = [] + print(f'len(self.clients): {len(self.clients)}') + for idx in range(len(self.clients)): + client_thread = Thread(target=self.client_train, args=(idx, train_datasets, None), kwargs=kwargs) + client_thread.start() + client_threads.append(client_thread) + for t in client_threads: t.join() - LOGGER.info(f"finish training {t}") - return - - def local_eval(self, train_dataset_file, round): - """ - Evaluate the model on the local dataset - """ - train_dataset = None - if isinstance(train_dataset_file, str): - train_dataset = self.dataset.load_data(train_dataset_file, "train") - if isinstance(train_dataset_file, list): - train_dataset = [] - for file in train_dataset_file: - train_dataset.append(self.dataset.load_data(file, "train")) - assert train_dataset is not None, "train_dataset is None" - train_dataset = read_data_from_file_to_npy(train_dataset) + LOGGER.info('finish training') + + def send_weights_to_clients(self, global_weights): for client in self.clients: - client.evaluate(train_dataset, round=round) - LOGGER.info('finish local eval') + client.set_weights(global_weights) + LOGGER.info('finish send weights to clients') + def get_global_model(self): self.global_model = self.clients[0] diff --git a/core/testcasecontroller/algorithm/paradigm/sedna_federated_learning.py b/core/testcasecontroller/algorithm/paradigm/sedna_federated_learning.py new file mode 100644 index 00000000..baaffea4 --- /dev/null +++ b/core/testcasecontroller/algorithm/paradigm/sedna_federated_learning.py @@ -0,0 +1,18 @@ +from sedna.core.base import JobBase + +class FederatedLearning(JobBase): + + def __init__(self, estimator): + self.estimator = estimator + + def train(self, train_data, vald_data, **kwargs): + return self.estimator.train(train_data, vald_data, **kwargs) + + def get_weights(self): + return self.estimator.get_weights() + + def set_weights(self, weights): + self.estimator.set_weights(weights) + + def helper_function(self, helper_info): + return self.estimator.helper_function(helper_info) \ No newline at end of file diff --git a/examples/federated-learning/fedavg/algorithm/aggregation.py b/examples/federated-learning/fedavg/algorithm/aggregation.py index 63d30868..e282c708 100644 --- a/examples/federated-learning/fedavg/algorithm/aggregation.py +++ b/examples/federated-learning/fedavg/algorithm/aggregation.py @@ -14,57 +14,7 @@ class FedAvg(BaseAggregation, abc.ABC): def __init__(self): super(FedAvg, self).__init__() - self.global_model = self.build(num_classes=100) - """ - Federated averaging algorithm - """ - - @staticmethod - def build(num_classes: int): - model = Sequential() - model.add(Conv2D(64, kernel_size=(3, 3), - activation="relu", strides=(2, 2), - input_shape=(32, 32, 3))) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Conv2D(32, kernel_size=(3, 3), activation="relu")) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Flatten()) - model.add(Dropout(0.25)) - model.add(Dense(64, activation="relu")) - model.add(Dense(32, activation="relu")) - model.add(Dropout(0.5)) - model.add(Dense(num_classes, activation="softmax")) - - model.compile(loss="categorical_crossentropy", - optimizer="sgd", - metrics=["accuracy"]) - return model - - def inference(self, test_data): - """ - Predict the test data with global model - - Parameters - ---------- - global_model : Model - Global model - test_data : Array-like - Test data - - Returns - ------- - predict : Array-like - Prediction result - """ - result = {} - for data in test_data.x: - x = np.load(data) - logits = self.global_model(x, training=False) - pred = tf.cast(tf.argmax(logits, axis=1), tf.int32) - result[data] = pred.numpy() - print("finish predict") - return result def aggregate(self, clients): """ @@ -96,6 +46,5 @@ def aggregate(self, clients): / self.total_size) updates.append(row.tolist()) self.weights = deepcopy(updates) - self.global_model.set_weights(self.weights) print("finish aggregation....") return updates diff --git a/examples/federated-learning/fedavg/algorithm/algorithm.yaml b/examples/federated-learning/fedavg/algorithm/algorithm.yaml index e6d92958..a5b8ff26 100644 --- a/examples/federated-learning/fedavg/algorithm/algorithm.yaml +++ b/examples/federated-learning/fedavg/algorithm/algorithm.yaml @@ -12,7 +12,7 @@ algorithm: # currently the options of value are as follows: # 1> "default": the dataset is evenly divided based train_ratio; splitting_method: "default" - + label_data_ratio: 1.0 data_partition: "iid" # the url address of initial network for network pre-training; string url; # the url address of initial network; string type; optional; diff --git a/examples/federated-learning/fedavg/algorithm/basemodel.py b/examples/federated-learning/fedavg/algorithm/basemodel.py index 1b59216d..638bb46a 100644 --- a/examples/federated-learning/fedavg/algorithm/basemodel.py +++ b/examples/federated-learning/fedavg/algorithm/basemodel.py @@ -5,11 +5,11 @@ import numpy as np import tensorflow as tf from keras import Sequential -from keras.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense +from keras.src.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense from sedna.common.class_factory import ClassType, ClassFactory __all__ = ["BaseModel"] -os.environ['BACKEND_TYPE'] = 'TENSORFLOW' +os.environ['BACKEND_TYPE'] = 'KEARS' @ClassFactory.register(ClassType.GENERAL, alias='fedavg') diff --git a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/algorithm.yaml b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/algorithm.yaml index 8bd64610..2cfbd464 100644 --- a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/algorithm.yaml +++ b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/algorithm.yaml @@ -12,7 +12,7 @@ algorithm: # currently the options of value are as follows: # 1> "default": the dataset is evenly divided based train_ratio; splitting_method: "default" - + label_data_ratio: 1.0 data_partition: "iid" # the url address of initial network for network pre-training; string url; # the url address of initial network; string type; optional; diff --git a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/basemodel.py b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/basemodel.py index d04cc935..74896252 100644 --- a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/basemodel.py +++ b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/basemodel.py @@ -1,52 +1,36 @@ import os import zipfile - +import logging import keras import numpy as np import tensorflow as tf from keras import Sequential from keras.src.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense from sedna.common.class_factory import ClassType, ClassFactory -from resnet import resnet18 +from resnet import resnet10 from network import NetWork, incremental_learning __all__ = ["BaseModel"] os.environ['BACKEND_TYPE'] = 'KERAS' +logging.getLogger().setLevel(logging.INFO) @ClassFactory.register(ClassType.GENERAL, alias='fcil') class BaseModel: def __init__(self, **kwargs): + self.kwargs = kwargs + print(f"kwargs: {kwargs}") self.batch_size = kwargs.get('batch_size', 1) print(f"batch_size: {self.batch_size}") self.epochs = kwargs.get('epochs', 1) self.lr = kwargs.get('lr', 0.001) - self.optimizer = tf.keras.optimizers.SGD(learning_rate=self.lr) + self.optimizer = keras.optimizers.SGD(learning_rate=self.lr) self.old_task_id = -1 - self.fe = resnet18(10) - # self.model.build(input_shape=(None, 32, 32, 3)) - self.model = NetWork(10, self.fe) + self.fe = resnet10(10) + logging.info(type(self.fe)) + self.model = NetWork(100, self.fe) self._init_model() - @staticmethod - def build(num_classes: int): - model = Sequential() - model.add(Conv2D(64, kernel_size=(3, 3), - activation="relu", strides=(2, 2), - input_shape=(32, 32, 3))) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Conv2D(32, kernel_size=(3, 3), activation="relu")) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Flatten()) - model.add(Dropout(0.25)) - model.add(Dense(64, activation="relu")) - model.add(Dense(32, activation="relu")) - model.add(Dropout(0.5)) - model.add(Dense(num_classes, activation="softmax")) - - model.compile(loss="categorical_crossentropy", - optimizer="sgd", - metrics=["accuracy"]) - return model + def _init_model(self): self.model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy']) @@ -56,31 +40,31 @@ def _init_model(self): self.model.fit(x, y, epochs=1) def load(self, model_url=None): - print(f"load model from {model_url}") + logging.info(f"load model from {model_url}") extra_model_path = os.path.basename(model_url) + "/model" with zipfile.ZipFile(model_url, 'r') as zip_ref: zip_ref.extractall(extra_model_path) self.model = tf.saved_model.load(extra_model_path) def _initialize(self): - print(f"initialize finished") + logging.info(f"initialize finished") def get_weights(self): - print(f"get_weights") + logging.info(f"get_weights") weights = [layer.tolist() for layer in self.model.get_weights()] - print(len(weights)) + logging.info(len(weights)) return weights def set_weights(self, weights): weights = [np.array(layer) for layer in weights] self.model.set_weights(weights) - print("----------finish set weights-------------") + logging.info("----------finish set weights-------------") def save(self, model_path=""): - print("save model") + logging.info("save model") def model_info(self, model_path, result, relpath): - print("model info") + logging.info("model info") return {} @@ -89,51 +73,46 @@ def train(self, train_data, valid_data, **kwargs): round = kwargs.get("round", -1) task_id = kwargs.get("task_id", -1) task_size = kwargs.get("task_size", 10) - print(task_id, task_size) - if task_id > self.old_task_id and task_id > 0: - self.model = incremental_learning(self.model, task_size*(task_id+1)) - self.optimizer = tf.keras.optimizers.SGD(learning_rate=self.lr) - self.old_task_id = task_id - print(f"train data: {train_data[0].shape} {train_data[1].shape}") + self.model.compile(optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy']) + logging.info(f"train data: {train_data[0].shape} {train_data[1].shape}") train_db = self.data_process(train_data) - print(train_db) + logging.info(train_db) for epoch in range(self.epochs): total_loss = 0 total_num = 0 - print(f"Epoch {epoch + 1} / {self.epochs}") - print("-" * 50) + logging.info(f"Epoch {epoch + 1} / {self.epochs}") + logging.info("-" * 50) for x, y in train_db: + # self.model.fit(x, y, batch_size=self.batch_size) with tf.GradientTape() as tape: logits = self.model(x, training=True) - y = tf.one_hot(y, depth=(task_id + 1) * task_size) - y = tf.squeeze(y, axis=1) - loss = tf.reduce_mean(keras.losses.categorical_crossentropy(y, logits, from_logits=True)) + loss = tf.reduce_mean(keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)) grads = tape.gradient(loss, self.model.trainable_variables) self.optimizer.apply(grads, self.model.trainable_variables) # self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) total_loss += loss total_num += 1 - print(f"train round {round}: Epoch {epoch + 1} avg loss: {total_loss / total_num}") - print(f"finish round {round} train") - + logging.info(f"train round {round}: Epoch {epoch + 1} avg loss: {total_loss / total_num}") + logging.info(f"finish round {round} train") + self.eval(train_data, round) return {"num_samples": train_data[0].shape[0]} - def inference(self, data, **kwargs): + def predict(self, data, **kwargs): result = {} for data in data.x: x = np.load(data) logits = self.model(x, training=False) pred = tf.cast(tf.argmax(logits, axis=1), tf.int32) result[data] = pred.numpy() - print("finish predict") + logging.info("finish predict") return result def eval(self, data, round, **kwargs): total_num = 0 total_correct = 0 data = self.data_process(data) - print(f"in evalute data: {data}") + # print(f"in evalute data: {data}") for i, (x, y) in enumerate(data): logits = self.model(x, training=False) # prob = tf.nn.softmax(logits, axis=1) @@ -145,10 +124,10 @@ def eval(self, data, round, **kwargs): correct = tf.reduce_sum(correct) total_num += x.shape[0] total_correct += int(correct) - print(f"total_correct: {total_correct}, total_num: {total_num}") + logging.info(f"total_correct: {total_correct}, total_num: {total_num}") acc = total_correct / total_num del total_correct - print(f"finsih round {round}evaluate, acc: {acc}") + logging.info(f"finsih round {round}evaluate, acc: {acc}") return acc def data_process(self, data, **kwargs): diff --git a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/network.py b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/network.py index 52f7995c..87c6f779 100644 --- a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/network.py +++ b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/network.py @@ -2,28 +2,39 @@ import tensorflow as tf import numpy as np from keras.src.layers import Dense -from keras.src.models.cloning import clone_model from resnet import resnet10 class NetWork(keras.Model): def __init__(self, num_classes, feature_extractor): super(NetWork, self).__init__() + self.num_classes = num_classes self.feature = feature_extractor self.fc = Dense(num_classes, activation='softmax') def call(self, inputs): + # print(type(self.feature)) x = self.feature(inputs) x = self.fc(x) return x - def feature_extractor(self, inputs): - return self.feature(inputs) + return self.feature.predict(inputs) def predict(self, fea_input): return self.fc(fea_input) + def get_config(self): + return { + 'num_classes': self.num_classes, + 'feature_extractor': self.feature, + } + + @classmethod + def from_config(cls, config): + return cls(**config) + + def incremental_learning(old_model:NetWork, num_class): @@ -37,7 +48,6 @@ def incremental_learning(old_model:NetWork, num_class): if hasattr(new_model.feature, layer.name): new_model.feature.__setattr__(layer.name, layer) if num_class > old_model.fc.units: - original_use_bias = hasattr(old_model.fc, 'bias') print("original_use_bias", original_use_bias) init_class = old_model.fc.units @@ -47,7 +57,12 @@ def incremental_learning(old_model:NetWork, num_class): if original_use_bias: new_model.fc.bias.assign(tf.pad(old_model.fc.bias, [[0, num_class - init_class]])) - - new_model.build((None, 32, 32, 3)) return new_model + +def copy_model(model: NetWork): + cfg = model.get_config() + + copy_model = model.from_config(cfg) + return copy_model + diff --git a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/resnet.py b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/resnet.py index ceccc093..fbcd1f79 100644 --- a/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/resnet.py +++ b/examples/federated-learning/federated-class-incremental-learning/fedavg/algorithm/resnet.py @@ -45,6 +45,8 @@ def call(self, inputs, training=None): class ResNet(keras.Model): def __init__(self, layer_dims, num_classes=10): # [2, 2, 2, 2] super(ResNet, self).__init__() + self.layer_dims = layer_dims + self.num_classes = num_classes self.stem = keras.models.Sequential([keras.layers.Conv2D(64, (3, 3), strides=(1, 1)), keras.layers.BatchNormalization(), @@ -60,7 +62,6 @@ def __init__(self, layer_dims, num_classes=10): # [2, 2, 2, 2] # output: [b, 512, h, w], self.avgpool = keras.layers.GlobalAveragePooling2D() # self.fc = keras.layers.Dense(num_classes) - def call(self, inputs, training=None): x = self.stem(inputs,training=training) @@ -82,6 +83,17 @@ def build_resblock(self, filter_num, blocks, stride=1): for _ in range(1, blocks): res_blocks.add(BasicBlock(filter_num, stride=1)) return res_blocks + + def get_config(self): + return { + 'layer_dims': self.layer_dims, + 'num_classes': self.num_classes + } + + @classmethod + def from_config(cls, config): + return cls(**config) + def resnet10(num_classes:int): return ResNet([1, 1, 1, 1], num_classes) diff --git a/examples/federated-learning/federated-class-incremental-learning/fedavg/test.py b/examples/federated-learning/federated-class-incremental-learning/fedavg/test.py new file mode 100644 index 00000000..5b3cf860 --- /dev/null +++ b/examples/federated-learning/federated-class-incremental-learning/fedavg/test.py @@ -0,0 +1,14 @@ +from algorithm.resnet import resnet10 +from algorithm.network import NetWork, incremental_learning +import copy +import numpy as np +fe = resnet10(10) +model = NetWork(10, fe) +new_model = copy.deepcopy(model) + +x = np.random.rand(1, 32, 32, 3) +y = np.random.randint(0, 10, 1) +model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy']) +model.fit(x, y, epochs=1) +new_model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy']) +new_model.fit(x, y, epochs=1) \ No newline at end of file diff --git a/examples/federated-learning/federated-class-incremental-learning/fedavg/testenv/testenv.yaml b/examples/federated-learning/federated-class-incremental-learning/fedavg/testenv/testenv.yaml index c295a236..2baae28b 100644 --- a/examples/federated-learning/federated-class-incremental-learning/fedavg/testenv/testenv.yaml +++ b/examples/federated-learning/federated-class-incremental-learning/fedavg/testenv/testenv.yaml @@ -32,6 +32,5 @@ testenv: url: "/home/wyd/ianvs/project/ianvs/examples/federated-learning/federated-class-incremental-learning/fedavg/testenv/acc.py" # incremental rounds setting of incremental learning; int type; default value is 2; - task_size: 10 - incremental_rounds: 100 - round: 10 \ No newline at end of file + incremental_rounds: 2 + round: 2 \ No newline at end of file diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/GLFC.py b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/GLFC.py index 3a42c3d2..72e0c531 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/GLFC.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/GLFC.py @@ -4,6 +4,7 @@ import keras import logging from network import NetWork, incremental_learning, copy_model +from model import resnet10 def get_one_hot(target, num_classes): # print(f'in get one hot, target shape is {target.shape}') @@ -15,13 +16,15 @@ def get_one_hot(target, num_classes): return y class GLFC_Client: - def __init__(self, feature_extractor, num_classes, batch_size, task_size, memory_size, epochs, learning_rate, encode_model): + def __init__(self, num_classes, batch_size, task_size, memory_size, epochs, learning_rate, encode_model): self.epochs = epochs self.learning_rate = learning_rate - self.model = NetWork(num_classes, feature_extractor) + + # self.model = NetWork(num_classes, feature_extractor) self.encode_model = encode_model self.num_classes = num_classes + logging.info(f'num_classes is {num_classes}') self.batch_size = batch_size self.task_size = task_size @@ -38,6 +41,41 @@ def __init__(self, feature_extractor, num_classes, batch_size, task_size, memory self.current_class = None self.last_class = None self.train_loader = None + self.build_feature_extractor() + self.classifier = None + # self._initialize_classifier() + # assert self.classifier is not None + + def build_feature_extractor(self): + self.feature_extractor = resnet10() + self.feature_extractor.build(input_shape=(None, 32, 32, 3)) + self.feature_extractor.call(keras.Input(shape=(32, 32, 3))) + + def _initialize_classifier(self): + if self.classifier != None: + new_classifier = tf.keras.Sequential([ + # tf.keras.Input(shape=(None, self.feature_extractor.layers[-2].output_shape[-1])), + tf.keras.layers.Dense(self.num_classes, kernel_initializer='lecun_normal') + ]) + new_classifier.build(input_shape=(None, self.feature_extractor.layers[-2].output_shape[-1])) + new_weights = new_classifier.get_weights() + old_weights = self.classifier.get_weights() + # 复制旧参数 + # weight + new_weights[0][0:old_weights[0].shape[0], 0:old_weights[0].shape[1]] = old_weights[0] + # bias + new_weights[1][0:old_weights[1].shape[0]] = old_weights[1] + new_classifier.set_weights(new_weights) + self.classifier = new_classifier + else: + logging.info(f'input shape is {self.feature_extractor.layers[-2].output_shape[-1]}') + self.classifier = tf.keras.Sequential([ + # tf.keras.Input(shape=(None, self.feature_extractor.layers[-2].output_shape[-1])), + tf.keras.layers.Dense(self.num_classes, kernel_initializer='lecun_normal') + ]) + self.classifier.build(input_shape=(None, self.feature_extractor.layers[-2].output_shape[-1])) + + logging.info(f"finish ! initialize classifier {self.classifier.summary()}") def before_train(self, task_id, train_data, class_learned, old_model): logging.info(f"------before train task_id: {task_id}------") @@ -49,12 +87,11 @@ def before_train(self, task_id, train_data, class_learned, old_model): if self.current_class is not None: self.last_class = self.current_class logging.info(f'self.last_class is , {self.last_class}, {self.num_classes}') - self.model = incremental_learning(self.model, self.num_classes) + self._initialize_classifier() self.current_class = np.unique(train_data[1]).tolist() self.update_new_set(need_update) if len(old_model) != 0: self.old_model = old_model[1] - # incremental_learning(self.model, self.num_classes * (task_id+ 1)) else: if len(old_model) != 0: self.old_model = old_model[0] @@ -90,28 +127,38 @@ def _get_train_loader(self, mix): return tf.data.Dataset.from_tensor_slices((self.train_set[0], self.train_set[1])).shuffle(buffer_size=10000000).batch(self.batch_size) def train(self, round): + # self._initialize_classifier() opt = keras.optimizers.SGD(learning_rate=self.learning_rate, weight_decay=0.00001) # print(self.train_loader is None) - # self.model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy']) + feature_extractor_params = self.feature_extractor.trainable_variables + classifier_params = self.classifier.trainable_variables + all_params = [] + all_params.extend(feature_extractor_params) + all_params.extend(classifier_params) + for epoch in range(self.epochs): for step, (x, y) in enumerate(self.train_loader): # opt = keras.optimizers.SGD(learning_rate=self.learning_rate, weight_decay=0.00001) - # self.model.fit(x, y, epochs=3) with tf.GradientTape() as tape: - # logits = self.model(x, training=True) - # y = get_one_hot(y, self.num_classes) - # loss = tf.reduce_mean(keras.losses.categorical_crossentropy(y, logits, from_logits=True)) - loss = self._compute_loss(x, y) - logging.info(f'------round{round} epoch{epoch} step{step} loss: {loss} and loss dim is {loss.shape}------') - grads = tape.gradient(loss, self.model.trainable_variables) - # print(f'grads shape is {len(grads)} and type is {type(grads)}') - opt.apply_gradients(zip(grads, self.model.trainable_variables)) + logits = self.model_call(x, training=True) + # # y = get_one_hot(y, self.num_classes) + loss = tf.reduce_mean(keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)) + # loss = self._compute_loss(x, y) + # logging.info(f'------round{round} epoch{epoch} step{step} loss: {loss} and loss dim is {loss.shape}------') + grads = tape.gradient(loss, all_params) + # # print(f'grads shape is {len(grads)} and type is {type(grads)}') + opt.apply_gradients(zip(grads, all_params)) logging.info(f'------finish round{round} traning------') + + def model_call(self, x, training=False): + input = self.feature_extractor(inputs=x,training=training) + # logging.info(input.shape) + return self.classifier(inputs=input, training=training) def _compute_loss(self, imgs, labels): logging.info(f'self.old_model is available: {self.old_model is not None}') - y_pred = self.model(imgs, training=True) + y_pred = self.model_call(imgs, training=True) target = get_one_hot(labels, self.num_classes) logits = y_pred # prob = tf.nn.softmax(logits, axis=1) @@ -137,7 +184,7 @@ def _compute_loss(self, imgs, labels): # print(f'in _compute_loss, loss is {loss} and shape is {loss.shape}') distill_target = tf.Variable(get_one_hot(labels, self.num_classes)) # print(f"distill_target shape: {distill_target.shape} type: {type(distill_target)}") - old_target = tf.sigmoid(self.old_model(imgs)) + old_target = tf.sigmoid(self.old_model[1](self.old_model[0]((imgs)))) old_task_size = old_target.shape[1] # print(f'old_target shape: {old_target.shape} and old_task_size: {old_task_size}') distill_target[:, :old_task_size].assign(old_target) @@ -236,7 +283,7 @@ def _construct_exemplar_set(self, images,label, m): def compute_class_mean(self, images): images_data = tf.data.Dataset.from_tensor_slices(images).batch(self.batch_size) - fe_output = self.model.feature_extractor(inputs=images_data) + fe_output = self.feature_extractor.predict(images_data) fe_output = tf.nn.l2_normalize( fe_output).numpy() # print(f"fe_output shape is {fe_output.shape}") class_mean = tf.reduce_mean(fe_output, axis=0) @@ -264,14 +311,21 @@ def proto_grad(self): # print("in proto_grad, label shape is ", label.shape) label = tf.constant([label]) target = get_one_hot(label, self.num_classes) - proto_model = copy_model(self.model) + logging.info(f'proto_grad target shape is {target.shape} and num_classes is {self.num_classes}') + proto_fe = resnet10() + proto_fe.build(input_shape=(None, 32, 32, 3)) + proto_fe.call(keras.Input(shape=(32, 32, 3))) + proto_fe.set_weights(self.feature_extractor.get_weights()) + proto_clf = copy.deepcopy(self.classifier) + proto_param = proto_fe.trainable_variables + proto_param.extend(proto_clf.trainable_variables) opt = keras.optimizers.SGD(learning_rate=self.learning_rate, weight_decay=0.00001) for _ in range(iters): with tf.GradientTape() as tape: - output = proto_model(data) + output = proto_clf(proto_fe(data)) loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(target, output)) - grads = tape.gradient(loss, proto_model.trainable_variables) - opt.apply_gradients(zip(grads, proto_model.trainable_variables)) + grads = tape.gradient(loss, proto_param) + opt.apply_gradients(zip(grads,proto_param)) with tf.GradientTape() as tape: outputs = self.encode_model(data) loss_cls = cri_loss(label, outputs) @@ -287,7 +341,7 @@ def evaluate(self): total_num = 0 total_correct = 0 for x, y in self.train_loader: - logits = self.model(x, training=False) + logits = self.model_call(x, training=False) # prob = tf.nn.softmax(logits, axis=1) pred = tf.argmax(logits, axis=1) pred = tf.cast(pred, dtype=tf.int32) diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/aggregation.py b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/aggregation.py index c7ba2e96..3ec7b9d9 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/aggregation.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/aggregation.py @@ -17,13 +17,9 @@ class FedAvg(BaseAggregation, abc.ABC): def __init__(self): super(FedAvg, self).__init__() - self.fe = resnet10(10) - self.encode_model = lenet5(32, 100) self.proxy_server = ProxyServer( learning_rate=0.01, - num_class=10, - feature_extractor=self.fe, - encode_model=self.encode_model, + num_classes=10, test_data=None ) self.task_id = -1 @@ -62,7 +58,7 @@ def aggregate(self, clients): self.weights = [np.array(layer) for layer in updates] print("finish aggregation....") - return updates + return self.weights def helper_function(self,train_infos, **kwargs): proto_grad = [] @@ -79,7 +75,8 @@ def helper_function(self,train_infos, **kwargs): self.proxy_server.dataload(proto_grad) if task_id > self.task_id: self.task_id = task_id - self.proxy_server.model = incremental_learning(self.proxy_server.model, self.num_classes * (task_id + 1)) - self.proxy_server.model.set_weights(self.weights) + print(f'incremental num classes is {self.num_classes * (task_id + 1)}') + self.proxy_server.increment_class( self.num_classes * (task_id + 1) ) + self.proxy_server.set_weights(self.weights) print(f'finish set weight for proxy server') return {'best_old_model': self.proxy_server.model_back()} \ No newline at end of file diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/algorithm.yaml b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/algorithm.yaml index d022edf4..ae91bf77 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/algorithm.yaml +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/algorithm.yaml @@ -12,7 +12,7 @@ algorithm: # currently the options of value are as follows: # 1> "default": the dataset is evenly divided based train_ratio; splitting_method: "default" - + label_data_ratio: 1.0 data_partition: "iid" # the url address of initial network for network pre-training; string url; # the url address of initial network; string type; optional; diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/basemodel.py b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/basemodel.py index ce1f7de7..67df71ef 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/basemodel.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/basemodel.py @@ -1,5 +1,6 @@ import os import numpy as np +import keras import tensorflow as tf from sedna.common.class_factory import ClassType, ClassFactory from model import resnet10, lenet5 @@ -14,28 +15,36 @@ @ClassFactory.register(ClassType.GENERAL, alias='glfc') class BaseModel: def __init__(self, **kwargs) -> None: + self.kwargs = kwargs self.learning_rate = kwargs.get('learning_rate', 0.001) self.epochs = kwargs.get('epochs', 1) self.batch_size = kwargs.get('batch_size', 32) self.task_size = kwargs.get('task_size', 10) self.memory_size = kwargs.get('memory_size', 2000) self.encode_model = lenet5(32, 100) - self.fe = resnet10(self.task_size) - self.model = NetWork(self.task_size, self.fe) + # self.fe = self.build_feature_extractor() self.num_classes = 10 # the number of class for the first task - self.GLFC_Client = GLFC_Client( self.fe , self.num_classes, self.batch_size, self.task_size, self.memory_size, self.epochs, self.learning_rate, self.encode_model) + self.GLFC_Client = GLFC_Client( self.num_classes, self.batch_size, self.task_size, self.memory_size, self.epochs, self.learning_rate, self.encode_model) self.best_old_model = [] self.class_learned = 0 - + self.fe_weights_length = len(self.GLFC_Client.feature_extractor.get_weights()) + def get_weights(self): print("get weights") - weights = [layer.tolist() for layer in self.GLFC_Client.model.get_weights()] + weights = [] + fe_weights = self.GLFC_Client.feature_extractor.get_weights() + clf_weights = self.GLFC_Client.classifier.get_weights() + weights.extend(fe_weights) + weights.extend(clf_weights) return weights def set_weights(self, weights): print("set weights") - weights = [np.array(layer) for layer in weights] - self.GLFC_Client.model.set_weights(weights) + fe_weights = weights[:self.fe_weights_length] + + clf_weights = weights[self.fe_weights_length:] + self.GLFC_Client.feature_extractor.set_weights(fe_weights) + self.GLFC_Client.classifier.set_weights(clf_weights) def train(self, train_data,val_data, **kwargs): task_id = kwargs.get('task_id', 0) @@ -52,13 +61,13 @@ def train(self, train_data,val_data, **kwargs): def helper_function(self, helper_info, **kwargs): self.best_old_model = helper_info['best_old_model'] - pass + - def inference(self, data, **kwargs): + def predict(self, data, **kwargs): result = {} for data in data.x: x = np.load(data) - logits = self.GLFC_Client.model(x) + logits = self.GLFC_Client.model_call(x,training=False) pred = tf.cast(tf.argmax(logits, axis=1), tf.int32) result[data] = pred.numpy() print("finish predict") diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/model.py b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/model.py index d70d1c1e..abac6414 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/model.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/model.py @@ -43,10 +43,9 @@ def call(self, inputs, training=None): # 残差神经网络 class ResNet(keras.Model): - def __init__(self, layer_dims, num_classes=10): # [2, 2, 2, 2] + def __init__(self, layer_dims): # [2, 2, 2, 2] super(ResNet, self).__init__() self.layer_dims = layer_dims - self.num_classes = num_classes self.stem = keras.models.Sequential([keras.layers.Conv2D(64, (3, 3), strides=(1, 1)), keras.layers.BatchNormalization(), @@ -61,7 +60,6 @@ def __init__(self, layer_dims, num_classes=10): # [2, 2, 2, 2] # output: [b, 512, h, w], self.avgpool = keras.layers.GlobalAveragePooling2D() - # self.fc = keras.layers.Dense(num_classes) def call(self, inputs, training=None): x = self.stem(inputs,training=training) @@ -69,11 +67,7 @@ def call(self, inputs, training=None): x = self.layer2(x,training=training) x = self.layer3(x,training=training) x = self.layer4(x,training=training) - - # [b, c] x = self.avgpool(x) - # [b, 100] - # x = self.fc(x) return x def build_resblock(self, filter_num, blocks, stride=1): @@ -87,7 +81,6 @@ def build_resblock(self, filter_num, blocks, stride=1): def get_config(self): return { 'layer_dims': self.layer_dims, - 'num_classes': self.num_classes } @classmethod @@ -140,11 +133,11 @@ def lenet5(input_shape, num_classes:int): return LeNet(input_shape, 3, num_classes) -def resnet10(num_classes:int): - return ResNet([1, 1, 1, 1], num_classes) +def resnet10(): + return ResNet([1, 1, 1, 1]) def resnet18(num_classes:int): - return ResNet([2, 2, 2, 2], num_classes) + return ResNet([2, 2, 2, 2]) def resnet34(num_classes:int): - return ResNet([3, 4, 6, 3], num_classes) \ No newline at end of file + return ResNet([3, 4, 6, 3]) \ No newline at end of file diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/proxy_server.py b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/proxy_server.py index 3b18138a..dd7fb303 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/proxy_server.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/algorithm/proxy_server.py @@ -4,6 +4,7 @@ import tensorflow as tf import logging from network import * +from model import resnet10, resnet18, resnet34, lenet5 logging.getLogger().setLevel(logging.INFO) class ProxyData: @@ -12,32 +13,76 @@ def __init__(self): self.test_label = [] class ProxyServer: - def __init__(self, learning_rate, num_class, feature_extractor, encode_model, **kwargs): + def __init__(self, learning_rate, num_classes, **kwargs): self.learning_rate = learning_rate - self.feature_extractor = feature_extractor - self.encode_model = encode_model - self.model = NetWork(num_class, feature_extractor) - self.model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy']) - self.model.fit(np.random.rand(1, 32, 32, 3), np.random.randint(0, num_class, 1), epochs=1) + self.encode_model = lenet5(32, 100) self.monitor_dataset = ProxyData() self.new_set =[] self.new_set_label = [] - self.num_classes= 0 + self.num_classes= num_classes self.proto_grad = None - self.best_model_1 = None + + self.best_model_1 =None self.best_model_2 = None self.best_perf = 0 self.num_image = 20 self.Iteration = 250 + self.build_model() + self.fe_weights_length = len(self.feature_extractor.get_weights()) + self.classifier = None + + def build_model(self): + self.feature_extractor = resnet10() + self.feature_extractor.build(input_shape=(None, 32, 32, 3)) + self.feature_extractor.call(keras.Input(shape=(32, 32, 3))) + + def set_weights(self, weights): + print(f'set weights {self.num_classes}') + fe_weights = weights[:self.fe_weights_length] + clf_weights = weights[self.fe_weights_length:] + self.feature_extractor.set_weights(fe_weights) + self.classifier.set_weights(clf_weights) + + def increment_class(self, num_classes): + print(f'increment class {num_classes}') + self.num_classes = num_classes + self._initialize_classifier() + + def _initialize_classifier(self): + if self.classifier != None: + new_classifier = tf.keras.Sequential([ + # tf.keras.Input(shape=(None, self.feature_extractor.layers[-2].output_shape[-1])), + tf.keras.layers.Dense(self.num_classes, kernel_initializer='lecun_normal') + ]) + new_classifier.build(input_shape=(None, self.feature_extractor.layers[-2].output_shape[-1])) + new_weights = new_classifier.get_weights() + old_weights = self.classifier.get_weights() + # 复制旧参数 + # weight + new_weights[0][0:old_weights[0].shape[0], 0:old_weights[0].shape[1]] = old_weights[0] + # bias + new_weights[1][0:old_weights[1].shape[0]] = old_weights[1] + new_classifier.set_weights(new_weights) + self.classifier = new_classifier + else: + logging.info(f'input shape is {self.feature_extractor.layers[-2].output_shape[-1]}') + self.classifier = tf.keras.Sequential([ + # tf.keras.Input(shape=(None, self.feature_extractor.layers[-2].output_shape[-1])), + tf.keras.layers.Dense(self.num_classes, kernel_initializer='lecun_normal') + ]) + self.classifier.build(input_shape=(None, self.feature_extractor.layers[-2].output_shape[-1])) + + logging.info(f"finish ! initialize classifier {self.classifier}") def model_back(self): return [self.best_model_1, self.best_model_2] def dataload(self, proto_grad): + self._initialize_classifier() self.proto_grad = proto_grad if len(proto_grad ) != 0 : self.reconstruction() @@ -49,13 +94,13 @@ def dataload(self, proto_grad): logging.info(f'in proxy server, current performance is {cur_perf}') if cur_perf > self.best_perf: self.best_perf = cur_perf - self.best_model_2 = copy_model(self.model) + self.best_model_2 = (self.feature_extractor, self.classifier) def monitor(self): correct, total = 0, 0 for (x, y) in zip(self.monitor_dataset.test_data, self.monitor_dataset.test_label): - y_pred = self.model(x) + y_pred = self.classifier(self.feature_extractor((x))) predicts = tf.argmax(y_pred, axis=-1) predicts = tf.cast(predicts, tf.int32) diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/test._train.py b/examples/federated-learning/federated-class-incremental-learning/glfc/test._train.py index c43d64b0..4a3a9b74 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/test._train.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/test._train.py @@ -4,10 +4,10 @@ from algorithm.model import resnet10, lenet5 from sedna.datasources import TxtDataParse from core.testenvmanager.dataset.utils import read_data_from_file_to_npy +import copy train_file = '/home/wyd/ianvs/project/data/cifar100/cifar100_train.txt' train_data = TxtDataParse(data_type='train') train_data.parse(train_file) -print(train_data.x, train_data.y) train_data = read_data_from_file_to_npy(train_data) train_loader = tf.data.Dataset.from_tensor_slices(train_data).shuffle(500000).batch(32) x_train, y_train = train_data @@ -22,7 +22,6 @@ # if range != 0 and range % 10 == 0: # task_id += 1 # model = incremental_learning(model, 10*(task_id+1)) - for x, y in task_loader: model.fit(x, y, epochs=1) # print(y) @@ -44,22 +43,3 @@ # grads = tape.gradient(loss, model.trainable_variables) # optimizer.apply(grads, model.trainable_variables) # print(f'loss: {loss}, accuracy: {correct / x.shape[0]}') - -total_num = 0 -total_correct = 0 -for x, y in train_loader: - logits = model(x, training=True) - label =y - # print(y.shape[0]) - y = tf.one_hot(y, depth=(task_id + 1) * 10) - y = tf.squeeze(y, axis=1) - pred = tf.argmax(logits, axis=1) - pred = tf.cast(pred, dtype=tf.int32) - pred = tf.reshape(pred, label.shape) - # print(pred.shape, label.shape) - correct = tf.cast(tf.equal(pred, label), dtype=tf.int32) - # print(correct.shape) - correct = tf.reduce_sum(correct) - total_correct += correct - total_num += x.shape[0] -print('train accuracy:', total_correct / total_num) \ No newline at end of file diff --git a/examples/federated-learning/federated-class-incremental-learning/glfc/test.py b/examples/federated-learning/federated-class-incremental-learning/glfc/test.py index cbf80dcd..781c7b39 100644 --- a/examples/federated-learning/federated-class-incremental-learning/glfc/test.py +++ b/examples/federated-learning/federated-class-incremental-learning/glfc/test.py @@ -1,14 +1,214 @@ -import random +from keras import Sequential,Input +from keras.src.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense +from typing import List +import tensorflow as tf import numpy as np +import keras +from keras import layers, Sequential -y = np.random.randint(0,10 ,size=(500,1)) -y = np.sort(y, axis=0) -print(y) -class_num = len(np.unique(y)) -current_class = random.sample([x for x in range(class_num)], 6) -print(current_class) - -indices = np.where((y==current_class)) -print(indices) -print(y[indices[0]].shape) -# print(y[497][5]) \ No newline at end of file +class Conv2D(keras.layers.Layer): + def __init__(self, is_combined:bool, alpha:float, filter_num, kernel_size, strides = (1, 1), padding: str = "valid"): + super(Conv2D, self).__init__() + self.is_combined = is_combined + self.alpha = tf.Variable(alpha) + self.conv_local = layers.Conv2D(filter_num, kernel_size, strides, padding, kernel_initializer='he_normal') + self.conv_global = layers.Conv2D(filter_num, kernel_size, strides, padding, kernel_initializer='he_normal') + + def call(self, inputs): + return self.alpha * self.conv_global(inputs) + (1 - self.alpha) * self.conv_local(inputs) + + def get_alpha(self): + return self.alpha + + def set_alpha(self, alpha): + self.alpha.assign(alpha) + + def get_global_weights(self): + return self.conv_global.get_weights() + + def set_global_weights(self, global_weights): + self.conv_global.set_weights(global_weights) + + def get_global_variables(self): + return self.conv_global.trainable_variables + + def merge_to_local(self): + new_weight = [] + for w_local, w_global in zip(self.conv_local.get_weights(), self.conv_global.get_weights()): + new_weight.append(self.alpha * w_global + (1 - self.alpha) * w_local) + self.conv_local.set_weights(new_weight) + self.alpha.assign(0.0) + + def switch_to_global(self): + self.conv_global.set_weights(self.conv_local.get_weights()) + +# 卷积块 +# Input--conv2D--BN--ReLU--conv2D--BN--ReLU--Output +# \ / +# ------------------------------ +class BasicBlock(keras.Model): + def __init__(self, is_combined:bool, filter_num, stride=1): + super(BasicBlock, self).__init__() + + self.filter_num = filter_num + self.stride = stride + + self.conv1 = Conv2D(is_combined, 0.0, filter_num, (3, 3), strides=stride, padding='same') + self.bn1 = layers.BatchNormalization() + self.relu = layers.Activation('relu') + + self.conv2 = Conv2D(is_combined, 0.0, filter_num, (3, 3), strides=1, padding='same') + self.bn2 = layers.BatchNormalization() + + if stride != 1: + self.downsample = Sequential() + self.downsample.add(Conv2D(is_combined, 0.0, filter_num, (1, 1), strides=stride)) + else: + self.downsample = lambda x:x + + def call(self, inputs, training=None): + # [b, h, w, c] + out = self.conv1(inputs) + out = self.bn1(out,training=training) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out,training=training) + + identity = self.downsample(inputs) + + output = layers.add([out, identity]) + output = tf.nn.relu(output) + + return output + +# 残差神经网络 +class ResNet(keras.Model): + def __init__(self, is_combined:bool, layer_dims): # [2, 2, 2, 2] + super(ResNet, self).__init__() + + self.is_combined = is_combined + self.stem = Sequential([Conv2D(is_combined, 0.0, 64, (3, 3), strides=(1, 1)), + layers.BatchNormalization(), + layers.Activation('relu'), + layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same') + ]) + + self.layer1 = self.build_resblock(64, layer_dims[0]) + self.layer2 = self.build_resblock(128, layer_dims[1], stride=2) + self.layer3 = self.build_resblock(256, layer_dims[2], stride=2) + self.layer4 = self.build_resblock(512, layer_dims[3], stride=2) + + # output: [b, 512, h, w], + self.avgpool = layers.GlobalAveragePooling2D() + + def call(self, inputs, training=None): + x = self.stem(inputs,training=training) + + x = self.layer1(x,training=training) + x = self.layer2(x,training=training) + x = self.layer3(x,training=training) + x = self.layer4(x,training=training) + + # [b, c] + x = self.avgpool(x) + return x + + def build_resblock(self, filter_num, blocks, stride=1): + res_blocks = [] + # may down sample + res_blocks.append(BasicBlock(self.is_combined, filter_num, stride)) + for _ in range(1, blocks): + res_blocks.append(BasicBlock(self.is_combined, filter_num, stride=1)) + return Sequential(res_blocks) + + def get_alpha(self): + convs = self._get_all_conv_layers() + ret = [] + for conv in convs: + ret.append(conv.get_alpha()) + return ret + + def set_alpha(self, alpha = 0.0): + convs = self._get_all_conv_layers() + for conv in convs: + conv.set_alpha(alpha) + + def merge_to_local_model(self): + convs = self._get_all_conv_layers() + for conv in convs: + conv.merge_to_local() + + def switch_to_global(self): + convs = self._get_all_conv_layers() + for conv in convs: + conv.switch_to_global() + + def initialize_alpha(self): + convs = self._get_all_conv_layers() + for conv in convs: + conv.set_alpha(np.random.random()) + + def set_global_model(self, global_model): + local_convs = self._get_all_conv_layers() + global_convs = global_model._get_all_conv_layers() + for local_conv, global_conv in zip(local_convs, global_convs): + local_conv.set_global_weights(global_conv.get_global_weights()) + + + def get_global_variables(self): + convs = self._get_all_conv_layers() + ret = [] + for conv in convs: + ret.extend(conv.get_global_variables()) + return ret + + def _get_all_conv_layers(self) -> List[Conv2D]: + def get_all_conv_layers_(model): + convs = [] + for i in model.layers: + if isinstance(i, Conv2D): + convs.append(i) + elif isinstance(i, keras.Model): + convs.extend(get_all_conv_layers_(i)) + return convs + return get_all_conv_layers_(self) + + +def resnet10(is_combined = False) -> ResNet: + return ResNet(is_combined, [1, 1, 1, 1]) + +def resnet18(is_combined = False) -> ResNet: + return ResNet(is_combined, [2, 2, 2, 2]) + +def resnet34(is_combined = False) -> ResNet: + return ResNet(is_combined, [3, 4, 6, 3]) + +feature_extractor = resnet10(10) +feature_extractor.build(input_shape=(None, 32, 32, 3)) +feature_extractor.call(Input(shape=(32, 32, 3))) +print(feature_extractor.summary()) +classifier = Sequential([ + + Dense( 10, kernel_initializer='lecun_normal') + ]) +x = np.random.random((32, 512)) +classifier(x) +# all_weight = [] +# fe_weight = feature_extractor.get_weights() +# clf_weight = classifier.get_weights() + +# all_weight.extend(fe_weight) +# all_weight.extend(clf_weight) +# print(len(fe_weight)) +# print('-'*50) +# print(len(clf_weight)) + +# print(len(all_weight)) +# proto_fe = resnet10() +# proto_fe.build(input_shape=(None, 32, 32, 3)) +# proto_fe.call(keras.Input(shape=(32, 32, 3))) +# print(proto_fe.summary()) +# proto_fe.set_weights(fe_weight) +# for w in fe_weight: +# print(w.shape) \ No newline at end of file