Skip to content

Commit

Permalink
dynamic logging admin command
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster committed Dec 20, 2024
1 parent 97076bc commit c9ab964
Show file tree
Hide file tree
Showing 21 changed files with 368 additions and 67 deletions.
2 changes: 2 additions & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class AdminCommandNames(object):
SHELL_TAIL = "tail"
SHELL_GREP = "grep"
APP_COMMAND = "app_command"
CONFIGURE_JOB_LOG = "configure_job_log"


class ServerCommandNames(object):
Expand All @@ -263,6 +264,7 @@ class ServerCommandNames(object):
HANDLE_DEAD_JOB = "handle_dead_job"
SERVER_STATE = "server_state"
APP_COMMAND = "app_command"
CONFIGURE_JOB_LOG = "configure_job_log"


class ServerCommandKey(object):
Expand Down
109 changes: 64 additions & 45 deletions nvflare/fuel/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from logging import Logger
from logging.handlers import RotatingFileHandler

from nvflare.apis.fl_constant import WorkspaceConstants
from nvflare.apis.workspace import Workspace


Expand Down Expand Up @@ -71,8 +72,7 @@ def __init__(
datefmt=None,
style="%",
level_colors=DEFAULT_LEVEL_COLORS,
logger_names=None,
logger_color=ANSIColor.CYAN,
logger_colors={},
):
"""ColorFormatter to format colors based on log levels. Optionally can color logger_names.
Expand All @@ -88,28 +88,24 @@ def __init__(
"ERROR": ANSIColor.RED,
"CRITICAL": ANSIColor.BOLD_RED,
}
logger_names (List[str]): list of logger names to apply logger_color.
logger_color (int): ANSI custom color for logger_names.
logger_colors (Dict[str, str]): dict of logger_name: ANSI colors. Defaults to {}.
"""
super().__init__(fmt=fmt, datefmt=datefmt, style=style)
self.logger_names = logger_names or []
self.logger_color = logger_color
self.level_colors = level_colors
self.logger_colors = logger_colors

def format(self, record):
super().format(record)

if record.levelno <= logging.INFO and any(
record.name.startswith(logger_name) for logger_name in self.logger_names
):
# Apply logger_color to logger_names
log_fmt = ansi_sgr(self.logger_color) + self.fmt + ansi_sgr(ANSIColor.RESET)
else:
# Apply level_colors based on record levelname
log_fmt = (
ansi_sgr(self.level_colors.get(record.levelname, ANSIColor.GREY)) + self.fmt + ansi_sgr(ANSIColor.RESET)
)
# Apply level_colors based on record levelname
log_color = ansi_sgr(self.level_colors.get(record.levelname, ANSIColor.GREY))

# Apply logger_color to logger_names if INFO or below
if record.levelno <= logging.INFO:
log_color = ansi_sgr(self.logger_colors.get(record.name), log_color)

log_fmt = log_color + self.fmt + ansi_sgr(ANSIColor.RESET)

formatter = logging.Formatter(log_fmt)
return formatter.format(record)
Expand Down Expand Up @@ -222,42 +218,65 @@ def get_script_logger():
)


def update_filenames(obj, dir_path: str = "", file_prefix: str = ""):
"""Update 'filename' keys in JSON objects with dir_path and file_prefix."""
if "filename" in obj and isinstance(obj["filename"], str):
filename = obj["filename"]
if file_prefix:
filename = os.path.join(os.path.dirname(filename), file_prefix + "_" + os.path.basename(filename))
obj["filename"] = os.path.join(dir_path, filename)
return obj
def configure_logging(workspace: Workspace, dir_path: str = "", file_prefix: str = ""):
log_config_file_path = workspace.get_log_config_file_path()
assert os.path.isfile(log_config_file_path), f"missing log config file {log_config_file_path}"

with open(log_config_file_path, "r") as f:
dict_config = json.load(f)

def read_log_config(file, dir_path: str = "", file_prefix: str = "") -> dict:
"""
Reads JSON logging configuration file and returns config dictionary.
Updates 'filename' keys with dir_path for dynamic locations.
apply_log_config(dict_config, dir_path, file_prefix)

Args:
file (str): Path to the configuration file.
dir_path (str): Update filename keys with dir_path.

Returns:
config (dict)
"""
try:
with open(file, "r") as f:
config = json.load(f, object_hook=lambda obj: update_filenames(obj, dir_path, file_prefix))
return config
except Exception as e:
raise ValueError(f"Unrecognized logging configuration format. Failed to parse JSON: {e}.")
def apply_log_config(dict_config, dir_path: str = "", file_prefix: str = ""):
stack = [dict_config]
while stack:
current_dict = stack.pop()
for key, value in current_dict.items():
if isinstance(value, dict):
stack.append(value)
elif key == "filename":
if file_prefix:
value = os.path.join(os.path.dirname(value), file_prefix + "_" + os.path.basename(value))
current_dict[key] = os.path.join(dir_path, value)

