diff --git a/autotm/algorithms_for_tuning/genetic_algorithm/ga.py b/autotm/algorithms_for_tuning/genetic_algorithm/ga.py index 7c1b43a..a2a1084 100644 --- a/autotm/algorithms_for_tuning/genetic_algorithm/ga.py +++ b/autotm/algorithms_for_tuning/genetic_algorithm/ga.py @@ -7,21 +7,20 @@ import sys import time import uuid -from typing import Optional, Tuple, Callable +from typing import Optional, Callable import numpy as np -from autotm.abstract_params import AbstractParams -from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector from autotm.algorithms_for_tuning.genetic_algorithm.selection import selection +from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import set_surrogate_fitness, Surrogate, \ get_prediction_uncertanty -from autotm.algorithms_for_tuning.individuals import make_individual, IndividualDTO, Individual +from autotm.algorithms_for_tuning.individuals import IndividualDTO, Individual, IndividualBuilder from autotm.algorithms_for_tuning.nelder_mead_optimization.nelder_mead import ( NelderMeadOptimization, ) from autotm.fitness.tasks import estimate_fitness, log_best_solution -from autotm.params import create_individual, FixedListParams +from autotm.params import create_individual from autotm.utils import AVG_COHERENCE_SCORE from autotm.visualization.dynamic_tracker import MetricsCollector @@ -60,6 +59,7 @@ def __init__( data_path, num_individuals, num_iterations, + ibuilder: IndividualBuilder, mutation_type="mutation_one_param", crossover_type="blend_crossover", selection_type="fitness_prop", @@ -116,6 +116,7 @@ def __init__( self.data_path = data_path self.num_individuals = num_individuals self.num_iterations = num_iterations + self.ibuilder = ibuilder self.mutation_type = mutation_type self.crossover_type = crossover_type self.selection = selection(selection_type) @@ -159,7 +160,7 @@ def estimate_fitness(self, population): if len(not_evaluated) > evaluations_limit: not_evaluated = not_evaluated[:evaluations_limit] self.evaluations_counter += len(not_evaluated) - new_evaluated = estimate_fitness(not_evaluated) + new_evaluated = estimate_fitness(self.ibuilder, not_evaluated) if self.statistics_collector: for individual in new_evaluated: self.statistics_collector.log_individual(individual) @@ -181,7 +182,7 @@ def init_population(self): train_option=self.train_option, ) # TODO: improve heuristic on search space - list_of_individuals.append(make_individual(dto=dto)) + list_of_individuals.append(self.ibuilder.make_individual(dto=dto)) population_with_fitness = self.estimate_fitness(list_of_individuals) self.save_params(population_with_fitness) @@ -211,7 +212,7 @@ def _calculate_uncertain_res(self, generation, iteration_num: int, proc=0.3): for individual in generation[:recalculate_num]: # copy individual_json = individual.dto.model_dump_json() - individual = make_individual(dto=IndividualDTO.model_validate_json(individual_json)) + individual = self.ibuilder.make_individual(dto=IndividualDTO.model_validate_json(individual_json)) individual.dto.fitness_value = None calculated.append(individual) @@ -236,7 +237,7 @@ def _calculate_uncertain_res(self, generation, iteration_num: int, proc=0.3): tag=self.tag, train_option=self.train_option, ) - calculated.append(make_individual(dto=dto)) + calculated.append(self.ibuilder.make_individual(dto=dto)) return calculated def save_params(self, population): @@ -331,7 +332,7 @@ def run_crossover(self, pairs_generator, surrogate_iteration, iteration_num: int train_option=self.train_option, ) for child in children] - individuals = [make_individual(child) for child in children_dto] + individuals = [self.ibuilder.make_individual(child) for child in children_dto] new_generation += individuals crossover_changes["parent_1_params"].append(i.params) @@ -391,7 +392,7 @@ def apply_nelder_mead(self, starting_points_set, num_gen, num_iterations=2): train_option=self.train_option, ) - new_population.append(make_individual(dto=solution_dto)) + new_population.append(self.ibuilder.make_individual(dto=solution_dto)) return new_population def run(self, verbose=False, visualize_results=False) -> Individual: @@ -594,7 +595,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: f"RUN ID {run_id}." ) best_solution = population[0] - log_best_solution(best_solution, alg_args=" ".join(sys.argv), is_tmp=True) + log_best_solution(self.ibuilder, best_solution, alg_args=" ".join(sys.argv), is_tmp=True) if visualize_results: self.metric_collector.save_and_visualise_trace() @@ -605,7 +606,7 @@ def run(self, verbose=False, visualize_results=False) -> Individual: self.statistics_collector.log_iteration(self.evaluations_counter, population[0].fitness_value) logger.info(f"Y: {y}") best_individual = population[0] - ind = log_best_solution(best_individual, alg_args=" ".join(sys.argv)) + ind = log_best_solution(self.ibuilder, best_individual, alg_args=" ".join(sys.argv)) logger.info(f"Logged the best solution. Obtained fitness is {ind.fitness_value}") return ind @@ -647,7 +648,7 @@ def run_mutation(self, population): tag=self.tag, train_option=self.train_option, ) - population[i] = make_individual(dto=dto) + population[i] = self.ibuilder.make_individual(dto=dto) # multistage bag of regularizers approach diff --git a/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py b/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py index 8d94141..f1f5a5c 100755 --- a/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py +++ b/autotm/algorithms_for_tuning/genetic_algorithm/genetic_algorithm.py @@ -6,6 +6,7 @@ from autotm.algorithms_for_tuning.genetic_algorithm.statistics_collector import StatisticsCollector from autotm.algorithms_for_tuning.genetic_algorithm.ga import GA +from autotm.algorithms_for_tuning.individuals import IndividualBuilder from autotm.fitness.tm import fit_tm, TopicModel from autotm.utils import make_log_config_dict @@ -41,6 +42,7 @@ def get_best_individual( train_option: str = "offline", quiet_log: bool = False, statistics_collector: Optional[StatisticsCollector] = None, + ibuilder: Optional[IndividualBuilder] = None, **kwargs ): """ @@ -101,6 +103,7 @@ def get_best_individual( data_path=data_path, num_individuals=num_individuals, num_iterations=num_iterations, + ibuilder=ibuilder or IndividualBuilder(), mutation_type=mutation_type, crossover_type=crossover_type, selection_type=selection_type, @@ -155,13 +158,15 @@ def run_algorithm( use_nelder_mead_in_selector: bool = False, train_option: str = "offline", quiet_log: bool = False, + individual_type: str = "regular" ) -> TopicModel: best_individual = get_best_individual(dataset, data_path, exp_id, topic_count, num_individuals, num_iterations, num_fitness_evaluations, mutation_type, crossover_type, selection_type, elem_cross_prob, cross_alpha, best_proc, log_file, tag, surrogate_name, gpr_kernel, gpr_alpha, gpr_normalize_y, use_pipeline, use_nelder_mead_in_mutation, use_nelder_mead_in_crossover, - use_nelder_mead_in_selector, train_option, quiet_log) + use_nelder_mead_in_selector, train_option, quiet_log, + ibuilder=IndividualBuilder(individual_type)) best_topic_model = fit_tm( preproc_data_path=data_path, diff --git a/autotm/algorithms_for_tuning/individuals.py b/autotm/algorithms_for_tuning/individuals.py index dba8887..1d115b1 100644 --- a/autotm/algorithms_for_tuning/individuals.py +++ b/autotm/algorithms_for_tuning/individuals.py @@ -142,7 +142,17 @@ def fitness_value(self) -> float: return alpha * self.dto.fitness_value[AVG_COHERENCE_SCORE] -def make_individual(dto: IndividualDTO) -> Individual: - # TODO: choose fitness by ENV var - return RegularFitnessIndividual(dto=dto) - # return SparsityScalerBasedFitnessIndividual(dto=dto) +class IndividualBuilder: + SUPPORTED_IND_TYPES = ["regular", "sparse"] + + def __init__(self, ind_type: str = "regular"): + self.ind_type = ind_type + + if self.ind_type not in self.SUPPORTED_IND_TYPES: + raise ValueError(f"Unsupported ind type: {self.ind_type}") + + def make_individual(self, dto: IndividualDTO) -> Individual: + if self.ind_type == "regular": + return RegularFitnessIndividual(dto=dto) + else: + return SparsityScalerBasedFitnessIndividual(dto=dto) diff --git a/autotm/fitness/cluster_tasks.py b/autotm/fitness/cluster_tasks.py index 7cbced9..a2a5708 100644 --- a/autotm/fitness/cluster_tasks.py +++ b/autotm/fitness/cluster_tasks.py @@ -1,9 +1,8 @@ import logging import os import time -import uuid from multiprocessing.process import current_process -from typing import List, Optional, Union, cast +from typing import List, Optional, cast import celery from billiard.exceptions import SoftTimeLimitExceeded @@ -13,11 +12,11 @@ from celery.utils.log import get_task_logger from tqdm import tqdm -from autotm.algorithms_for_tuning.individuals import Individual, make_individual +from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder from autotm.fitness.tm import fit_tm_of_individual from autotm.params_logging_utils import model_files, log_params_and_artifacts, log_stats from autotm.schemas import IndividualDTO -from autotm.utils import TqdmToLogger, AVG_COHERENCE_SCORE +from autotm.utils import TqdmToLogger logger = logging.getLogger("root") task_logger = get_task_logger(__name__) @@ -109,7 +108,8 @@ def calculate_fitness(self: Task, self.retry(max_retries=1, countdown=5) -def parallel_fitness(population: List[Individual], +def parallel_fitness(ibuilder: IndividualBuilder, + population: List[Individual], use_tqdm: bool = False, tqdm_check_period: int = 2, app: Optional[celery.Celery] = None) -> List[Individual]: @@ -170,17 +170,18 @@ def parallel_fitness(population: List[Individual], # restoring the order in the resulting population according to the initial population # results_by_id = {ind.id: ind for ind in (fitness_from_json(r) for r in results)} results_by_id = {ind.id: ind for ind in (IndividualDTO.parse_raw(r) for r in results)} - return [make_individual(results_by_id[ind.dto.id]) for ind in population] + return [ibuilder.make_individual(results_by_id[ind.dto.id]) for ind in population] -def log_best_solution(individual: Individual, +def log_best_solution(ibuilder: IndividualBuilder, + individual: Individual, wait_for_result_timeout: Optional[float] = None, alg_args: Optional[str] = None, is_tmp: bool = False, app: Optional[celery.Celery] = None) \ -> Individual: if is_tmp: - return make_individual(individual.dto) + return ibuilder.make_individual(individual.dto) # ind = fitness_to_json(individual) ind = individual.dto.json() logger.info(f"Sending a best individual to be logged: {ind}") @@ -207,28 +208,6 @@ def log_best_solution(individual: Individual, r = result.get(timeout=wait_for_result_timeout) r = IndividualDTO.parse_raw(r) - ind = make_individual(r) + ind = ibuilder.make_individual(r) return ind - - -class FitnessCalculatorWrapper: - def __init__(self, dataset, data_path, topic_count, train_option): - self.dataset = dataset - self.data_path = data_path - self.topic_count = topic_count - self.train_option = train_option - - def run(self, params): - params = list(params) - params = params[:-1] + [0, 0, 0] + [params[-1]] - - solution_dto = IndividualDTO(id=str(uuid.uuid4()), - dataset=self.dataset, - params=params, - alg_id="ga", - topic_count=self.topic_count, train_option=self.train_option) - - dto = parallel_fitness([solution_dto])[0] - result = dto.fitness_value[AVG_COHERENCE_SCORE] - return -result diff --git a/autotm/fitness/estimator.py b/autotm/fitness/estimator.py index 3b3ea32..2112537 100644 --- a/autotm/fitness/estimator.py +++ b/autotm/fitness/estimator.py @@ -1,8 +1,8 @@ import copy import logging import math +import uuid from abc import ABC, abstractmethod -from dataclasses import dataclass from typing import List import numpy as np @@ -10,41 +10,34 @@ from autotm.abstract_params import AbstractParams from autotm.algorithms_for_tuning.genetic_algorithm.surrogate import Surrogate, set_surrogate_fitness, \ get_prediction_uncertanty -from autotm.algorithms_for_tuning.individuals import Individual, make_individual +from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder from autotm.schemas import IndividualDTO - logger = logging.getLogger(__name__) -@dataclass -class EstimatedParams: - all_params: List[AbstractParams] - all_fitness: List[float] - - def add(self, params: AbstractParams, fitness: float): - self.all_params.append(params) - self.all_fitness.append(fitness) - - class FitnessEstimator(ABC): @abstractmethod - def fit(self, iter_num: int, estimated_params: EstimatedParams) -> None: + def fit(self, iter_num: int) -> None: ... @abstractmethod - def estimate(self, iter_num: int, population: List[IndividualDTO]) -> List[IndividualDTO]: + def estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: ... class SurrogateEnabledFitnessEstimator(FitnessEstimator): SUPPORTED_CALC_SCHEMES = ["type1", "type2"] - def __init__(self, surrogate: Surrogate, calc_scheme: str, speedup: bool = True): + def __init__(self, ibuilder: IndividualBuilder, surrogate: Surrogate, calc_scheme: str, speedup: bool = True): + self.ibuilder = ibuilder self.surrogate = surrogate self.calc_scheme = calc_scheme self.speedup = speedup + self.all_params: List[AbstractParams] = [] + self.all_fitness: List[float] = [] + if calc_scheme not in self.SUPPORTED_CALC_SCHEMES: raise ValueError(f"Unexpected surrogate scheme! {self.calc_scheme}") @@ -52,12 +45,12 @@ def __init__(self, surrogate: Surrogate, calc_scheme: str, speedup: bool = True) def surrogate_iteration(iter_num: int) -> bool: return (iter_num % 2 != 0) if iter_num > 0 else False - def fit(self, iter_num: int, estimated_params: EstimatedParams) -> None: + def fit(self, iter_num: int) -> None: surrogate_iteration = self.surrogate_iteration(iter_num) if (self.calc_scheme == "type1" and not surrogate_iteration) or (self.calc_scheme == "type2"): - self.surrogate.fit(np.array(estimated_params.all_params), np.array(estimated_params.all_fitness)) + self.surrogate.fit(np.array(self.all_params), np.array(self.all_fitness)) - def estimate(self, iter_num: int, population: List[IndividualDTO]) -> List[IndividualDTO]: + def estimate(self, iter_num: int, population: List[Individual]) -> List[Individual]: surrogate_iteration = self.surrogate_iteration(iter_num) if not self.speedup or not surrogate_iteration: @@ -67,7 +60,7 @@ def estimate(self, iter_num: int, population: List[IndividualDTO]) -> List[Indiv if self.calc_scheme == "type1" and surrogate_iteration: population = self.surrogate_calculation(population) elif self.calc_scheme == "type2": - population = self._calculate_uncertain_res(population) + population = self._calculate_uncertain_res(iter_num, population) self.save_params(population) return population @@ -77,7 +70,6 @@ def surrogate_calculation(self, population: List[Individual]): y_pred = self.surrogate.predict(X_val) if not self.speedup: - # TODO: ??? y_val = np.array([individ.fitness_value for individ in population]) def check_val(fval): @@ -107,7 +99,7 @@ def check_params(p): return population - def _calculate_uncertain_res(self, iter_num: int, population: List[Individual], proc=0.3): + def _calculate_uncertain_res(self, iter_num: int, population: List[Individual], proc:float = 0.3): if len(population) == 0: return [] @@ -125,7 +117,7 @@ def _calculate_uncertain_res(self, iter_num: int, population: List[Individual], for individual in population[:recalculate_num]: # copy individual_json = individual.dto.model_dump_json() - individual = make_individual(dto=IndividualDTO.model_validate_json(individual_json)) + individual = self.ibuilder.make_individual(dto=IndividualDTO.model_validate_json(individual_json)) individual.dto.fitness_value = None calculated.append(individual) @@ -150,7 +142,8 @@ def _calculate_uncertain_res(self, iter_num: int, population: List[Individual], tag=self.tag, train_option=self.train_option, ) - calculated.append(make_individual(dto=dto)) + # TODO: replace + calculated.append(self.ibuilder.make_individual(dto=dto)) return calculated def save_params(self, population): diff --git a/autotm/fitness/local_tasks.py b/autotm/fitness/local_tasks.py index d09a625..6bd477a 100644 --- a/autotm/fitness/local_tasks.py +++ b/autotm/fitness/local_tasks.py @@ -1,7 +1,7 @@ import logging from typing import List, Optional -from autotm.algorithms_for_tuning.individuals import make_individual, Individual +from autotm.algorithms_for_tuning.individuals import Individual, IndividualBuilder from autotm.fitness.tm import fit_tm_of_individual from autotm.params_logging_utils import log_params_and_artifacts, log_stats, model_files from autotm.schemas import IndividualDTO @@ -56,7 +56,7 @@ def calculate_fitness( ) -def estimate_fitness(population: List[Individual]) -> List[Individual]: +def estimate_fitness(ibuilder: IndividualBuilder, population: List[Individual]) -> List[Individual]: logger.info("Calculating fitness...") population_with_fitness = [] for individual in population: @@ -65,19 +65,20 @@ def estimate_fitness(population: List[Individual]) -> List[Individual]: population_with_fitness.append(individual) continue individ_with_fitness = calculate_fitness(individual.dto) - population_with_fitness.append(make_individual(individ_with_fitness)) + population_with_fitness.append(ibuilder.make_individual(individ_with_fitness)) logger.info("The fitness results have been obtained") return population_with_fitness def log_best_solution( + ibuilder: IndividualBuilder, individual: IndividualDTO, wait_for_result_timeout: Optional[float] = None, alg_args: Optional[str] = None, is_tmp: bool = False, ): logger.info("Sending a best individual to be logged") - res = make_individual(calculate_fitness(individual.dto, + res = ibuilder.make_individual(calculate_fitness(individual.dto, log_artifact_and_parameters=True, is_tmp=is_tmp))