From 62e2441be9b580007c02fb2492b2672baf35d100 Mon Sep 17 00:00:00 2001 From: Chester Chen <512707+chesterxgchen@users.noreply.github.com> Date: Fri, 3 Jan 2025 08:40:10 -0800 Subject: [PATCH] Fed Statistics: Adding Percentiles support (#3124) * 1. Add percentile support using t-digest 2. Add examples for df_stats 3. refactoring the some of the codebase 4. missing work 1. add DP noise 2. make writing filer easier for end-user 3. add job API for the stats. Job 4. make it even easier to work on stats. 5. unit tests * 1. Add percentile support using t-digest 2. Add examples for df_stats 3. refactoring the some of the codebase 4. missing work 1. add DP noise 2. make writing filer easier for end-user 3. add job API for the stats. Job 4. make it even easier to work on stats. 5. unit tests * add unit tests add job api in example format style * add tdigest license file * remove debugging print * fix test * format style changes * format style changes --- 3rdParty/tdigest.LICENSE.txt | 23 +++ CONTRIBUTING.md | 4 + .../federated-statistics/df_stats.ipynb | 32 +++-- .../df_stats/demo/visualization.ipynb | 2 +- .../df_stats/job_api/df_statistics.py | 75 ++++++++++ .../df_stats/job_api/df_stats_job.py | 72 ++++++++++ .../app/config/config_fed_client.json | 18 +-- .../app/config/config_fed_server.json | 6 +- .../jobs/df_stats/app/custom/df_statistics.py | 73 +--------- .../df_stats/requirements.txt | 2 +- .../image_stats/job_api/image_statistics.py | 133 ++++++++++++++++++ .../image_stats/job_api/image_stats_job.py | 64 +++++++++ .../image_stats/requirements.txt | 1 - .../cifar10/stats/image_stats_job.py | 2 +- .../higgs/stats/code/df_stats_job.py | 2 +- .../app_common/abstract/statistics_spec.py | 13 ++ nvflare/app_common/app_constant.py | 15 +- .../statistics/statistics_task_handler.py | 16 ++- .../app_common/statistics/numeric_stats.py | 115 ++++++++++++--- .../statistics/statistics_config_utils.py | 11 ++ .../workflows/statistics_controller.py | 37 +++-- nvflare/app_opt/statistics/df/__init__.py | 13 ++ .../statistics/df/df_core_statistics.py | 116 +++++++++++++++ nvflare/job_config/stats_job.py | 7 +- runtest.sh | 2 +- setup.cfg | 1 + .../statistics/numeric_stats_test.py | 4 +- .../unit_test/app_opt/statistics/__init__.py | 13 ++ .../app_opt/statistics/percentiles_test.py | 90 ++++++++++++ 29 files changed, 819 insertions(+), 143 deletions(-) create mode 100644 3rdParty/tdigest.LICENSE.txt create mode 100644 examples/advanced/federated-statistics/df_stats/job_api/df_statistics.py create mode 100644 examples/advanced/federated-statistics/df_stats/job_api/df_stats_job.py create mode 100644 examples/advanced/federated-statistics/image_stats/job_api/image_statistics.py create mode 100644 examples/advanced/federated-statistics/image_stats/job_api/image_stats_job.py create mode 100644 nvflare/app_opt/statistics/df/__init__.py create mode 100644 nvflare/app_opt/statistics/df/df_core_statistics.py create mode 100644 tests/unit_test/app_opt/statistics/__init__.py create mode 100644 tests/unit_test/app_opt/statistics/percentiles_test.py diff --git a/3rdParty/tdigest.LICENSE.txt b/3rdParty/tdigest.LICENSE.txt new file mode 100644 index 0000000000..c4da63fac0 --- /dev/null +++ b/3rdParty/tdigest.LICENSE.txt @@ -0,0 +1,23 @@ +https://github.com/CamDavidsonPilon/tdigest/blob/master/LICENSE.txt + +The MIT License (MIT) + +Copyright (c) 2015 Cameron Davidson-Pilon + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 32843e03e6..07e05e2264 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -44,6 +44,10 @@ To collaborate efficiently, please read through this section and follow them. * [Building documentation](#building-the-documentation) * [Signing your work](#signing-your-work) +> Note: + > some package dependencies requires python-dev in local development such as + > python3.12-dev. + #### Checking the coding style We check code style using flake8 and isort. A bash script (`runtest.sh`) is provided to run all tests locally. diff --git a/examples/advanced/federated-statistics/df_stats.ipynb b/examples/advanced/federated-statistics/df_stats.ipynb index 93d3e86140..944544e4e5 100644 --- a/examples/advanced/federated-statistics/df_stats.ipynb +++ b/examples/advanced/federated-statistics/df_stats.ipynb @@ -144,15 +144,27 @@ { "cell_type": "code", "execution_count": null, - "id": "0d5041aa-c2e0-4af6-a2c8-bae76e4512d0", + "id": "6361a85e-4187-433c-976c-0dc4021908ac", + "metadata": {}, + "outputs": [], + "source": [ + "! nvflare simulator df_stats/jobs/df_stats -w /tmp/nvflare/df/workdir -n 2 -t 2" + ] + }, + { + "cell_type": "markdown", + "id": "4fdbfb95-90c9-4d45-b727-dab6f5a8bc41", "metadata": { "tags": [] }, - "outputs": [], "source": [ + "Or python code\n", + "```\n", "from nvflare.private.fed.app.simulator.simulator_runner import SimulatorRunner\n", - "runner = SimulatorRunner(job_folder=\"df_stats/jobs/df_stats\", workspace=\"/tmp/nvflare/df_stats/workdir\", n_clients = 2, threads=2)\n", - "runner.run()" + "runner = SimulatorRunner(job_folder=\"df_stats/jobs/df_stats\", workspace=\"/tmp/nvflare/df/workdir\", n_clients = 2, threads=2)\n", + "runner.run()\n", + "\n", + "```" ] }, { @@ -167,7 +179,7 @@ "From a **terminal** one can also the following equivalent CLI\n", "\n", "```\n", - "nvflare simulator df_stats/jobs/df_stats -w /tmp/nvflare/df_stats -n 2 -t 2\n", + "nvflare simulator df_stats/jobs/df_stats -w /tmp/nvflare/df/workdir -n 2 -t 2\n", "\n", "```\n", "\n", @@ -184,9 +196,9 @@ "metadata": {}, "source": [ "\n", - "The results are stored in workspace \"/tmp/nvflare/df_stats/workdir/\"\n", + "The results are stored in workspace \"/tmp/nvflare/df/workdir/\"\n", "```\n", - "/tmp/nvflare/df_stats/workdir/server/simulate_job/statistics/adults_stats.json\n", + "/tmp/nvflare/df/workdir/server/simulate_job/statistics/adults_stats.json\n", "```" ] }, @@ -199,7 +211,7 @@ }, "outputs": [], "source": [ - "cat /tmp/nvflare/df_stats/workdir/server/simulate_job/statistics/adults_stats.json" + "cat /tmp/nvflare/df/workdir/server/simulate_job/statistics/adults_stats.json" ] }, { @@ -222,7 +234,7 @@ }, "outputs": [], "source": [ - "! cp /tmp/nvflare/df_stats/workdir/server/simulate_job/statistics/adults_stats.json df_stats/demo/." + "! cp /tmp/nvflare/df/workdir/server/simulate_job/statistics/adults_stats.json df_stats/demo/." ] }, { @@ -271,7 +283,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.19" + "version": "3.10.2" } }, "nbformat": 4, diff --git a/examples/advanced/federated-statistics/df_stats/demo/visualization.ipynb b/examples/advanced/federated-statistics/df_stats/demo/visualization.ipynb index 99eb5c4c91..283f5279b2 100644 --- a/examples/advanced/federated-statistics/df_stats/demo/visualization.ipynb +++ b/examples/advanced/federated-statistics/df_stats/demo/visualization.ipynb @@ -285,7 +285,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.17" + "version": "3.10.2" } }, "nbformat": 4, diff --git a/examples/advanced/federated-statistics/df_stats/job_api/df_statistics.py b/examples/advanced/federated-statistics/df_stats/job_api/df_statistics.py new file mode 100644 index 0000000000..5078c2f0a9 --- /dev/null +++ b/examples/advanced/federated-statistics/df_stats/job_api/df_statistics.py @@ -0,0 +1,75 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Optional + +import pandas as pd + +from nvflare.apis.fl_context import FLContext +from nvflare.app_opt.statistics.df.df_core_statistics import DFStatisticsCore + + +class DFStatistics(DFStatisticsCore): + def __init__(self, data_path): + super().__init__() + self.data_root_dir = "/tmp/nvflare/df_stats/data" + self.data_path = data_path + self.data: Optional[Dict[str, pd.DataFrame]] = None + self.data_features = [ + "Age", + "Workclass", + "fnlwgt", + "Education", + "Education-Num", + "Marital Status", + "Occupation", + "Relationship", + "Race", + "Sex", + "Capital Gain", + "Capital Loss", + "Hours per week", + "Country", + "Target", + ] + + # the original dataset has no header, + # we will use the adult.train dataset for site-1, the adult.test dataset for site-2 + # the adult.test dataset has incorrect formatted row at 1st line, we will skip it. + self.skip_rows = { + "site-1": [], + "site-2": [0], + } + + def load_data(self, fl_ctx: FLContext) -> Dict[str, pd.DataFrame]: + client_name = fl_ctx.get_identity_name() + self.log_info(fl_ctx, f"load data for client {client_name}") + try: + skip_rows = self.skip_rows[client_name] + data_path = f"{self.data_root_dir}/{fl_ctx.get_identity_name()}/{self.data_path}" + # example of load data from CSV + df: pd.DataFrame = pd.read_csv( + data_path, names=self.data_features, sep=r"\s*,\s*", skiprows=skip_rows, engine="python", na_values="?" + ) + train = df.sample(frac=0.8, random_state=200) # random state is a seed value + test = df.drop(train.index).sample(frac=1.0) + + self.log_info(fl_ctx, f"load data done for client {client_name}") + return {"train": train, "test": test} + + except Exception as e: + raise Exception(f"Load data for client {client_name} failed! {e}") + + def initialize(self, fl_ctx: FLContext): + self.data = self.load_data(fl_ctx) diff --git a/examples/advanced/federated-statistics/df_stats/job_api/df_stats_job.py b/examples/advanced/federated-statistics/df_stats/job_api/df_stats_job.py new file mode 100644 index 0000000000..696d57170c --- /dev/null +++ b/examples/advanced/federated-statistics/df_stats/job_api/df_stats_job.py @@ -0,0 +1,72 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse + +from df_statistics import DFStatistics + +from nvflare.job_config.stats_job import StatsJob + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("-n", "--n_clients", type=int, default=3) + parser.add_argument("-d", "--data_root_dir", type=str, nargs="?", default="/tmp/nvflare/dataset/output") + parser.add_argument("-o", "--stats_output_path", type=str, nargs="?", default="statistics/stats.json") + parser.add_argument("-j", "--job_dir", type=str, nargs="?", default="/tmp/nvflare/jobs/stats_df") + parser.add_argument("-w", "--work_dir", type=str, nargs="?", default="/tmp/nvflare/jobs/stats_df/work_dir") + parser.add_argument("-co", "--export_config", action="store_true", help="config only mode, export config") + + return parser.parse_args() + + +def main(): + args = define_parser() + + n_clients = args.n_clients + data_root_dir = args.data_root_dir + output_path = args.stats_output_path + job_dir = args.job_dir + work_dir = args.work_dir + export_config = args.export_config + + statistic_configs = { + "count": {}, + "mean": {}, + "sum": {}, + "stddev": {}, + "histogram": {"*": {"bins": 20}}, + "Age": {"bins": 20, "range": [0, 10]}, + "percentile": {"*": [25, 50, 75], "Age": [50, 95]}, + } + # define local stats generator + df_stats_generator = DFStatistics(data_root_dir=data_root_dir) + + job = StatsJob( + job_name="stats_df", + statistic_configs=statistic_configs, + stats_generator=df_stats_generator, + output_path=output_path, + ) + + sites = [f"site-{i + 1}" for i in range(n_clients)] + job.setup_clients(sites) + + if export_config: + job.export_job(job_dir) + else: + job.simulator_run(work_dir) + + +if __name__ == "__main__": + main() diff --git a/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_client.json b/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_client.json index 858431b138..3d3ade8735 100644 --- a/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_client.json +++ b/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_client.json @@ -14,23 +14,7 @@ } } ], - "task_result_filters": [ - { - "tasks": ["fed_stats"], - "filters":[ - { - "path": "nvflare.app_common.filters.statistics_privacy_filter.StatisticsPrivacyFilter", - "args": { - "result_cleanser_ids": [ - "min_count_cleanser", - "min_max_noise_cleanser", - "hist_bins_cleanser" - ] - } - } - ] - } - ], + "task_data_filters": [], "components": [ { diff --git a/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_server.json b/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_server.json index 344ed08e4d..58e4b861fb 100644 --- a/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_server.json +++ b/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/config/config_fed_server.json @@ -18,10 +18,14 @@ "bins": 10, "range": [0,120] } + }, + "percentile": { + "*": [25, 50, 75] } }, "writer_id": "stats_writer", - "enable_pre_run_task": false + "enable_pre_run_task": false, + "precision" : 2 } } ], diff --git a/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/custom/df_statistics.py b/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/custom/df_statistics.py index 4a87b6c4b9..4277f269e7 100644 --- a/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/custom/df_statistics.py +++ b/examples/advanced/federated-statistics/df_stats/jobs/df_stats/app/custom/df_statistics.py @@ -12,18 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Optional +from typing import Dict, Optional -import numpy as np import pandas as pd -from pandas.core.series import Series from nvflare.apis.fl_context import FLContext -from nvflare.app_common.abstract.statistics_spec import BinRange, Feature, Histogram, HistogramType, Statistics -from nvflare.app_common.statistics.numpy_utils import dtype_to_data_type, get_std_histogram_buckets +from nvflare.app_opt.statistics.df.df_core_statistics import DFStatisticsCore -class DFStatistics(Statistics): +class DFStatistics(DFStatisticsCore): def __init__(self, data_path): super().__init__() self.data_root_dir = "/tmp/nvflare/df_stats/data" @@ -76,67 +73,3 @@ def load_data(self, fl_ctx: FLContext) -> Dict[str, pd.DataFrame]: def initialize(self, fl_ctx: FLContext): self.data = self.load_data(fl_ctx) - if self.data is None: - raise ValueError("data is not loaded. make sure the data is loaded") - - def features(self) -> Dict[str, List[Feature]]: - results: Dict[str, List[Feature]] = {} - for ds_name in self.data: - df = self.data[ds_name] - results[ds_name] = [] - for feature_name in df: - data_type = dtype_to_data_type(df[feature_name].dtype) - results[ds_name].append(Feature(feature_name, data_type)) - - return results - - def count(self, dataset_name: str, feature_name: str) -> int: - df: pd.DataFrame = self.data[dataset_name] - return df[feature_name].count() - - def sum(self, dataset_name: str, feature_name: str) -> float: - df: pd.DataFrame = self.data[dataset_name] - return df[feature_name].sum().item() - - def mean(self, dataset_name: str, feature_name: str) -> float: - - count: int = self.count(dataset_name, feature_name) - sum_value: float = self.sum(dataset_name, feature_name) - return sum_value / count - - def stddev(self, dataset_name: str, feature_name: str) -> float: - df = self.data[dataset_name] - return df[feature_name].std().item() - - def variance_with_mean( - self, dataset_name: str, feature_name: str, global_mean: float, global_count: float - ) -> float: - df = self.data[dataset_name] - tmp = (df[feature_name] - global_mean) * (df[feature_name] - global_mean) - variance = tmp.sum() / (global_count - 1) - return variance.item() - - def histogram( - self, dataset_name: str, feature_name: str, num_of_bins: int, global_min_value: float, global_max_value: float - ) -> Histogram: - - num_of_bins: int = num_of_bins - - df = self.data[dataset_name] - feature: Series = df[feature_name] - flattened = feature.ravel() - flattened = flattened[flattened != np.array(None)] - buckets = get_std_histogram_buckets(flattened, num_of_bins, BinRange(global_min_value, global_max_value)) - return Histogram(HistogramType.STANDARD, buckets) - - def max_value(self, dataset_name: str, feature_name: str) -> float: - """this is needed for histogram calculation, not used for reporting""" - - df = self.data[dataset_name] - return df[feature_name].max() - - def min_value(self, dataset_name: str, feature_name: str) -> float: - """this is needed for histogram calculation, not used for reporting""" - - df = self.data[dataset_name] - return df[feature_name].min() diff --git a/examples/advanced/federated-statistics/df_stats/requirements.txt b/examples/advanced/federated-statistics/df_stats/requirements.txt index aef6212b4c..c766bd7827 100644 --- a/examples/advanced/federated-statistics/df_stats/requirements.txt +++ b/examples/advanced/federated-statistics/df_stats/requirements.txt @@ -1,5 +1,5 @@ -nvflare~=2.5.0rc numpy pandas matplotlib jupyterlab +tdigest diff --git a/examples/advanced/federated-statistics/image_stats/job_api/image_statistics.py b/examples/advanced/federated-statistics/image_stats/job_api/image_statistics.py new file mode 100644 index 0000000000..3bfe2ea61c --- /dev/null +++ b/examples/advanced/federated-statistics/image_stats/job_api/image_statistics.py @@ -0,0 +1,133 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import glob +import os +from typing import Dict, List, Optional + +import numpy as np +from monai.data import ITKReader, load_decathlon_datalist +from monai.transforms import LoadImage + +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.abstract.statistics_spec import Bin, DataType, Feature, Histogram, HistogramType, Statistics +from nvflare.security.logging import secure_log_traceback + + +class ImageStatistics(Statistics): + def __init__(self, data_root: str = "/tmp/nvflare/image_stats/data", data_list_key: str = "data"): + """local image statistics generator . + + Args: + data_root: directory with local image data. + data_list_key: data list key to use. + Returns: + a Shareable with the computed local statistics` + """ + super().__init__() + self.data_list_key = data_list_key + self.data_root = data_root + self.data_list = None + self.client_name = None + + self.loader = None + self.failure_images = 0 + self.fl_ctx = None + + def initialize(self, fl_ctx: FLContext): + self.fl_ctx = fl_ctx + self.client_name = fl_ctx.get_identity_name() + self.loader = LoadImage(image_only=True) + self.loader.register(ITKReader()) + self._load_data_list(self.client_name, fl_ctx) + + if self.data_list is None: + raise ValueError("data is not loaded. make sure the data is loaded") + + def _load_data_list(self, client_name, fl_ctx: FLContext) -> bool: + dataset_json = glob.glob(os.path.join(self.data_root, client_name + "*.json")) + if len(dataset_json) != 1: + self.log_error( + fl_ctx, f"No unique matching dataset list found in {self.data_root} for client {client_name}" + ) + return False + dataset_json = dataset_json[0] + self.log_info(fl_ctx, f"Reading data from {dataset_json}") + + data_list = load_decathlon_datalist( + data_list_file_path=dataset_json, data_list_key=self.data_list_key, base_dir=self.data_root + ) + self.data_list = {"train": data_list} + + self.log_info(fl_ctx, f"Client {client_name} has {len(self.data_list)} images") + return True + + def pre_run( + self, + statistics: List[str], + num_of_bins: Optional[Dict[str, Optional[int]]], + bin_ranges: Optional[Dict[str, Optional[List[float]]]], + ): + return {} + + def features(self) -> Dict[str, List[Feature]]: + return {"train": [Feature("intensity", DataType.FLOAT)]} + + def count(self, dataset_name: str, feature_name: str) -> int: + image_paths = self.data_list[dataset_name] + return len(image_paths) + + def failure_count(self, dataset_name: str, feature_name: str) -> int: + + return self.failure_images + + def histogram( + self, dataset_name: str, feature_name: str, num_of_bins: int, global_min_value: float, global_max_value: float + ) -> Histogram: + histogram_bins: List[Bin] = [] + histogram = np.zeros((num_of_bins,), dtype=np.int64) + bin_edges = [] + for i, entry in enumerate(self.data_list[dataset_name]): + file = entry.get("image") + try: + img = self.loader(file) + curr_histogram, bin_edges = np.histogram( + img, bins=num_of_bins, range=(global_min_value, global_max_value) + ) + histogram += curr_histogram + bin_edges = bin_edges.tolist() + + if i % 100 == 0: + self.logger.info( + f"{self.client_name}, adding {i + 1} of {len(self.data_list[dataset_name])}: {file}" + ) + except Exception as e: + self.failure_images += 1 + self.logger.critical( + f"Failed to load file {file} with exception: {e.__str__()}. " f"Skipping this image..." + ) + + if num_of_bins + 1 != len(bin_edges): + secure_log_traceback() + raise ValueError( + f"bin_edges size: {len(bin_edges)} is not matching with number of bins + 1: {num_of_bins + 1}" + ) + + for j in range(num_of_bins): + low_value = bin_edges[j] + high_value = bin_edges[j + 1] + bin_sample_count = histogram[j] + histogram_bins.append(Bin(low_value=low_value, high_value=high_value, sample_count=bin_sample_count)) + + return Histogram(HistogramType.STANDARD, histogram_bins) diff --git a/examples/advanced/federated-statistics/image_stats/job_api/image_stats_job.py b/examples/advanced/federated-statistics/image_stats/job_api/image_stats_job.py new file mode 100644 index 0000000000..d7c5cf753c --- /dev/null +++ b/examples/advanced/federated-statistics/image_stats/job_api/image_stats_job.py @@ -0,0 +1,64 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse + +from image_statistics import ImageStatistics + +from nvflare.job_config.stats_job import StatsJob + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("-n", "--n_clients", type=int, default=3) + parser.add_argument("-d", "--data_root_dir", type=str, nargs="?", default="/tmp/nvflare/dataset/output") + parser.add_argument("-o", "--stats_output_path", type=str, nargs="?", default="statistics/stats.json") + parser.add_argument("-j", "--job_dir", type=str, nargs="?", default="/tmp/nvflare/jobs/stats_df") + parser.add_argument("-w", "--work_dir", type=str, nargs="?", default="/tmp/nvflare/jobs/stats_df/work_dir") + parser.add_argument("-co", "--export_config", action="store_true", help="config only mode, export config") + + return parser.parse_args() + + +def main(): + args = define_parser() + + n_clients = args.n_clients + data_root_dir = args.data_root_dir + output_path = args.stats_output_path + job_dir = args.job_dir + work_dir = args.work_dir + export_config = args.export_config + + statistic_configs = {"count": {}, "histogram": {"*": {"bins": 20, "range": [0, 256]}}} + # define local stats generator + stats_generator = ImageStatistics(data_root_dir) + + job = StatsJob( + job_name="stats_image", + statistic_configs=statistic_configs, + stats_generator=stats_generator, + output_path=output_path, + ) + + sites = [f"site-{i + 1}" for i in range(n_clients)] + job.setup_clients(sites) + + if export_config: + job.export_job(job_dir) + else: + job.simulator_run(work_dir, gpu="0") + + +if __name__ == "__main__": + main() diff --git a/examples/advanced/federated-statistics/image_stats/requirements.txt b/examples/advanced/federated-statistics/image_stats/requirements.txt index 1a77cf1524..227880ff29 100644 --- a/examples/advanced/federated-statistics/image_stats/requirements.txt +++ b/examples/advanced/federated-statistics/image_stats/requirements.txt @@ -1,4 +1,3 @@ -nvflare~=2.5.0rc numpy monai[itk] pandas diff --git a/examples/hello-world/step-by-step/cifar10/stats/image_stats_job.py b/examples/hello-world/step-by-step/cifar10/stats/image_stats_job.py index 35601e839e..6c50493833 100644 --- a/examples/hello-world/step-by-step/cifar10/stats/image_stats_job.py +++ b/examples/hello-world/step-by-step/cifar10/stats/image_stats_job.py @@ -52,7 +52,7 @@ def main(): ) sites = [f"site-{i + 1}" for i in range(n_clients)] - job.setup_client(sites) + job.setup_clients(sites) if export_config: job.export_job(job_dir) diff --git a/examples/hello-world/step-by-step/higgs/stats/code/df_stats_job.py b/examples/hello-world/step-by-step/higgs/stats/code/df_stats_job.py index 151728a54c..39dbed5a61 100644 --- a/examples/hello-world/step-by-step/higgs/stats/code/df_stats_job.py +++ b/examples/hello-world/step-by-step/higgs/stats/code/df_stats_job.py @@ -59,7 +59,7 @@ def main(): ) sites = [f"site-{i + 1}" for i in range(n_clients)] - job.setup_client(sites) + job.setup_clients(sites) if export_config: job.export_job(job_dir) diff --git a/nvflare/app_common/abstract/statistics_spec.py b/nvflare/app_common/abstract/statistics_spec.py index 3fedc0ead1..b34dd93f68 100644 --- a/nvflare/app_common/abstract/statistics_spec.py +++ b/nvflare/app_common/abstract/statistics_spec.py @@ -318,6 +318,19 @@ def failure_count(self, dataset_name: str, feature_name: str) -> int: """ return 0 + def percentiles(self, dataset_name: str, feature_name: str, percentiles: List) -> Dict: + """Return failed count for given dataset and feature. + + To perform data privacy min_count check, failure_count is always required. + + Args: + dataset_name: + feature_name: + percentiles: List[Int] ex [25,50, 75] corresponding to p25, p50, p75 + Returns: dict + """ + raise NotImplementedError + def finalize(self, fl_ctx: FLContext): """Called to finalize the Statistic calculator (close/release resources gracefully). diff --git a/nvflare/app_common/app_constant.py b/nvflare/app_common/app_constant.py index 98b8bc21d6..a7c34b8444 100644 --- a/nvflare/app_common/app_constant.py +++ b/nvflare/app_common/app_constant.py @@ -163,6 +163,7 @@ class StatisticsConstants(AppConstants): STATS_VAR = "var" STATS_STDDEV = "stddev" STATS_HISTOGRAM = "histogram" + STATS_PERCENTILE = "percentile" STATS_MAX = "max" STATS_MIN = "min" STATS_FEATURES = "stats_features" @@ -173,6 +174,9 @@ class StatisticsConstants(AppConstants): STATS_BIN_RANGE = "range" STATS_TARGET_STATISTICS = "statistics" + STATS_PERCENTILES_KEY = "percentiles" + STATS_CENTROIDS_KEY = "centroids" + FED_STATS_PRE_RUN = "fed_stats_pre_run" FED_STATS_TASK = "fed_stats" STATISTICS_TASK_KEY = "fed_stats_task_key" @@ -184,7 +188,16 @@ class StatisticsConstants(AppConstants): NAME = "Name" ordered_statistics = { - STATS_1st_STATISTICS: [STATS_COUNT, STATS_FAILURE_COUNT, STATS_SUM, STATS_MEAN, STATS_MIN, STATS_MAX], + # statistics can only require one/two-round of calculations + STATS_1st_STATISTICS: [ + STATS_COUNT, + STATS_FAILURE_COUNT, + STATS_SUM, + STATS_MEAN, + STATS_MIN, + STATS_MAX, + STATS_PERCENTILE, + ], STATS_2nd_STATISTICS: [STATS_HISTOGRAM, STATS_VAR, STATS_STDDEV], } diff --git a/nvflare/app_common/executors/statistics/statistics_task_handler.py b/nvflare/app_common/executors/statistics/statistics_task_handler.py index 1e282faac0..e3b1bb45a7 100644 --- a/nvflare/app_common/executors/statistics/statistics_task_handler.py +++ b/nvflare/app_common/executors/statistics/statistics_task_handler.py @@ -23,7 +23,7 @@ from nvflare.app_common.app_constant import StatisticsConstants as StC from nvflare.app_common.statistics.numeric_stats import filter_numeric_features from nvflare.app_common.statistics.statisitcs_objects_decomposer import fobs_registration -from nvflare.app_common.statistics.statistics_config_utils import get_feature_bin_range +from nvflare.app_common.statistics.statistics_config_utils import get_feature_bin_range, get_target_percents from nvflare.fuel.utils import fobs from nvflare.security.logging import secure_format_exception @@ -96,6 +96,7 @@ def statistic_functions(self) -> dict: StC.STATS_HISTOGRAM: self.get_histogram, StC.STATS_MAX: self.get_max_value, StC.STATS_MIN: self.get_min_value, + StC.STATS_PERCENTILE: self.get_percentiles_and_centroids, } def _populate_result_statistics(self, statistics_result, ds_features, tm: StatisticConfig, shareable, fl_ctx, fn): @@ -318,6 +319,19 @@ def get_bin_range( return bin_range + def get_percentiles_and_centroids( + self, + dataset_name: str, + feature_name: str, + statistic_configs: StatisticConfig, + inputs: Shareable, + fl_ctx: FLContext, + ) -> dict: + percentile_config = statistic_configs.config + target_percents = get_target_percents(percentile_config, feature_name) + result = self.stats_generator.percentiles(dataset_name, feature_name, target_percents) + return result + def _get_global_value_from_input(self, statistic_key: str, dataset_name: str, feature_name: str, inputs): global_value = None if dataset_name in inputs[statistic_key]: diff --git a/nvflare/app_common/statistics/numeric_stats.py b/nvflare/app_common/statistics/numeric_stats.py index d9e7988775..046f0ce0b4 100644 --- a/nvflare/app_common/statistics/numeric_stats.py +++ b/nvflare/app_common/statistics/numeric_stats.py @@ -15,8 +15,11 @@ from math import sqrt from typing import Dict, List, TypeVar +from tdigest import TDigest + from nvflare.app_common.abstract.statistics_spec import Bin, BinRange, DataType, Feature, Histogram, HistogramType from nvflare.app_common.app_constant import StatisticsConstants as StC +from nvflare.app_common.statistics.statistics_config_utils import get_target_percents T = TypeVar("T") @@ -37,7 +40,9 @@ def get_global_feature_data_types( return global_feature_data_types -def get_global_stats(global_metrics: dict, client_metrics: dict, metric_task: str) -> dict: +def get_global_stats( + global_metrics: dict, client_metrics: dict, metric_task: str, statistic_configs: Dict[str, dict], precision: int = 4 +) -> dict: # we need to calculate the metrics in specified order ordered_target_metrics = StC.ordered_statistics[metric_task] ordered_metrics = [metric for metric in ordered_target_metrics if metric in client_metrics] @@ -49,21 +54,27 @@ def get_global_stats(global_metrics: dict, client_metrics: dict, metric_task: st stats = client_metrics[metric] if metric == StC.STATS_COUNT or metric == StC.STATS_FAILURE_COUNT or metric == StC.STATS_SUM: for client_name in stats: - global_metrics[metric] = accumulate_metrics(stats[client_name], global_metrics[metric]) + global_metrics[metric] = accumulate_metrics(stats[client_name], global_metrics[metric], precision) elif metric == StC.STATS_MEAN: - global_metrics[metric] = get_means(global_metrics[StC.STATS_SUM], global_metrics[StC.STATS_COUNT]) + global_metrics[metric] = get_means( + global_metrics[StC.STATS_SUM], global_metrics[StC.STATS_COUNT], precision + ) elif metric == StC.STATS_MAX: for client_name in stats: - global_metrics[metric] = get_min_or_max_values(stats[client_name], global_metrics[metric], max) + global_metrics[metric] = get_min_or_max_values( + stats[client_name], global_metrics[metric], max, precision + ) elif metric == StC.STATS_MIN: for client_name in stats: - global_metrics[metric] = get_min_or_max_values(stats[client_name], global_metrics[metric], min) + global_metrics[metric] = get_min_or_max_values( + stats[client_name], global_metrics[metric], min, precision + ) elif metric == StC.STATS_HISTOGRAM: for client_name in stats: global_metrics[metric] = accumulate_hists(stats[client_name], global_metrics[metric]) elif metric == StC.STATS_VAR: for client_name in stats: - global_metrics[metric] = accumulate_metrics(stats[client_name], global_metrics[metric]) + global_metrics[metric] = accumulate_metrics(stats[client_name], global_metrics[metric], precision) elif metric == StC.STATS_STDDEV: ds_vars = global_metrics[StC.STATS_VAR] ds_stddev = {} @@ -71,14 +82,22 @@ def get_global_stats(global_metrics: dict, client_metrics: dict, metric_task: st ds_stddev[ds_name] = {} feature_vars = ds_vars[ds_name] for feature in feature_vars: - ds_stddev[ds_name][feature] = sqrt(feature_vars[feature]) + ds_stddev[ds_name][feature] = round(sqrt(feature_vars[feature]), precision) global_metrics[StC.STATS_STDDEV] = ds_stddev + elif metric == StC.STATS_PERCENTILE: + global_digest = {} + for client_name in stats: + + global_digest = aggregate_centroids(stats[client_name], global_digest) + + percent_config = statistic_configs.get(StC.STATS_PERCENTILE) + global_metrics[metric] = compute_percentiles(global_digest, percent_config, precision) return global_metrics -def accumulate_metrics(metrics: dict, global_metrics: dict) -> dict: +def accumulate_metrics(metrics: dict, global_metrics: dict, precision: int) -> dict: for ds_name in metrics: if ds_name not in global_metrics: global_metrics[ds_name] = {} @@ -87,14 +106,16 @@ def accumulate_metrics(metrics: dict, global_metrics: dict) -> dict: for feature_name in feature_metrics: if feature_metrics[feature_name] is not None: if feature_name not in global_metrics[ds_name]: - global_metrics[ds_name][feature_name] = feature_metrics[feature_name] + global_metrics[ds_name][feature_name] = round(feature_metrics[feature_name], precision) else: - global_metrics[ds_name][feature_name] += feature_metrics[feature_name] + global_metrics[ds_name][feature_name] = round( + global_metrics[ds_name][feature_name] + feature_metrics[feature_name], precision + ) return global_metrics -def get_min_or_max_values(metrics: dict, global_metrics: dict, fn2) -> dict: +def get_min_or_max_values(metrics: dict, global_metrics: dict, fn2, precision: int = 4) -> dict: """Use 2 argument function to calculate fn2(global, client), for example, min or max. .. note:: @@ -105,6 +126,7 @@ def get_min_or_max_values(metrics: dict, global_metrics: dict, fn2) -> dict: metrics: client's metric global_metrics: global metrics fn2: two-argument function such as min or max + precision: decimal number precision Returns: Dict[dataset, Dict[feature, int]] @@ -116,19 +138,21 @@ def get_min_or_max_values(metrics: dict, global_metrics: dict, fn2) -> dict: feature_metrics = metrics[ds_name] for feature_name in feature_metrics: if feature_name not in global_metrics[ds_name]: - global_metrics[ds_name][feature_name] = feature_metrics[feature_name] + global_metrics[ds_name][feature_name] = round(feature_metrics[feature_name], precision) else: - global_metrics[ds_name][feature_name] = fn2( - global_metrics[ds_name][feature_name], feature_metrics[feature_name] + global_metrics[ds_name][feature_name] = round( + fn2(global_metrics[ds_name][feature_name], feature_metrics[feature_name]), precision ) results = {} for ds_name in global_metrics: for feature_name in global_metrics[ds_name]: if feature_name not in results: - results[feature_name] = global_metrics[ds_name][feature_name] + results[feature_name] = round(global_metrics[ds_name][feature_name], precision) else: - results[feature_name] = fn2(results[feature_name], global_metrics[ds_name][feature_name]) + results[feature_name] = round( + fn2(results[feature_name], global_metrics[ds_name][feature_name]), precision + ) for ds_name in global_metrics: for feature_name in global_metrics[ds_name]: @@ -146,7 +170,7 @@ def bins_to_dict(bins: List[Bin]) -> Dict[BinRange, float]: def accumulate_hists( - metrics: Dict[str, Dict[str, Histogram]], global_hists: Dict[str, Dict[str, Histogram]] + metrics: Dict[str, Dict[str, Histogram]], global_hists: Dict[str, Dict[str, Histogram]], precision: int = 4 ) -> Dict[str, Dict[str, Histogram]]: for ds_name in metrics: feature_hists = metrics[ds_name] @@ -158,14 +182,18 @@ def accumulate_hists( if feature not in global_hists[ds_name]: g_bins = [] for bucket in hist.bins: - g_bins.append(Bin(bucket.low_value, bucket.high_value, bucket.sample_count)) + g_bins.append( + Bin( + round(bucket.low_value, precision), round(bucket.high_value, precision), bucket.sample_count + ) + ) g_hist = Histogram(HistogramType.STANDARD, g_bins) global_hists[ds_name][feature] = g_hist else: g_hist = global_hists[ds_name][feature] g_buckets = bins_to_dict(g_hist.bins) for bucket in hist.bins: - bin_range = BinRange(bucket.low_value, bucket.high_value) + bin_range = BinRange(round(bucket.low_value, precision), round(bucket.high_value, precision)) if bin_range in g_buckets: g_buckets[bin_range] += bucket.sample_count else: @@ -174,22 +202,24 @@ def accumulate_hists( # update ordered bins updated_bins = [] for gb in g_hist.bins: - bin_range = BinRange(gb.low_value, gb.high_value) - updated_bins.append(Bin(gb.low_value, gb.high_value, g_buckets[bin_range])) + bin_range = BinRange(round(gb.low_value, precision), round(gb.high_value, precision)) + updated_bins.append( + Bin(round(gb.low_value, precision), round(gb.high_value, precision), g_buckets[bin_range]) + ) global_hists[ds_name][feature] = Histogram(g_hist.hist_type, updated_bins) return global_hists -def get_means(sums: dict, counts: dict) -> dict: +def get_means(sums: dict, counts: dict, precision: int = 4) -> dict: means = {} for ds_name in sums: means[ds_name] = {} feature_sums = sums[ds_name] feature_counts = counts[ds_name] for feature in feature_sums: - means[ds_name][feature] = feature_sums[feature] / feature_counts[feature] + means[ds_name][feature] = round(feature_sums[feature] / feature_counts[feature], precision) return means @@ -201,3 +231,42 @@ def filter_numeric_features(ds_features: Dict[str, List[Feature]]) -> Dict[str, numeric_ds_features[ds_name] = n_features return numeric_ds_features + + +def aggregate_centroids(metrics: Dict[str, Dict[str, Dict]], g_digest: dict) -> dict: + for ds_name in metrics: + if ds_name not in g_digest: + g_digest[ds_name] = {} + + feature_metrics = metrics[ds_name] + for feature_name in feature_metrics: + if feature_metrics[feature_name] is not None: + centroids: List = feature_metrics[feature_name].get(StC.STATS_CENTROIDS_KEY) + if feature_name not in g_digest[ds_name]: + g_digest[ds_name][feature_name] = TDigest() + + for centroid in centroids: + mean = centroid.get("m") + count = centroid.get("c") + g_digest[ds_name][feature_name].update(mean, count) + + return g_digest + + +def compute_percentiles(g_digest: Dict[str, Dict[str, TDigest]], quantile_config: Dict, precision: int = 4) -> dict: + g_ds_metrics = {} + for ds_name in g_digest: + if ds_name not in g_ds_metrics: + g_ds_metrics[ds_name] = {} + + feature_metrics = g_digest[ds_name] + for feature_name in feature_metrics: + digest = feature_metrics[feature_name] + percentiles = get_target_percents(quantile_config, feature_name) + percentile_values = {} + for percentile in percentiles: + percentile_values[percentile] = round(digest.percentile(percentile), precision) + + g_ds_metrics[ds_name][feature_name] = percentile_values + + return g_ds_metrics diff --git a/nvflare/app_common/statistics/statistics_config_utils.py b/nvflare/app_common/statistics/statistics_config_utils.py index 2c2efb76c0..b128df2394 100644 --- a/nvflare/app_common/statistics/statistics_config_utils.py +++ b/nvflare/app_common/statistics/statistics_config_utils.py @@ -28,3 +28,14 @@ def get_feature_bin_range(feature_name: str, hist_config: dict) -> Optional[List bin_range = default_config[StC.STATS_BIN_RANGE] return bin_range + + +def get_target_percents(percentile_config: dict, feature_name: str): + if feature_name in percentile_config: + percents = percentile_config.get(feature_name) + elif "*" in percentile_config: + percents = percentile_config.get("*") + else: + raise ValueError(f"feature: {feature_name} target percents are not defined.") + + return percents diff --git a/nvflare/app_common/workflows/statistics_controller.py b/nvflare/app_common/workflows/statistics_controller.py index 53fdfbf552..a835c95900 100644 --- a/nvflare/app_common/workflows/statistics_controller.py +++ b/nvflare/app_common/workflows/statistics_controller.py @@ -69,6 +69,10 @@ def __init__( "histogram": { "*": {"bins": 20}, "Age": {"bins": 10, "range": [0, 120]} + }, + percentile: { + "*": [25, 50, 75, 90], + "Age": [50, 75, 95] } }, @@ -207,6 +211,7 @@ def _get_all_statistic_configs(self) -> List[StatisticConfig]: StC.STATS_MEAN: StatisticConfig(StC.STATS_MEAN, {}), StC.STATS_VAR: StatisticConfig(StC.STATS_VAR, {}), StC.STATS_STDDEV: StatisticConfig(StC.STATS_STDDEV, {}), + StC.STATS_PERCENTILE: StatisticConfig(StC.STATS_PERCENTILE, {}), } if StC.STATS_HISTOGRAM in self.statistic_configs: @@ -264,7 +269,9 @@ def statistics_task_flow(self, abort_signal: Signal, fl_ctx: FLContext, statisti abort_signal=abort_signal, ) - self.global_statistics = get_global_stats(self.global_statistics, self.client_statistics, statistic_task) + self.global_statistics = get_global_stats( + self.global_statistics, self.client_statistics, statistic_task, self.statistic_configs, self.precision + ) self.log_info(fl_ctx, f"task {self.task_name} statistics_flow for {statistic_task} flow end.") @@ -402,12 +409,19 @@ def _combine_all_statistics(self): hist: Histogram = self.client_statistics[statistic][client][ds][feature_name] buckets = StatisticsController._apply_histogram_precision(hist.bins, self.precision) result[feature_name][statistic][client][ds] = buckets + elif statistic == StC.STATS_PERCENTILE: + percentiles = self.client_statistics[statistic][client][ds][feature_name][ + StC.STATS_PERCENTILES_KEY + ] + formatted_percentiles = {} + for p in percentiles: + formatted_percentiles[p] = round(percentiles.get(p), self.precision) + result[feature_name][statistic][client][ds] = formatted_percentiles else: result[feature_name][statistic][client][ds] = round( self.client_statistics[statistic][client][ds][feature_name], self.precision ) - precision = self.precision for statistic in filtered_global_statistics: for ds in self.global_statistics[statistic]: for feature_name in self.global_statistics[statistic][ds]: @@ -419,11 +433,13 @@ def _combine_all_statistics(self): if statistic == StC.STATS_HISTOGRAM: hist: Histogram = self.global_statistics[statistic][ds][feature_name] - buckets = StatisticsController._apply_histogram_precision(hist.bins, self.precision) - result[feature_name][statistic][StC.GLOBAL][ds] = buckets + result[feature_name][statistic][StC.GLOBAL][ds] = hist.bins + elif statistic == StC.STATS_PERCENTILE: + percentiles = self.global_statistics[statistic][ds][feature_name] + result[feature_name][statistic][StC.GLOBAL][ds] = percentiles else: result[feature_name][statistic][StC.GLOBAL].update( - {ds: round(self.global_statistics[statistic][ds][feature_name], precision)} + {ds: self.global_statistics[statistic][ds][feature_name]} ) return result @@ -444,9 +460,10 @@ def _apply_histogram_precision(bins: List[Bin], precision) -> List[Bin]: @staticmethod def _get_target_statistics(statistic_configs: dict, ordered_statistics: list) -> List[StatisticConfig]: # make sure the execution order of the statistics calculation + targets = [] if statistic_configs: - for statistic in statistic_configs: + for metric in statistic_configs: # if target statistic has histogram, we are not in 2nd statistic task # we only need to estimate the global min/max if we have histogram statistic, # If the user provided the global min/max for a specified feature, then we do nothing @@ -457,16 +474,16 @@ def _get_target_statistics(statistic_configs: dict, ordered_statistics: list) -> # in all cases, we will still send the STATS_MIN/MAX tasks, but client executor may or may not # delegate to stats generator to calculate the local min/max depends on if the global bin ranges # are specified. to do this, we send over the histogram configuration when calculate the local min/max - if statistic == StC.STATS_HISTOGRAM and statistic not in ordered_statistics: + if metric == StC.STATS_HISTOGRAM and metric not in ordered_statistics: targets.append(StatisticConfig(StC.STATS_MIN, statistic_configs[StC.STATS_HISTOGRAM])) targets.append(StatisticConfig(StC.STATS_MAX, statistic_configs[StC.STATS_HISTOGRAM])) - if statistic == StC.STATS_STDDEV and statistic in ordered_statistics: + if metric == StC.STATS_STDDEV and metric in ordered_statistics: targets.append(StatisticConfig(StC.STATS_VAR, {})) for rm in ordered_statistics: - if rm == statistic: - targets.append(StatisticConfig(statistic, statistic_configs[statistic])) + if rm == metric: + targets.append(StatisticConfig(metric, statistic_configs[metric])) return targets def _prepare_inputs(self, statistic_task: str) -> Shareable: diff --git a/nvflare/app_opt/statistics/df/__init__.py b/nvflare/app_opt/statistics/df/__init__.py new file mode 100644 index 0000000000..d9155f923f --- /dev/null +++ b/nvflare/app_opt/statistics/df/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/nvflare/app_opt/statistics/df/df_core_statistics.py b/nvflare/app_opt/statistics/df/df_core_statistics.py new file mode 100644 index 0000000000..5f509f0fe8 --- /dev/null +++ b/nvflare/app_opt/statistics/df/df_core_statistics.py @@ -0,0 +1,116 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from abc import ABC +from typing import Dict, List, Optional + +import numpy as np +import pandas as pd +from pandas.core.series import Series +from tdigest import TDigest + +from nvflare.app_common.abstract.statistics_spec import BinRange, Feature, Histogram, HistogramType, Statistics +from nvflare.app_common.app_constant import StatisticsConstants +from nvflare.app_common.statistics.numpy_utils import dtype_to_data_type, get_std_histogram_buckets + + +class DFStatisticsCore(Statistics, ABC): + + def __init__(self): + # assumption: the data can be loaded and cached in the memory + self.data: Optional[Dict[str, pd.DataFrame]] = None + super(DFStatisticsCore, self).__init__() + + def features(self) -> Dict[str, List[Feature]]: + results: Dict[str, List[Feature]] = {} + for ds_name in self.data: + df = self.data[ds_name] + results[ds_name] = [] + for feature_name in df: + data_type = dtype_to_data_type(df[feature_name].dtype) + results[ds_name].append(Feature(feature_name, data_type)) + + return results + + def count(self, dataset_name: str, feature_name: str) -> int: + df: pd.DataFrame = self.data[dataset_name] + return df[feature_name].count() + + def sum(self, dataset_name: str, feature_name: str) -> float: + df: pd.DataFrame = self.data[dataset_name] + return df[feature_name].sum().item() + + def mean(self, dataset_name: str, feature_name: str) -> float: + + count: int = self.count(dataset_name, feature_name) + sum_value: float = self.sum(dataset_name, feature_name) + return sum_value / count + + def stddev(self, dataset_name: str, feature_name: str) -> float: + df = self.data[dataset_name] + return df[feature_name].std().item() + + def variance_with_mean( + self, dataset_name: str, feature_name: str, global_mean: float, global_count: float + ) -> float: + df = self.data[dataset_name] + tmp = (df[feature_name] - global_mean) * (df[feature_name] - global_mean) + variance = tmp.sum() / (global_count - 1) + return variance.item() + + def histogram( + self, dataset_name: str, feature_name: str, num_of_bins: int, global_min_value: float, global_max_value: float + ) -> Histogram: + + num_of_bins: int = num_of_bins + + df = self.data[dataset_name] + feature: Series = df[feature_name] + flattened = feature.ravel() + flattened = flattened[flattened != np.array(None)] + buckets = get_std_histogram_buckets(flattened, num_of_bins, BinRange(global_min_value, global_max_value)) + return Histogram(HistogramType.STANDARD, buckets) + + def max_value(self, dataset_name: str, feature_name: str) -> float: + """this is needed for histogram calculation, not used for reporting""" + + df = self.data[dataset_name] + return df[feature_name].max() + + def min_value(self, dataset_name: str, feature_name: str) -> float: + """this is needed for histogram calculation, not used for reporting""" + + df = self.data[dataset_name] + return df[feature_name].min() + + def percentiles(self, dataset_name: str, feature_name: str, percents: List) -> Dict: + digest = self._prepare_t_digest(dataset_name, feature_name) + results = {} + p_results = {} + for p in percents: + v = round(digest.percentile(p), 4) + p_results[p] = v + results[StatisticsConstants.STATS_PERCENTILES_KEY] = p_results + + # Extract centroids (mean, count) from the digest to used for merge for the global + x = digest.centroids_to_list() + results[StatisticsConstants.STATS_CENTROIDS_KEY] = x + return results + + def _prepare_t_digest(self, dataset_name: str, feature_name: str) -> TDigest: + df = self.data[dataset_name] + data = df[feature_name] + digest = TDigest() + for value in data: + digest.update(value) + return digest diff --git a/nvflare/job_config/stats_job.py b/nvflare/job_config/stats_job.py index 63cdf19070..1a1320c123 100644 --- a/nvflare/job_config/stats_job.py +++ b/nvflare/job_config/stats_job.py @@ -14,6 +14,7 @@ from typing import List from nvflare import FedJob, FilterType +from nvflare.apis.job_def import SERVER_SITE_NAME from nvflare.app_common.abstract.statistics_spec import Statistics from nvflare.app_common.executors.statistics.statistics_executor import StatisticsExecutor from nvflare.app_common.filters.statistics_privacy_filter import StatisticsPrivacyFilter @@ -50,15 +51,15 @@ def __init__( self.setup_server() - def setup_server(self): + def setup_server(self, server_name: str = SERVER_SITE_NAME): # define stats controller ctr = self.get_stats_controller() self.to(ctr, "server") # define stat writer to output Json file stats_writer = self.get_stats_output_writer() - self.to(stats_writer, "server", id=self.writer_id) + self.to(stats_writer, server_name, id=self.writer_id) - def setup_client(self, sites: List[str]): + def setup_clients(self, sites: List[str]): # Client side job config # Add client site for site_id in sites: diff --git a/runtest.sh b/runtest.sh index bc4d76a343..78dee66bdc 100755 --- a/runtest.sh +++ b/runtest.sh @@ -92,7 +92,7 @@ function check_license() { folders_to_check_license="nvflare examples tests integration research" echo "checking license header in folder: $folders_to_check_license" (grep -r --include "*.py" --exclude-dir "*protos*" --exclude "modeling_roberta.py" -L \ - "\(# Copyright (c) \(2021\|2022\|2023\|2024\), NVIDIA CORPORATION. All rights reserved.\)\|\(This file is released into the public domain.\)" \ + "\(# Copyright (c) \(2021\|2022\|2023\|2024\|2025\), NVIDIA CORPORATION. All rights reserved.\)\|\(This file is released into the public domain.\)" \ ${folders_to_check_license} || true) > no_license.lst if [ -s no_license.lst ]; then # The file is not-empty. diff --git a/setup.cfg b/setup.cfg index f855afc365..664e334262 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,7 @@ install_requires = docker>=6.0 websockets>=10.4 pyhocon + tdigest [options.extras_require] HE = diff --git a/tests/unit_test/app_common/statistics/numeric_stats_test.py b/tests/unit_test/app_common/statistics/numeric_stats_test.py index ff75786946..6e654ae828 100644 --- a/tests/unit_test/app_common/statistics/numeric_stats_test.py +++ b/tests/unit_test/app_common/statistics/numeric_stats_test.py @@ -53,7 +53,9 @@ def test_accumulate_metrics(self, client_stats, expected_global_stats): global_stats = {} for client_name in client_stats: - global_stats = accumulate_metrics(metrics=client_stats[client_name], global_metrics=global_stats) + global_stats = accumulate_metrics( + metrics=client_stats[client_name], global_metrics=global_stats, precision=4 + ) assert global_stats.keys() == expected_global_stats.keys() assert global_stats == expected_global_stats diff --git a/tests/unit_test/app_opt/statistics/__init__.py b/tests/unit_test/app_opt/statistics/__init__.py new file mode 100644 index 0000000000..d9155f923f --- /dev/null +++ b/tests/unit_test/app_opt/statistics/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unit_test/app_opt/statistics/percentiles_test.py b/tests/unit_test/app_opt/statistics/percentiles_test.py new file mode 100644 index 0000000000..3f1eee5e22 --- /dev/null +++ b/tests/unit_test/app_opt/statistics/percentiles_test.py @@ -0,0 +1,90 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +import numpy as np +import pandas as pd + +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.app_constant import StatisticsConstants +from nvflare.app_common.statistics.numeric_stats import aggregate_centroids, compute_percentiles +from nvflare.app_opt.statistics.df.df_core_statistics import DFStatisticsCore + + +class MockDFStats(DFStatisticsCore): + def __init__(self, given_median: int): + super().__init__() + self.median = given_median + self.data = {"train": None} + + def initialize(self, fl_ctx: FLContext): + self.load_data() + + def load_data(self): + data = np.concatenate( + (np.arange(0, self.median), [self.median], np.arange(self.median + 1, self.median * 2 + 1)) + ) + + # Shuffle the data to make it unordered + np.random.shuffle(data) + + # Create the DataFrame + df = pd.DataFrame(data, columns=["Feature"]) + self.data = {"train": df} + + +class MockDFStats2(DFStatisticsCore): + def __init__(self, data_array: List[int]): + super().__init__() + self.raw_data = data_array + self.data = {"train": None} + + def initialize(self, fl_ctx: FLContext): + self.load_data() + + def load_data(self): + # Create the DataFrame + df = pd.DataFrame(self.raw_data, columns=["Feature"]) + self.data = {"train": df} + + +class TestPercentiles: + + def test_percentile_metrics(self): + stats_generator = MockDFStats(given_median=100) + stats_generator.load_data() + percentiles = stats_generator.percentiles("train", "Feature", percents=[50]) + result = percentiles.get(StatisticsConstants.STATS_PERCENTILES_KEY) + print(f"{percentiles=}") + assert result is not None + assert result.get(50) == stats_generator.median + + def test_percentile_metrics_aggregation(self): + stats_generators = [ + MockDFStats2(data_array=[0, 1, 2, 3, 4, 5]), + MockDFStats(given_median=10), + MockDFStats2(data_array=[100, 110, 120, 130, 140, 150]), + ] + global_digest = {} + result = {} + for g in stats_generators: # each site/client + g.load_data() + local_percentiles = g.percentiles("train", "Feature", percents=[50]) + local_metrics = {"train": {"Feature": local_percentiles}} + aggregate_centroids(local_metrics, global_digest) + result = compute_percentiles(global_digest, {"Feature": [50]}, 2) + + expected_median = 10 + assert result["train"]["Feature"].get(50) == expected_median