diff --git a/src/llm_vm/completion/optimize.py b/src/llm_vm/completion/optimize.py index af9625f2..9600e697 100644 --- a/src/llm_vm/completion/optimize.py +++ b/src/llm_vm/completion/optimize.py @@ -15,6 +15,80 @@ # we need to package-ify so this works import llm_vm.completion.data_synthesis as data_synthesis import inspect +from __future__ import annotations + + +from typing import Optional + + +from google.auth import default +from google.cloud import aiplatform +import pandas as pd +import vertexai +from vertexai.language_models import TextGenerationModel +from vertexai.preview.language_models import TuningEvaluationSpec + + +credentials, _ = default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + + + +def tuning( + project_id: str, + location: str, + model_display_name: str, + training_data: pd.DataFrame | str, + train_steps: int = 10, + evaluation_dataset: Optional[str] = None, + tensorboard_instance_name: Optional[str] = None, +) -> TextGenerationModel: + """Tune a new model, based on a prompt-response data. + "training_data" can be either the GCS URI of a file formatted in JSONL format + (for example: training_data=f'gs://{bucket}/{filename}.jsonl'), or a pandas + DataFrame. Each training example should be JSONL record with two keys, for + example: + { + "input_text": , + "output_text": + }, + or the pandas DataFame should contain two columns: + ['input_text', 'output_text'] + with rows for each training example. + Args: + project_id: GCP Project ID, used to initialize vertexai + location: GCP Region, used to initialize vertexai + model_display_name: Customized Tuned LLM model name. + training_data: GCS URI of jsonl file or pandas dataframe of training data. + train_steps: Number of training steps to use when tuning the model. + evaluation_dataset: GCS URI of jsonl file of evaluation data. + tensorboard_instance_name: The full name of the existing Vertex AI TensorBoard instance: + projects/PROJECT_ID/locations/LOCATION_ID/tensorboards/TENSORBOARD_INSTANCE_ID + Note that this instance must be in the same region as your tuning job. + """ + vertexai.init(project=project_id, location=location, credentials=credentials) + eval_spec = TuningEvaluationSpec(evaluation_data=evaluation_dataset) + eval_spec.tensorboard = aiplatform.Tensorboard( + tensorboard_name=tensorboard_instance_name + ) + model = TextGenerationModel.from_pretrained("text-bison@001") + + model.tune_model( + training_data=training_data, + # Optional: + model_display_name=model_display_name, + train_steps=train_steps, + tuning_job_location="europe-west4", + tuned_model_location=location, + tuning_evaluation_spec=eval_spec, + ) + + print(model._job.status) + + return model + + +if __name__ == "__main__": + tuning() job_id = None # we want to be able to cancel a fine_tune if you kill the program diff --git a/src/llm_vm/onsite_llm.py b/src/llm_vm/onsite_llm.py index f78db919..aabc09a6 100644 --- a/src/llm_vm/onsite_llm.py +++ b/src/llm_vm/onsite_llm.py @@ -28,6 +28,140 @@ from sentence_transformers import SentenceTransformer +def train_func(config): + """Your training function that will be launched on each worker.""" + + # Unpack training configs + lr = config["lr"] + seed = config["seed"] + num_epochs = config["num_epochs"] + train_batch_size = config["train_batch_size"] + eval_batch_size = config["eval_batch_size"] + train_ds_size = config["train_dataset_size"] + + set_seed(seed) + + # Initialize accelerator + accelerator = Accelerator() + + # Load datasets and metrics + metric = evaluate.load("glue", "mrpc") + + # Prepare Ray Data loaders + # ==================================================== + train_ds = ray.train.get_dataset_shard("train") + eval_ds = ray.train.get_dataset_shard("validation") + + tokenizer = AutoTokenizer.from_pretrained("bert-base-cased") + + def collate_fn(batch): + outputs = tokenizer( + list(batch["sentence1"]), + list(batch["sentence2"]), + truncation=True, + padding="longest", + return_tensors="pt", + ) + outputs["labels"] = torch.LongTensor(batch["label"]) + outputs = {k: v.to(accelerator.device) for k, v in outputs.items()} + return outputs + + train_dataloader = train_ds.iter_torch_batches( + batch_size=train_batch_size, collate_fn=collate_fn + ) + eval_dataloader = eval_ds.iter_torch_batches( + batch_size=eval_batch_size, collate_fn=collate_fn + ) + # ==================================================== + + # Instantiate the model, optimizer, lr_scheduler + model = AutoModelForSequenceClassification.from_pretrained( + "bert-base-cased", return_dict=True + ) + + optimizer = AdamW(params=model.parameters(), lr=lr) + + steps_per_epoch = train_ds_size // (accelerator.num_processes * train_batch_size) + lr_scheduler = get_linear_schedule_with_warmup( + optimizer=optimizer, + num_warmup_steps=100, + num_training_steps=(steps_per_epoch * num_epochs), + ) + + # Prepare everything with accelerator + model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler) + + for epoch in range(num_epochs): + # Training + model.train() + for batch in train_dataloader: + outputs = model(**batch) + loss = outputs.loss + accelerator.backward(loss) + optimizer.step() + lr_scheduler.step() + optimizer.zero_grad() + + # Evaluation + model.eval() + for batch in eval_dataloader: + with torch.no_grad(): + outputs = model(**batch) + predictions = outputs.logits.argmax(dim=-1) + + predictions, references = accelerator.gather_for_metrics( + (predictions, batch["labels"]) + ) + metric.add_batch( + predictions=predictions, + references=references, + ) + + eval_metric = metric.compute() + accelerator.print(f"epoch {epoch}:", eval_metric) + + # Report Checkpoint and metrics to Ray Train + # ========================================== + with TemporaryDirectory() as tmpdir: + if accelerator.is_main_process: + unwrapped_model = accelerator.unwrap_model(model) + accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin") + checkpoint = Checkpoint.from_directory(tmpdir) + else: + checkpoint = None + ray.train.report(metrics=eval_metric, checkpoint=checkpoint) + + +if __name__ == "__main__": + config = { + "lr": 2e-5, + "num_epochs": 3, + "seed": 42, + "train_batch_size": 16, + "eval_batch_size": 32, + } + + # Prepare Ray Datasets + hf_datasets = load_dataset("glue", "mrpc") + ray_datasets = { + "train": ray.data.from_huggingface(hf_datasets["train"]), + "validation": ray.data.from_huggingface(hf_datasets["validation"]), + } + config["train_dataset_size"] = ray_datasets["train"].count() + + trainer = TorchTrainer( + train_func, + train_loop_config=config, + datasets=ray_datasets, + dataset_config=DataConfig(datasets_to_split=["train", "validation"]), + scaling_config=ScalingConfig(num_workers=4, use_gpu=True), + ) + + result = trainer.fit() + +# __accelerate_torch_basic_example_end__ + + __private_key_value_models_map = {} # [] {