Skip to content

Commit

Permalink
Merge branch 'ds_refactor' of github.com:microsoft/RD-Agent into ds_r…
Browse files Browse the repository at this point in the history
…efactor
  • Loading branch information
XianBW committed Dec 25, 2024
2 parents 75b40d8 + 9a4ba5f commit 84f0fb8
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 107 deletions.
4 changes: 2 additions & 2 deletions rdagent/app/data_science/loop.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pathlib import Path
from typing import Any

from pathlib import Path
import fire

from rdagent.app.data_science.conf import DS_RD_SETTING
Expand Down Expand Up @@ -100,7 +100,7 @@ def feedback(self, prev_out: dict[str, Any]):
self.trace.hist.append((prev_out["direct_exp_gen"].hypothesis, prev_out["running"], feedback))


def main(path=None, step_n=None, competition=None):
def main(path=None, step_n=None, competition="bms-molecular-translation"):
"""
Auto R&D Evolving loop for models in a kaggle{} scenario.
You can continue running session by
Expand Down
5 changes: 3 additions & 2 deletions rdagent/components/coder/CoSTEER/evolving_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from rdagent.core.evolving_framework import EvolvableSubjects
from rdagent.core.exception import CoderError


class FilterFailedRAGEvoAgent(RAGEvoAgent):
def filter_evolvable_subjects_by_feedback(
self, evo: EvolvableSubjects, feedback: CoSTEERSingleFeedbackDeprecated
Expand All @@ -15,8 +16,8 @@ def filter_evolvable_subjects_by_feedback(
for index in range(len(evo.sub_workspace_list)):
if evo.sub_workspace_list[index] is not None and feedback[index] and not feedback[index].final_decision:
evo.sub_workspace_list[index].clear()

if all(not f.final_decision for f in feedback if f):
raise CoderError("All feedbacks of sub tasks are negative.")

return evo
1 change: 0 additions & 1 deletion rdagent/components/coder/data_science/ensemble/exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@
from rdagent.components.coder.CoSTEER.task import CoSTEERTask
from rdagent.core.utils import cache_with_pickle


EnsembleTask = CoSTEERTask
3 changes: 2 additions & 1 deletion rdagent/components/coder/data_science/feature/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def evaluate(
test_code = f.read()
implementation.inject_files(**{fname: test_code})
stdout = implementation.execute(env=de, entry=f"python {fname}")

if stdout is None:
stdout = "The execution exceeded the time limit, and no stdout information has been generated yet."
system_prompt = T(".prompts:feature_eval.system").r(
test_code=test_code, code=implementation.file_dict["feat01.py"]
)
Expand Down
1 change: 0 additions & 1 deletion rdagent/components/coder/data_science/feature/exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@
from rdagent.components.coder.CoSTEER.task import CoSTEERTask
from rdagent.core.utils import cache_with_pickle


FeatureTask = CoSTEERTask
2 changes: 2 additions & 0 deletions rdagent/components/coder/data_science/model/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def evaluate(
test_code = f.read()
implementation.inject_files(**{fname: test_code})
stdout = implementation.execute(env=de, entry=f"python {fname}")
if stdout is None:
stdout = "The execution exceeded the time limit, and no stdout information has been generated yet."
system_prompt = T(".prompts:model_eval.system").r(
test_code=test_code, scenario="No scenario information yet.", spec=implementation.file_dict["spec/model.md"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def evaluate(
test_code = f.read()
implementation.inject_files(**{fname: test_code})
stdout = implementation.execute(env=de, entry=f"python {fname}")

if stdout is None:
stdout = "The execution exceeded the time limit, and no stdout information has been generated yet."
system_prompt = T(".prompts:data_loader_eval.system").r(
test_code=test_code, code=implementation.file_dict["load_data.py"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@
from rdagent.utils.agent.tpl import T
from rdagent.utils.env import DockerEnv, DSDockerConf


DataLoaderTask = CoSTEERTask
2 changes: 2 additions & 0 deletions rdagent/components/coder/data_science/workflow/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def evaluate(
de = DockerEnv(conf=ds_docker_conf)
fname = "main.py"
stdout = implementation.execute(env=de, entry=f"python {fname}")
if stdout is None:
stdout = "The execution exceeded the time limit, and no stdout information has been generated yet."
system_prompt = T(".prompts:workflow_eval.system").r(
scenario="No scenario information yet.", spec=implementation.file_dict["spec/workflow.md"]
)
Expand Down
3 changes: 1 addition & 2 deletions rdagent/components/coder/data_science/workflow/exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@
from rdagent.components.coder.CoSTEER.task import CoSTEERTask
from rdagent.core.utils import cache_with_pickle


WorkflowTask = CoSTEERTask
WorkflowTask = CoSTEERTask
1 change: 1 addition & 0 deletions rdagent/core/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_task_information(self) -> str:
def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.name}>"


ASpecificTask = TypeVar("ASpecificTask", bound=Task)


Expand Down
2 changes: 1 addition & 1 deletion rdagent/log/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import os
import sys
import pickle
import sys
from contextlib import contextmanager
from datetime import datetime, timezone
from functools import partial
Expand Down
209 changes: 121 additions & 88 deletions rdagent/scenarios/data_science/debug/data.py
Original file line number Diff line number Diff line change
@@ -1,133 +1,166 @@
import os
from pathlib import Path

import platform
import pandas as pd
import fire
import shutil

from rdagent.app.kaggle.conf import KAGGLE_IMPLEMENT_SETTING


class DataHandler:
"""Base DataHandler interface."""

def load(self, path) -> pd.DataFrame:
...
raise NotImplementedError

def dump(self, df: pd.DataFrame, path):
...
raise NotImplementedError


class CSVDataHandler(DataHandler):
class GenericDataHandler(DataHandler):
"""
A generic data handler that automatically detects file type based on suffix
and uses the correct pandas method for load/dump.
"""

def load(self, path) -> pd.DataFrame:
return pd.read_csv(path)
path = Path(path)
suffix = path.suffix.lower()

if suffix == ".csv":
return pd.read_csv(path)
elif suffix == ".pkl":
return pd.read_pickle(path)
elif suffix == ".parquet":
return pd.read_parquet(path)
elif suffix in [".h5", ".hdf", ".hdf5"]:
# Note: for HDF, you need a 'key' in read_hdf. If you expect a single key,
# you might do: pd.read_hdf(path, key='df') or something similar.
# Adjust as needed based on your HDF structure.
return pd.read_hdf(path, key='data')
else:
raise ValueError(f"Unsupported file type: {suffix}")

def dump(self, df: pd.DataFrame, path):
df.to_csv(path, index=False)
path = Path(path)
suffix = path.suffix.lower()

if suffix == ".csv":
df.to_csv(path, index=False)
elif suffix == ".pkl":
df.to_pickle(path)
elif suffix == ".parquet":
df.to_parquet(path, index=True)
elif suffix in [".h5", ".hdf", ".hdf5"]:
# Similarly, you need a key for HDF.
df.to_hdf(path, key="data", mode="w")
else:
raise ValueError(f"Unsupported file type: {suffix}")


class DataReducer:
"""Base DataReducer interface."""

def reduce(self, df) -> pd.DataFrame:
...
def reduce(self, df: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError


class RandDataReducer(DataReducer):
"""
Example random sampler: ensures at least `min_num` rows
or at least `min_frac` fraction of the data (whichever is larger).
"""

def __init__(self, min_frac=0.05, min_num=100):
self.min_frac = min_frac
self.min_num = min_num

def reduce(self, df) -> pd.DataFrame:
# Calculate the fraction to sample
def reduce(self, df: pd.DataFrame) -> pd.DataFrame:
frac = max(self.min_frac, self.min_num / len(df))
# Sample the data
if frac >= 1:
return df
return df.sample(frac=frac, random_state=1)


def create_debug_data(
competition,
original_file_name,
dh_cls: type[DataHandler],
dr_cls: type[DataReducer],
dr_cls_kwargs={},
dataset_path=KAGGLE_IMPLEMENT_SETTING.local_data_path,
):
# Define the path to the original data file
data_path = Path(dataset_path) / competition / original_file_name

# Automatically generate full and sampled file names based on the original file name
original_suffix = Path(original_file_name).suffix
full_file_name = original_file_name.replace(original_suffix, f'.full{original_suffix}')
sampled_file_name = original_file_name.replace(original_suffix, f'.sampled{original_suffix}')

# Define the path to the .full data file
full_data_path = data_path.with_name(full_file_name)

# Check if the .full file exists
if not full_data_path.exists():
# Initialize handlers
data_handler = dh_cls()
data_reducer = dr_cls(**dr_cls_kwargs)

# Load the data file
df = data_handler.load(data_path)

# Reduce the data
df_sampled = data_reducer.reduce(df)

# Save the sampled data to a new data file
sampled_data_path = data_path.with_name(sampled_file_name)
data_handler.dump(df_sampled, sampled_data_path)

# Rename the original file with .full
data_path.rename(full_data_path)

# Move the sampled data to replace the original one
sampled_data_path.rename(data_path)


class PickleDataHandler(DataHandler):

def load(self, path) -> pd.DataFrame:
return pd.read_pickle(path)

def dump(self, df: pd.DataFrame, path):
df.to_pickle(path)


class ColumnReducer(DataReducer):
"""
Example column reducer: keep only the first 5 columns.
"""

def reduce(self, df) -> pd.DataFrame:
def reduce(self, df: pd.DataFrame) -> pd.DataFrame:
return df.iloc[:, :5]


def new_york_city_taxi_fare_prediction_creator():
create_debug_data(competition="new-york-city-taxi-fare-prediction",
original_file_name="train.csv",
dh_cls=CSVDataHandler,
dr_cls=RandDataReducer,
dr_cls_kwargs=dict(min_frac=0.05, min_num=100))
class RowReducer(DataReducer):
"""
Example row reducer: keep only the first 10% rows.
"""

def reduce(self, df: pd.DataFrame) -> pd.DataFrame:
ten_percent = int(max(len(df) * 0.1, 100))
return df.iloc[:ten_percent]

def amc_debug_data_creator():
create_debug_data(
competition="amc",
original_file_name="train_feature_with_label.pkl",
dh_cls=PickleDataHandler,
dr_cls=ColumnReducer,
)

create_debug_data(
competition="amc",
original_file_name="test_feature.pkl",
dh_cls=PickleDataHandler,
dr_cls=ColumnReducer,
)
def create_debug_data(
competition: str,
dr_cls: type[DataReducer] = RandDataReducer,
dr_cls_kwargs=None,
dataset_path=None,
sample_path=None,
):
"""
Reads the original data file, creates a reduced sample,
and renames/moves files for easier debugging.
Automatically detects file type (csv, pkl, parquet, hdf, etc.).
"""
if dr_cls_kwargs is None:
dr_cls_kwargs = {}

if dataset_path is None:
dataset_path = KAGGLE_IMPLEMENT_SETTING.local_data_path

if sample_path is None:
sample_path = Path(dataset_path) / "sample"

data_folder = Path(dataset_path) / competition
sample_folder = Path(sample_path) / competition

# Traverse the folder and exclude specific file types
included_extensions = {".csv", ".pkl", ".parquet", ".h5", ".hdf", ".hdf5"}
files_to_process = [
file for file in data_folder.rglob("*")
if file.is_file()
]

for file_path in files_to_process:
sampled_file_path = sample_folder / file_path.relative_to(data_folder)
if sampled_file_path.exists():
continue

sampled_file_path.parent.mkdir(parents=True, exist_ok=True)
if file_path.suffix not in included_extensions:
if platform.system() == "Linux":
os.symlink(file_path, sampled_file_path)
if platform.system() == "Windows":
os.link(file_path, sampled_file_path)
continue

# Initialize the generic data handler
data_handler = GenericDataHandler()

# Initialize the data reducer (e.g., RandDataReducer or ColumnReducer)
data_reducer = dr_cls(**dr_cls_kwargs)

# Load the original data
df = data_handler.load(file_path)

# Create a sampled subset
df_sampled = data_reducer.reduce(df)

# Dump the sampled data
data_handler.dump(df_sampled, sampled_file_path)

# competition to data handler & Reducer mapping
# find a place to store reduced data.
# - <local_data_path>, <local_data_path>.debug

import fire
if __name__ == "__main__":
# fire.Fire(create_debug_data)
fire.Fire(amc_debug_data_creator)
fire.Fire(create_debug_data)
2 changes: 1 addition & 1 deletion rdagent/scenarios/data_science/scen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .scen import DataScienceScen
from .kaggle import KaggleScen
from .scen import DataScienceScen

__all__ = ["DataScienceScen", "KaggleScen"]
2 changes: 1 addition & 1 deletion rdagent/scenarios/data_science/scen/kaggle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class KaggleScen(DataScienceScen):
But we found that too much scenario unrelated code in kaggle scenario and hard to reuse.
So we start from a simple one....
"""

def _get_description(self):
return crawl_descriptions(self.competition, DS_RD_SETTING.local_data_path)

Expand All @@ -32,4 +33,3 @@ def rich_style_description(self) -> str:
name="Kaggle",
competition=f"[{self.competition}](https://www.kaggle.com/competitions/{self.competition})",
)

Loading

0 comments on commit 84f0fb8

Please sign in to comment.