diff --git a/smdebug/_version.py b/smdebug/_version.py index 65d99eceb..94650f2d5 100644 --- a/smdebug/_version.py +++ b/smdebug/_version.py @@ -1 +1 @@ -__version__ = "1.0.27" +__version__ = "1.0.28" diff --git a/smdebug/xgboost/hook.py b/smdebug/xgboost/hook.py index c5c016bd4..f5272f567 100644 --- a/smdebug/xgboost/hook.py +++ b/smdebug/xgboost/hook.py @@ -5,9 +5,9 @@ # Third Party import numpy as np +import pkg_resources import xgboost as xgb from xgboost import DMatrix -from xgboost.core import CallbackEnv # First Party from smdebug.core.collection import DEFAULT_XGBOOST_COLLECTIONS, CollectionKeys @@ -15,14 +15,23 @@ from smdebug.core.json_config import create_hook_from_json_config from smdebug.core.save_config import SaveConfig from smdebug.core.utils import FRAMEWORK, error_handling_agent, make_numpy_array +from smdebug.exceptions import SMDebugError from smdebug.profiler.profiler_config_parser import get_profiler_config_parser from smdebug.xgboost.singleton_utils import set_hook -from smdebug.exceptions import SMDebugError # Local from .collection import CollectionManager from .constants import INCREASING_METRICS_REGEX -from .utils import get_content_type, get_dmatrix, parse_tree_model, validate_data_file_path +from .utils import (get_content_type, + get_dmatrix, + parse_tree_model, + validate_data_file_path, + is_xgb_1_3_and_later, + is_xgb_1_7_and_later) + +# Version difference +if not is_xgb_1_3_and_later(): + from xgboost.core import CallbackEnv DEFAULT_INCLUDE_COLLECTIONS = [CollectionKeys.METRICS] DEFAULT_SAVE_CONFIG_INTERVAL = 10 @@ -33,9 +42,8 @@ profiler_config_parser = get_profiler_config_parser(FRAMEWORK.XGBOOST) - -class Hook(CallbackHook): - """Hook that represents a callback function in XGBoost.""" +class XGBoostBaseHook(CallbackHook): + """base Hook that represents a callback function in XGBoost.""" @error_handling_agent.catch_smdebug_errors() def __init__( @@ -117,14 +125,14 @@ def __init__( self._full_shap_values = None set_hook(self) - @error_handling_agent.catch_smdebug_errors() - def __call__(self, env: CallbackEnv) -> None: - self._callback(env) - def _get_num_workers(self): + if is_xgb_1_7_and_later(): + return xgb.collective.get_world_size() return xgb.rabit.get_world_size() def _get_worker_name(self): + if is_xgb_1_7_and_later(): + return "worker_{}".format(xgb.collective.get_rank()) return "worker_{}".format(xgb.rabit.get_rank()) def has_default_hook_configuration(self): @@ -165,70 +173,11 @@ def _get_default_collections(self): def _prepare_collections(self): super()._prepare_collections() - def _is_last_step(self, env: CallbackEnv) -> bool: - # env.iteration: current boosting round. - # env.end_iteration: round # when training will end. this is always num_round + 1. # noqa: E501 - return env.iteration + 1 == env.end_iteration - def _increment_step(self, iteration): self.step = self.mode_steps[self.mode] = iteration self._collections_to_save_for_step = None - def _callback(self, env: CallbackEnv) -> None: - if not self.prepared_collections: - # at this point we need all collections to be ready - # this may not be the case at creation of hook - # as user's code after hook might add collections - self._prepare_collections() - self.prepared_collections = True - - self._increment_step(env.iteration) - - if self.last_saved_step is not None and not self.exported_collections: - self.export_collections() - self.exported_collections = True - - if not self._get_collections_to_save_for_step(): - self.logger.debug("Skipping iteration {}".format(self.step)) - return - - self._initialize_writers() - - if self._is_collection_being_saved_for_step(CollectionKeys.HYPERPARAMETERS): - self.write_hyperparameters(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.METRICS): - self.write_metrics(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.LOSSES): - self.write_losses(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.PREDICTIONS): - self.write_predictions(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.LABELS): - self.write_labels(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.FEATURE_IMPORTANCE): - self.write_feature_importances(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.TREES): - self.write_tree_model(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.FULL_SHAP): - self._maybe_compute_shap_values(env) - self.write_full_shap(env) - - if self._is_collection_being_saved_for_step(CollectionKeys.AVERAGE_SHAP): - self._maybe_compute_shap_values(env) - self.write_average_shap(env) - - self._clear_shap_values() - self.last_saved_step = self.step - - self._close_writers() - - def write_hyperparameters(self, env: CallbackEnv): + def write_hyperparameters(self): if not self.hyperparameters: self.logger.warning( "To log hyperparameters, 'hyperparameter' parameter must be provided." @@ -237,13 +186,13 @@ def write_hyperparameters(self, env: CallbackEnv): for param_name, param_value in self.hyperparameters.items(): self._save_for_tensor("hyperparameters/{}".format(param_name), param_value) - def write_losses(self, env: CallbackEnv): + def write_losses(self, evals_log): # Get loss metric at the current boosting round # loss metrics will have already been saved by write_metrics if the metrics collection is included. if self._is_collection_being_saved_for_step(CollectionKeys.METRICS): return else: - evaluation_metrics = env.evaluation_result_list + evaluation_metrics = self._get_evaluation_metrics(evals_log) p = re.compile( INCREASING_METRICS_REGEX ) # regex for matching metrics that increase in value @@ -252,81 +201,84 @@ def write_losses(self, env: CallbackEnv): continue self._save_for_tensor(metric_name, metric_data) - def write_metrics(self, env: CallbackEnv): + def write_metrics(self, evals_log): # Get metrics measured at current boosting round - for metric_name, metric_data in env.evaluation_result_list: + evaluation_metrics = self._get_evaluation_metrics(evals_log) + for metric_name, metric_data in evaluation_metrics: self._save_for_tensor(metric_name, metric_data) - def write_predictions(self, env: CallbackEnv): + def write_predictions(self, model): # Write predictions y_hat from validation data if not self.validation_data: - self.logger.warning("To log predictions, 'validation_data' parameter must be provided.") + self.logger.warning( + "To log predictions, 'validation_data' parameter must be provided." + ) return - self._save_for_tensor("predictions", env.model.predict(self.validation_data)) + self._save_for_tensor("predictions", model.predict(self.validation_data)) - def write_labels(self, env: CallbackEnv): + def write_labels(self): # Write labels y from validation data if not self.validation_data: self.logger.warning("To log labels, 'validation_data' parameter must be provided.") return self._save_for_tensor("labels", self.validation_data.get_label()) - def write_feature_importances(self, env: CallbackEnv): + def write_feature_importances(self, model): # Get normalized feature importance of each feature def _write_normalized_feature_importance(importance_type): - feature_importances = env.model.get_score(importance_type=importance_type) + feature_importances = model.get_score(importance_type=importance_type) total = sum(feature_importances.values()) for feature_name, score in feature_importances.items(): self._save_for_tensor( f"feature_importance/{importance_type}/{feature_name}", score / total ) - if getattr(env.model, "booster", None) is not None and env.model.booster not in { + if getattr(model, "booster", None) is not None and model.booster not in { "gbtree", "dart", }: self.logger.warning( - "Feature importance is not defined for Booster type %s", env.model.booster + "Feature importance is not defined for Booster type %s", model.booster ) return importance_types = ["weight", "gain", "cover", "total_gain", "total_cover"] for importance_type in importance_types: _write_normalized_feature_importance(importance_type) - def write_full_shap(self, env: CallbackEnv): + def write_full_shap(self, model): if not self.train_data: self.logger.warning("To log SHAP values, 'train_data' parameter must be provided.") return # feature names will be in the format, 'f0', 'f1', 'f2', ..., numbered # according to the order of features in the data set. - feature_names = env.model.feature_names + ["bias"] + feature_names = self._get_feature_names(model) + ["bias"] for feature_id, feature_name in enumerate(feature_names): self._save_for_tensor(f"full_shap/{feature_name}", self._full_shap_values) - def write_average_shap(self, env: CallbackEnv): + def write_average_shap(self, model): if not self.train_data: self.logger.warning("To log SHAP values, 'train_data' parameter must be provided.") return dim = len(self._full_shap_values.shape) average_shap = np.mean(self._full_shap_values, axis=tuple(range(dim - 1))) - feature_names = env.model.feature_names + ["bias"] + feature_names = self._get_feature_names(model) + ["bias"] for feature_id, feature_name in enumerate(feature_names): self._save_for_tensor(f"average_shap/{feature_name}", average_shap[feature_id]) - def _maybe_compute_shap_values(self, env: CallbackEnv): + def _maybe_compute_shap_values(self, model): if self.train_data is not None and self._full_shap_values is None: - self._full_shap_values = env.model.predict(self.train_data, pred_contribs=True) + self._full_shap_values = model.predict(self.train_data, pred_contribs=True) def _clear_shap_values(self): self._full_shap_values = None - def write_tree_model(self, env: CallbackEnv): - if hasattr(env.model, "booster") and env.model.booster not in {"gbtree", "dart"}: + def write_tree_model(self, model, epoch): + if hasattr(model, "booster") and model.booster not in {"gbtree", "dart"}: self.logger.warning( - "Tree model dump is not supported for Booster type %s", env.model.booster + "Tree model dump is not supported for Booster type %s", model.booster ) return - tree = parse_tree_model(env.model, env.iteration) + tree = parse_tree_model(model, epoch) for column_name, column_values in tree.items(): tensor_name = "trees/{}".format(column_name) self._save_for_tensor(tensor_name, np.array(column_values)) @@ -360,3 +312,219 @@ def _validate_data(data: Union[None, Tuple[str, str], DMatrix] = None) -> None: return get_dmatrix(file_path, content_type) except Exception: raise SMDebugError(error_msg) + + @staticmethod + def _get_evaluation_metrics(evals_log) -> List[Tuple[str, str]]: + if not is_xgb_1_3_and_later(): + return evals_log + evaluation_metrics = [] + for eval_type, results in evals_log.items(): + for key, metric_data in results.items(): + evaluation_metrics.append((f"{eval_type}-{key}", metric_data[-1])) + return evaluation_metrics + + @staticmethod + def _get_feature_names(model) -> List[str]: + # Reimplement XGBoost automatic feature naming that removed in 1.4. + if model.feature_names is None: + feature_names = ["f{0}".format(i) for i in range(model.num_features())] + else: + feature_names = model.feature_names + + return feature_names + +if not is_xgb_1_3_and_later(): + class Hook(XGBoostBaseHook): + """Hook that represents a callback function in XGBoost.""" + + @error_handling_agent.catch_smdebug_errors() + def __init__( + self, + out_dir: Optional[str] = None, + export_tensorboard: bool = False, + tensorboard_dir: Optional[str] = None, + dry_run: bool = False, + reduction_config=None, + save_config: Optional[SaveConfig] = None, + include_regex: Optional[List[str]] = None, + include_collections: Optional[List[str]] = None, + save_all: bool = False, + include_workers: str = "one", + hyperparameters: Optional[Dict[str, Any]] = None, + train_data: Union[None, Tuple[str, str], DMatrix] = None, + validation_data: Union[None, Tuple[str, str], DMatrix] = None, + ) -> None: + super().__init__( + out_dir=out_dir, + export_tensorboard=export_tensorboard, + tensorboard_dir=tensorboard_dir, + dry_run=dry_run, + reduction_config=reduction_config, + save_config=save_config, + include_regex=include_regex, + include_collections=include_collections, + save_all=save_all, + include_workers=include_workers, + hyperparameters=hyperparameters, + train_data=train_data, + validation_data=validation_data, + ) + + @error_handling_agent.catch_smdebug_errors() + def __call__(self, env: CallbackEnv) -> None: + self._callback(env) + + def _is_last_step(self, env: CallbackEnv) -> bool: + # env.iteration: current boosting round. + # env.end_iteration: round # when training will end. this is always num_round + 1. # noqa: E501 + return env.iteration + 1 == env.end_iteration + + def _callback(self, env: CallbackEnv) -> None: + if not self.prepared_collections: + # at this point we need all collections to be ready + # this may not be the case at creation of hook + # as user's code after hook might add collections + self._prepare_collections() + self.prepared_collections = True + + self._increment_step(env.iteration) + + if self.last_saved_step is not None and not self.exported_collections: + self.export_collections() + self.exported_collections = True + + if not self._get_collections_to_save_for_step(): + self.logger.debug("Skipping iteration {}".format(self.step)) + return + + self._initialize_writers() + + if self._is_collection_being_saved_for_step(CollectionKeys.HYPERPARAMETERS): + self.write_hyperparameters() + + if self._is_collection_being_saved_for_step(CollectionKeys.METRICS): + self.write_metrics(env.evaluation_result_list) + + if self._is_collection_being_saved_for_step(CollectionKeys.LOSSES): + self.write_losses(env.evaluation_result_list) + + if self._is_collection_being_saved_for_step(CollectionKeys.PREDICTIONS): + self.write_predictions(env.model) + + if self._is_collection_being_saved_for_step(CollectionKeys.LABELS): + self.write_labels() + + if self._is_collection_being_saved_for_step(CollectionKeys.FEATURE_IMPORTANCE): + self.write_feature_importances(env.model) + + if self._is_collection_being_saved_for_step(CollectionKeys.TREES): + self.write_tree_model(env.model, env.iteration) + + if self._is_collection_being_saved_for_step(CollectionKeys.FULL_SHAP): + self._maybe_compute_shap_values(env.model) + self.write_full_shap(env.model) + + if self._is_collection_being_saved_for_step(CollectionKeys.AVERAGE_SHAP): + self._maybe_compute_shap_values(env.model) + self.write_average_shap(env.model) + + self._clear_shap_values() + self.last_saved_step = self.step + + self._close_writers() + +else: + + class Hook(XGBoostBaseHook, xgb.callback.TrainingCallback): + """Hook that represents a callback function in XGBoost.""" + + @error_handling_agent.catch_smdebug_errors() + def __init__( + self, + out_dir: Optional[str] = None, + export_tensorboard: bool = False, + tensorboard_dir: Optional[str] = None, + dry_run: bool = False, + reduction_config=None, + save_config: Optional[SaveConfig] = None, + include_regex: Optional[List[str]] = None, + include_collections: Optional[List[str]] = None, + save_all: bool = False, + include_workers: str = "one", + hyperparameters: Optional[Dict[str, Any]] = None, + train_data: Union[None, Tuple[str, str], DMatrix] = None, + validation_data: Union[None, Tuple[str, str], DMatrix] = None, + ) -> None: + super().__init__( + out_dir=out_dir, + export_tensorboard=export_tensorboard, + tensorboard_dir=tensorboard_dir, + dry_run=dry_run, + reduction_config=reduction_config, + save_config=save_config, + include_regex=include_regex, + include_collections=include_collections, + save_all=save_all, + include_workers=include_workers, + hyperparameters=hyperparameters, + train_data=train_data, + validation_data=validation_data, + ) + + @error_handling_agent.catch_smdebug_errors() + def after_iteration(self, model, epoch, evals_log) -> bool: + """Run after each iteration. Return True when training should stop.""" + if not self.prepared_collections: + # at this point we need all collections to be ready + # this may not be the case at creation of hook + # as user's code after hook might add collections + self._prepare_collections() + self.prepared_collections = True + + self._increment_step(epoch) + + if self.last_saved_step is not None and not self.exported_collections: + self.export_collections() + self.exported_collections = True + + if not self._get_collections_to_save_for_step(): + self.logger.debug("Skipping iteration {}".format(self.step)) + return + + self._initialize_writers() + + if self._is_collection_being_saved_for_step(CollectionKeys.HYPERPARAMETERS): + self.write_hyperparameters() + + if self._is_collection_being_saved_for_step(CollectionKeys.METRICS): + self.write_metrics(evals_log) + + if self._is_collection_being_saved_for_step(CollectionKeys.LOSSES): + self.write_losses(evals_log) + + if self._is_collection_being_saved_for_step(CollectionKeys.PREDICTIONS): + self.write_predictions(model) + + if self._is_collection_being_saved_for_step(CollectionKeys.LABELS): + self.write_labels() + + if self._is_collection_being_saved_for_step(CollectionKeys.FEATURE_IMPORTANCE): + self.write_feature_importances(model) + + if self._is_collection_being_saved_for_step(CollectionKeys.TREES): + self.write_tree_model(model, epoch) + + if self._is_collection_being_saved_for_step(CollectionKeys.FULL_SHAP): + self._maybe_compute_shap_values(model) + self.write_full_shap(model) + + if self._is_collection_being_saved_for_step(CollectionKeys.AVERAGE_SHAP): + self._maybe_compute_shap_values(model) + self.write_average_shap(model) + + self._clear_shap_values() + self.last_saved_step = self.step + + self._close_writers() + + return False diff --git a/smdebug/xgboost/utils.py b/smdebug/xgboost/utils.py index ad0bf6d4f..741f16ca4 100644 --- a/smdebug/xgboost/utils.py +++ b/smdebug/xgboost/utils.py @@ -6,6 +6,7 @@ import logging import os import re +import pkg_resources # Third Party import numpy as np @@ -28,6 +29,33 @@ ) +def is_xgb_1_3_and_later(): + if pkg_resources.parse_version( + pkg_resources.get_distribution("xgboost").version + ) >= pkg_resources.parse_version("1.3"): + return True + else: + return False + + +def is_xgb_1_5_and_later(): + if pkg_resources.parse_version( + pkg_resources.get_distribution("xgboost").version + ) >= pkg_resources.parse_version("1.5"): + return True + else: + return False + + +def is_xgb_1_7_and_later(): + if pkg_resources.parse_version( + pkg_resources.get_distribution("xgboost").version + ) >= pkg_resources.parse_version("1.7"): + return True + else: + return False + + def _get_invalid_content_type_error_msg(invalid_content_type): return INVALID_CONTENT_TYPE_ERROR.format(invalid_content_type=invalid_content_type) @@ -348,6 +376,7 @@ def parse_tree_model(booster, iteration, fmap=""): missings = [] gains = [] covers = [] + categories = [] trees = booster.get_dump(with_stats=True) for i, tree in enumerate(trees): @@ -376,22 +405,36 @@ def parse_tree_model(booster, iteration, fmap=""): missings.append(float("NAN")) gains.append(float(stats[1])) covers.append(float(stats[3])) + categories.append(float("NAN")) # Not a Leaf Node else: # parse string - fid = arr[1].split("]") - parse = fid[0].split("<") - stats = re.split("=|,", fid[1]) + fid = arr[1].split(']') + # these conditions brings backward compatible + if fid[0].find("<") != -1: + # numerical + parse = fid[0].split('<') + splits.append(float(parse[1])) + categories.append(float("NAN")) + elif fid[0].find(":{") != -1: + # categorical + parse = fid[0].split(":") + cats = parse[1][1:-1] # strip the {} + cats_split = cats.split(",") + splits.append(float("NAN")) + categories.append(cats_split if cats_split else float("NAN")) + else: + raise SMDebugError("Failed to parse tree model text dump.") + stats = re.split('=|,', fid[1]) # append to lists tree_ids.append(i) - node_ids.append(int(re.findall(r"\b\d+\b", arr[0])[0])) + node_ids.append(int(re.findall(r'\b\d+\b', arr[0])[0])) fids.append(parse[0]) - splits.append(float(parse[1])) str_i = str(i) - y_directs.append(str_i + "-" + stats[1]) - n_directs.append(str_i + "-" + stats[3]) - missings.append(str_i + "-" + stats[5]) + y_directs.append(str_i + '-' + stats[1]) + n_directs.append(str_i + '-' + stats[3]) + missings.append(str_i + '-' + stats[5]) gains.append(float(stats[7])) covers.append(float(stats[9])) @@ -409,6 +452,9 @@ def parse_tree_model(booster, iteration, fmap=""): "Gain": np.array(gains, dtype=np.dtype("float")), "Cover": np.array(covers, dtype=np.dtype("float")), } + if is_xgb_1_5_and_later(): + key_to_array["Category"] = np.array(categories, dtype=np.dtype("U")) + # XGBoost's trees_to_dataframe() method uses # df.sort_values(['Tree', 'Node']).reset_index(drop=True) to sort the # node ids. The following achieves the same result without using pandas. diff --git a/tests/xgboost/test_utils.py b/tests/xgboost/test_utils.py index bca5be3fa..568a1e4e1 100644 --- a/tests/xgboost/test_utils.py +++ b/tests/xgboost/test_utils.py @@ -4,8 +4,7 @@ import xgboost as xgb # First Party -from smdebug.xgboost.utils import parse_tree_model - +from smdebug.xgboost.utils import parse_tree_model, is_xgb_1_5_and_later @pytest.mark.slow def test_parse_tree_model(): @@ -22,6 +21,8 @@ def test_parse_tree_model(): bst = xgb.train(params, dtrain, evals=[(dtrain, "train")], num_boost_round=num_boost_round) columns = ["Tree", "Node", "ID", "Feature", "Split", "Yes", "No", "Missing", "Gain", "Cover"] + if is_xgb_1_5_and_later(): + columns.append("Category") try: from pandas import DataFrame # noqa diff --git a/tests/xgboost/test_xgboost_error_handling_agent.py b/tests/xgboost/test_xgboost_error_handling_agent.py index e296c0212..4209483ea 100644 --- a/tests/xgboost/test_xgboost_error_handling_agent.py +++ b/tests/xgboost/test_xgboost_error_handling_agent.py @@ -38,6 +38,13 @@ def __call__(self, *args, **kwargs): """ raise RuntimeError(self.xgboost_callback_error_message) + @error_handling_agent.catch_smdebug_errors() + def after_iteration(self, *args, **kwargs): + """ + Override the XGBoost Hook's after_iteration callback to fail immediately. + """ + raise RuntimeError(self.xgboost_callback_error_message) + return HookWithXGBoostCallbackError