logging.config.dictConfig(dict_config)

def configure_logging(workspace: Workspace, dir_path: str = "", file_prefix: str = ""):
log_config_file_path = workspace.get_log_config_file_path()
assert os.path.isfile(log_config_file_path), f"missing log config file {log_config_file_path}"

dict_config = read_log_config(log_config_file_path, dir_path, file_prefix)
logging.config.dictConfig(dict_config)
def handle_log_config_command(config: str, workspace: Workspace, job_id: str = None):
if config is None:
config = workspace.get_log_config_file_path()

if os.path.isfile(config):

with open(config, "r") as f:
dict_config = json.load(f)

if job_id:
dir_path = workspace.get_run_dir(job_id)
else:
dir_path = workspace.get_root_dir()

with open(os.path.join(workspace.get_site_config_dir(), WorkspaceConstants.LOGGING_CONFIG), "w") as f:
f.write(json.dumps(dict_config))

apply_log_config(dict_config, dir_path)

elif isinstance(config, str):
if config.isdigit():
level = int(config)
if not (0 <= level <= 50):
raise ValueError(f"Invalid logging level: {level}")
else:
level = getattr(logging, config.upper(), None)
if level is None:
raise ValueError(f"Invalid logging level: {config}")

logging.getLogger().setLevel(level)
else:
raise ValueError(
f"Unsupported config type. Expect config to be filepath or string level but got {type(config)}"
)


def add_log_file_handler(log_file_name):
Expand Down
6 changes: 5 additions & 1 deletion nvflare/job_config/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ def export_job(self, job_root: str):
self._set_all_apps()
self.job.generate_job_config(job_root)

