diff --git a/src/tools/extract_best_config.py b/src/tools/extract_best_config.py index 81904e3d5..0d13721c5 100644 --- a/src/tools/extract_best_config.py +++ b/src/tools/extract_best_config.py @@ -8,21 +8,21 @@ import dictdiffer from tqdm import tqdm import traceback - -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -from pure_funcs import config_pretty_str from copy import deepcopy +# Append parent directories to the path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -from procedures import utc_ms, make_get_filepath, dump_config, format_config from pure_funcs import ( + config_pretty_str, flatten_dict, ts_to_date_utc, backtested_multiconfig2live_multiconfig, sort_dict_keys, - config_pretty_str, ) +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +from procedures import utc_ms, make_get_filepath, dump_config, format_config + def data_generator(all_results_filename, verbose=False): """ @@ -31,15 +31,13 @@ def data_generator(all_results_filename, verbose=False): Args: all_results_filename (str): Path to the all_results.txt file. - verbose (bool): If True, disable all printing and progress tracking. + verbose (bool): If True, enables printing and progress tracking. Yields: dict: The full data dictionary at each step. """ prev_data = None - # Get the total file size in bytes file_size = os.path.getsize(all_results_filename) - # Disable progress bar and printing if verbose is True with open(all_results_filename, "r") as f: with tqdm( total=file_size, @@ -55,11 +53,11 @@ def data_generator(all_results_filename, verbose=False): try: data = json.loads(line) if "diff" not in data: - # This is the first entry; full data is provided + # First entry; full data provided. prev_data = data yield deepcopy(prev_data) else: - # Apply the diff to the previous data to get the current data + # Apply the diff to the previous data. diff = data["diff"] for i in range(len(diff)): if len(diff[i]) == 2: @@ -76,7 +74,6 @@ def data_generator(all_results_filename, verbose=False): pbar.close() -# Function definitions remain unchanged def calc_dist(p0, p1): return ((p0[0] - p1[0]) ** 2 + (p0[1] - p1[1]) ** 2) ** 0.5 @@ -110,14 +107,18 @@ def calc_pareto_front_d(objectives: dict, higher_is_better: [bool]): for kcandidate in sorted_keys: is_dominated = False for kmember in pareto_front: - if dominates_d(objectives[kmember], objectives[kcandidate], higher_is_better): + if dominates_d( + objectives[kmember], objectives[kcandidate], higher_is_better + ): is_dominated = True break if not is_dominated: pareto_front = [ kmember for kmember in pareto_front - if not dominates_d(objectives[kcandidate], objectives[kmember], higher_is_better) + if not dominates_d( + objectives[kcandidate], objectives[kmember], higher_is_better + ) ] pareto_front.append(kcandidate) return pareto_front @@ -130,95 +131,158 @@ def gprint(verbose): return lambda *args, **kwargs: None -def process_single(file_location, verbose=False): +def process_batch(batch, analysis_prefix, verbose): + """ + Process a batch of records to select its best candidate. + + This function: + 1. Flattens each record in the batch. + 2. Converts the flattened records into a DataFrame. + 3. Filters for candidate rows (using keys 'w_0' and 'w_1' with the appropriate prefix). + 4. Computes the Pareto front and selects the candidate closest to the ideal point. + + Args: + batch (list): List of records (dicts). + analysis_prefix (str): Prefix for analysis keys (e.g., "analysis_" or "analyses_combined_"). + verbose (bool): Controls printing. + + Returns: + dict: The best candidate from the batch. + """ + print_ = gprint(verbose) + # Flatten all records in the batch. + flat_records = [flatten_dict(x) for x in batch] + df = pd.DataFrame(flat_records) + keys = [analysis_prefix + "w_0", analysis_prefix + "w_1"] + higher_is_better = [False, False] + + if keys[0] in df.columns and keys[1] in df.columns: + candidates = df[(df[keys[0]] <= 0.0) & (df[keys[1]] <= 0.0)][keys] + if candidates.empty: + candidates = df[keys] + else: + candidates = df + + if len(candidates) == 1: + best_index = candidates.index[0] + else: + obj_dict = {i: candidates.loc[i, keys].tolist() for i in candidates.index} + pareto_indices = calc_pareto_front_d(obj_dict, higher_is_better) + if len(pareto_indices) == 1: + best_index = pareto_indices[0] + else: + norm_candidates = (candidates - candidates.min()) / ( + candidates.max() - candidates.min() + ) + norm_pareto = norm_candidates.loc[pareto_indices] + distances = [calc_dist(row, [0.0, 0.0]) for row in norm_pareto.values] + best_index = norm_pareto.index[distances.index(min(distances))] + print_("Best candidate in batch:") + print_(candidates.loc[best_index]) + print_("Pareto front:") + res_to_print = df[ + [col for col in df.columns if analysis_prefix[:-1] in col] + ].loc[norm_pareto.index] + res_to_print.columns = [ + col.replace(analysis_prefix, "") for col in res_to_print.columns + ] + print_(res_to_print) + return batch[best_index] + + +def process_single(file_location, verbose=False, batch_size=1000): + """ + Process a single results file in batches. + + Instead of loading all records into memory, records are read sequentially + and processed in batches. For each batch, the best candidate is selected. + Finally, the list of batch winners is reduced to determine the overall best candidate. + + Args: + file_location (str): Path to the results file. + verbose (bool): Enable verbose output. + batch_size (int): Number of records to process per batch. + + Returns: + dict: The final best candidate. + """ print_ = gprint(verbose) + + # First, try to load the file as a full JSON (shortcut if possible) try: result = json.load(open(file_location)) print_(config_pretty_str(sort_dict_keys(result))) return result except: pass - xs = [] - for x in data_generator(file_location, verbose=verbose): - if x: - xs.append(x) - if not xs: + + # Use the generator to read the file sequentially. + gen = data_generator(file_location, verbose=verbose) + first = None + for record in gen: + if record: + first = record + break + if not first: print_(f"No valid data found in {file_location}") return None - print_("Processing...") - res = pd.DataFrame([flatten_dict(x) for x in xs]) - # Determine the prefix based on the data - if "analyses_combined" in xs[0]: + # Determine analysis prefix based on the first record. + if "analyses_combined" in first: analysis_prefix = "analyses_combined_" analysis_key = "analyses_combined" - elif "analysis" in xs[0]: + elif "analysis" in first: analysis_prefix = "analysis_" analysis_key = "analysis" else: raise Exception("Neither 'analyses_combined' nor 'analysis' found in data") - keys, higher_is_better = ["w_0", "w_1"], [False, False] - keys = [analysis_prefix + key for key in keys] - print_("n backtests", len(res)) - - # Adjust the filtering condition based on the prefix - res_keys_w_0 = res[analysis_prefix + "w_0"] - res_keys_w_1 = res[analysis_prefix + "w_1"] - candidates = res[(res_keys_w_0 <= 0.0) & (res_keys_w_1 <= 0.0)][keys] - if len(candidates) == 0: - candidates = res[keys] - print_("n candidates", len(candidates)) - if len(candidates) == 1: - best = candidates.iloc[0].name - pareto = candidates - else: - pareto = candidates.loc[ - calc_pareto_front_d( - {i: x for i, x in zip(candidates.index, candidates.values)}, higher_is_better - ) - ] - cands_norm = (candidates - candidates.min()) / (candidates.max() - candidates.min()) - pareto_norm = (pareto - candidates.min()) / (candidates.max() - candidates.min()) - dists = [calc_dist(p, [float(x) for x in higher_is_better]) for p in pareto_norm.values] - pareto_w_dists = pareto_norm.join( - pd.Series(dists, name="dist_to_ideal", index=pareto_norm.index) - ) - closest_to_ideal = pareto_w_dists.sort_values("dist_to_ideal") - best = closest_to_ideal.dist_to_ideal.idxmin() - print_("best") - print_(candidates.loc[best]) - print_("pareto front:") - res_to_print = res[[x for x in res.columns if analysis_prefix[:-1] in x]].loc[ - closest_to_ideal.index - ] - res_to_print.columns = [x.replace(analysis_prefix, "") for x in res_to_print.columns] - print_(res_to_print) - - # Processing the best result for configuration - best_d = xs[best] - # Adjust for 'analysis' or 'analyses_combined' - best_d[analysis_key]["n_iters"] = len(xs) - if "config" in best_d: - best_d.update(deepcopy(best_d["config"])) - del best_d["config"] - fjson = config_pretty_str(best_d) + total_records = 1 # first record already read + batch_candidates = [] + current_batch = [first] + + # Read remaining records in batches. + for record in gen: + if record: + current_batch.append(record) + total_records += 1 + if len(current_batch) >= batch_size: + candidate = process_batch(current_batch, analysis_prefix, verbose) + batch_candidates.append(candidate) + current_batch = [] + if current_batch: + candidate = process_batch(current_batch, analysis_prefix, verbose) + batch_candidates.append(candidate) + + print_( + f"Processed {total_records} records in batches; found {len(batch_candidates)} batch winners." + ) + + # Final reduction: process the list of batch winners to get the overall best candidate. + final_candidate = process_batch(batch_candidates, analysis_prefix, verbose) + + # Update configuration information. + final_candidate[analysis_key]["n_iters"] = total_records + if "config" in final_candidate: + final_candidate.update(deepcopy(final_candidate["config"])) + del final_candidate["config"] + + fjson = config_pretty_str(final_candidate) print_(fjson) - coins = [s.replace("USDT", "") for s in best_d["backtest"]["symbols"]] print_(file_location) + full_path = file_location.replace("_all_results.txt", "") + ".json" base_path = os.path.split(full_path)[0] - full_path = make_get_filepath(full_path.replace(base_path, base_path + "_analysis/")) - pareto_to_dump = [x for i, x in enumerate(xs) if i in pareto.index] - for i in range(len(pareto_to_dump)): - if "config" in pareto_to_dump[i]: - pareto_to_dump[i].update(deepcopy(pareto_to_dump[i]["config"])) - del pareto_to_dump[i]["config"] + full_path = make_get_filepath( + full_path.replace(base_path, base_path + "_analysis/") + ) + + # Dump the batch winners (for reference) and the final configuration. with open(full_path.replace(".json", "_pareto.txt"), "w") as f: - for x in pareto_to_dump: - f.write(json.dumps(x) + "\n") - dump_config(format_config(best_d), full_path) - return best_d + for candidate in batch_candidates: + f.write(json.dumps(candidate) + "\n") + dump_config(format_config(final_candidate), full_path) + return final_candidate def main(args): @@ -226,29 +290,36 @@ def main(args): for fname in sorted(os.listdir(args.file_location), reverse=True): fpath = os.path.join(args.file_location, fname) try: - process_single(fpath) + process_single(fpath, args.verbose, args.batch_size) print(f"successfully processed {fpath}") except Exception as e: - print(f"error with {fpath} {e}") + print(f"error with {fpath}: {e}") traceback.print_exc() else: try: - result = process_single(args.file_location, args.verbose) + result = process_single(args.file_location, args.verbose, args.batch_size) print(f"successfully processed {args.file_location}") except Exception as e: - print(f"error with {args.file_location} {e}") + print(f"error with {args.file_location}: {e}") traceback.print_exc() if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Process results.") - parser.add_argument("file_location", type=str, help="Location of the results file or directory") + parser = argparse.ArgumentParser(description="Process results in batches.") + parser.add_argument( + "file_location", type=str, help="Location of the results file or directory" + ) parser.add_argument( "-v", "--verbose", action="store_true", - help="Disable printing and progress tracking", + help="Enable verbose printing and progress tracking", + ) + parser.add_argument( + "--batch-size", + type=int, + default=10000, + help="Number of records to process per batch (default: 1000)", ) args = parser.parse_args() - main(args)