diff --git a/autotm/algorithms_for_tuning/bayesian_optimization/bayes_opt.py b/autotm/algorithms_for_tuning/bayesian_optimization/bayes_opt.py index 87f6172..7ab1609 100644 --- a/autotm/algorithms_for_tuning/bayesian_optimization/bayes_opt.py +++ b/autotm/algorithms_for_tuning/bayesian_optimization/bayes_opt.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import copy import logging +import logging.config import os import random import sys @@ -8,13 +9,15 @@ from multiprocessing.pool import AsyncResult from typing import List, Optional, Union -import click import yaml from hyperopt import STATUS_OK, fmin, hp, tpe from tqdm import tqdm from yaml import Loader -from autotm.algorithms_for_tuning.individuals import IndividualDTO +from autotm.algorithms_for_tuning.individuals import IndividualDTO, IndividualBuilder +from autotm.fitness.estimator import FitnessEstimator, ComputableFitnessEstimator +from autotm.fitness.tm import fit_tm, TopicModel +from autotm.params import FixedListParams from autotm.utils import TqdmToLogger, make_log_config_dict ALG_ID = "bo" @@ -80,10 +83,19 @@ def log_best_solution( class BigartmFitness: - def __init__(self, dataset: str, exp_id: Optional[int] = None): + def __init__(self, + data_path: str, + topic_count: int, + ibuilder: IndividualBuilder, + fitness_estimator: FitnessEstimator, + dataset: str, + exp_id: Optional[int] = None): + self.data_path = data_path + self.topic_count = topic_count + self.ibuilder = ibuilder + self.fitness_estimator = fitness_estimator self.dataset = dataset self.exp_id = exp_id - # self.best_solution: Optional[IndividualDTO] = None def parse_kwargs(self, **kwargs): params = [] @@ -102,54 +114,56 @@ def parse_kwargs(self, **kwargs): params.append(kwargs.get("decor_2", 1)) return params - def make_individ(self, **kwargs): + def make_ind_dto(self, **kwargs): # TODO: adapt this function to work with baesyian optimization params = [float(i) for i in self.parse_kwargs(**kwargs)] params = params[:-1] + [0.0, 0.0, 0.0] + [params[-1]] return IndividualDTO( id=str(uuid.uuid4()), + data_path=self.data_path, dataset=self.dataset, - params=params, + topic_count=self.topic_count, + params=FixedListParams(params=params), exp_id=self.exp_id, alg_id=ALG_ID, ) def __call__(self, kwargs): - population = [self.make_individ(**kwargs)] - - population = estimate_fitness(population) + population = [self.ibuilder.make_individual(self.make_ind_dto(**kwargs))] + population = self.fitness_estimator.estimate(-1, population) individ = population[0] - - # if self.best_solution is None or individ.fitness_value > self.best_solution.fitness_value: - # self.best_solution = copy.deepcopy(individ) - return {"loss": -1 * individ.fitness_value, "status": STATUS_OK} -@click.command(context_settings=dict(allow_extra_args=True)) -@click.option("--dataset", required=True, type=str, help="dataset name in the config") -@click.option( - "--log-file", - type=str, - default="/var/log/tm-alg-bo.log", - help="a log file to write logs of the algorithm execution to", -) -@click.option("--exp-id", required=True, type=int, help="mlflow experiment id") -def run_algorithm(dataset, log_file, exp_id): +def run_algorithm(dataset, + data_path, + topic_count, + log_file, + exp_id, + num_evaluations, + individual_type: str = "regular", + train_option: str = "offline") -> TopicModel: run_uid = uuid.uuid4() if not config["testMode"] else None logging_config = make_log_config_dict(filename=log_file, uid=run_uid) logging.config.dictConfig(logging_config) - fitness = BigartmFitness(dataset, exp_id) + ibuilder = IndividualBuilder(individual_type) + fitness_estimator = ComputableFitnessEstimator(ibuilder, num_evaluations) + + fitness = BigartmFitness(data_path, topic_count, ibuilder, fitness_estimator, dataset, exp_id) best_params = fmin( - fitness, SPACE, algo=tpe.suggest, max_evals=NUM_FITNESS_EVALUATIONS + fitness, SPACE, algo=tpe.suggest, max_evals=num_evaluations ) - best_solution = fitness.make_individ(**best_params) - best_solution = log_best_solution( - best_solution, wait_for_result_timeout=-1, alg_args=" ".join(sys.argv) + best_solution_dto = fitness.make_ind_dto(**best_params) + best_solution_dto = log_best_solution( + best_solution_dto, wait_for_result_timeout=-1, alg_args=" ".join(sys.argv) ) - print(best_solution.fitness_value * -1) + best_topic_model = fit_tm( + preproc_data_path=data_path, + topic_count=topic_count, + params=best_solution_dto.params, + train_option=train_option + ) -if __name__ == "__main__": - run_algorithm() + return best_topic_model diff --git a/autotm/algorithms_for_tuning/genetic_algorithm/ga.py b/autotm/algorithms_for_tuning/genetic_algorithm/ga.py index 7c1b43a..a7bb852 100644 --- a/autotm/algorithms_for_tuning/genetic_algorithm/ga.py +++ b/autotm/algorithms_for_tuning/genetic_algorithm/ga.py @@ -1,32 +1,27 @@ import copy import gc import logging -import math import operator import random import sys import time import uuid -from typing import Optional, Tuple, Callable +from typing import Optional, Callable, List import numpy as np -from autotm.abstract_params import AbstractParams -from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector from autotm.algorithms_for_tuning.genetic_algorithm.selection import selection -from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import set_surrogate_fitness, Surrogate, \ - get_prediction_uncertanty -from autotm.algorithms_for_tuning.individuals import make_individual, IndividualDTO, Individual +from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector +from autotm.algorithms_for_tuning.individuals import IndividualDTO, Individual, IndividualBuilder from autotm.algorithms_for_tuning.nelder_mead_optimization.nelder_mead import ( NelderMeadOptimization, ) -from autotm.fitness.tasks import estimate_fitness, log_best_solution -from autotm.params import create_individual, FixedListParams +from autotm.fitness.estimator import FitnessEstimator +from autotm.params import create_individual from autotm.utils import AVG_COHERENCE_SCORE from autotm.visualization.dynamic_tracker import MetricsCollector ALG_ID = "ga" -SPEEDUP = True logger = logging.getLogger("GA_algo") @@ -60,17 +55,16 @@ def __init__( data_path, num_individuals, num_iterations, + ibuilder: IndividualBuilder, + fitness_estimator: FitnessEstimator, mutation_type="mutation_one_param", crossover_type="blend_crossover", selection_type="fitness_prop", elem_cross_prob=0.2, - num_fitness_evaluations: Optional[int] = 500, early_stopping_iterations: Optional[int] = 500, best_proc=0.3, alpha=None, exp_id: Optional[int] = None, - surrogate_name=None, - calc_scheme="type2", topic_count: Optional[int] = None, fitness_obj_type="single_objective", tag: Optional[str] = None, @@ -116,24 +110,17 @@ def __init__( self.data_path = data_path self.num_individuals = num_individuals self.num_iterations = num_iterations + self.ibuilder = ibuilder + self.fitness_estimator = fitness_estimator self.mutation_type = mutation_type self.crossover_type = crossover_type self.selection = selection(selection_type) self.elem_cross_prob = elem_cross_prob self.alpha = alpha - self.evaluations_counter = 0 - self.num_fitness_evaluations = num_fitness_evaluations self.early_stopping_iterations = early_stopping_iterations self.fitness_obj_type = fitness_obj_type self.best_proc = best_proc - self.all_params = [] - self.all_fitness = [] - if surrogate_name: - self.surrogate = Surrogate(surrogate_name, **kwargs) - else: - self.surrogate = None self.exp_id = exp_id - self.calc_scheme = calc_scheme self.topic_count = topic_count self.tag = tag self.use_pipeline = use_pipeline @@ -151,20 +138,6 @@ def __init__( {} ) # generation, parent_1_params, parent_2_params, ... - def estimate_fitness(self, population): - evaluated = [individual for individual in population if individual.dto.fitness_value is not None] - not_evaluated = [individual for individual in population if individual.dto.fitness_value is None] - evaluations_limit = max(0, self.num_fitness_evaluations - self.evaluations_counter) \ - if self.num_fitness_evaluations else len(not_evaluated) - if len(not_evaluated) > evaluations_limit: - not_evaluated = not_evaluated[:evaluations_limit] - self.evaluations_counter += len(not_evaluated) - new_evaluated = estimate_fitness(not_evaluated) - if self.statistics_collector: - for individual in new_evaluated: - self.statistics_collector.log_individual(individual) - return evaluated + new_evaluated - def init_population(self): list_of_individuals = [] for i in range(self.num_individuals): @@ -181,122 +154,24 @@ def init_population(self): train_option=self.train_option, ) # TODO: improve heuristic on search space - list_of_individuals.append(make_individual(dto=dto)) - population_with_fitness = self.estimate_fitness(list_of_individuals) + list_of_individuals.append(self.ibuilder.make_individual(dto=dto)) + + population_with_fitness = self.run_fitness(list_of_individuals, -1) + + # self.save_params(population_with_fitness) + # if self.surrogate is not None and self.calc_scheme == "type2": + # self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness)) + # logger.info("Surrogate is initialized!") + + self.fitness_estimator.fit(iter_num=-1) - self.save_params(population_with_fitness) - if self.surrogate is not None and self.calc_scheme == "type2": - self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness)) - logger.info("Surrogate is initialized!") return population_with_fitness @staticmethod def _sort_population(population): population.sort(key=operator.attrgetter("fitness_value"), reverse=True) - def _calculate_uncertain_res(self, generation, iteration_num: int, proc=0.3): - if len(generation) == 0: - return [] - X = np.array([individ.dto.params.to_vector() for individ in generation]) - certanty = get_prediction_uncertanty( - self.surrogate.surrogate, X, self.surrogate.name - ) - recalculate_num = int(np.floor(len(certanty) * proc)) - logger.info(f"Certanty values: {certanty}") - - certanty, X = ( - list(t) for t in zip(*sorted(zip(certanty, X.tolist()), reverse=True)) - ) # check - calculated = [] - for individual in generation[:recalculate_num]: - # copy - individual_json = individual.dto.model_dump_json() - individual = make_individual(dto=IndividualDTO.model_validate_json(individual_json)) - individual.dto.fitness_value = None - calculated.append(individual) - - calculated = self.estimate_fitness(calculated) - - self.all_params += [individ.dto.params.to_vector() for individ in calculated] - self.all_fitness += [ - individ.dto.fitness_value["avg_coherence_score"] for individ in calculated - ] - - pred_y = self.surrogate.predict(X[recalculate_num:]) - for ix, individual in enumerate(generation[recalculate_num:]): - dto = IndividualDTO( - id=str(uuid.uuid4()), - data_path=self.data_path, - params=individual.dto.params, - dataset=self.dataset, - fitness_value=set_surrogate_fitness(pred_y[ix]), - exp_id=self.exp_id, - alg_id=ALG_ID, - topic_count=self.topic_count, - tag=self.tag, - train_option=self.train_option, - ) - calculated.append(make_individual(dto=dto)) - return calculated - - def save_params(self, population): - params_and_f = [ - (copy.deepcopy(individ.params.to_vector()), individ.fitness_value) - for individ in population - if individ.fitness_value not in self.all_fitness - ] - - def check_val(fval): - return not (fval is None or math.isnan(fval) or math.isinf(fval)) - - def check_params(p): - return all(check_val(el) for el in p) - - clean_params_and_f = [] - for p, f in params_and_f: - if not check_params(p) or not check_val(f): - logger.warning(f"Bad params or fitness found. Fitness: {f}. Params: {p}.") - else: - clean_params_and_f.append((p, f)) - - pops = [p for p, _ in clean_params_and_f] - fs = [f for _, f in clean_params_and_f] - - self.all_params += pops - self.all_fitness += fs - - def surrogate_calculation(self, population): - X_val = np.array([copy.deepcopy(individ.params.to_vector()) for individ in population]) - y_pred = self.surrogate.predict(X_val) - if not SPEEDUP: - y_val = np.array([individ.fitness_value for individ in population]) - - def check_val(fval): - return not (fval is None or math.isnan(fval) or math.isinf(fval)) - - def check_params(p): - return all(check_val(el) for el in p) - - clean_params_and_f = [] - for i in range(len(y_val)): - if not check_params(X_val[i]) or not check_val(y_val[i]): - logger.warning( - f"Bad params or fitness found. Fitness: {y_val[i]}. Params: {X_val[i]}." - ) - else: - clean_params_and_f.append((X_val[i], y_val[i])) - - X_val = clean_params_and_f[0] - y_val = clean_params_and_f[1] - r_2, mse, rmse = self.surrogate.score(X_val, y_val) - logger.info(f"Real values: {list(y_val)}") - logger.info(f"Predicted values: {list(y_pred)}") - logger.info(f"R^2: {r_2}, MSE: {mse}, RMSE: {rmse}") - for ix, individ in enumerate(population): - individ.dto.fitness_value = set_surrogate_fitness(y_pred[ix]) - return population - - def run_crossover(self, pairs_generator, surrogate_iteration, iteration_num: int): + def run_crossover(self, pairs_generator, iteration_num: int): new_generation = [] crossover_changes = { @@ -331,7 +206,7 @@ def run_crossover(self, pairs_generator, surrogate_iteration, iteration_num: int train_option=self.train_option, ) for child in children] - individuals = [make_individual(child) for child in children_dto] + individuals = [self.ibuilder.make_individual(child) for child in children_dto] new_generation += individuals crossover_changes["parent_1_params"].append(i.params) @@ -341,7 +216,7 @@ def run_crossover(self, pairs_generator, surrogate_iteration, iteration_num: int crossover_changes["child_id"].append(len(new_generation) - 1) if len(new_generation) > 0: - new_generation = self.run_fitness(new_generation, surrogate_iteration, iteration_num) + new_generation = self.run_fitness(new_generation, iteration_num) logger.info(f"size of the new generation is {len(new_generation)}") for i in range(len(crossover_changes["parent_1_params"])): @@ -391,11 +266,13 @@ def apply_nelder_mead(self, starting_points_set, num_gen, num_iterations=2): train_option=self.train_option, ) - new_population.append(make_individual(dto=solution_dto)) + new_population.append(self.ibuilder.make_individual(dto=solution_dto)) return new_population def run(self, verbose=False, visualize_results=False) -> Individual: - self.evaluations_counter = 0 + assert self.fitness_estimator.evaluations_counter == 0, \ + "Fitness estimator has non-zero evaluations count and cannot be reused" + ftime = str(int(time.time())) # os.makedirs(LOG_FILE_PATH, exist_ok=True) @@ -405,7 +282,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: logger.info( f"ALGORITHM PARAMS number of individuals {self.num_individuals}; " f"number of fitness evals " - f"{self.num_fitness_evaluations if self.num_fitness_evaluations else 'unlimited'}; " + f"{self.fitness_estimator.num_fitness_evaluations if self.fitness_estimator.num_fitness_evaluations else 'unlimited'}; " f"number of early stopping iterations " f"{self.early_stopping_iterations if self.early_stopping_iterations else 'unlimited'}; " f"crossover prob {self.elem_cross_prob}" @@ -416,7 +293,6 @@ def run(self, verbose=False, visualize_results=False) -> Individual: x, y = [], [] high_fitness = 0 - surrogate_iteration = False best_val_so_far = -10 early_stopping_counter = 0 @@ -426,12 +302,12 @@ def run(self, verbose=False, visualize_results=False) -> Individual: logger.info(f"ENTERING GENERATION {ii}") - if self.surrogate is not None: - surrogate_iteration = ii % 2 != 0 - self._sort_population(population) if self.statistics_collector is not None: - self.statistics_collector.log_iteration(self.evaluations_counter, population[0].fitness_value) + self.statistics_collector.log_iteration( + self.fitness_estimator.evaluations_counter, + population[0].fitness_value + ) pairs_generator = self.selection( population=population, best_proc=self.best_proc, @@ -442,7 +318,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: # Crossover new_generation = self.run_crossover( - pairs_generator, surrogate_iteration, iteration_num=ii + pairs_generator, iteration_num=ii ) self._sort_population(new_generation) @@ -484,7 +360,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: for p in population: p.dto.iteration_id = ii - population = self.run_fitness(population, surrogate_iteration, ii) + population = self.run_fitness(population, ii) # TODO (pipeline) Mutations collection is disabled # before_mutation = [] # individual @@ -522,7 +398,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: ) population[i] = elem - if self.num_fitness_evaluations and self.evaluations_counter >= self.num_fitness_evaluations: + if self.fitness_estimator.num_fitness_evaluations and self.fitness_estimator.evaluations_counter >= self.fitness_estimator.num_fitness_evaluations: self.metric_collector.save_fitness( generation=ii, params=[i.params for i in population], @@ -544,15 +420,17 @@ def run(self, verbose=False, visualize_results=False) -> Individual: if (current_fitness > high_fitness) or (ii == 0): high_fitness = current_fitness - if self.surrogate: - if self.calc_scheme == "type1" and not surrogate_iteration: - self.surrogate.fit( - np.array(self.all_params), np.array(self.all_fitness) - ) - elif self.calc_scheme == "type2": - self.surrogate.fit( - np.array(self.all_params), np.array(self.all_fitness) - ) + # if self.surrogate: + # if self.calc_scheme == "type1" and not surrogate_iteration: + # self.surrogate.fit( + # np.array(self.all_params), np.array(self.all_fitness) + # ) + # elif self.calc_scheme == "type2": + # self.surrogate.fit( + # np.array(self.all_params), np.array(self.all_fitness) + # ) + + self.fitness_estimator.fit(ii) if self.early_stopping_iterations: if population[0].fitness_value > best_val_so_far: @@ -594,7 +472,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: f"RUN ID {run_id}." ) best_solution = population[0] - log_best_solution(best_solution, alg_args=" ".join(sys.argv), is_tmp=True) + self.fitness_estimator.log_best_solution(best_solution, alg_args=" ".join(sys.argv), is_tmp=True) if visualize_results: self.metric_collector.save_and_visualise_trace() @@ -602,29 +480,19 @@ def run(self, verbose=False, visualize_results=False) -> Individual: self.metric_collector.save_trace() if self.statistics_collector is not None: - self.statistics_collector.log_iteration(self.evaluations_counter, population[0].fitness_value) + self.statistics_collector.log_iteration( + self.fitness_estimator.evaluations_counter, + population[0].fitness_value + ) logger.info(f"Y: {y}") best_individual = population[0] - ind = log_best_solution(best_individual, alg_args=" ".join(sys.argv)) + ind = self.fitness_estimator.log_best_solution(best_individual, alg_args=" ".join(sys.argv)) logger.info(f"Logged the best solution. Obtained fitness is {ind.fitness_value}") return ind - def run_fitness(self, population, surrogate_iteration, ii): - fitness_calc_time_start = time.time() - if not SPEEDUP or not self.surrogate or not surrogate_iteration: - population = self.estimate_fitness(population) - self.save_params(population) - if self.surrogate: - if self.calc_scheme == "type1" and surrogate_iteration: - population = self.surrogate_calculation(population) - elif self.calc_scheme == "type2": - population = self._calculate_uncertain_res(population, iteration_num=ii) - self.save_params(population) - else: - raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}") - logger.info(f"TIME OF THE FITNESS FUNCTION: {time.time() - fitness_calc_time_start}") - return population + def run_fitness(self, population: List[Individual], ii: int): + return self.fitness_estimator.estimate(ii,population ) def run_mutation(self, population): for i in range(1, len(population)): @@ -647,7 +515,7 @@ def run_mutation(self, population): tag=self.tag, train_option=self.train_option, ) - population[i] = make_individual(dto=dto) + population[i] = self.ibuilder.make_individual(dto=dto) # multistage bag of regularizers approach diff --git a/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py b/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py index 8d94141..9aac782 100755 --- a/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py +++ b/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py @@ -1,17 +1,23 @@ #!/usr/bin/env python3 import logging +import logging.config import sys import uuid from typing import Union, Optional from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector from autotm.algorithms_for_tuning.genetic_algorithm.ga import GA +from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import Surrogate +from autotm.algorithms_for_tuning.individuals import IndividualBuilder +from autotm.fitness.estimator import ComputableFitnessEstimator, SurrogateEnabledComputableFitnessEstimator, \ + DistributedSurrogateEnabledComputableFitnessEstimator from autotm.fitness.tm import fit_tm, TopicModel from autotm.utils import make_log_config_dict logger = logging.getLogger(__name__) NUM_FITNESS_EVALUATIONS = 150 +SPEEDUP = True def get_best_individual( @@ -41,6 +47,8 @@ def get_best_individual( train_option: str = "offline", quiet_log: bool = False, statistics_collector: Optional[StatisticsCollector] = None, + individual_type: str = "regular", + fitness_estimator_type: str = "local", # distributed **kwargs ): """ @@ -96,11 +104,40 @@ def get_best_individual( if cross_alpha is not None: cross_alpha = float(cross_alpha) + ibuilder = IndividualBuilder(individual_type) + + if fitness_estimator_type == "local" and surrogate_name: + fitness_estimator = SurrogateEnabledComputableFitnessEstimator( + ibuilder, + Surrogate(surrogate_name), + "type1", + SPEEDUP, + num_fitness_evaluations, + statistics_collector + ) + elif fitness_estimator_type == "local": + fitness_estimator = ComputableFitnessEstimator(ibuilder, num_fitness_evaluations, statistics_collector) + elif fitness_estimator_type == "distributed" and surrogate_name: + fitness_estimator = DistributedSurrogateEnabledComputableFitnessEstimator( + ibuilder, + Surrogate(surrogate_name), + "type1", + SPEEDUP, + num_fitness_evaluations, + statistics_collector + ) + elif fitness_estimator_type == "distributed": + fitness_estimator = ComputableFitnessEstimator(ibuilder, num_fitness_evaluations, statistics_collector) + else: + raise ValueError("Incorrect settings") + g = GA( dataset=dataset, data_path=data_path, num_individuals=num_individuals, num_iterations=num_iterations, + ibuilder=ibuilder, + fitness_estimator=fitness_estimator, mutation_type=mutation_type, crossover_type=crossover_type, selection_type=selection_type, @@ -149,19 +186,23 @@ def run_algorithm( gpr_kernel: str = None, gpr_alpha: float = None, gpr_normalize_y: float = None, - use_pipeline: bool = False, + use_pipeline: bool = True, use_nelder_mead_in_mutation: bool = False, use_nelder_mead_in_crossover: bool = False, use_nelder_mead_in_selector: bool = False, train_option: str = "offline", quiet_log: bool = False, + individual_type: str = "regular", + fitness_estimator_type: str = "local" ) -> TopicModel: best_individual = get_best_individual(dataset, data_path, exp_id, topic_count, num_individuals, num_iterations, num_fitness_evaluations, mutation_type, crossover_type, selection_type, elem_cross_prob, cross_alpha, best_proc, log_file, tag, surrogate_name, gpr_kernel, gpr_alpha, gpr_normalize_y, use_pipeline, use_nelder_mead_in_mutation, use_nelder_mead_in_crossover, - use_nelder_mead_in_selector, train_option, quiet_log) + use_nelder_mead_in_selector, train_option, quiet_log, + individual_type=individual_type, + fitness_estimator_type=fitness_estimator_type) best_topic_model = fit_tm( preproc_data_path=data_path, diff --git a/autotm/algorithms_for_tuning/individuals.py b/autotm/algorithms_for_tuning/individuals.py index dba8887..1d115b1 100644 --- a/autotm/algorithms_for_tuning/individuals.py +++ b/autotm/algorithms_for_tuning/individuals.py @@ -142,7 +142,17 @@ def fitness_value(self) -> float: return alpha * self.dto.fitness_value[AVG_COHERENCE_SCORE] -def make_individual(dto: IndividualDTO) -> Individual: - # TODO: choose fitness by ENV var - return RegularFitnessIndividual(dto=dto) - # return SparsityScalerBasedFitnessIndividual(dto=dto) +class IndividualBuilder: + SUPPORTED_IND_TYPES = ["regular", "sparse"] + + def __init__(self, ind_type: str = "regular"): + self.ind_type = ind_type + + if self.ind_type not in self.SUPPORTED_IND_TYPES: + raise ValueError(f"Unsupported ind type: {self.ind_type}") + + def make_individual(self, dto: IndividualDTO) -> Individual: + if self.ind_type == "regular": + return RegularFitnessIndividual(dto=dto) + else: + return SparsityScalerBasedFitnessIndividual(dto=dto) diff --git a/autotm/base.py b/autotm/base.py index e42245d..5aa4120 100644 --- a/autotm/base.py +++ b/autotm/base.py @@ -167,7 +167,9 @@ def fit(self, dataset: Union[pd.DataFrame, pd.Series], processed_dataset_path: O else: # TODO: refactor this function best_topic_model = bayes_opt.run_algorithm( - dataset=processed_dataset_path, + dataset=self.exp_dataset_name or "__noname__", + data_path=processed_dataset_path, + topic_count=self.topic_count, log_file=self.log_file_path, exp_id=self.exp_id or "0", **self.alg_params diff --git a/autotm/fitness/cluster_tasks.py b/autotm/fitness/cluster_tasks.py index 7cbced9..a2a5708 100644 --- a/autotm/fitness/cluster_tasks.py +++ b/autotm/fitness/cluster_tasks.py @@ -1,9 +1,8 @@ import logging import os import time -import uuid from multiprocessing.process import current_process -from typing import List, Optional, Union, cast +from typing import List, Optional, cast import celery from billiard.exceptions import SoftTimeLimitExceeded @@ -13,11 +12,11 @@ from celery.utils.log import get_task_logger from tqdm import tqdm -from autotm.algorithms_for_tuning.individuals import Individual, make_individual +from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder from autotm.fitness.tm import fit_tm_of_individual from autotm.params_logging_utils import model_files, log_params_and_artifacts, log_stats from autotm.schemas import IndividualDTO -from autotm.utils import TqdmToLogger, AVG_COHERENCE_SCORE +from autotm.utils import TqdmToLogger logger = logging.getLogger("root") task_logger = get_task_logger(__name__) @@ -109,7 +108,8 @@ def calculate_fitness(self: Task, self.retry(max_retries=1, countdown=5) -def parallel_fitness(population: List[Individual], +def parallel_fitness(ibuilder: IndividualBuilder, + population: List[Individual], use_tqdm: bool = False, tqdm_check_period: int = 2, app: Optional[celery.Celery] = None) -> List[Individual]: @@ -170,17 +170,18 @@ def parallel_fitness(population: List[Individual], # restoring the order in the resulting population according to the initial population # results_by_id = {ind.id: ind for ind in (fitness_from_json(r) for r in results)} results_by_id = {ind.id: ind for ind in (IndividualDTO.parse_raw(r) for r in results)} - return [make_individual(results_by_id[ind.dto.id]) for ind in population] + return [ibuilder.make_individual(results_by_id[ind.dto.id]) for ind in population] -def log_best_solution(individual: Individual, +def log_best_solution(ibuilder: IndividualBuilder, + individual: Individual, wait_for_result_timeout: Optional[float] = None, alg_args: Optional[str] = None, is_tmp: bool = False, app: Optional[celery.Celery] = None) \ -> Individual: if is_tmp: - return make_individual(individual.dto) + return ibuilder.make_individual(individual.dto) # ind = fitness_to_json(individual) ind = individual.dto.json() logger.info(f"Sending a best individual to be logged: {ind}") @@ -207,28 +208,6 @@ def log_best_solution(individual: Individual, r = result.get(timeout=wait_for_result_timeout) r = IndividualDTO.parse_raw(r) - ind = make_individual(r) + ind = ibuilder.make_individual(r) return ind - - -class FitnessCalculatorWrapper: - def __init__(self, dataset, data_path, topic_count, train_option): - self.dataset = dataset - self.data_path = data_path - self.topic_count = topic_count - self.train_option = train_option - - def run(self, params): - params = list(params) - params = params[:-1] + [0, 0, 0] + [params[-1]] - - solution_dto = IndividualDTO(id=str(uuid.uuid4()), - dataset=self.dataset, - params=params, - alg_id="ga", - topic_count=self.topic_count, train_option=self.train_option) - - dto = parallel_fitness([solution_dto])[0] - result = dto.fitness_value[AVG_COHERENCE_SCORE] - return -result diff --git a/autotm/fitness/estimator.py b/autotm/fitness/estimator.py new file mode 100644 index 0000000..870e316 --- /dev/null +++ b/autotm/fitness/estimator.py @@ -0,0 +1,304 @@ +import copy +import logging +import math +import time +import uuid +from abc import ABC, abstractmethod +from typing import List, Optional + +import numpy as np + +from autotm.abstract_params import AbstractParams +from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector +from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import Surrogate, set_surrogate_fitness, \ + get_prediction_uncertanty +from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder +from autotm.fitness import local_tasks, cluster_tasks +from autotm.schemas import IndividualDTO + +logger = logging.getLogger(__name__) + + +class FitnessEstimator: + def __init__(self, num_fitness_evaluations: Optional[int] = None, statistics_collector: Optional[StatisticsCollector] = None): + self._num_fitness_evaluations = num_fitness_evaluations + self._evaluations_counter = 0 + self._statistics_collector = statistics_collector + super().__init__() + + @property + def num_fitness_evaluations(self) -> Optional[int]: + return self._num_fitness_evaluations + + @property + def evaluations_counter(self) -> int: + return self._evaluations_counter + + @abstractmethod + def fit(self, iter_num: int) -> None: + ... + + @abstractmethod + def log_best_solution(self, + individual: Individual, + wait_for_result_timeout: Optional[float] = None, + alg_args: Optional[str] = None, + is_tmp: bool = False) -> Individual: + ... + + def estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: + evaluated = [individual for individual in population if individual.dto.fitness_value is not None] + not_evaluated = [individual for individual in population if individual.dto.fitness_value is None] + evaluations_limit = max(0, self._num_fitness_evaluations - self._evaluations_counter) \ + if self._num_fitness_evaluations else len(not_evaluated) + if len(not_evaluated) > evaluations_limit: + not_evaluated = not_evaluated[:evaluations_limit] + self._evaluations_counter += len(not_evaluated) + new_evaluated = self._estimate(iter_num, not_evaluated) + if self._statistics_collector: + for individual in new_evaluated: + self._statistics_collector.log_individual(individual) + return evaluated + new_evaluated + + @abstractmethod + def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: + ... + + +class SurrogateEnabledFitnessEstimatorMixin(FitnessEstimator, ABC): + SUPPORTED_CALC_SCHEMES = ["type1", "type2"] + + ibuilder: IndividualBuilder + surrogate: Surrogate + calc_scheme: str + speedup: bool + all_params: List[AbstractParams] + all_fitness: List[float] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @staticmethod + def surrogate_iteration(iter_num: int) -> bool: + return (iter_num % 2 != 0) if iter_num > 0 else False + + def fit(self, iter_num: int) -> None: + surrogate_iteration = self.surrogate_iteration(iter_num) + if (self.calc_scheme == "type1" and not surrogate_iteration) or (self.calc_scheme == "type2"): + self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness)) + + def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: + fitness_calc_time_start = time.time() + surrogate_iteration = self.surrogate_iteration(iter_num) + + if not self.speedup or not surrogate_iteration or iter_num == -1: + population = super().estimate(iter_num, population) + self.save_params(population) + + if self.calc_scheme == "type1" and surrogate_iteration: + population = self.surrogate_calculation(population) + elif self.calc_scheme == "type2" and iter_num != -1: + population = self._calculate_uncertain_res(iter_num, population) + self.save_params(population) + + logger.info(f"TIME OF THE SURROGATE-BASED FITNESS FUNCTION: {time.time() - fitness_calc_time_start}") + + return population + + def surrogate_calculation(self, population: List[Individual]): + x_val = np.array([copy.deepcopy(individ.params.to_vector()) for individ in population]) + y_pred = self.surrogate.predict(x_val) + + if not self.speedup: + y_val = np.array([individ.fitness_value for individ in population]) + + def check_val(fval): + return not (fval is None or math.isnan(fval) or math.isinf(fval)) + + def check_params(p): + return all(check_val(el) for el in p) + + clean_params_and_f = [] + for i in range(len(y_val)): + if not check_params(x_val[i]) or not check_val(y_val[i]): + logger.warning( + f"Bad params or fitness found. Fitness: {y_val[i]}. Params: {x_val[i]}." + ) + else: + clean_params_and_f.append((x_val[i], y_val[i])) + + x_val = clean_params_and_f[0] + y_val = clean_params_and_f[1] + r_2, mse, rmse = self.surrogate.score(x_val, y_val) + logger.info(f"Real values: {list(y_val)}") + logger.info(f"Predicted values: {list(y_pred)}") + logger.info(f"R^2: {r_2}, MSE: {mse}, RMSE: {rmse}") + + for ix, individ in enumerate(population): + individ.dto.fitness_value = set_surrogate_fitness(y_pred[ix]) + + return population + + def _calculate_uncertain_res(self, iter_num: int, population: List[Individual], proc:float = 0.3): + if len(population) == 0: + return [] + + x = np.array([individ.dto.params.to_vector() for individ in population]) + certanty = get_prediction_uncertanty( + self.surrogate.surrogate, x, self.surrogate.name + ) + recalculate_num = int(np.floor(len(certanty) * proc)) + logger.info(f"Certanty values: {certanty}") + + certanty, x = ( + list(t) for t in zip(*sorted(zip(certanty, x.tolist()), reverse=True)) + ) # check + calculated = [] + for individual in population[:recalculate_num]: + # copy + individual_json = individual.dto.model_dump_json() + individual = self.ibuilder.make_individual(dto=IndividualDTO.model_validate_json(individual_json)) + individual.dto.fitness_value = None + calculated.append(individual) + + calculated = super().estimate(iter_num, calculated) + + self.all_params += [individ.dto.params.to_vector() for individ in calculated] + self.all_fitness += [ + individ.dto.fitness_value["avg_coherence_score"] for individ in calculated + ] + + pred_y = self.surrogate.predict(x[recalculate_num:]) + for ix, individual in enumerate(population[recalculate_num:]): + dto = individual.dto + dto = IndividualDTO( + id=str(uuid.uuid4()), + data_path=dto.data_path, + params=dto.params, + dataset=dto.dataset, + fitness_value=set_surrogate_fitness(pred_y[ix]), + exp_id=dto.exp_id, + alg_id=dto.alg_id, + topic_count=dto.topic_count, + tag=dto.tag, + train_option=dto.train_option, + ) + calculated.append(self.ibuilder.make_individual(dto=dto)) + return calculated + + def save_params(self, population): + params_and_f = [ + (copy.deepcopy(individ.params.to_vector()), individ.fitness_value) + for individ in population + if individ.fitness_value not in self.all_fitness + ] + + def check_val(fval): + return not (fval is None or math.isnan(fval) or math.isinf(fval)) + + def check_params(pp): + return all(check_val(el) for el in pp) + + clean_params_and_f = [] + for p, f in params_and_f: + if not check_params(p) or not check_val(f): + logger.warning(f"Bad params or fitness found. Fitness: {f}. Params: {p}.") + else: + clean_params_and_f.append((p, f)) + + pops = [p for p, _ in clean_params_and_f] + fs = [f for _, f in clean_params_and_f] + + self.all_params += pops + self.all_fitness += fs + + +class ComputableFitnessEstimator(FitnessEstimator): + def __init__(self, + ibuilder: IndividualBuilder, + num_fitness_evaluations: Optional[int] = None, + statistics_collector: Optional[StatisticsCollector] = None): + self.ibuilder = ibuilder + super().__init__(num_fitness_evaluations, statistics_collector) + + def fit(self, iter_num: int) -> None: + pass + + def log_best_solution(self, + individual: Individual, + wait_for_result_timeout: Optional[float] = None, + alg_args: Optional[str] = None, + is_tmp: bool = False) -> Individual: + return local_tasks.log_best_solution(self.ibuilder, individual, wait_for_result_timeout, alg_args, is_tmp) + + def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: + return local_tasks.estimate_fitness(self.ibuilder, population) + + +class DistributedComputableFitnessEstimator(FitnessEstimator): + def __init__(self, + ibuilder: IndividualBuilder, + num_fitness_evaluations: Optional[int] = None, + statistics_collector: Optional[StatisticsCollector] = None): + self.app = cluster_tasks.make_celery_app() + self.ibuilder = ibuilder + super().__init__(num_fitness_evaluations, statistics_collector) + + def fit(self, iter_num: int) -> None: + pass + + def log_best_solution(self, + individual: Individual, + wait_for_result_timeout: Optional[float] = None, + alg_args: Optional[str] = None, + is_tmp: bool = False) -> Individual: + return cluster_tasks.log_best_solution(self.ibuilder, individual, + wait_for_result_timeout, alg_args, is_tmp, app=self.app) + + def _estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: + return cluster_tasks.parallel_fitness(self.ibuilder, population, app=self.app) + + +class SurrogateEnabledComputableFitnessEstimator(ComputableFitnessEstimator, SurrogateEnabledFitnessEstimatorMixin): + def __init__(self, + ibuilder: IndividualBuilder, + surrogate: Surrogate, + calc_scheme: str, + speedup: bool = True, + num_fitness_evaluations: Optional[int] = None, + statistics_collector: Optional[StatisticsCollector] = None): + self.ibuilder = ibuilder + self.surrogate = surrogate + self.calc_scheme = calc_scheme + self.speedup = speedup + + self.all_params: List[AbstractParams] = [] + self.all_fitness: List[float] = [] + + if calc_scheme not in self.SUPPORTED_CALC_SCHEMES: + raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}") + super().__init__(ibuilder, num_fitness_evaluations, statistics_collector) + + +class DistributedSurrogateEnabledComputableFitnessEstimator( + DistributedComputableFitnessEstimator, + SurrogateEnabledFitnessEstimatorMixin +): + def __init__(self, + ibuilder: IndividualBuilder, + surrogate: Surrogate, + calc_scheme: str, + speedup: bool = True, + num_fitness_evaluations: Optional[int] = None, + statistics_collector: Optional[StatisticsCollector] = None): + self.ibuilder = ibuilder + self.surrogate = surrogate + self.calc_scheme = calc_scheme + self.speedup = speedup + + self.all_params: List[AbstractParams] = [] + self.all_fitness: List[float] = [] + + if calc_scheme not in self.SUPPORTED_CALC_SCHEMES: + raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}") + super().__init__(ibuilder, num_fitness_evaluations, statistics_collector) diff --git a/autotm/fitness/local_tasks.py b/autotm/fitness/local_tasks.py index d09a625..cc1e0ac 100644 --- a/autotm/fitness/local_tasks.py +++ b/autotm/fitness/local_tasks.py @@ -1,7 +1,7 @@ import logging from typing import List, Optional -from autotm.algorithms_for_tuning.individuals import make_individual, Individual +from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder from autotm.fitness.tm import fit_tm_of_individual from autotm.params_logging_utils import log_params_and_artifacts, log_stats, model_files from autotm.schemas import IndividualDTO @@ -56,7 +56,7 @@ def calculate_fitness( ) -def estimate_fitness(population: List[Individual]) -> List[Individual]: +def estimate_fitness(ibuilder: IndividualBuilder, population: List[Individual]) -> List[Individual]: logger.info("Calculating fitness...") population_with_fitness = [] for individual in population: @@ -65,19 +65,20 @@ def estimate_fitness(population: List[Individual]) -> List[Individual]: population_with_fitness.append(individual) continue individ_with_fitness = calculate_fitness(individual.dto) - population_with_fitness.append(make_individual(individ_with_fitness)) + population_with_fitness.append(ibuilder.make_individual(individ_with_fitness)) logger.info("The fitness results have been obtained") return population_with_fitness def log_best_solution( - individual: IndividualDTO, + ibuilder: IndividualBuilder, + individual: Individual, wait_for_result_timeout: Optional[float] = None, alg_args: Optional[str] = None, is_tmp: bool = False, ): logger.info("Sending a best individual to be logged") - res = make_individual(calculate_fitness(individual.dto, + res = ibuilder.make_individual(calculate_fitness(individual.dto, log_artifact_and_parameters=True, is_tmp=is_tmp)) diff --git a/autotm/fitness/tasks.py b/autotm/fitness/tasks.py deleted file mode 100644 index 420fdf4..0000000 --- a/autotm/fitness/tasks.py +++ /dev/null @@ -1,15 +0,0 @@ -import functools - -from . import AUTOTM_EXEC_MODE, SUPPORTED_EXEC_MODES - -if AUTOTM_EXEC_MODE == 'local': - from .local_tasks import estimate_fitness, log_best_solution -elif AUTOTM_EXEC_MODE == 'cluster': - from .cluster_tasks import make_celery_app - from .cluster_tasks import parallel_fitness, log_best_solution - - app = make_celery_app() - estimate_fitness = functools.partial(parallel_fitness, app=app) - log_best_solution = functools.partial(log_best_solution, app=app) -else: - raise ValueError(f"Unknown exec mode: {AUTOTM_EXEC_MODE}. Only the following are supported: {SUPPORTED_EXEC_MODES}") diff --git a/examples/autotm_fit_predict.py b/examples/autotm_fit_predict.py index 0300de9..625b808 100644 --- a/examples/autotm_fit_predict.py +++ b/examples/autotm_fit_predict.py @@ -1,6 +1,7 @@ import logging import os import uuid +from typing import Dict, Any import pandas as pd from sklearn.model_selection import train_test_split @@ -11,10 +12,35 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger() +CONFIGURATIONS = { + "base": { + "alg_name": "ga", + "num_iterations": 2, + "num_individuals": 2, + "use_pipeline": True + }, + "static_chromosome": { + "alg_name": "ga", + "num_iterations": 2, + "num_individuals": 2, + "use_pipeline": False + }, + "surrogate": { + "alg_name": "ga", + "num_iterations": 2, + "num_individuals": 2, + "use_pipeline": True, + "surrogate_name": "random-forest-regressor" + }, + "bayes": { + "alg_name": "bayes", + "num_evaluations": 5, + } +} -def main(): + +def run(alg_name: str, alg_params: Dict[str, Any]): path_to_dataset = "data/sample_corpora/sample_dataset_lenta.csv" - alg_name = "ga" df = pd.read_csv(path_to_dataset) train_df, test_df = train_test_split(df, test_size=0.1) @@ -29,14 +55,7 @@ def main(): "min_tokens_count": 3 }, alg_name=alg_name, - alg_params={ - "num_iterations": 2, - "num_individuals": 2, - "use_nelder_mead_in_mutation": False, - "use_nelder_mead_in_crossover": False, - "use_nelder_mead_in_selector": False, - "train_option": "offline" - }, + alg_params=alg_params, working_dir_path=working_dir_path, exp_dataset_name="lenta_ru" ) @@ -54,5 +73,18 @@ def main(): logger.info(f"Calculated train mixtures: {mixtures.shape}\n\n{mixtures.head(10).to_string()}") +def main(conf_name: str = "base"): + if conf_name not in CONFIGURATIONS: + raise ValueError( + f"Unknown configuration {conf_name}. Available configurations: {sorted(CONFIGURATIONS.keys())}" + ) + + conf = CONFIGURATIONS[conf_name] + alg_name = conf['alg_name'] + del conf['alg_name'] + + run(alg_name=alg_name, alg_params=conf) + + if __name__ == "__main__": main() diff --git a/examples/experiments/experiment.py b/examples/experiments/experiment.py index d278915..a7e108d 100644 --- a/examples/experiments/experiment.py +++ b/examples/experiments/experiment.py @@ -17,18 +17,24 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) -SAVE_PATH = "/Users/Maksim.Zuev/PycharmProjects/AutoTMResources/datasets" +# SAVE_PATH = "/Users/Maksim.Zuev/PycharmProjects/AutoTMResources/datasets" +SAVE_PATH = "/home/nikolay/wspace/AutoTMDatasets" datasets = [ "hotel-reviews_sample", - "lenta_ru_sample" - "amazon_food_sample", - "20newsgroups_sample", - "banners_sample", + # "lenta_ru_sample" + # "amazon_food_sample", + # "20newsgroups_sample", + # "banners_sample", ] -num_iterations = 500 -num_fitness_evaluations = 150 -num_individuals = 11 +# num_iterations = 500 +# num_fitness_evaluations = 150 +# num_individuals = 11 + +num_iterations = 2 +num_fitness_evaluations = 10 +num_individuals = 4 + topic_count = 10 use_nelder_mead_in_mutation = False use_nelder_mead_in_crossover = False @@ -157,7 +163,7 @@ def main(): surrogate = None # "random-forest-regressor" for dataset_name in datasets: for use_pipeline in [False, True]: - for _ in range(10): + for _ in range(1): start_time = time.time() collector = suppress_stdout(lambda: run_single_experiment(os.path.curdir, dataset_name, use_pipeline, surrogate)) diff --git a/tests/integration/test_fit_predict.py b/tests/integration/test_fit_predict.py index e7a9c05..e98d54b 100644 --- a/tests/integration/test_fit_predict.py +++ b/tests/integration/test_fit_predict.py @@ -42,6 +42,7 @@ def test_fit_predict(pytestconfig): alg_params={ "num_iterations": 2, "num_individuals": 4, + "use_pipeline": False, "use_nelder_mead_in_mutation": False, "use_nelder_mead_in_crossover": False, "use_nelder_mead_in_selector": False,