Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching for results extraction #487

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 159 additions & 88 deletions src/tools/extract_best_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
import dictdiffer
from tqdm import tqdm
import traceback

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from pure_funcs import config_pretty_str
from copy import deepcopy

# Append parent directories to the path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from procedures import utc_ms, make_get_filepath, dump_config, format_config
from pure_funcs import (
config_pretty_str,
flatten_dict,
ts_to_date_utc,
backtested_multiconfig2live_multiconfig,
sort_dict_keys,
config_pretty_str,
)

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from procedures import utc_ms, make_get_filepath, dump_config, format_config


def data_generator(all_results_filename, verbose=False):
"""
Expand All @@ -31,15 +31,13 @@ def data_generator(all_results_filename, verbose=False):

Args:
all_results_filename (str): Path to the all_results.txt file.
verbose (bool): If True, disable all printing and progress tracking.
verbose (bool): If True, enables printing and progress tracking.

Yields:
dict: The full data dictionary at each step.
"""
prev_data = None
# Get the total file size in bytes
file_size = os.path.getsize(all_results_filename)
# Disable progress bar and printing if verbose is True
with open(all_results_filename, "r") as f:
with tqdm(
total=file_size,
Expand All @@ -55,11 +53,11 @@ def data_generator(all_results_filename, verbose=False):
try:
data = json.loads(line)
if "diff" not in data:
# This is the first entry; full data is provided
# First entry; full data provided.
prev_data = data
yield deepcopy(prev_data)
else:
# Apply the diff to the previous data to get the current data
# Apply the diff to the previous data.
diff = data["diff"]
for i in range(len(diff)):
if len(diff[i]) == 2:
Expand All @@ -76,7 +74,6 @@ def data_generator(all_results_filename, verbose=False):
pbar.close()


# Function definitions remain unchanged
def calc_dist(p0, p1):
return ((p0[0] - p1[0]) ** 2 + (p0[1] - p1[1]) ** 2) ** 0.5

Expand Down Expand Up @@ -110,14 +107,18 @@ def calc_pareto_front_d(objectives: dict, higher_is_better: [bool]):
for kcandidate in sorted_keys:
is_dominated = False
for kmember in pareto_front:
if dominates_d(objectives[kmember], objectives[kcandidate], higher_is_better):
if dominates_d(
objectives[kmember], objectives[kcandidate], higher_is_better
):
is_dominated = True
break
if not is_dominated:
pareto_front = [
kmember
for kmember in pareto_front
if not dominates_d(objectives[kcandidate], objectives[kmember], higher_is_better)
if not dominates_d(
objectives[kcandidate], objectives[kmember], higher_is_better
)
]
pareto_front.append(kcandidate)
return pareto_front
Expand All @@ -130,125 +131,195 @@ def gprint(verbose):
return lambda *args, **kwargs: None


def process_single(file_location, verbose=False):
def process_batch(batch, analysis_prefix, verbose):
"""
Process a batch of records to select its best candidate.

This function:
1. Flattens each record in the batch.
2. Converts the flattened records into a DataFrame.
3. Filters for candidate rows (using keys 'w_0' and 'w_1' with the appropriate prefix).
4. Computes the Pareto front and selects the candidate closest to the ideal point.

Args:
batch (list): List of records (dicts).
analysis_prefix (str): Prefix for analysis keys (e.g., "analysis_" or "analyses_combined_").
verbose (bool): Controls printing.

Returns:
dict: The best candidate from the batch.
"""
print_ = gprint(verbose)
# Flatten all records in the batch.
flat_records = [flatten_dict(x) for x in batch]
df = pd.DataFrame(flat_records)
keys = [analysis_prefix + "w_0", analysis_prefix + "w_1"]
higher_is_better = [False, False]

if keys[0] in df.columns and keys[1] in df.columns:
candidates = df[(df[keys[0]] <= 0.0) & (df[keys[1]] <= 0.0)][keys]
if candidates.empty:
candidates = df[keys]
else:
candidates = df

if len(candidates) == 1:
best_index = candidates.index[0]
else:
obj_dict = {i: candidates.loc[i, keys].tolist() for i in candidates.index}
pareto_indices = calc_pareto_front_d(obj_dict, higher_is_better)
if len(pareto_indices) == 1:
best_index = pareto_indices[0]
else:
norm_candidates = (candidates - candidates.min()) / (
candidates.max() - candidates.min()
)
norm_pareto = norm_candidates.loc[pareto_indices]
distances = [calc_dist(row, [0.0, 0.0]) for row in norm_pareto.values]
best_index = norm_pareto.index[distances.index(min(distances))]
print_("Best candidate in batch:")
print_(candidates.loc[best_index])
print_("Pareto front:")
res_to_print = df[
[col for col in df.columns if analysis_prefix[:-1] in col]
].loc[norm_pareto.index]
res_to_print.columns = [
col.replace(analysis_prefix, "") for col in res_to_print.columns
]
print_(res_to_print)
return batch[best_index]


def process_single(file_location, verbose=False, batch_size=1000):
"""
Process a single results file in batches.

Instead of loading all records into memory, records are read sequentially
and processed in batches. For each batch, the best candidate is selected.
Finally, the list of batch winners is reduced to determine the overall best candidate.

Args:
file_location (str): Path to the results file.
verbose (bool): Enable verbose output.
batch_size (int): Number of records to process per batch.

Returns:
dict: The final best candidate.
"""
print_ = gprint(verbose)

# First, try to load the file as a full JSON (shortcut if possible)
try:
result = json.load(open(file_location))
print_(config_pretty_str(sort_dict_keys(result)))
return result
except:
pass
xs = []
for x in data_generator(file_location, verbose=verbose):
if x:
xs.append(x)
if not xs:

# Use the generator to read the file sequentially.
gen = data_generator(file_location, verbose=verbose)
first = None
for record in gen:
if record:
first = record
break
if not first:
print_(f"No valid data found in {file_location}")
return None
print_("Processing...")
res = pd.DataFrame([flatten_dict(x) for x in xs])

# Determine the prefix based on the data
if "analyses_combined" in xs[0]:
# Determine analysis prefix based on the first record.
if "analyses_combined" in first:
analysis_prefix = "analyses_combined_"
analysis_key = "analyses_combined"
elif "analysis" in xs[0]:
elif "analysis" in first:
analysis_prefix = "analysis_"
analysis_key = "analysis"
else:
raise Exception("Neither 'analyses_combined' nor 'analysis' found in data")

keys, higher_is_better = ["w_0", "w_1"], [False, False]
keys = [analysis_prefix + key for key in keys]
print_("n backtests", len(res))

# Adjust the filtering condition based on the prefix
res_keys_w_0 = res[analysis_prefix + "w_0"]
res_keys_w_1 = res[analysis_prefix + "w_1"]
candidates = res[(res_keys_w_0 <= 0.0) & (res_keys_w_1 <= 0.0)][keys]
if len(candidates) == 0:
candidates = res[keys]
print_("n candidates", len(candidates))
if len(candidates) == 1:
best = candidates.iloc[0].name
pareto = candidates
else:
pareto = candidates.loc[
calc_pareto_front_d(
{i: x for i, x in zip(candidates.index, candidates.values)}, higher_is_better
)
]
cands_norm = (candidates - candidates.min()) / (candidates.max() - candidates.min())
pareto_norm = (pareto - candidates.min()) / (candidates.max() - candidates.min())
dists = [calc_dist(p, [float(x) for x in higher_is_better]) for p in pareto_norm.values]
pareto_w_dists = pareto_norm.join(
pd.Series(dists, name="dist_to_ideal", index=pareto_norm.index)
)
closest_to_ideal = pareto_w_dists.sort_values("dist_to_ideal")
best = closest_to_ideal.dist_to_ideal.idxmin()
print_("best")
print_(candidates.loc[best])
print_("pareto front:")
res_to_print = res[[x for x in res.columns if analysis_prefix[:-1] in x]].loc[
closest_to_ideal.index
]
res_to_print.columns = [x.replace(analysis_prefix, "") for x in res_to_print.columns]
print_(res_to_print)

# Processing the best result for configuration
best_d = xs[best]
# Adjust for 'analysis' or 'analyses_combined'
best_d[analysis_key]["n_iters"] = len(xs)
if "config" in best_d:
best_d.update(deepcopy(best_d["config"]))
del best_d["config"]
fjson = config_pretty_str(best_d)
total_records = 1 # first record already read
batch_candidates = []
current_batch = [first]

# Read remaining records in batches.
for record in gen:
if record:
current_batch.append(record)
total_records += 1
if len(current_batch) >= batch_size:
candidate = process_batch(current_batch, analysis_prefix, verbose)
batch_candidates.append(candidate)
current_batch = []
if current_batch:
candidate = process_batch(current_batch, analysis_prefix, verbose)
batch_candidates.append(candidate)

print_(
f"Processed {total_records} records in batches; found {len(batch_candidates)} batch winners."
)

# Final reduction: process the list of batch winners to get the overall best candidate.
final_candidate = process_batch(batch_candidates, analysis_prefix, verbose)

# Update configuration information.
final_candidate[analysis_key]["n_iters"] = total_records
if "config" in final_candidate:
final_candidate.update(deepcopy(final_candidate["config"]))
del final_candidate["config"]

fjson = config_pretty_str(final_candidate)
print_(fjson)
coins = [s.replace("USDT", "") for s in best_d["backtest"]["symbols"]]
print_(file_location)

full_path = file_location.replace("_all_results.txt", "") + ".json"
base_path = os.path.split(full_path)[0]
full_path = make_get_filepath(full_path.replace(base_path, base_path + "_analysis/"))
pareto_to_dump = [x for i, x in enumerate(xs) if i in pareto.index]
for i in range(len(pareto_to_dump)):
if "config" in pareto_to_dump[i]:
pareto_to_dump[i].update(deepcopy(pareto_to_dump[i]["config"]))
del pareto_to_dump[i]["config"]
full_path = make_get_filepath(
full_path.replace(base_path, base_path + "_analysis/")
)

# Dump the batch winners (for reference) and the final configuration.
with open(full_path.replace(".json", "_pareto.txt"), "w") as f:
for x in pareto_to_dump:
f.write(json.dumps(x) + "\n")
dump_config(format_config(best_d), full_path)
return best_d
for candidate in batch_candidates:
f.write(json.dumps(candidate) + "\n")
dump_config(format_config(final_candidate), full_path)
return final_candidate


def main(args):
if os.path.isdir(args.file_location):
for fname in sorted(os.listdir(args.file_location), reverse=True):
fpath = os.path.join(args.file_location, fname)
try:
process_single(fpath)
process_single(fpath, args.verbose, args.batch_size)
print(f"successfully processed {fpath}")
except Exception as e:
print(f"error with {fpath} {e}")
print(f"error with {fpath}: {e}")
traceback.print_exc()
else:
try:
result = process_single(args.file_location, args.verbose)
result = process_single(args.file_location, args.verbose, args.batch_size)
print(f"successfully processed {args.file_location}")
except Exception as e:
print(f"error with {args.file_location} {e}")
print(f"error with {args.file_location}: {e}")
traceback.print_exc()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process results.")
parser.add_argument("file_location", type=str, help="Location of the results file or directory")
parser = argparse.ArgumentParser(description="Process results in batches.")
parser.add_argument(
"file_location", type=str, help="Location of the results file or directory"
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Disable printing and progress tracking",
help="Enable verbose printing and progress tracking",
)
parser.add_argument(
"--batch-size",
type=int,
default=10000,
help="Number of records to process per batch (default: 1000)",
)
args = parser.parse_args()

main(args)