def simulator_run(self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None):
def simulator_run(
self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None, log_config: str = None
):
"""Run the job with the simulator with the `workspace` using `clients` and `threads`.
For end users.
Expand All @@ -531,6 +533,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No
n_clients: number of clients.
threads: number of threads.
gpu: gpu assignments for simulating clients, comma separated
log_config: log config json file
Returns:
Expand All @@ -556,6 +559,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No
n_clients=n_clients,
threads=threads,
gpu=gpu,
log_config=log_config,
)

def as_id(self, obj: Any) -> str:
Expand Down
4 changes: 3 additions & 1 deletion nvflare/job_config/fed_job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def generate_job_config(self, job_root):

self._generate_meta(job_dir)

def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None):
def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None, log_config=None):
with TemporaryDirectory() as job_root:
self.generate_job_config(job_root)

Expand All @@ -157,6 +157,8 @@ def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, g
if gpu:
gpu = self._trim_whitespace(gpu)
command += " -gpu " + str(gpu)
if log_config:
command += " -l" + str(log_config)

new_env = os.environ.copy()
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)
Expand Down
5 changes: 4 additions & 1 deletion nvflare/lighter/impl/master_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ default_authz: |
"permissions": {
"project_admin": "any",
"org_admin": {
"configure_site_log": "o:site",
"configure_job_log": "n:submitter",
"submit_job": "none",
"clone_job": "none",
"manage_job": "o:submitter",
Expand All @@ -281,7 +283,8 @@ default_authz: |
"byoc": "none"
},
"lead": {
"submit_job": "any",
"configure_job_log": "n:submitter",
"submit_job": "o:site",
"clone_job": "n:submitter",
"manage_job": "n:submitter",
"download_job": "n:submitter",
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TrainingTopic(object):
START_JOB = "train.start_job"
GET_SCOPES = "train.get_scopes"
NOTIFY_JOB_STATUS = "train.notify_job_status"
CONFIGURE_JOB_LOG = "train.configure_job_log"


class RequestHeader(object):
Expand Down Expand Up @@ -96,6 +97,7 @@ class SysCommandTopic(object):
SHELL = "sys.shell"
REPORT_RESOURCES = "resource_manager.report_resources"
REPORT_ENV = "sys.report_env"
CONFIGURE_SITE_LOG = "sys.configure_site_log"


class ControlCommandTopic(object):
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/fed/app/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def define_simulator_parser(simulator_parser):
simulator_parser.add_argument("-c", "--clients", type=str, help="client names list")
simulator_parser.add_argument("-t", "--threads", type=int, help="number of parallel running clients")
simulator_parser.add_argument("-gpu", "--gpu", type=str, help="list of GPU Device Ids, comma separated")
simulator_parser.add_argument("-l", "--log_config", type=str, help="log config file")
simulator_parser.add_argument("-m", "--max_clients", type=int, default=100, help="max number of clients")
simulator_parser.add_argument(
"--end_run_for_all",
Expand All @@ -46,6 +47,7 @@ def run_simulator(simulator_args):
n_clients=simulator_args.n_clients,
threads=simulator_args.threads,
gpu=simulator_args.gpu,
log_config=simulator_args.log_config,
max_clients=simulator_args.max_clients,
end_run_for_all=simulator_args.end_run_for_all,
)
Expand Down
51 changes: 38 additions & 13 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
import copy
import json
import logging.config
import os
import shlex
import shutil
Expand Down Expand Up @@ -53,7 +52,7 @@
from nvflare.fuel.utils.argument_utils import parse_vars
from nvflare.fuel.utils.config_service import ConfigService
from nvflare.fuel.utils.gpu_utils import get_host_gpu_ids
from nvflare.fuel.utils.log_utils import read_log_config
from nvflare.fuel.utils.log_utils import apply_log_config
from nvflare.fuel.utils.network_utils import get_open_ports
from nvflare.fuel.utils.zip_utils import split_path, unzip_all_from_bytes, zip_directory_to_bytes
from nvflare.private.defs import AppFolderConstants
Expand Down Expand Up @@ -89,6 +88,7 @@ def __init__(
n_clients=None,
threads=None,
gpu=None,
log_config=None,
max_clients=100,
end_run_for_all=False,
):
Expand All @@ -100,6 +100,7 @@ def __init__(
self.n_clients = n_clients
self.threads = threads
self.gpu = gpu
self.log_config = log_config
self.max_clients = max_clients
self.end_run_for_all = end_run_for_all

Expand Down Expand Up @@ -127,7 +128,15 @@ def __init__(
self.workspace = os.path.join(running_dir, self.workspace)

def _generate_args(
self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100
self,
job_folder: str,
workspace: str,
clients=None,
n_clients=None,
threads=None,
gpu=None,
log_config=None,
max_clients=100,
):
args = Namespace(
job_folder=job_folder,
Expand All @@ -136,14 +145,22 @@ def _generate_args(
n_clients=n_clients,
threads=threads,
gpu=gpu,
log_config=log_config,
max_clients=max_clients,
)
args.set = []
return args

def setup(self):
self.args = self._generate_args(
self.job_folder, self.workspace, self.clients, self.n_clients, self.threads, self.gpu, self.max_clients
self.job_folder,
self.workspace,
self.clients,
self.n_clients,
self.threads,
self.gpu,
self.log_config,
self.max_clients,
)

if self.args.clients:
Expand All @@ -153,12 +170,17 @@ def setup(self):
for i in range(self.args.n_clients):
self.client_names.append("site-" + str(i + 1))

log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)
dict_config = read_log_config(log_config_file_path, os.path.join(self.args.workspace, SiteType.SERVER))
if self.args.log_config:
log_config_file_path = self.args.log_config
else:
log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)

with open(log_config_file_path, "r") as f:
dict_config = json.load(f)

self.args.log_config = None
# self.args.log_config = None
self.args.config_folder = "config"
self.args.job_id = SimulatorConstants.JOB_NAME
self.args.client_config = os.path.join(self.args.config_folder, JobConstants.CLIENT_JOB_CONFIG)
Expand All @@ -180,7 +202,7 @@ def setup(self):

os.makedirs(os.path.join(self.simulator_root, SiteType.SERVER))

logging.config.dictConfig(dict_config)
apply_log_config(dict_config, os.path.join(self.simulator_root, SiteType.SERVER))

try:
data_bytes, job_name, meta = self.validate_job_data()
Expand Down Expand Up @@ -668,9 +690,12 @@ def _pick_next_client(self):
def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC):
open_port = get_open_ports(1)[0]
client_workspace = os.path.join(self.args.workspace, client.client_name)
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
if self.args.log_config:
logging_config = self.args.log_config
else:
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
decomposer_module = ConfigService.get_str_var(
name=ConfigVarName.DECOMPOSER_MODULE, conf=SystemConfigs.RESOURCES_CONF
)
Expand Down
Loading

0 comments on commit c9ab964

Please sign in to comment.