diff --git a/.vscode/settings.json b/.vscode/settings.json index fba148f3ce7..bd803cd8d52 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,10 +3,15 @@ 120 ], "editor.formatOnSave": true, - "python.formatting.provider": "black", - "python.linting.enabled": true, - "python.linting.flake8Enabled": true, - "python.linting.flake8Args": [ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter", + "editor.formatOnSave": true + }, + "black-formatter.args": [ + "--line-length", + "120" + ], + "flake8.args": [ "--config=setup.cfg" ], "python.testing.unittestEnabled": false, diff --git a/README.md b/README.md index 37ab95b84d5..85e32f22df3 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ Ludwig is a **low-code** framework for building **custom** AI models like **LLMs Key features: - 🛠 **Build custom models with ease:** a declarative YAML configuration file is all you need to train a state-of-the-art LLM on your data. Support for multi-task and multi-modality learning. Comprehensive config validation detects invalid parameter combinations and prevents runtime failures. -- ⚡ **Optimized for scale and efficiency:** automatic batch size selection, distributed training ([DDP](https://pytorch.org/tutorials/beginner/ddp_series_theory.html), [DeepSpeed](https://github.com/microsoft/DeepSpeed)), parameter efficient fine-tuning ([PEFT](https://github.com/huggingface/peft)), 4-bit quantization (QLoRA), and larger-than-memory datasets. +- ⚡ **Optimized for scale and efficiency:** automatic batch size selection, distributed training ([DDP](https://pytorch.org/tutorials/beginner/ddp_series_theory.html), [DeepSpeed](https://github.com/microsoft/DeepSpeed)), parameter efficient fine-tuning ([PEFT](https://github.com/huggingface/peft)), 4-bit quantization (QLoRA), paged and 8-bit optimizers, and larger-than-memory datasets. - 📐 **Expert level control:** retain full control of your models down to the activation functions. Support for hyperparameter optimization, explainability, and rich metric visualizations. - 🧱 **Modular and extensible:** experiment with different model architectures, tasks, features, and modalities with just a few parameter changes in the config. Think building blocks for deep learning. - 🚢 **Engineered for production:** prebuilt [Docker](https://hub.docker.com/u/ludwigai) containers, native support for running with [Ray](https://www.ray.io/) on [Kubernetes](https://github.com/ray-project/kuberay), export models to [Torchscript](https://pytorch.org/docs/stable/jit.html) and [Triton](https://developer.nvidia.com/triton-inference-server), upload to [HuggingFace](https://huggingface.co/models) with one command. @@ -52,8 +52,13 @@ pip install ludwig[full] Want to take a quick peak at some of the Ludwig 0.8 features? Check out this Colab Notebook 🚀 [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1lB4ALmEyvcMycE3Mlnsd7I3bc0zxvk39) -For a full tutorial, check out the official [getting started guide](https://ludwig-ai.github.io/ludwig-docs/latest/getting_started/), -or take a look at end-to-end [Examples](https://ludwig-ai.github.io/ludwig-docs/latest/examples). +Looking to fine-tune Llama-2 or Mistral? Check out these notebooks: + +1. Fine-Tune Llama-2-7b: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1r4oSEwRJpYKBPM0M0RSh0pBEYK_gBKbe) +1. Fine-Tune Llama-2-13b: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1zmSEzqZ7v4twBrXagj1TE_C--RNyVAyu) +1. Fine-Tune Mistral-7b: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1i_8A1n__b7ljRWHzIsAdhO7u7r49vUm4) + +For a full tutorial, check out the official [getting started guide](https://ludwig-ai.github.io/ludwig-docs/latest/getting_started/), or take a look at end-to-end [Examples](https://ludwig-ai.github.io/ludwig-docs/latest/examples). ## Large Language Model Fine-Tuning diff --git a/ludwig/api.py b/ludwig/api.py index 8035a22fd51..c85fc67c3c5 100644 --- a/ludwig/api.py +++ b/ludwig/api.py @@ -444,6 +444,16 @@ def train( `(training_set, validation_set, test_set)`. `output_directory` filepath to where training results are stored. """ + # Only reset the metadata if the model has not been trained before + if self.training_set_metadata: + logger.warning( + "This model has been trained before. Its architecture has been defined by the original training set " + "(for example, the number of possible categorical outputs). The current training data will be mapped " + "to this architecture. If you want to change the architecture of the model, please concatenate your " + "new training data with the original and train a new model from scratch." + ) + training_set_metadata = self.training_set_metadata + if self._user_config.get(HYPEROPT): print_boxed("WARNING") logger.warning(HYPEROPT_WARNING) @@ -2131,6 +2141,31 @@ def kfold_cross_validate( return kfold_cv_stats, kfold_split_indices +def _get_compute_description(backend) -> Dict: + """Returns the compute description for the backend.""" + compute_description = {"num_nodes": backend.num_nodes} + + if torch.cuda.is_available(): + # Assumption: All nodes are of the same instance type. + # TODO: fix for Ray where workers may be of different skus + compute_description.update( + { + "gpus_per_node": torch.cuda.device_count(), + "arch_list": torch.cuda.get_arch_list(), + "gencode_flags": torch.cuda.get_gencode_flags(), + "devices": {}, + } + ) + for i in range(torch.cuda.device_count()): + compute_description["devices"][i] = { + "gpu_type": torch.cuda.get_device_name(i), + "device_capability": torch.cuda.get_device_capability(i), + "device_properties": str(torch.cuda.get_device_properties(i)), + } + + return compute_description + + @PublicAPI def get_experiment_description( config, @@ -2174,15 +2209,6 @@ def get_experiment_description( description["config"] = config description["torch_version"] = torch.__version__ - - gpu_info = {} - if torch.cuda.is_available(): - # Assumption: All nodes are of the same instance type. - # TODO: fix for Ray where workers may be of different skus - gpu_info = {"gpu_type": torch.cuda.get_device_name(0), "gpus_per_node": torch.cuda.device_count()} - - compute_description = {"num_nodes": backend.num_nodes, **gpu_info} - - description["compute"] = compute_description + description["compute"] = _get_compute_description(backend) return description diff --git a/ludwig/backend/base.py b/ludwig/backend/base.py index 77a14a0b104..112a8afe35e 100644 --- a/ludwig/backend/base.py +++ b/ludwig/backend/base.py @@ -20,7 +20,7 @@ from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager -from typing import Any, Callable, TYPE_CHECKING +from typing import Any, Callable, Generator, TYPE_CHECKING import numpy as np import pandas as pd @@ -89,7 +89,7 @@ def initialize_pytorch(self, *args, **kwargs): @contextmanager @abstractmethod - def create_trainer(self, **kwargs) -> BaseTrainer: + def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> Generator: raise NotImplementedError() @abstractmethod @@ -146,7 +146,9 @@ def tune_batch_size(self, evaluator_cls: type[BatchSizeEvaluator], dataset_len: raise NotImplementedError() @abstractmethod - def batch_transform(self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame: + def batch_transform( + self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None + ) -> DataFrame: """Applies `transform_fn` to every `batch_size` length batch of `df` and returns the result.""" raise NotImplementedError() @@ -171,7 +173,9 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si with ThreadPoolExecutor() as executor: # number of threads is inferred if isinstance(sample_fname, str): if map_fn is read_audio_from_path: # bypass torchaudio issue that no longer takes in file-like objects - result = executor.map(lambda path: map_fn(path) if path is not None else path, column.values) + result = executor.map( # type: ignore[misc] + lambda path: map_fn(path) if path is not None else path, column.values + ) else: result = executor.map( lambda path: get_bytes_obj_from_path(path) if path is not None else path, column.values @@ -186,7 +190,7 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si return pd.Series(result, index=column.index, name=column.name) @staticmethod - def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame: + def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None) -> DataFrame: name = name or "Batch Transform" batches = to_batches(df, batch_size) transform = transform_fn() @@ -204,21 +208,11 @@ def initialize(): def initialize_pytorch(*args, **kwargs): initialize_pytorch(*args, **kwargs) - def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> BaseTrainer: - from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry - - if model.type() == MODEL_LLM: - trainer_cls = get_from_registry(config.type, get_llm_trainers_registry()) - else: - trainer_cls = get_from_registry(model.type(), get_trainers_registry()) - - return trainer_cls(config=config, model=model, **kwargs) - @staticmethod def create_predictor(model: BaseModel, **kwargs): from ludwig.models.predictor import get_predictor_cls - return get_predictor_cls(model.type())(model, **kwargs) + return get_predictor_cls(model.type())(model, **kwargs) # type: ignore[call-arg] def sync_model(self, model): pass @@ -254,14 +248,16 @@ def is_coordinator() -> bool: class LocalBackend(LocalPreprocessingMixin, LocalTrainingMixin, Backend): BACKEND_TYPE = "local" + _shared_instance: LocalBackend + @classmethod - def shared_instance(cls): + def shared_instance(cls) -> LocalBackend: """Returns a shared singleton LocalBackend instance.""" if not hasattr(cls, "_shared_instance"): cls._shared_instance = cls() return cls._shared_instance - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: super().__init__(dataset_manager=PandasDatasetManager(self), **kwargs) @property @@ -280,6 +276,22 @@ def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | No # trial resources it wants, because there is no Ray Datasets process to compete with it for CPUs. return None + def create_trainer( + self, + config: BaseTrainerConfig, + model: BaseModel, + **kwargs, + ) -> BaseTrainer: # type: ignore[override] + from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry + + trainer_cls: type + if model.type() == MODEL_LLM: + trainer_cls = get_from_registry(config.type, get_llm_trainers_registry()) + else: + trainer_cls = get_from_registry(model.type(), get_trainers_registry()) + + return trainer_cls(config=config, model=model, **kwargs) + @DeveloperAPI class DataParallelBackend(LocalPreprocessingMixin, Backend, ABC): @@ -298,7 +310,12 @@ def initialize_pytorch(self, *args, **kwargs): *args, local_rank=self._distributed.local_rank(), local_size=self._distributed.local_size(), **kwargs ) - def create_trainer(self, **kwargs) -> BaseTrainer: + def create_trainer( + self, + config: BaseTrainerConfig, + model: BaseModel, + **kwargs, + ) -> BaseTrainer: # type: ignore[override] from ludwig.trainers.trainer import Trainer return Trainer(distributed=self._distributed, **kwargs) @@ -306,7 +323,7 @@ def create_trainer(self, **kwargs) -> BaseTrainer: def create_predictor(self, model: BaseModel, **kwargs): from ludwig.models.predictor import get_predictor_cls - return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) + return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) # type: ignore[call-arg] def sync_model(self, model): # Model weights are only saved on the coordinator, so broadcast diff --git a/ludwig/cli.py b/ludwig/cli.py index c89bc47f7cb..345a69d0e7d 100644 --- a/ludwig/cli.py +++ b/ludwig/cli.py @@ -56,7 +56,7 @@ def __init__(self): init_config Initialize a user config from a dataset and targets render_config Renders the fully populated config with all defaults set check_install Runs a quick training run on synthetic data to verify installation status - upload Push trained model artifacts to a registry (e.g., HuggingFace Hub) + upload Push trained model artifacts to a registry (e.g., Predibase, HuggingFace Hub) """, ) parser.add_argument("command", help="Subcommand to run") diff --git a/ludwig/config_validation/checks.py b/ludwig/config_validation/checks.py index d6ed2cc1b60..a9724219671 100644 --- a/ludwig/config_validation/checks.py +++ b/ludwig/config_validation/checks.py @@ -718,3 +718,11 @@ def check_prompt_requirements(config: "ModelConfig") -> None: # noqa: F821 "A template must contain at least one reference to a column or the sample keyword {__sample__} for " "a JSON-serialized representation of non-output feature columns." ) + + +@register_config_check +def check_sample_ratio_and_size_compatible(config: "ModelConfig") -> None: + sample_ratio = config.preprocessing.sample_ratio + sample_size = config.preprocessing.sample_size + if sample_size is not None and sample_ratio < 1.0: + raise ConfigValidationError("sample_size cannot be used when sample_ratio < 1.0") diff --git a/ludwig/data/dataset/base.py b/ludwig/data/dataset/base.py index 729c0bc4569..ea548a0ea10 100644 --- a/ludwig/data/dataset/base.py +++ b/ludwig/data/dataset/base.py @@ -14,10 +14,13 @@ # limitations under the License. # ============================================================================== +from __future__ import annotations + import contextlib from abc import ABC, abstractmethod -from typing import Iterable, Optional +from typing import Iterable +from ludwig.data.batcher.base import Batcher from ludwig.distributed import DistributedStrategy from ludwig.features.base_feature import BaseFeature from ludwig.utils.defaults import default_random_seed @@ -26,7 +29,7 @@ class Dataset(ABC): @abstractmethod - def __len__(self): + def __len__(self) -> int: raise NotImplementedError() @contextlib.contextmanager @@ -38,36 +41,36 @@ def initialize_batcher( random_seed: int = default_random_seed, ignore_last: bool = False, distributed: DistributedStrategy = None, - ): + ) -> Batcher: raise NotImplementedError() @abstractmethod - def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: raise NotImplementedError() @abstractmethod - def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: raise NotImplementedError() @property - def in_memory_size_bytes(self): + def in_memory_size_bytes(self) -> int: raise NotImplementedError() class DatasetManager(ABC): @abstractmethod - def create(self, dataset, config, training_set_metadata): + def create(self, dataset, config, training_set_metadata) -> Dataset: raise NotImplementedError() @abstractmethod - def save(self, cache_path, dataset, config, training_set_metadata, tag): + def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset: raise NotImplementedError() @abstractmethod - def can_cache(self, skip_save_processed_input): + def can_cache(self, skip_save_processed_input) -> bool: raise NotImplementedError() @property @abstractmethod - def data_format(self): + def data_format(self) -> str: raise NotImplementedError() diff --git a/ludwig/data/dataset/pandas.py b/ludwig/data/dataset/pandas.py index 6cec6ec6bf8..279da884515 100644 --- a/ludwig/data/dataset/pandas.py +++ b/ludwig/data/dataset/pandas.py @@ -13,13 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== + +from __future__ import annotations + import contextlib -from typing import Iterable, Optional +from typing import Iterable, TYPE_CHECKING import numpy as np from pandas import DataFrame from ludwig.constants import PREPROCESSING, TRAINING +from ludwig.data.batcher.base import Batcher from ludwig.data.batcher.random_access import RandomAccessBatcher from ludwig.data.dataset.base import Dataset, DatasetManager from ludwig.data.sampler import DistributedSampler @@ -31,6 +35,9 @@ from ludwig.utils.fs_utils import download_h5 from ludwig.utils.misc_utils import get_proc_features +if TYPE_CHECKING: + from ludwig.backend.base import Backend + class PandasDataset(Dataset): def __init__(self, dataset, features, data_hdf5_fp): @@ -42,13 +49,13 @@ def __init__(self, dataset, features, data_hdf5_fp): dataset = load_hdf5(dataset) self.dataset = to_numpy_dataset(dataset) - def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: """Convert the dataset to a Pandas DataFrame.""" if features: return from_numpy_dataset({feature.feature_name: self.dataset[feature.proc_column] for feature in features}) return from_numpy_dataset(self.dataset) - def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: return to_scalar_df(self.to_df(features)) def get(self, proc_column, idx=None): @@ -76,18 +83,18 @@ def get(self, proc_column, idx=None): indices = indices[:, np.argsort(indices[1])] return im_data[indices[2, :]] - def get_dataset(self): + def get_dataset(self) -> dict[str, np.ndarray]: return self.dataset def __len__(self): return self.size @property - def processed_data_fp(self) -> Optional[str]: + def processed_data_fp(self) -> str | None: return self.data_hdf5_fp @property - def in_memory_size_bytes(self): + def in_memory_size_bytes(self) -> int: df = self.to_df() return df.memory_usage(deep=True).sum() if df is not None else 0 @@ -100,7 +107,7 @@ def initialize_batcher( ignore_last: bool = False, distributed: DistributedStrategy = None, augmentation_pipeline=None, - ): + ) -> Batcher: sampler = DistributedSampler( len(self), shuffle=should_shuffle, random_seed=random_seed, distributed=distributed ) @@ -115,21 +122,21 @@ def initialize_batcher( class PandasDatasetManager(DatasetManager): - def __init__(self, backend): - self.backend = backend + def __init__(self, backend: Backend): + self.backend: Backend = backend - def create(self, dataset, config, training_set_metadata): + def create(self, dataset, config, training_set_metadata) -> Dataset: return PandasDataset(dataset, get_proc_features(config), training_set_metadata.get(DATA_TRAIN_HDF5_FP)) - def save(self, cache_path, dataset, config, training_set_metadata, tag): + def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset: save_hdf5(cache_path, dataset) if tag == TRAINING: training_set_metadata[DATA_TRAIN_HDF5_FP] = cache_path return dataset - def can_cache(self, skip_save_processed_input): + def can_cache(self, skip_save_processed_input) -> bool: return self.backend.is_coordinator() and not skip_save_processed_input @property - def data_format(self): + def data_format(self) -> str: return "hdf5" diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index ba7ba4dfed9..708edea346d 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1201,15 +1201,8 @@ def build_dataset( if mode == "training": sample_ratio = global_preprocessing_parameters["sample_ratio"] - if sample_ratio < 1.0: - if not df_engine.partitioned and len(dataset_df) * sample_ratio < 1: - raise ValueError( - f"sample_ratio {sample_ratio} is too small for dataset of length {len(dataset_df)}. " - f"Please increase sample_ratio or use a larger dataset." - ) - - logger.debug(f"sample {sample_ratio} of data") - dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + sample_size = global_preprocessing_parameters["sample_size"] + dataset_df = _get_sampled_dataset_df(dataset_df, df_engine, sample_ratio, sample_size, random_seed) # If persisting DataFrames in memory is enabled, we want to do this after # each batch of parallel ops in order to avoid redundant computation @@ -1396,6 +1389,29 @@ def embed_fixed_features( return results +def _get_sampled_dataset_df(dataset_df, df_engine, sample_ratio, sample_size, random_seed): + df_len = len(dataset_df) + if sample_ratio < 1.0: + if not df_engine.partitioned and df_len * sample_ratio < 1: + raise ValueError( + f"sample_ratio {sample_ratio} is too small for dataset of length {df_len}. " + f"Please increase sample_ratio or use a larger dataset." + ) + + logger.debug(f"sample {sample_ratio} of data") + dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + + if sample_size: + if sample_size < df_len: + # Cannot use 'n' parameter when using dask DataFrames -- only 'frac' is supported + sample_ratio = sample_size / df_len + dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + else: + logger.warning("sample_size is larger than dataset size, ignoring sample_size") + + return dataset_df + + def get_features_with_cacheable_fixed_embeddings( feature_configs: List[FeatureConfigDict], metadata: TrainingSetMetadataDict ) -> List[FeatureConfigDict]: diff --git a/ludwig/datasets/configs/consumer_complaints_generation.yaml b/ludwig/datasets/configs/consumer_complaints_generation.yaml new file mode 100644 index 00000000000..476773459ed --- /dev/null +++ b/ludwig/datasets/configs/consumer_complaints_generation.yaml @@ -0,0 +1,28 @@ +version: 1.0 +name: consumer_complaints_generation +download_urls: https://predibase-public-us-west-2.s3.us-west-2.amazonaws.com/datasets/consumer_complaints_gen_tutorial.csv +train_filenames: consumer_complaints_gen_tutorial.csv +description: | + The dataset contains different information of complaints that customers have made about a multiple products and + services in the financial sector, such us Credit Reports, Student Loans, Money Transfer, etc. The date of each + complaint ranges from November 2011 to May 2019. The dataset has been modified to be used for text generation. + We have added a structured JSON field that contains a company generated response to the raised complaint. The idea + is to fine-tune an LLM to generate this output JSON field. +columns: + - name: Date received + type: Date + - name: Generated Company Response + type: text + - name: Complaint ID + type: number + - name: Issue + type: text + - name: Product + type: text + - name: Structured JSON Output + type: text + - name: Consumer complaint narrative + type: text +output_features: + - name: Structured JSON Output + type: text diff --git a/ludwig/explain/captum.py b/ludwig/explain/captum.py index 6194defd88f..081568e18f7 100644 --- a/ludwig/explain/captum.py +++ b/ludwig/explain/captum.py @@ -279,9 +279,11 @@ def get_input_tensors( :return: A list of variables, one for each input feature. Shape of each variable is [batch size, embedding size]. """ - # Ignore sample_ratio from the model config, since we want to explain all the data. + # Ignore sample_ratio and sample_size from the model config, since we want to explain all the data. sample_ratio_bak = model.config_obj.preprocessing.sample_ratio + sample_size_bak = model.config_obj.preprocessing.sample_size model.config_obj.preprocessing.sample_ratio = 1.0 + model.config_obj.preprocessing.sample_size = None config = model.config_obj.to_dict() training_set_metadata = copy.deepcopy(model.training_set_metadata) @@ -302,8 +304,9 @@ def get_input_tensors( callbacks=model.callbacks, ) - # Restore sample_ratio + # Restore sample_ratio and sample_size model.config_obj.preprocessing.sample_ratio = sample_ratio_bak + model.config_obj.preprocessing.sample_size = sample_size_bak # Make sure the number of rows in the preprocessed dataset matches the number of rows in the input data assert ( diff --git a/ludwig/models/llm.py b/ludwig/models/llm.py index 314d91c1eda..a8201901ad2 100644 --- a/ludwig/models/llm.py +++ b/ludwig/models/llm.py @@ -21,6 +21,7 @@ from ludwig.utils.llm_utils import ( add_left_padding, generate_merged_ids, + get_context_len, pad_target_tensor_for_fine_tuning, realign_target_and_prediction_tensors_for_inference, remove_left_padding, @@ -126,13 +127,7 @@ def __init__( self.curr_device = next(self.model.parameters()).device logger.info("Done.") - # Determines the maximum length of the context (input + output tokens) - if hasattr(self.model_config, "max_sequence_length"): - self.context_len = self.model_config.max_sequence_length - elif hasattr(self.model_config, "max_position_embeddings"): - self.context_len = self.model_config.max_position_embeddings - else: - self.context_len = 2048 + self.context_len = get_context_len(self.model_config) # TODO(Arnav): This needs be more flexible to account for RoPE Scaling # When merging input IDs and target IDs for LLM fine-tuning, we want to make sure that the merged tensor is @@ -412,7 +407,7 @@ def generate( mask=None, ) -> Dict[str, torch.Tensor]: """Generates tokens using the model.""" - logger.info(f"For generating text, using: {self.generation}") + log_once(f"For generating text, using: {self.generation}") input_ids, _ = self._unpack_inputs(inputs) with torch.no_grad(): diff --git a/ludwig/schema/metadata/configs/preprocessing.yaml b/ludwig/schema/metadata/configs/preprocessing.yaml index 688f2084732..a29d2ece63a 100644 --- a/ludwig/schema/metadata/configs/preprocessing.yaml +++ b/ludwig/schema/metadata/configs/preprocessing.yaml @@ -44,6 +44,22 @@ sample_ratio: expected_impact: 2 suggested_values: Depends on data size ui_display_name: Sample Ratio +sample_size: + default_value_reasoning: + The default value is None because we do not want to shrink + the dataset by default, and we do not know the size of an arbitrary dataset. + By setting the default to None, we fall back on the sample_ratio to determine + the size of the dataset. + description_implications: + Decreases the amount of data you are inputting into + the model. Could be useful if you have more data than you need and you are + concerned with computational costs. More useful than sample_ratio if you + know the exact number of samples you want to train on instead of knowing the proportion. + example_value: + - 1000 + expected_impact: 2 + suggested_values: Depends on data size + ui_display_name: Sample Size column: expected_impact: 3 ui_display_name: Split Column diff --git a/ludwig/schema/model_types/base.py b/ludwig/schema/model_types/base.py index b3ebc154ffb..410aa5c454e 100644 --- a/ludwig/schema/model_types/base.py +++ b/ludwig/schema/model_types/base.py @@ -30,7 +30,7 @@ sanitize_and_filter_combiner_entities_, set_derived_feature_columns_, set_hyperopt_defaults_, - set_llm_tokenizers, + set_llm_parameters, set_preprocessing_parameters, set_tagger_decoder_parameters, set_validation_parameters, @@ -69,8 +69,8 @@ def __post_init__(self): set_tagger_decoder_parameters(self) sanitize_and_filter_combiner_entities_(self) - # Set preprocessing parameters for text features for LLM model type - set_llm_tokenizers(self) + # Reconcile LLM parameters + set_llm_parameters(self) # Reconcile conflicting preprocessing parameters set_preprocessing_parameters(self) diff --git a/ludwig/schema/model_types/utils.py b/ludwig/schema/model_types/utils.py index a38a3d53409..b8550d06838 100644 --- a/ludwig/schema/model_types/utils.py +++ b/ludwig/schema/model_types/utils.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Mapping, Set, TYPE_CHECKING from marshmallow import ValidationError +from transformers import AutoConfig from ludwig.api_annotations import DeveloperAPI from ludwig.constants import ( @@ -17,6 +18,7 @@ INPUT_FEATURES, LOSS, MODEL_ECD, + MODEL_LLM, OUTPUT_FEATURES, PARAMETERS, PREPROCESSING, @@ -28,9 +30,11 @@ from ludwig.features.feature_utils import compute_feature_hash from ludwig.schema.features.utils import output_config_registry from ludwig.schema.hyperopt.scheduler import BaseHyperbandSchedulerConfig +from ludwig.schema.llms.generation import LLMGenerationConfig from ludwig.schema.trainer import ECDTrainerConfig from ludwig.types import HyperoptConfigDict, ModelConfigDict from ludwig.utils.data_utils import get_sanitized_feature_name +from ludwig.utils.llm_utils import get_context_len if TYPE_CHECKING: from ludwig.schema.model_types.base import ModelConfig @@ -299,16 +303,24 @@ def set_tagger_decoder_parameters(config: "ModelConfig") -> None: output_feature.reduce_input = None -def set_llm_tokenizers(config: "ModelConfig") -> None: +def set_llm_parameters(config: "ModelConfig") -> None: + if config.model_type != MODEL_LLM: + return + + # Set preprocessing parameters for text features for LLM model type + _set_llm_tokenizers(config) + + # Set max_new_tokens in generation config to the max sequence length of the output features + _set_generation_max_new_tokens(config) + + +def _set_llm_tokenizers(config: "ModelConfig") -> None: """Sets the tokenizers for the LLM model to the pretrained model name or path. This ensures that they use the correct shared vocabulary from the tokenizer. This also ensures padding is correctly set to left padding to prevent the LLM from trying to continue to sequence based on the right padding tokens, which might exist based on sequence length. """ - if config.model_type != "llm": - return - pretrained_model_name_or_path = config.base_model if not isinstance(pretrained_model_name_or_path, str) or pretrained_model_name_or_path is None: raise ValueError("Must set `base_model` when using the LLM model.") @@ -337,6 +349,62 @@ def set_llm_tokenizers(config: "ModelConfig") -> None: output_feature.decoder.fallback_label = output_feature.preprocessing.fallback_label +def _get_maximum_possible_sequence_length(config: "ModelConfig", default_max_sequence_length: int) -> int: + """Returns the maximum possible sequence length for the LLM model based on the model config.""" + max_possible_sequence_length = default_max_sequence_length + if config.output_features[0].preprocessing.max_sequence_length is not None: + # Note: We don't need to check for max between feature.preprocessing.max_sequence_length and + # defaults.text.preprocessing.max_sequence_length because the latter is only applied to input features. + max_possible_sequence_length = max( + default_max_sequence_length, config.output_features[0].preprocessing.max_sequence_length + ) + elif config.preprocessing.global_max_sequence_length is not None: + # This is not perfect since it includes tokens from both input + output features, but this at least + # ensures that max possible of the sequence length is used. It is very likely that the model learns + # to generate sequences than this value. + max_possible_sequence_length = max( + max_possible_sequence_length, config.preprocessing.global_max_sequence_length + ) + elif max_possible_sequence_length == default_max_sequence_length: + # It's possible that both max_sequence_length and global_max_sequence_length are not set, in which case + # we should fall back to the window size of the pretrained model. By this point, because of schema validation + # checks, we know that the base_model exists so we can safely grab the base model's config. + # TODO (Arnav): Figure out how to factor in rope scaling factor into this calculation. + model_config = AutoConfig.from_pretrained(config.base_model) + max_possible_sequence_length = get_context_len(model_config) + # Artifically leave a buffer of half the total model window size to trade off + # runtime while likely covering a majority of the max sequence length. + max_possible_sequence_length = max_possible_sequence_length // 2 + return max_possible_sequence_length + + +def _set_generation_max_new_tokens(config: "ModelConfig") -> None: + """Sets the max_new_tokens parameter in the generation config to the max sequence length of the output + features. + + This ensures that the generation config is set to the correct value for the LLM model type. + """ + _DEFAULT_MAX_SEQUENCE_LENGTH = LLMGenerationConfig().max_new_tokens + if config.generation.max_new_tokens != _DEFAULT_MAX_SEQUENCE_LENGTH: + # Max new tokens is explicitly set by user, so don't override + return + + if config.output_features[0].type != TEXT: + # This is trickier to set for other output features, so don't override for now. + # TODO: Add better support for category output features + return + + max_possible_sequence_length = _get_maximum_possible_sequence_length(config, _DEFAULT_MAX_SEQUENCE_LENGTH) + + logger.info( + f"Setting generation max_new_tokens to {max_possible_sequence_length} to correspond with the max " + "sequence length assigned to the output feature or the global max sequence length. This will ensure that " + "the correct number of tokens are generated at inference time. To override this behavior, set " + "`generation.max_new_tokens` to a different value in your Ludwig config." + ) + config.generation.max_new_tokens = max_possible_sequence_length + + @DeveloperAPI def contains_grid_search_parameters(hyperopt_config: HyperoptConfigDict) -> bool: """Returns True if any hyperopt parameter in the config is using the grid_search space.""" diff --git a/ludwig/schema/preprocessing.py b/ludwig/schema/preprocessing.py index 075c63fe4a3..4963f3783ba 100644 --- a/ludwig/schema/preprocessing.py +++ b/ludwig/schema/preprocessing.py @@ -18,6 +18,14 @@ class PreprocessingConfig(schema_utils.BaseMarshmallowConfig): parameter_metadata=PREPROCESSING_METADATA["sample_ratio"], ) + sample_size: float = schema_utils.NonNegativeInteger( + default=None, + allow_none=True, + description="The maximum number of samples from the dataset to use. Cannot be set if sample_ratio is set to be " + "< 1.0. If sample_ratio is set to 1.0, this will override the number of samples to used.", + parameter_metadata=PREPROCESSING_METADATA["sample_size"], + ) + oversample_minority: float = schema_utils.NonNegativeFloat( default=None, allow_none=True, diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index 7f41c4376be..1c91e916e2a 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -487,6 +487,14 @@ def write_step_summary(cls, train_summary_writer, combined_loss, all_losses, ste / (1000**3), global_step=step, ) + + # Utilization. + # https://pytorch.org/docs/stable/generated/torch.cuda.utilization.html#torch.cuda.utilization + train_summary_writer.add_scalar( + f"cuda/device{i}/utilization", + torch.cuda.utilization(device=device), + global_step=step, + ) train_summary_writer.flush() def is_cpu_training(self): @@ -735,7 +743,8 @@ def run_evaluation( else: # There's no validation, so we save the model. if not self.skip_save_model: - logger.info("Saving model.\n") + if self.is_coordinator(): + logger.info("Saving model.\n") checkpoint_manager.save_best(progress_tracker.steps) self.callback(lambda c: c.on_save_best_checkpoint(self, progress_tracker, save_path)) @@ -828,7 +837,8 @@ def train( try: progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) - logger.info("Resuming training from previous run.") + if self.is_coordinator(): + logger.info("Resuming training from previous run.") except Exception: # This may happen if model training is interrupted after the progress tracker is initialized # but before any real training progress is made. @@ -841,7 +851,8 @@ def train( ), output_features=output_features, ) - logger.info("Failed to resume training from previous run. Creating fresh model training run.") + if self.is_coordinator(): + logger.info("Failed to resume training from previous run. Creating fresh model training run.") else: progress_tracker = get_new_progress_tracker( batch_size=self.batch_size, @@ -850,7 +861,8 @@ def train( best_increase_batch_size_eval_metric=get_initial_validation_value(self.increase_batch_size_eval_metric), output_features=output_features, ) - logger.info("Creating fresh model training run.") + if self.is_coordinator(): + logger.info("Creating fresh model training run.") # Distributed: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when diff --git a/ludwig/upload.py b/ludwig/upload.py index 4cc86e488ad..fa626f3a1e1 100644 --- a/ludwig/upload.py +++ b/ludwig/upload.py @@ -4,7 +4,7 @@ from typing import Optional from ludwig.utils.print_utils import get_logging_level_registry -from ludwig.utils.upload_utils import HuggingFaceHub +from ludwig.utils.upload_utils import HuggingFaceHub, Predibase logger = logging.getLogger(__name__) @@ -12,6 +12,7 @@ def get_upload_registry(): return { "hf_hub": HuggingFaceHub, + "predibase": Predibase, } @@ -23,6 +24,8 @@ def upload_cli( private: bool = False, commit_message: str = "Upload trained [Ludwig](https://ludwig.ai/latest/) model weights", commit_description: Optional[str] = None, + dataset_file: Optional[str] = None, + dataset_name: Optional[str] = None, **kwargs, ) -> None: """Create an empty repo on the HuggingFace Hub and upload trained model artifacts to that repo. @@ -30,7 +33,7 @@ def upload_cli( Args: service (`str`): Name of the hosted model service to push the trained artifacts to. - Currently, this only supports `hf_hub`. + Currently, this only supports `hf_hub` and `predibase`. repo_id (`str`): A namespace (user or an organization) and a repo name separated by a `/`. @@ -49,10 +52,15 @@ def upload_cli( `f"Upload {path_in_repo} with huggingface_hub"` commit_description (`str` *optional*): The description of the generated commit + dataset_file (`str`, *optional*): + The path to the dataset file. Required if `service` is set to + `"predibase"` for new model repos. + dataset_name (`str`, *optional*): + The name of the dataset. Used by the `service` + `"predibase"`. """ model_service = get_upload_registry().get(service, "hf_hub") hub = model_service() - hub.login() hub.upload( repo_id=repo_id, model_path=model_path, @@ -60,6 +68,8 @@ def upload_cli( private=private, commit_message=commit_message, commit_description=commit_description, + dataset_file=dataset_file, + dataset_name=dataset_name, ) @@ -77,7 +87,7 @@ def cli(sys_argv): "service", help="Name of the model repository service.", default="hf_hub", - choices=["hf_hub"], + choices=["hf_hub", "predibase"], ) parser.add_argument( @@ -115,6 +125,11 @@ def cli(sys_argv): choices=["critical", "error", "warning", "info", "debug", "notset"], ) + parser.add_argument("-df", "--dataset_file", help="The location of the dataset file", default=None) + parser.add_argument( + "-dn", "--dataset_name", help="(Optional) The name of the dataset in the Provider", default=None + ) + args = parser.parse_args(sys_argv) args.logging_level = get_logging_level_registry()[args.logging_level] diff --git a/ludwig/utils/llm_utils.py b/ludwig/utils/llm_utils.py index 12c0a1f509f..1dd9d1df85a 100644 --- a/ludwig/utils/llm_utils.py +++ b/ludwig/utils/llm_utils.py @@ -5,6 +5,7 @@ import torch.nn.functional as F from bitsandbytes.nn.modules import Embedding from transformers import ( + AutoConfig, AutoModelForCausalLM, CodeLlamaTokenizer, CodeLlamaTokenizerFast, @@ -22,6 +23,9 @@ logger = logging.getLogger(__name__) +FALLBACK_CONTEXT_LEN = 2048 + + def set_pad_token(tokenizer: PreTrainedTokenizer): """Sets the pad token for the tokenizer if it is not already set. @@ -57,6 +61,45 @@ def set_pad_token(tokenizer: PreTrainedTokenizer): tokenizer.pad_token_id = tokenizer.eos_token_id +def get_context_len(model_config: AutoConfig): + """Determines the maximum length of the context (input + output tokens) based on the provided model + configuration. + + Args: + model_config (AutoConfig): The model configuration object containing information about the model's properties. + + Returns: + int: The maximum context length, which can be derived from the model configuration. If no relevant attribute + is found, the default value of 2048 is returned. + + This function examines the provided model configuration object to identify the attribute that specifies the maximum + context length. It checks for attributes in the following order of preference: + 1. 'max_sequence_length': If this attribute is present in the model configuration, its value is returned. + 2. 'max_position_embeddings': If 'max_sequence_length' is not found but 'max_position_embeddings' is present, its + value is returned. + 3. 'n_positions': If neither 'max_sequence_length' nor 'max_position_embeddings' are found, and 'n_positions' is + present, its value is returned. + 4. Default: If none of the relevant attributes are present, the function returns a default value of 2048. + + Note: + - The maximum context length is important for defining the size of input and output sequences in a model. + + Example Usage: + >>> config = AutoConfig.from_pretrained("bert-base-uncased") + >>> context_len = get_context_len(config) + >>> print(context_len) + 512 + """ + if hasattr(model_config, "max_sequence_length"): + return model_config.max_sequence_length + elif hasattr(model_config, "max_position_embeddings"): + return model_config.max_position_embeddings + elif hasattr(model_config, "n_positions"): + return model_config.n_positions + else: + return FALLBACK_CONTEXT_LEN + + def has_padding_token(input_tensor: torch.Tensor, tokenizer: PreTrainedTokenizer): """Checks if the input tensor contains any padding tokens. diff --git a/ludwig/utils/upload_utils.py b/ludwig/utils/upload_utils.py index 4dc193d6410..cc166170282 100644 --- a/ludwig/utils/upload_utils.py +++ b/ludwig/utils/upload_utils.py @@ -37,6 +37,8 @@ def upload( private: Optional[bool] = False, commit_message: Optional[str] = None, commit_description: Optional[str] = None, + dataset_file: Optional[str] = None, + dataset_name: Optional[str] = None, ) -> bool: """Abstract method to upload trained model artifacts to the target repository. @@ -68,9 +70,7 @@ def _validate_upload_parameters( trained model artifacts to the target repository. Args: - repo_id (str): The ID of the target repository. It must be a namespace (user or an organization) - and a repository name separated by a '/'. For example, if your HF username is 'johndoe' and you - want to create a repository called 'test', the repo_id should be 'johndoe/test'. + repo_id (str): The ID of the target repository. Each provider will verify their specific rules. model_path (str): The path to the directory containing the trained model artifacts. It should contain the model's weights, usually saved under 'model/model_weights'. repo_type (str, optional): The type of the repository. Not used in the base class, but subclasses @@ -85,18 +85,10 @@ def _validate_upload_parameters( implementations. Defaults to None. Raises: - AssertionError: If the repo_id does not have both a namespace and a repo name separated by a '/'. FileNotFoundError: If the model_path does not exist. Exception: If the trained model artifacts are not found at the expected location within model_path, or if the artifacts are not in the required format (i.e., 'pytorch_model.bin' or 'adapter_model.bin'). """ - # Validate repo_id has both a namespace and a repo name - assert "/" in repo_id, ( - "`repo_id` must be a namespace (user or an organization) and a repo name separated by a `/`." - " For example, if your HF username is `johndoe` and you want to create a repository called `test`, the" - " repo_id should be johndoe/test" - ) - # Make sure the model's save path is actually a valid path if not os.path.exists(model_path): raise FileNotFoundError(f"The path '{model_path}' does not exist.") @@ -110,21 +102,11 @@ def _validate_upload_parameters( "wrong during training where the model's weights were not saved." ) - # Make sure the model's saved artifacts either contain: - # 1. pytorch_model.bin -> regular model training, such as ECD or for LLMs - # 2. adapter_model.bin -> LLM fine-tuning using PEFT - files = set(os.listdir(trained_model_artifacts_path)) - if "pytorch_model.bin" not in files and "adapter_model.bin" not in files: - raise Exception( - f"Can't find model weights at {trained_model_artifacts_path}. Trained model weights should " - "either be saved as `pytorch_model.bin` for regular model training, or have `adapter_model.bin`" - "if using parameter efficient fine-tuning methods like LoRA." - ) - class HuggingFaceHub(BaseModelUpload): def __init__(self): self.api = None + self.login() def login(self): """Login to huggingface hub using the token stored in ~/.cache/huggingface/token and returns a HfApi client @@ -142,6 +124,68 @@ def login(self): self.api = hf_api + @staticmethod + def _validate_upload_parameters( + repo_id: str, + model_path: str, + repo_type: Optional[str] = None, + private: Optional[bool] = False, + commit_message: Optional[str] = None, + commit_description: Optional[str] = None, + ): + """Validate parameters before uploading trained model artifacts. + + This method checks if the input parameters meet the necessary requirements before uploading + trained model artifacts to the target repository. + + Args: + repo_id (str): The ID of the target repository. It must be a namespace (user or an organization) + and a repository name separated by a '/'. For example, if your HF username is 'johndoe' and you + want to create a repository called 'test', the repo_id should be 'johndoe/test'. + model_path (str): The path to the directory containing the trained model artifacts. It should contain + the model's weights, usually saved under 'model/model_weights'. + repo_type (str, optional): The type of the repository. Not used in the base class, but subclasses + may use it for specific repository implementations. Defaults to None. + private (bool, optional): Whether the repository should be private or not. Not used in the base class, + but subclasses may use it for specific repository implementations. Defaults to False. + commit_message (str, optional): A message to attach to the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + commit_description (str, optional): A description of the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + + Raises: + ValueError: If the repo_id does not have both a namespace and a repo name separated by a '/'. + """ + # Validate repo_id has both a namespace and a repo name + if "/" not in repo_id: + raise ValueError( + "`repo_id` must be a namespace (user or an organization) and a repo name separated by a `/`." + " For example, if your HF username is `johndoe` and you want to create a repository called `test`, the" + " repo_id should be johndoe/test" + ) + BaseModelUpload._validate_upload_parameters( + repo_id, + model_path, + repo_type, + private, + commit_message, + commit_description, + ) + + trained_model_artifacts_path = os.path.join(model_path, "model", "model_weights") + # Make sure the model's saved artifacts either contain: + # 1. pytorch_model.bin -> regular model training, such as ECD or for LLMs + # 2. adapter_model.bin -> LLM fine-tuning using PEFT + files = set(os.listdir(trained_model_artifacts_path)) + if "pytorch_model.bin" not in files and "adapter_model.bin" not in files: + raise Exception( + f"Can't find model weights at {trained_model_artifacts_path}. Trained model weights should " + "either be saved as `pytorch_model.bin` for regular model training, or have `adapter_model.bin`" + "if using parameter efficient fine-tuning methods like LoRA." + ) + def upload( self, repo_id: str, @@ -150,6 +194,7 @@ def upload( private: Optional[bool] = False, commit_message: Optional[str] = None, commit_description: Optional[str] = None, + **kwargs, ) -> bool: """Create an empty repo on the HuggingFace Hub and upload trained model artifacts to that repo. @@ -205,3 +250,143 @@ def upload( return True return False + + +class Predibase(BaseModelUpload): + def __init__(self): + self.pc = None + self.login() + + def login(self): + """Login to Predibase using the token stored in the PREDIBASE_API_TOKEN environment variable and return a + PredibaseClient object that can be used to interact with Predibase.""" + from predibase import PredibaseClient + + token = os.environ.get("PREDIBASE_API_TOKEN") + if token is None: + raise ValueError( + "Unable to find PREDIBASE_API_TOKEN environment variable. Please log into Predibase, " + "generate a token and use `export PREDIBASE_API_TOKEN=` to use Predibase" + ) + + try: + pc = PredibaseClient() + + # TODO: Check if subscription has expired + + self.pc = pc + except Exception as e: + raise Exception(f"Failed to login to Predibase: {e}") + return False + + return True + + @staticmethod + def _validate_upload_parameters( + repo_id: str, + model_path: str, + repo_type: Optional[str] = None, + private: Optional[bool] = False, + commit_message: Optional[str] = None, + commit_description: Optional[str] = None, + ): + """Validate parameters before uploading trained model artifacts. + + This method checks if the input parameters meet the necessary requirements before uploading + trained model artifacts to the target repository. + + Args: + repo_id (str): The ID of the target repository. It must be a less than 256 characters. + model_path (str): The path to the directory containing the trained model artifacts. It should contain + the model's weights, usually saved under 'model/model_weights'. + repo_type (str, optional): The type of the repository. Not used in the base class, but subclasses + may use it for specific repository implementations. Defaults to None. + private (bool, optional): Whether the repository should be private or not. Not used in the base class, + but subclasses may use it for specific repository implementations. Defaults to False. + commit_message (str, optional): A message to attach to the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + commit_description (str, optional): A description of the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + + Raises: + ValueError: If the repo_id is too long. + """ + if len(repo_id) > 255: + raise ValueError("`repo_id` must be 255 characters or less.") + + BaseModelUpload._validate_upload_parameters( + repo_id, + model_path, + repo_type, + private, + commit_message, + commit_description, + ) + + def upload( + self, + repo_id: str, + model_path: str, + commit_description: Optional[str] = None, + dataset_file: Optional[str] = None, + dataset_name: Optional[str] = None, + **kwargs, + ) -> bool: + """Create an empty repo in Predibase and upload trained model artifacts to that repo. + + Args: + model_path (`str`): + The path of the saved model. This is the top level directory where + the models weights as well as other associated training artifacts + are saved. + repo_name (`str`): + A repo name. + repo_description (`str` *optional*): + The description of the repo. + dataset_file (`str` *optional*): + The path to the dataset file. Required if `service` is set to + `"predibase"` for new model repos. + dataset_name (`str` *optional*): + The name of the dataset. Used by the `service` + `"predibase"`. Falls back to the filename. + """ + # Validate upload parameters are in the right format + Predibase._validate_upload_parameters( + repo_id, + model_path, + None, + False, + "", + commit_description, + ) + + # Upload the dataset to Predibase + try: + dataset = self.pc.upload_dataset(file_path=dataset_file, name=dataset_name) + except Exception as e: + raise RuntimeError("Failed to upload dataset to Predibase") from e + + # Create empty model repo using repo_name, but it is okay if it already exists. + try: + repo = self.pc.create_model_repo( + name=repo_id, + description=commit_description, + exists_ok=True, + ) + except Exception as e: + raise RuntimeError("Failed to create repo in Predibase") from e + + # Upload the zip file to Predibase + try: + self.pc.upload_model( + repo=repo, + model_path=model_path, + dataset=dataset, + ) + except Exception as e: + raise RuntimeError("Failed to upload model to Predibase") from e + + logger.info(f"Model uploaded to Predibase with repository name `{repo_id}`") + return True diff --git a/requirements_extra.txt b/requirements_extra.txt index 4be0e04497a..26fe48eb998 100644 --- a/requirements_extra.txt +++ b/requirements_extra.txt @@ -3,3 +3,6 @@ horovod[pytorch]>=0.24.0,!=0.26.0 # alternative to Dask modin[ray] + +# Allows users to upload +predibase>=2023.10.2 diff --git a/tests/integration_tests/test_api.py b/tests/integration_tests/test_api.py index 95879c8326f..e7a9d5102ba 100644 --- a/tests/integration_tests/test_api.py +++ b/tests/integration_tests/test_api.py @@ -740,3 +740,28 @@ def test_saved_weights_in_checkpoint(tmpdir): input_feature_encoder = saved_input_feature["encoder"] assert "saved_weights_in_checkpoint" in input_feature_encoder assert input_feature_encoder["saved_weights_in_checkpoint"] + + +def test_constant_metadata(tmpdir): + input_features = [category_feature(encoder={"vocab_size": 5})] + output_features = [category_feature(name="class", decoder={"vocab_size": 5}, output_feature=True)] + + data_csv1 = generate_data(input_features, output_features, os.path.join(tmpdir, "dataset1.csv")) + val_csv1 = shutil.copyfile(data_csv1, os.path.join(tmpdir, "validation1.csv")) + test_csv1 = shutil.copyfile(data_csv1, os.path.join(tmpdir, "test1.csv")) + + config = { + "input_features": input_features, + "output_features": output_features, + } + model = LudwigModel(config) + model.train(training_set=data_csv1, validation_set=val_csv1, test_set=test_csv1, output_directory=tmpdir) + metadata1 = model.training_set_metadata + + data_csv2 = generate_data(input_features, output_features, os.path.join(tmpdir, "dataset2.csv"), num_examples=10) + val_csv2 = shutil.copyfile(data_csv2, os.path.join(tmpdir, "validation2.csv")) + test_csv2 = shutil.copyfile(data_csv2, os.path.join(tmpdir, "test2.csv")) + model.train(training_set=data_csv2, validation_set=val_csv2, test_set=test_csv2, output_directory=tmpdir) + metadata2 = model.training_set_metadata + + assert metadata1 == metadata2 diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 9f9812c6d85..671327e8e12 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -162,6 +162,108 @@ def test_sample_ratio_deterministic(backend, tmpdir, ray_cluster_2cpu): assert test_set_1.to_df().compute().equals(test_set_2.to_df().compute()) +@pytest.mark.parametrize( + "backend", + [ + pytest.param("local", id="local"), + pytest.param("ray", id="ray", marks=pytest.mark.distributed), + ], +) +def test_sample_size(backend, tmpdir, ray_cluster_2cpu): + num_examples = 100 + sample_size = 25 + + input_features = [sequence_feature(encoder={"reduce_output": "sum"}), audio_feature(folder=tmpdir)] + output_features = [category_feature(decoder={"vocab_size": 5}, reduce_input="sum")] + data_csv = generate_data( + input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=num_examples + ) + config = { + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, + TRAINER: { + EPOCHS: 2, + }, + PREPROCESSING: {"sample_size": sample_size}, + } + + model = LudwigModel(config, backend=backend) + train_set, val_set, test_set, training_set_metadata = model.preprocess( + data_csv, + skip_save_processed_input=True, + ) + + count = len(train_set) + len(val_set) + len(test_set) + assert sample_size == count + + # Check that sample size is disabled when doing preprocessing for prediction + dataset, _ = preprocess_for_prediction( + model.config_obj.to_dict(), + dataset=data_csv, + training_set_metadata=training_set_metadata, + split=FULL, + include_outputs=True, + backend=model.backend, + ) + assert "sample_size" in model.config_obj.preprocessing.to_dict() + assert len(dataset) == num_examples + + +@pytest.mark.parametrize( + "backend", + [ + pytest.param("local", id="local"), + pytest.param("ray", id="ray", marks=pytest.mark.distributed), + ], +) +def test_sample_size_deterministic(backend, tmpdir, ray_cluster_2cpu): + """Ensures that the sampled dataset is the same when using a random seed. + + model.preprocess returns a PandasPandasDataset object when using local backend, and returns a RayDataset object when + using the Ray backend. + """ + num_examples = 100 + sample_size = 30 + + input_features = [binary_feature()] + output_features = [category_feature()] + data_csv = generate_data( + input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=num_examples + ) + + config = { + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, + PREPROCESSING: {"sample_size": sample_size}, + } + + model1 = LudwigModel(config, backend=backend) + train_set_1, val_set_1, test_set_1, _ = model1.preprocess( + data_csv, + skip_save_processed_input=True, + ) + + model2 = LudwigModel(config, backend=backend) + train_set_2, val_set_2, test_set_2, _ = model2.preprocess( + data_csv, + skip_save_processed_input=True, + ) + + # Ensure sizes are the same + assert sample_size == len(train_set_1) + len(val_set_1) + len(test_set_1) + assert sample_size == len(train_set_2) + len(val_set_2) + len(test_set_2) + + # Ensure actual rows are the same + if backend == "local": + assert train_set_1.to_df().equals(train_set_2.to_df()) + assert val_set_1.to_df().equals(val_set_2.to_df()) + assert test_set_1.to_df().equals(test_set_2.to_df()) + else: + assert train_set_1.to_df().compute().equals(train_set_2.to_df().compute()) + assert val_set_1.to_df().compute().equals(val_set_2.to_df().compute()) + assert test_set_1.to_df().compute().equals(test_set_2.to_df().compute()) + + def test_strip_whitespace_category(csv_filename, tmpdir): data_csv_path = os.path.join(tmpdir, csv_filename) diff --git a/tests/ludwig/config_validation/test_checks.py b/tests/ludwig/config_validation/test_checks.py index 44630167fda..613a29a3fd3 100644 --- a/tests/ludwig/config_validation/test_checks.py +++ b/tests/ludwig/config_validation/test_checks.py @@ -489,3 +489,35 @@ def test_check_prompt_requirements(): config["prompt"] = {"task": "Some task", "template": "{__task__}"} ModelConfig.from_dict(config) + + +def test_check_sample_ratio_and_size_compatible(): + config = { + "input_features": [binary_feature()], + "output_features": [binary_feature()], + "model_type": "ecd", + } + ModelConfig.from_dict( + { + "input_features": [binary_feature()], + "output_features": [binary_feature()], + "model_type": "ecd", + } + ) + + config["preprocessing"] = {"sample_size": 10} + ModelConfig.from_dict(config) + + config["preprocessing"]["sample_ratio"] = 1 + ModelConfig.from_dict(config) + + config["preprocessing"]["sample_ratio"] = 0.1 + with pytest.raises(ConfigValidationError): + ModelConfig.from_dict(config) + + config["preprocessing"]["sample_size"] = 0 + with pytest.raises(ConfigValidationError): + ModelConfig.from_dict(config) + + del config["preprocessing"]["sample_size"] + ModelConfig.from_dict(config) diff --git a/tests/ludwig/schema/test_model_config.py b/tests/ludwig/schema/test_model_config.py index 054a274b378..21e2883b989 100644 --- a/tests/ludwig/schema/test_model_config.py +++ b/tests/ludwig/schema/test_model_config.py @@ -950,3 +950,62 @@ def test_llm_quantization_backend_compatibility(): ModelConfig.from_dict(config) ray.shutdown() + + +class TestMaxNewTokensOverride: + def test_max_new_tokens_override_no_changes_to_max_new_tokens(self): + """Tests that the default value for max_new_tokens is respected when explicitly set in the config.""" + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text"}], + "generation": {"max_new_tokens": 64}, + } + + config_obj = ModelConfig.from_dict(config) + assert config_obj.generation.max_new_tokens == 64 + + def test_max_new_tokens_override_large_max_sequence_length(self): + """Tests that the default value for max_new_tokens is overridden when max_sequence_length is set to a large + value than the default max_new_tokens.""" + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text", "preprocessing": {"max_sequence_length": 100}}], + } + + config_obj = ModelConfig.from_dict(config) + assert config_obj.generation.max_new_tokens == 100 + + def test_max_new_tokens_override_large_global_max_sequence_length(self): + """Tests that the default value for max_new_tokens is overridden when global_max_sequence_length is set to + a larger value than the default max_new_tokens.""" + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text"}], + PREPROCESSING: {"global_max_sequence_length": 100}, + } + + config_obj = ModelConfig.from_dict(config) + assert config_obj.generation.max_new_tokens == 100 + + def test_max_new_tokens_override_fallback_to_model_window_size(self): + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text"}], + } + + config_obj = ModelConfig.from_dict(config) + # Base model context length is 2048 tokens by default + # Since we fallback to setting max_new_tokens to the model context length / 2, we expect it to be 1024 tokens + assert config_obj.generation.max_new_tokens == 1024 diff --git a/tests/ludwig/utils/test_llm_utils.py b/tests/ludwig/utils/test_llm_utils.py index e652297d285..d79264bf26a 100644 --- a/tests/ludwig/utils/test_llm_utils.py +++ b/tests/ludwig/utils/test_llm_utils.py @@ -1,13 +1,15 @@ import pytest import torch -from transformers import AutoTokenizer +from transformers import AutoConfig, AutoTokenizer from ludwig.constants import LOGITS, PREDICTIONS, PROBABILITIES from ludwig.utils.llm_utils import ( add_left_padding, create_attention_mask, + FALLBACK_CONTEXT_LEN, find_last_matching_index, generate_merged_ids, + get_context_len, has_padding_token, pad_target_tensor_for_fine_tuning, realign_target_and_prediction_tensors_for_inference, @@ -57,6 +59,29 @@ def test_set_pad_token_already_exists(): assert tokenizer.pad_token_id == 1 +class TestSetContextLen: + def test_max_sequence_length(self): + # Test when 'max_sequence_length' is present in the model configuration + config = AutoConfig.from_pretrained("huggyllama/llama-7b") + assert get_context_len(config) == config.max_sequence_length + + def test_max_position_embeddings(self): + # Test when 'max_position_embeddings' is present in the model configuration + config = AutoConfig.from_pretrained("huggyllama/llama-7b") + del config.max_sequence_length + assert get_context_len(config) == config.max_position_embeddings + + def test_n_positions(self): + # Test when 'n_positions' is present in the model configuration + config = AutoConfig.from_pretrained("hf-internal-testing/tiny-random-GPTJForCausalLM") + assert get_context_len(config) == config.n_positions + + def test_default_value(self): + config = AutoConfig.from_pretrained("hf-internal-testing/tiny-random-GPTJForCausalLM") + del config.n_positions + assert get_context_len(config) == FALLBACK_CONTEXT_LEN + + def test_has_padding_token_with_padding_tokens(tokenizer): input_sentence = "This is an example sentence." input_ids = tokenizer([input_sentence])