diff --git a/Makefile b/Makefile index 1b738ce2..3a4709ca 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Image URL to use all building/pushing image targets -IMG ?= docker.io/substratusai/controller-manager:v0.1.0-alpha +IMG ?= docker.io/substratusai/controller-manager:v0.3.0-alpha # ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary. ENVTEST_K8S_VERSION = 1.26.1 diff --git a/README.md b/README.md index db9bd1c7..f0599884 100644 --- a/README.md +++ b/README.md @@ -69,12 +69,16 @@ The Model API is capable of building base Models from Git repositories, or finet apiVersion: substratus.ai/v1 kind: Model metadata: - name: my-model + name: fb-opt-125m-squad spec: source: modelName: facebook-opt-125m training: - datasetName: favorite-colors + datasetName: squad + params: + epochs: 30 + batchSize: 3 + dataLimit: 120 # TODO: This should be copied from the source Model. size: parameterBits: 32 @@ -92,9 +96,9 @@ The ModelServer API runs a web server that serves the Model for inference (FUTUR apiVersion: substratus.ai/v1 kind: ModelServer metadata: - name: my-model-server + name: fb-opt-125m-squad spec: - modelName: my-model + modelName: fb-opt-125m-squad ``` ### Dataset API @@ -106,11 +110,12 @@ The Dataset API snapshots and locally caches remote datasets to facilitate effic apiVersion: substratus.ai/v1 kind: Dataset metadata: - name: favorite-colors + name: squad spec: + filename: all.jsonl source: - url: https://raw.githubusercontent.com/substratusai/model-facebook-opt-125m/main/hack/sample-data.jsonl - filename: fav-colors.jsonl + git: + url: https://github.com/substratusai/dataset-squad ``` ### Notebook API @@ -124,8 +129,9 @@ Notebooks can be opened using the `kubectl open notebook` command (which is a su apiVersion: substratus.ai/v1 kind: Notebook metadata: - name: nick-fb-opt-125m + name: facebook-opt-125m spec: + suspend: true modelName: facebook-opt-125m ``` diff --git a/api/v1/dataset_types.go b/api/v1/dataset_types.go index a84b5dfa..e4c2fc67 100644 --- a/api/v1/dataset_types.go +++ b/api/v1/dataset_types.go @@ -6,13 +6,12 @@ import ( // DatasetSpec defines the desired state of Dataset type DatasetSpec struct { - Source DatasetSource `json:"source,omitempty"` + Filename string `json:"filename"` + Source DatasetSource `json:"source,omitempty"` } type DatasetSource struct { - // URL supports http and https schemes. - URL string `json:"url"` - Filename string `json:"filename"` + Git *GitSource `json:"git,omitempty"` } // DatasetStatus defines the observed state of Dataset @@ -22,6 +21,7 @@ type DatasetStatus struct { URL string `json:"url,omitempty"` } +//+kubebuilder:resource:categories=ai //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" diff --git a/api/v1/model_types.go b/api/v1/model_types.go index ed0ae695..3748c12d 100644 --- a/api/v1/model_types.go +++ b/api/v1/model_types.go @@ -34,7 +34,23 @@ type ModelSize struct { } type Training struct { - DatasetName string `json:"datasetName"` + DatasetName string `json:"datasetName"` + Params TrainingParams `json:"params"` +} + +type TrainingParams struct { + //+kubebuilder:default:=3 + // Epochs is the total number of iterations that should be run through the training data. + // Increasing this number will increase training time. + Epochs int64 `json:"epochs,omitempty"` + //+kubebuilder:default:=1000000000000 + // DataLimit is the maximum number of training records to use. In the case of JSONL, this would be the total number of lines + // to train with. Increasing this number will increase training time. + DataLimit int64 `json:"dataLimit,omitempty"` + //+kubebuilder:default:=1 + // BatchSize is the number of training records to use per (forward and backward) pass through the model. + // Increasing this number will increase the memory requirements of the training process. + BatchSize int64 `json:"batchSize,omitempty"` } type ModelSource struct { @@ -76,6 +92,7 @@ type ModelStatus struct { Servers []string `json:"servers,omitempty"` } +//+kubebuilder:resource:categories=ai //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" diff --git a/api/v1/modelserver_types.go b/api/v1/modelserver_types.go index 950bb8b5..ac4b09e4 100644 --- a/api/v1/modelserver_types.go +++ b/api/v1/modelserver_types.go @@ -14,6 +14,7 @@ type ModelServerStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty"` } +//+kubebuilder:resource:categories=ai //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" diff --git a/api/v1/notebook_types.go b/api/v1/notebook_types.go index 42a83e0b..3504671d 100644 --- a/api/v1/notebook_types.go +++ b/api/v1/notebook_types.go @@ -17,6 +17,7 @@ type NotebookStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty"` } +//+kubebuilder:resource:categories=ai //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index d314ae24..8ec90650 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -19,7 +19,7 @@ func (in *Dataset) DeepCopyInto(out *Dataset) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -76,6 +76,11 @@ func (in *DatasetList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DatasetSource) DeepCopyInto(out *DatasetSource) { *out = *in + if in.Git != nil { + in, out := &in.Git, &out.Git + *out = new(GitSource) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatasetSource. @@ -91,7 +96,7 @@ func (in *DatasetSource) DeepCopy() *DatasetSource { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DatasetSpec) DeepCopyInto(out *DatasetSpec) { *out = *in - out.Source = in.Source + in.Source.DeepCopyInto(&out.Source) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatasetSpec. @@ -500,6 +505,7 @@ func (in *NotebookStatus) DeepCopy() *NotebookStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Training) DeepCopyInto(out *Training) { *out = *in + out.Params = in.Params } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Training. @@ -511,3 +517,18 @@ func (in *Training) DeepCopy() *Training { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrainingParams) DeepCopyInto(out *TrainingParams) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrainingParams. +func (in *TrainingParams) DeepCopy() *TrainingParams { + if in == nil { + return nil + } + out := new(TrainingParams) + in.DeepCopyInto(out) + return out +} diff --git a/config/crd/bases/substratus.ai_datasets.yaml b/config/crd/bases/substratus.ai_datasets.yaml index b36effb3..d2d41a37 100644 --- a/config/crd/bases/substratus.ai_datasets.yaml +++ b/config/crd/bases/substratus.ai_datasets.yaml @@ -9,6 +9,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: Dataset listKind: DatasetList plural: datasets @@ -42,17 +44,25 @@ spec: spec: description: DatasetSpec defines the desired state of Dataset properties: + filename: + type: string source: properties: - filename: - type: string - url: - description: URL supports http and https schemes. - type: string - required: - - filename - - url + git: + properties: + branch: + type: string + path: + description: Path within the git repository referenced by + url. + type: string + url: + description: 'URL to the git repository. Example: github.com/my-account/my-repo' + type: string + type: object type: object + required: + - filename type: object status: description: DatasetStatus defines the observed state of Dataset diff --git a/config/crd/bases/substratus.ai_models.yaml b/config/crd/bases/substratus.ai_models.yaml index 59086219..9aa931ee 100644 --- a/config/crd/bases/substratus.ai_models.yaml +++ b/config/crd/bases/substratus.ai_models.yaml @@ -9,6 +9,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: Model listKind: ModelList plural: models @@ -87,8 +89,35 @@ spec: properties: datasetName: type: string + params: + properties: + batchSize: + default: 1 + description: BatchSize is the number of training records to + use per (forward and backward) pass through the model. Increasing + this number will increase the memory requirements of the + training process. + format: int64 + type: integer + dataLimit: + default: 1000000000000 + description: DataLimit is the maximum number of training records + to use. In the case of JSONL, this would be the total number + of lines to train with. Increasing this number will increase + training time. + format: int64 + type: integer + epochs: + default: 3 + description: Epochs is the total number of iterations that + should be run through the training data. Increasing this + number will increase training time. + format: int64 + type: integer + type: object required: - datasetName + - params type: object required: - compute diff --git a/config/crd/bases/substratus.ai_modelservers.yaml b/config/crd/bases/substratus.ai_modelservers.yaml index e0e355f8..3a1f28fc 100644 --- a/config/crd/bases/substratus.ai_modelservers.yaml +++ b/config/crd/bases/substratus.ai_modelservers.yaml @@ -9,6 +9,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: ModelServer listKind: ModelServerList plural: modelservers diff --git a/config/crd/bases/substratus.ai_notebooks.yaml b/config/crd/bases/substratus.ai_notebooks.yaml index 5e10d941..754597dd 100644 --- a/config/crd/bases/substratus.ai_notebooks.yaml +++ b/config/crd/bases/substratus.ai_notebooks.yaml @@ -9,6 +9,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: Notebook listKind: NotebookList plural: notebooks diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index f4bcb948..a296c382 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: docker.io/substratusai/controller-manager - newTag: v0.1.0-alpha + newTag: v0.3.0-alpha diff --git a/docs/arch.md b/docs/arch.md index 009739ba..71ba0f5f 100644 --- a/docs/arch.md +++ b/docs/arch.md @@ -18,14 +18,30 @@ Training is triggered by creating a `kind: Model` with `.spec.training` and `.sp The Dataset API is used to describe data that can be referenced for training Models. -* Training models typically requires a large dataset. Pulling this dataset from a remote source every time you train a model is slow and unreliable. The Dataset API pulls a dataset once and stores it on fast Persistent Disks, mounted directly to training Jobs. -* The Dataset controller pulls in a remote dataset once, and stores it, guaranteeing every model that references that dataset uses the same exact data (reproducable training results). -* The Dataset API allows users to query datasets on the platform (`kubectl get datasets`). +* Datasets pull in remote data sources using containerized data loaders. +* Users can specify their own ETL logic by referencing a repository from a Dataset. +* Users can leverage pre-built data loader integrations with various sources including the Huggingface Hub, materializing the output of SQL queries, scraping and downloading an entire confluence site, etc. +* Training typically requires a large dataset. Pulling this dataset from a remote source every time you train a model is slow and unreliable. The Dataset API pulls a dataset once and stores it in a bucket, which is mounted in training Jobs. +* The Dataset controller pulls in a remote dataset once, and stores it, guaranteeing every model that references that dataset uses the same exact data (facilitating reproducable training results). +* The Dataset API allows users to query ready to use datasets (`kubectl get datasets`). * The Dataset API allows Kubernetes RBAC to be applied as a mechanism for controlling access to data. -* Similar to the Model API, the Dataset API contains metadata about datasets (size of dataset --> which can be used to inform training job resource requirements). -* Dataset API provides a central place to define the auth credentials for remote dataset sources. -* Dataset API could provide integrations with many data sources including the Huggingface Hub, materializing the output of SQL queries, scraping and downloading an entire confluence site, etc. + +### Possible Future Dataset Features + +* Scheduled recurring loads +* Continuous loading +* Present metadata about datasets (size of dataset --> which can be used to inform training job resource requirements). +* Dataset API could provide a central place to define the auth credentials for remote dataset sources. * If Models have consistent or at least declarative training data format expectations, then the Dataset API allows for a programtic way to orchestrate coupling those models to a large number of datasets and producing a matrix of trained models. +## Notebooks + +Notebooks can be used to quickly spin up a development environment backed by high performance compute. + +* Integration with Model and Dataset APIs allow for quick iteration. +* Local directory syncing streamlines the developer experience. + + + diff --git a/docs/container-contracts.md b/docs/container-contracts.md new file mode 100644 index 00000000..832bd4e2 --- /dev/null +++ b/docs/container-contracts.md @@ -0,0 +1,91 @@ +# Container Contracts + +Substratus orchestrates machine learning operations without requiring any language-specific library dependencies. As long as a container satisfies the "contract", that container can be orchestrated with Substratus. These contracts are designed to impose as few requirements as possible. + +## Model Contract + +The repo should contain a Dockerfile. + +As a part of building the Dockerfile: + +- Model artifacts (i.e. `model.pt`, `tokenizer.pt`) should be saved into `/model/saved/`. +- Workspace directory should be `/model/` (i.e. `WORKDIR /model`). + +### Scripts + +Must be located in `$PATH`: + +- `serve.sh` + * Loads model from disk (`/model/saved/`). + * Run a webserver on port `8080`. + * Endpoints: + * GET `/docs` + * Serves API Documentation. + * POST `/generate` + * Accepts a request body of `{"prompt":"", "max_new_tokens": 100}`. + * Responds with a body of `{"generation": ""}`. +- `train.sh` + * Writes logs to STDOUT/STDERR and optionally to `/model/logs/`. + * If notebooks are run, it saves the `.ipynb` files with output into `/model/logs/`. + * The `DATASET_PATH` environment vairable will be provided. + * Can load an existing model from `/model/saved/`. + * Saves new trained model to `/model/trained/` (which will be copied into the new container's `/model/saved/` directory). +- `notebook.sh` + * Should start a Jupyter Lab/Notebook environment. + * Should serve on port `8888`. + +### Directory Structure + +``` +/model/ # Working directory + src/ # Model source code + saved/ # Model artifacts from build jobs (to be loaded for inference) + trained/ # Model artifacts from training job + logs/ # Output of building/training jobs for debugging purposes +``` + +### Environment Variables + +The following parameters are communicated through environment variables when `train.sh` is called and should be taken into account. + +| Environment Variable | Source | +| -------------------- | ------------------------------------------ | +| `TRAIN_DATA_PATH` | Dataset (`/data/` + `.spec.filename`) | +| `TRAIN_DATA_LIMIT` | Model (`.spec.training.params.dataLimit`) | +| `TRAIN_BATCH_SIZE` | Model (`.spec.training.params.batchSize`) | +| `TRAIN_EPOCHS` | Model (`.spec.training.params.epochs`) | + +## Dataset Contract + +The repo should contain a Dockerfile. + +- Workspace directory should be `/dataset/` (i.e. `WORKDIR /dataset`). + +### Scripts + +Must be located in `$PATH`: + +- `load.sh` + * Saves data to disk in the file path specified by the `LOAD_DATA_PATH` environment variable. + * Writes logs to STDOUT/STDERR and optionally to `/dataset/logs/`. + * If notebooks are run, it saves the `.ipynb` files with output into `/dataset/logs/`. +- `notebook.sh` + * Should start a Jupyter Lab/Notebook environment. + * Should serve on port `8888`. + +### Directory Structure + +``` +/dataset/ # Working directory + src/ # Data loading source code + logs/ # Output of data loading jobs for debugging purposes +``` + +### Environment Variables + +The following parameters are communicated through environment variables when `load.sh` is called and should be taken into account. + +| Environment Variable | Source | +| -------------------- | ------------------------------------------ | +| `LOAD_DATA_PATH` | Dataset (`/data/` + `.spec.filename`) | + diff --git a/docs/model-contract.md b/docs/model-contract.md deleted file mode 100644 index 8b1b43ce..00000000 --- a/docs/model-contract.md +++ /dev/null @@ -1,47 +0,0 @@ -# Model Contract - -## Repo - -Model directory should contain a Dockerfile. - -## Container Image - -As a part of the container build process: - -- Model artifacts (i.e. `model.pt`, `tokenizer.pt`) should be saved into `/model/saved/`. -- Workspace directory should be `/model/` (i.e. `WORKDIR /model`). - -## Scripts - -Must be located in `$PATH`: - -- `serve.sh` - * Loads model from disk (`/model/saved/`). - * Run a webserver on port `8080`. - * Endpoints: - * GET `/docs` - * Serves API Documentation. - * POST `/generate` - * Accepts a request body of `{"prompt":"", "max_new_tokens": 100}`. - * Responds with a body of `{"generation": ""}`. -- `train.sh` - * Writes logs to STDOUT/STDERR and optionally to `/model/logs/`. - * If notebooks are run, it saves the `.ipynb` files with output into `/model/logs/`. - * The `DATASET_PATH` environment vairable will be provided. - * Can load an existing model from `/model/saved/`. - * Saves new trained model to `/model/trained/` (which will be copied into the new container's `/model/saved/` directory). -- `notebook.sh` - * Should start a Jupyter Lab/Notebook environment. - * Should serve on port `8888`. - -## Directory Structure - -``` -/model/ - src/ # Model source code - saved/ # Model artifacts from build jobs (to be loaded for inference) - trained/ # Model artifacts from training job - logs/ # Output of building/training jobs -``` - - diff --git a/examples/facebook-opt-125m/dataset.yaml b/examples/facebook-opt-125m/dataset.yaml index 2615036e..b1b68064 100644 --- a/examples/facebook-opt-125m/dataset.yaml +++ b/examples/facebook-opt-125m/dataset.yaml @@ -1,8 +1,9 @@ apiVersion: substratus.ai/v1 kind: Dataset metadata: - name: favorite-colors + name: squad spec: + filename: all.jsonl source: - url: https://raw.githubusercontent.com/substratusai/model-facebook-opt-125m/main/hack/sample-data.jsonl - filename: fav-colors.jsonl + git: + url: https://github.com/substratusai/dataset-squad diff --git a/examples/facebook-opt-125m/finetuned-model.yaml b/examples/facebook-opt-125m/finetuned-model.yaml index 3efd3dd9..b8a9f849 100644 --- a/examples/facebook-opt-125m/finetuned-model.yaml +++ b/examples/facebook-opt-125m/finetuned-model.yaml @@ -1,12 +1,16 @@ apiVersion: substratus.ai/v1 kind: Model metadata: - name: my-model + name: fb-opt-125m-squad spec: source: modelName: facebook-opt-125m training: - datasetName: favorite-colors + datasetName: squad + params: + epochs: 3 + batchSize: 100 + dataLimit: 1000 # TODO: This should be copied from the source Model. size: parameterBits: 32 diff --git a/examples/facebook-opt-125m/finetuned-notebook.yaml b/examples/facebook-opt-125m/finetuned-notebook.yaml index 4bd226b0..bb66bbe3 100644 --- a/examples/facebook-opt-125m/finetuned-notebook.yaml +++ b/examples/facebook-opt-125m/finetuned-notebook.yaml @@ -1,6 +1,7 @@ apiVersion: substratus.ai/v1 kind: Notebook metadata: - name: my-model-notebook + name: fb-opt-125m-squad spec: - modelName: my-model + suspend: true + modelName: fb-opt-125m-squad diff --git a/examples/facebook-opt-125m/finetuned-server.yaml b/examples/facebook-opt-125m/finetuned-server.yaml index 7061e0c6..6d270c3e 100644 --- a/examples/facebook-opt-125m/finetuned-server.yaml +++ b/examples/facebook-opt-125m/finetuned-server.yaml @@ -1,6 +1,6 @@ apiVersion: substratus.ai/v1 kind: ModelServer metadata: - name: my-model-server + name: fb-opt-125m-squad spec: - modelName: my-model + modelName: fb-opt-125m-squad diff --git a/examples/facebook-opt-125m/model.yaml b/examples/facebook-opt-125m/model.yaml index aeb659d9..7a075853 100644 --- a/examples/facebook-opt-125m/model.yaml +++ b/examples/facebook-opt-125m/model.yaml @@ -6,7 +6,7 @@ spec: source: git: url: https://github.com/substratusai/model-facebook-opt-125m - branch: main + branch: params size: parameterBits: 32 parameterCount: 125000000 diff --git a/examples/facebook-opt-125m/notebook.yaml b/examples/facebook-opt-125m/notebook.yaml index 954737e2..4d8f9818 100644 --- a/examples/facebook-opt-125m/notebook.yaml +++ b/examples/facebook-opt-125m/notebook.yaml @@ -1,6 +1,7 @@ apiVersion: substratus.ai/v1 kind: Notebook metadata: - name: nick-fb-opt-125m + name: facebook-opt-125m spec: + suspend: true modelName: facebook-opt-125m diff --git a/install/kubernetes/system.yaml b/install/kubernetes/system.yaml index b3339745..4a1c9001 100644 --- a/install/kubernetes/system.yaml +++ b/install/kubernetes/system.yaml @@ -8,6 +8,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: Dataset listKind: DatasetList plural: datasets @@ -41,17 +43,25 @@ spec: spec: description: DatasetSpec defines the desired state of Dataset properties: + filename: + type: string source: properties: - filename: - type: string - url: - description: URL supports http and https schemes. - type: string - required: - - filename - - url + git: + properties: + branch: + type: string + path: + description: Path within the git repository referenced by + url. + type: string + url: + description: 'URL to the git repository. Example: github.com/my-account/my-repo' + type: string + type: object type: object + required: + - filename type: object status: description: DatasetStatus defines the observed state of Dataset @@ -143,6 +153,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: Model listKind: ModelList plural: models @@ -221,8 +233,35 @@ spec: properties: datasetName: type: string + params: + properties: + batchSize: + default: 1 + description: BatchSize is the number of training records to + use per (forward and backward) pass through the model. Increasing + this number will increase the memory requirements of the + training process. + format: int64 + type: integer + dataLimit: + default: 1000000000000 + description: DataLimit is the maximum number of training records + to use. In the case of JSONL, this would be the total number + of lines to train with. Increasing this number will increase + training time. + format: int64 + type: integer + epochs: + default: 3 + description: Epochs is the total number of iterations that + should be run through the training data. Increasing this + number will increase training time. + format: int64 + type: integer + type: object required: - datasetName + - params type: object required: - compute @@ -323,6 +362,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: ModelServer listKind: ModelServerList plural: modelservers @@ -447,6 +488,8 @@ metadata: spec: group: substratus.ai names: + categories: + - ai kind: Notebook listKind: NotebookList plural: notebooks @@ -991,7 +1034,7 @@ spec: envFrom: - configMapRef: name: system - image: docker.io/substratusai/controller-manager:v0.1.0-alpha + image: docker.io/substratusai/controller-manager:v0.3.0-alpha livenessProbe: httpGet: path: /healthz diff --git a/install/terraform/gcp/data-loader-builder.tf b/install/terraform/gcp/data-loader-builder.tf new file mode 100644 index 00000000..c3368ca7 --- /dev/null +++ b/install/terraform/gcp/data-loader-builder.tf @@ -0,0 +1,26 @@ +resource "google_service_account" "data_loader_builder" { + project = var.project_id + account_id = "${local.name}-data-loader-builder" +} + +resource "google_service_account_iam_member" "data_loader_builder_workload_identity" { + service_account_id = google_service_account.data_loader_builder.id + role = "roles/iam.workloadIdentityUser" + member = "serviceAccount:${var.project_id}.svc.id.goog[default/data-loader-builder]" + + # Workload identity pool does not exist until the first cluster exists. + depends_on = [google_container_cluster.main] +} + +resource "google_project_iam_member" "data_loader_builder_gar_repo_admin" { + project = var.project_id + role = "roles/artifactregistry.repoAdmin" + member = "serviceAccount:${google_service_account.data_loader_builder.email}" +} + +resource "google_storage_bucket_iam_member" "data_loader_builder_training_storage_admin" { + bucket = google_storage_bucket.training.name + role = "roles/storage.admin" + member = "serviceAccount:${google_service_account.data_loader_builder.email}" +} + diff --git a/install/terraform/gcp/data_puller.tf b/install/terraform/gcp/data-loader.tf similarity index 51% rename from install/terraform/gcp/data_puller.tf rename to install/terraform/gcp/data-loader.tf index 54349568..8401b34b 100644 --- a/install/terraform/gcp/data_puller.tf +++ b/install/terraform/gcp/data-loader.tf @@ -1,20 +1,20 @@ -resource "google_service_account" "data_puller" { +resource "google_service_account" "data_loader" { project = var.project_id - account_id = "${local.name}-data-puller" + account_id = "${local.name}-data-loader" } -resource "google_service_account_iam_member" "data_puller_workload_identity" { - service_account_id = google_service_account.data_puller.id +resource "google_service_account_iam_member" "data_loader_workload_identity" { + service_account_id = google_service_account.data_loader.id role = "roles/iam.workloadIdentityUser" - member = "serviceAccount:${var.project_id}.svc.id.goog[default/data-puller]" + member = "serviceAccount:${var.project_id}.svc.id.goog[default/data-loader]" # Workload identity pool does not exist until the first cluster exists. depends_on = [google_container_cluster.main] } -resource "google_storage_bucket_iam_member" "data_puller_datasets_storage_admin" { +resource "google_storage_bucket_iam_member" "data_loader_datasets_storage_admin" { bucket = google_storage_bucket.datasets.name role = "roles/storage.admin" - member = "serviceAccount:${google_service_account.data_puller.email}" + member = "serviceAccount:${google_service_account.data_loader.email}" } diff --git a/internal/controller/dataset_controller.go b/internal/controller/dataset_controller.go index a9d9ecfd..021d81d7 100644 --- a/internal/controller/dataset_controller.go +++ b/internal/controller/dataset_controller.go @@ -6,8 +6,8 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -19,8 +19,8 @@ import ( ) const ( - ReasonPulling = "Pulling" - ReasonPulled = "Pulled" + ReasonLoading = "Loading" + ReasonLoaded = "Loaded" ) // DatasetReconciler reconciles a Dataset object. @@ -48,48 +48,103 @@ func (r *DatasetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, client.IgnoreNotFound(err) } + // TODO(nstogner): Consider checking if the dataset is already loaded to the bucket instead of just + // checking the status. if ready := meta.FindStatusCondition(dataset.Status.Conditions, ConditionReady); ready != nil && ready.Status == metav1.ConditionTrue { return ctrl.Result{}, nil } - sa := &corev1.ServiceAccount{ + // Service account used for building and pushing the loader image. + bldrSA := &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ - Name: dataPullerServiceAccountName, + Name: dataLoaderBuilderServiceAccountName, Namespace: dataset.Namespace, }, } - if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, sa, func() error { - return r.authNServiceAccount(sa) + if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, bldrSA, func() error { + return r.authNServiceAccount(bldrSA) }); err != nil { return ctrl.Result{}, fmt.Errorf("failed to create or update service account: %w", err) } - job, err := r.pullerJob(ctx, &dataset) + // Service account used for loading the data. + loaderSA := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataLoaderServiceAccountName, + Namespace: dataset.Namespace, + }, + } + if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, loaderSA, func() error { + return r.authNServiceAccount(loaderSA) + }); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create or update service account: %w", err) + } + + // The Job that will build the data-loader container image. + buildJob, err := r.buildJob(ctx, &dataset) if err != nil { - lg.Error(err, "unable to create builder Job") + lg.Error(err, "unable to construct image-builder Job") // No use in retrying... return ctrl.Result{}, nil } - if err := r.Create(ctx, job); client.IgnoreAlreadyExists(err) != nil { + + if err := r.Get(ctx, client.ObjectKeyFromObject(buildJob), buildJob); err != nil { + if apierrors.IsNotFound(err) { + if err := r.Create(ctx, buildJob); client.IgnoreAlreadyExists(err) != nil { + return ctrl.Result{}, fmt.Errorf("creating image-builder Job: %w", err) + } + } else { + return ctrl.Result{}, fmt.Errorf("getting image-builder Job: %w", err) + } + } + + if buildJob.Status.Succeeded < 1 { + lg.Info("The image-builder Job has not succeeded yet") + + meta.SetStatusCondition(&dataset.Status.Conditions, metav1.Condition{ + Type: ConditionReady, + Status: metav1.ConditionFalse, + Reason: ReasonBuilding, + ObservedGeneration: dataset.Generation, + Message: fmt.Sprintf("Waiting for image-builder Job to complete: %v", buildJob.Name), + }) + if err := r.Status().Update(ctx, &dataset); err != nil { + return ctrl.Result{}, fmt.Errorf("updating status with Ready=%v, Reason=%v: %w", metav1.ConditionFalse, ReasonBuilding, err) + } + + // Allow Job watch to requeue. + return ctrl.Result{}, nil + } + + // Job that will run the data-loader image that was built by the previous Job. + loadJob, err := r.loadJob(ctx, &dataset) + if err != nil { + lg.Error(err, "unable to construct data-loader Job") + // No use in retrying... + return ctrl.Result{}, nil + } + + if err := r.Create(ctx, loadJob); client.IgnoreAlreadyExists(err) != nil { return ctrl.Result{}, fmt.Errorf("creating Job: %w", err) } meta.SetStatusCondition(&dataset.Status.Conditions, metav1.Condition{ Type: ConditionReady, Status: metav1.ConditionFalse, - Reason: ReasonPulling, + Reason: ReasonLoading, ObservedGeneration: dataset.Generation, - Message: "Waiting for dataset data puller Job to complete.", + Message: fmt.Sprintf("Waiting for data-loader Job to complete: %v", loadJob.Name), }) if err := r.Status().Update(ctx, &dataset); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, fmt.Errorf("updating status with Ready=%v, Reason=%v: %w", metav1.ConditionFalse, ReasonLoading, err) } - if err := r.Get(ctx, client.ObjectKeyFromObject(job), job); err != nil { + if err := r.Get(ctx, client.ObjectKeyFromObject(loadJob), loadJob); err != nil { return ctrl.Result{}, fmt.Errorf("geting Job: %w", err) } - if job.Status.Succeeded < 1 { + if loadJob.Status.Succeeded < 1 { lg.Info("Job has not succeeded yet") + // Allow Job watch to requeue. return ctrl.Result{}, nil } @@ -97,12 +152,12 @@ func (r *DatasetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct meta.SetStatusCondition(&dataset.Status.Conditions, metav1.Condition{ Type: ConditionReady, Status: metav1.ConditionTrue, - Reason: ReasonPulled, + Reason: ReasonLoaded, ObservedGeneration: dataset.Generation, - Message: "Dataset is ready.", }) + if err := r.Status().Update(ctx, &dataset); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, fmt.Errorf("updating status with Ready=%v, Reason=%v: %w", metav1.ConditionTrue, ReasonLoaded, err) } return ctrl.Result{}, nil @@ -116,7 +171,10 @@ func (r *DatasetReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -const dataPullerServiceAccountName = "data-puller" +const ( + dataLoaderServiceAccountName = "data-loader" + dataLoaderBuilderServiceAccountName = "data-loader-builder" +) func (r *DatasetReconciler) authNServiceAccount(sa *corev1.ServiceAccount) error { if sa.Annotations == nil { @@ -124,24 +182,156 @@ func (r *DatasetReconciler) authNServiceAccount(sa *corev1.ServiceAccount) error } switch typ := r.CloudContext.CloudType; typ { case CloudTypeGCP: - sa.Annotations["iam.gke.io/gcp-service-account"] = fmt.Sprintf("substratus-data-puller@%s.iam.gserviceaccount.com", r.CloudContext.GCP.ProjectID) + sa.Annotations["iam.gke.io/gcp-service-account"] = fmt.Sprintf("substratus-%s@%s.iam.gserviceaccount.com", sa.GetName(), r.CloudContext.GCP.ProjectID) default: return fmt.Errorf("unsupported cloud type: %q", r.CloudContext.CloudType) } return nil } -func (r *DatasetReconciler) pullerJob(ctx context.Context, dataset *apiv1.Dataset) (*batchv1.Job, error) { +func (r *DatasetReconciler) buildJob(ctx context.Context, dataset *apiv1.Dataset) (*batchv1.Job, error) { + + var job *batchv1.Job + + annotations := map[string]string{} + + buildArgs := []string{ + "--dockerfile=Dockerfile", + "--destination=" + r.loaderImage(dataset), + // Cache will default to the image registry. + "--cache=true", + // Disable compressed caching to decrease memory usage. + // (See https://github.com/GoogleContainerTools/kaniko/blob/main/README.md#flag---compressed-caching) + "--compressed-caching=false", + "--log-format=text", + } + + var initContainers []corev1.Container + var volumeMounts []corev1.VolumeMount + var volumes []corev1.Volume + + const dockerfileWithTini = ` +# Add Tini +ENV TINI_VERSION v0.19.0 +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini +RUN chmod +x /tini +ENTRYPOINT ["/tini", "--"] +` + cloneArgs := []string{ + "clone", + dataset.Spec.Source.Git.URL, + } + if dataset.Spec.Source.Git.Branch != "" { + cloneArgs = append(cloneArgs, "--branch", dataset.Spec.Source.Git.Branch) + } + cloneArgs = append(cloneArgs, "/workspace") + + if dataset.Spec.Source.Git.Path != "" { + buildArgs = append(buildArgs, "--context-sub-path="+dataset.Spec.Source.Git.Path) + } + + // Add an init container that will clone the Git repo and + // another that will append tini to the Dockerfile. + initContainers = append(initContainers, + corev1.Container{ + Name: "git-clone", + Image: "alpine/git", + Args: cloneArgs, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "workspace", + MountPath: "/workspace", + }, + }, + }, + corev1.Container{ + Name: "dockerfile-tini-appender", + Image: "busybox", + Args: []string{ + "sh", + "-c", + "echo '" + dockerfileWithTini + "' >> /workspace/Dockerfile", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "workspace", + MountPath: "/workspace", + }, + }, + }, + ) + + volumeMounts = []corev1.VolumeMount{ + { + Name: "workspace", + MountPath: "/workspace", + }, + } + volumes = []corev1.Volume{ + { + Name: "workspace", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + } + + const builderContainerName = "loader-builder" + annotations["kubectl.kubernetes.io/default-container"] = builderContainerName + job = &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataset.Name + "-data-loader-builder", + // Cross-Namespace owners not allowed, must be same as dataset: + Namespace: dataset.Namespace, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: int32Ptr(1), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: annotations, + }, + Spec: corev1.PodSpec{ + InitContainers: initContainers, + SecurityContext: &corev1.PodSecurityContext{ + RunAsUser: int64Ptr(0), + RunAsGroup: int64Ptr(0), + FSGroup: int64Ptr(3003), + }, + ServiceAccountName: dataLoaderBuilderServiceAccountName, + Containers: []corev1.Container{{ + Name: builderContainerName, + Image: "gcr.io/kaniko-project/executor:latest", + Args: buildArgs, + VolumeMounts: volumeMounts, + }}, + RestartPolicy: "Never", + Volumes: volumes, + }, + }, + }, + } + + if err := controllerutil.SetControllerReference(dataset, job, r.Scheme); err != nil { + return nil, fmt.Errorf("setting owner reference: %w", err) + } + + return job, nil +} + +func (r *DatasetReconciler) loadJob(ctx context.Context, dataset *apiv1.Dataset) (*batchv1.Job, error) { + const loaderContainerName = "loader" job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: dataset.Name + "-data-puller", - // Cross-Namespace owners not allowed, must be same as model: + Name: dataset.Name + "-data-loader", + // Cross-Namespace owners not allowed, must be same as dataset: Namespace: dataset.Namespace, }, Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{}, + Annotations: map[string]string{ + "kubectl.kubernetes.io/default-container": loaderContainerName, + }, }, Spec: corev1.PodSpec{ SecurityContext: &corev1.PodSecurityContext{ @@ -149,31 +339,32 @@ func (r *DatasetReconciler) pullerJob(ctx context.Context, dataset *apiv1.Datase RunAsGroup: int64Ptr(2002), FSGroup: int64Ptr(3003), }, - ServiceAccountName: dataPullerServiceAccountName, - Containers: []corev1.Container{{ - Name: "puller", - // TODO: Support gcs:// and s3:// ... and others? - // Consider using: - // - Source-specific containers (i.e. gsutil, aws cli) - // - A universal data puller cli (i.e. rclone). - Image: "curlimages/curl", - Args: []string{"-o", "/data/" + dataset.Spec.Source.Filename, dataset.Spec.Source.URL}, - VolumeMounts: []corev1.VolumeMount{{ - Name: "data", - MountPath: "/data", - SubPath: string(dataset.UID), - }}, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), + ServiceAccountName: dataLoaderServiceAccountName, + Containers: []corev1.Container{ + { + Name: loaderContainerName, + Image: r.loaderImage(dataset), + Args: []string{"load.sh"}, + Env: []corev1.EnvVar{ + { + Name: "LOAD_DATA_PATH", + Value: "/data/" + dataset.Spec.Filename, + }, }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), + VolumeMounts: []corev1.VolumeMount{ + { + Name: "data", + MountPath: "/data", + SubPath: string(dataset.UID) + "/data", + }, + { + Name: "data", + MountPath: "/dataset/logs", + SubPath: string(dataset.UID) + "/logs", + }, }, }, - }}, + }, Volumes: []corev1.Volume{}, RestartPolicy: "Never", }, @@ -198,7 +389,7 @@ func (r *DatasetReconciler) pullerJob(ctx context.Context, dataset *apiv1.Datase }, }) dataset.Status.URL = "gcs://" + r.CloudContext.GCP.ProjectID + "-substratus-datasets" + - "/" + string(dataset.UID) + "/" + dataset.Spec.Source.Filename + "/" + string(dataset.UID) + "/data/" + dataset.Spec.Filename } if err := controllerutil.SetControllerReference(dataset, job, r.Scheme); err != nil { @@ -207,3 +398,13 @@ func (r *DatasetReconciler) pullerJob(ctx context.Context, dataset *apiv1.Datase return job, nil } + +func (r *DatasetReconciler) loaderImage(d *apiv1.Dataset) string { + switch typ := r.CloudContext.CloudType; typ { + case CloudTypeGCP: + // Assuming this is Google Artifact Registry named "substratus". + return fmt.Sprintf("%s-docker.pkg.dev/%s/substratus/dataset-%s-%s", r.CloudContext.GCP.Region(), r.CloudContext.GCP.ProjectID, d.Namespace, d.Name) + default: + panic("unsupported cloud type: " + typ) + } +} diff --git a/internal/controller/dataset_controller_test.go b/internal/controller/dataset_controller_test.go index 7f1f95f2..30b8e2c8 100644 --- a/internal/controller/dataset_controller_test.go +++ b/internal/controller/dataset_controller_test.go @@ -22,27 +22,31 @@ func TestDataset(t *testing.T) { Namespace: "default", }, Spec: apiv1.DatasetSpec{ + Filename: "does-not-exist.jsonl", Source: apiv1.DatasetSource{ - URL: "https://test.internal/does/not/exist.jsonl", - Filename: "does-not-exist.jsonl", + Git: &apiv1.GitSource{ + URL: "https://github.com/substratusai/dataset-some-dataset", + }, }, }, } require.NoError(t, k8sClient.Create(ctx, dataset), "create a dataset") - // Test that a data puller ServiceAccount gets created by the controller. + // Test that a data loader ServiceAccount gets created by the controller. var sa corev1.ServiceAccount require.EventuallyWithT(t, func(t *assert.CollectT) { - err := k8sClient.Get(ctx, types.NamespacedName{Namespace: dataset.Namespace, Name: "data-puller"}, &sa) - assert.NoError(t, err, "getting the data puller serviceaccount") - }, timeout, interval, "waiting for the data puller serviceaccount to be created") - require.Equal(t, "substratus-data-puller@test-project-id.iam.gserviceaccount.com", sa.Annotations["iam.gke.io/gcp-service-account"]) + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: dataset.Namespace, Name: "data-loader"}, &sa) + assert.NoError(t, err, "getting the data loader serviceaccount") + }, timeout, interval, "waiting for the data loader serviceaccount to be created") + require.Equal(t, "substratus-data-loader@test-project-id.iam.gserviceaccount.com", sa.Annotations["iam.gke.io/gcp-service-account"]) - // Test that a data puller Job gets created by the controller. + // Test that a data loader builder Job gets created by the controller. var job batchv1.Job require.EventuallyWithT(t, func(t *assert.CollectT) { - err := k8sClient.Get(ctx, types.NamespacedName{Namespace: dataset.Namespace, Name: dataset.Name + "-data-puller"}, &job) - assert.NoError(t, err, "getting the data puller job") - }, timeout, interval, "waiting for the data puller job to be created") - require.Equal(t, "puller", job.Spec.Template.Spec.Containers[0].Name) + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: dataset.Namespace, Name: dataset.Name + "-data-loader-builder"}, &job) + assert.NoError(t, err, "getting the data loader builder job") + }, timeout, interval, "waiting for the data loader builder job to be created") + require.Equal(t, "loader-builder", job.Spec.Template.Spec.Containers[0].Name) + + // TODO: Test loader Job after builder Job. } diff --git a/internal/controller/model_controller.go b/internal/controller/model_controller.go index 18b04e67..72e1a877 100644 --- a/internal/controller/model_controller.go +++ b/internal/controller/model_controller.go @@ -30,6 +30,7 @@ const ( ReasonBuilding = "Building" ReasonBuiltAndPushed = "BuiltAndPushed" + ReasonSourceModelNotFound = "SourceModelNotFound" ReasonSourceModelNotReady = "SourceModelNotReady" TrainingDatasetNotReady = "TrainingDatasetNotReady" @@ -70,6 +71,22 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl var sourceModel apiv1.Model if model.Spec.Source.ModelName != "" { if err := r.Client.Get(ctx, types.NamespacedName{Namespace: model.Namespace, Name: model.Spec.Source.ModelName}, &sourceModel); err != nil { + if apierrors.IsNotFound(err) { + // Update this Model's status. + meta.SetStatusCondition(&model.Status.Conditions, metav1.Condition{ + Type: ConditionReady, + Status: metav1.ConditionFalse, + Reason: ReasonSourceModelNotFound, + ObservedGeneration: model.Generation, + }) + if err := r.Status().Update(ctx, &model); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update model status: %w", err) + } + + // TODO: Implement watch on source Model. + return ctrl.Result{RequeueAfter: 3 * time.Second}, nil + } + return ctrl.Result{}, fmt.Errorf("getting source model: %w", err) } if ready := meta.FindStatusCondition(sourceModel.Status.Conditions, ConditionReady); ready == nil || ready.Status != metav1.ConditionTrue || sourceModel.Status.ContainerImage == "" { @@ -132,7 +149,7 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, fmt.Errorf("creating Job: %w", err) } } else { - return ctrl.Result{}, fmt.Errorf("geting Job: %w", err) + return ctrl.Result{}, fmt.Errorf("getting Job: %w", err) } } if trainingJob.Status.Succeeded < 1 { @@ -226,7 +243,7 @@ func (r *ModelReconciler) modelImage(m *apiv1.Model) string { switch typ := r.CloudContext.CloudType; typ { case CloudTypeGCP: // Assuming this is Google Artifact Registry named "substratus". - return fmt.Sprintf("%s-docker.pkg.dev/%s/substratus/%s-%s", r.CloudContext.GCP.Region(), r.CloudContext.GCP.ProjectID, m.Namespace, m.Name) + return fmt.Sprintf("%s-docker.pkg.dev/%s/substratus/model-%s-%s", r.CloudContext.GCP.Region(), r.CloudContext.GCP.ProjectID, m.Namespace, m.Name) default: panic("unsupported cloud type: " + typ) } @@ -246,6 +263,7 @@ func (r *ModelReconciler) buildJob(ctx context.Context, model *apiv1.Model, sour // Disable compressed caching to decrease memory usage. // (See https://github.com/GoogleContainerTools/kaniko/blob/main/README.md#flag---compressed-caching) "--compressed-caching=false", + "--log-format=text", } var initContainers []corev1.Container @@ -386,6 +404,7 @@ ENTRYPOINT ["/tini", "--"] } } + annotations["kubectl.kubernetes.io/default-container"] = RuntimeBuilder job = &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: model.Name + "-model-builder", @@ -497,6 +516,7 @@ func (r *ModelReconciler) trainingJob(ctx context.Context, model *apiv1.Model, s }) } + annotations["kubectl.kubernetes.io/default-container"] = RuntimeTrainer job = &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: model.Name + "-model-trainer", @@ -525,8 +545,20 @@ func (r *ModelReconciler) trainingJob(ctx context.Context, model *apiv1.Model, s Args: []string{"train.sh"}, Env: []corev1.EnvVar{ { - Name: "DATA_PATH", - Value: "/data/" + dataset.Spec.Source.Filename, + Name: "TRAIN_DATA_PATH", + Value: "/data/" + dataset.Spec.Filename, + }, + { + Name: "TRAIN_DATA_LIMIT", + Value: fmt.Sprintf("%v", model.Spec.Training.Params.DataLimit), + }, + { + Name: "TRAIN_BATCH_SIZE", + Value: fmt.Sprintf("%v", model.Spec.Training.Params.BatchSize), + }, + { + Name: "TRAIN_EPOCHS", + Value: fmt.Sprintf("%v", model.Spec.Training.Params.Epochs), }, }, VolumeMounts: []corev1.VolumeMount{ diff --git a/internal/controller/model_controller_test.go b/internal/controller/model_controller_test.go index 9ab3b3e5..ebf6af1e 100644 --- a/internal/controller/model_controller_test.go +++ b/internal/controller/model_controller_test.go @@ -27,7 +27,7 @@ func TestModelFromGit(t *testing.T) { Spec: apiv1.ModelSpec{ Source: apiv1.ModelSource{ Git: &apiv1.GitSource{ - URL: "test.com/test/test.git", + URL: "https://test.com/test/test.git", }, }, Compute: apiv1.ModelCompute{ @@ -65,7 +65,7 @@ func TestModelFromModel(t *testing.T) { Spec: apiv1.ModelSpec{ Source: apiv1.ModelSource{ Git: &apiv1.GitSource{ - URL: "test.com/test/test.git", + URL: "https://test.com/test/test", }, }, Compute: apiv1.ModelCompute{ @@ -89,9 +89,11 @@ func TestModelFromModel(t *testing.T) { Namespace: "default", }, Spec: apiv1.DatasetSpec{ + Filename: "does-not-exist.jsonl", Source: apiv1.DatasetSource{ - URL: "https://test.internal/does/not/exist.jsonl", - Filename: "does-not-exist.jsonl", + Git: &apiv1.GitSource{ + URL: "https://github.com/substratusai/dataset-test-test", + }, }, }, } diff --git a/internal/controller/modelserver_controller.go b/internal/controller/modelserver_controller.go index a2ce7282..7d7c44e4 100644 --- a/internal/controller/modelserver_controller.go +++ b/internal/controller/modelserver_controller.go @@ -6,6 +6,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -53,7 +54,22 @@ func (r *ModelServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) var model apiv1.Model if err := r.Get(ctx, client.ObjectKey{Name: server.Spec.ModelName, Namespace: server.Namespace}, &model); err != nil { - return ctrl.Result{}, fmt.Errorf("model not found: %w", err) + if apierrors.IsNotFound(err) { + // Update this Model's status. + meta.SetStatusCondition(&server.Status.Conditions, metav1.Condition{ + Type: ConditionReady, + Status: metav1.ConditionFalse, + Reason: ReasonSourceModelNotFound, + ObservedGeneration: server.Generation, + }) + if err := r.Status().Update(ctx, &server); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update server status: %w", err) + } + + return ctrl.Result{}, nil + } + + return ctrl.Result{}, fmt.Errorf("getting model: %w", err) } var isRegistered bool diff --git a/internal/controller/notebook_controller.go b/internal/controller/notebook_controller.go index 29454b47..0dc77f5f 100644 --- a/internal/controller/notebook_controller.go +++ b/internal/controller/notebook_controller.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -45,9 +46,46 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, client.IgnoreNotFound(err) } + if notebook.Spec.Suspend { + meta.SetStatusCondition(¬ebook.Status.Conditions, metav1.Condition{ + Type: ConditionReady, + Status: metav1.ConditionFalse, + Reason: ReasonSuspended, + ObservedGeneration: notebook.Generation, + }) + if err := r.Status().Update(ctx, ¬ebook); err != nil { + return ctrl.Result{}, fmt.Errorf("updating notebook status: %w", err) + } + + var pod corev1.Pod + pod.SetName(nbPodName(¬ebook)) + pod.SetNamespace(notebook.Namespace) + if err := r.Delete(ctx, &pod); err != nil { + if !apierrors.IsNotFound(err) { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + var model apiv1.Model if err := r.Get(ctx, client.ObjectKey{Name: notebook.Spec.ModelName, Namespace: notebook.Namespace}, &model); err != nil { - return ctrl.Result{}, fmt.Errorf("model not found: %w", err) + if apierrors.IsNotFound(err) { + // Update this Model's status. + meta.SetStatusCondition(¬ebook.Status.Conditions, metav1.Condition{ + Type: ConditionReady, + Status: metav1.ConditionFalse, + Reason: ReasonSourceModelNotFound, + ObservedGeneration: notebook.Generation, + }) + if err := r.Status().Update(ctx, ¬ebook); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update notebook status: %w", err) + } + + // TODO: Implement watch on source Model. + return ctrl.Result{RequeueAfter: 3 * time.Second}, nil + } + return ctrl.Result{}, fmt.Errorf("getting model: %w", err) } if ready := meta.FindStatusCondition(model.Status.Conditions, ConditionReady); ready == nil || ready.Status != metav1.ConditionTrue { @@ -142,6 +180,10 @@ func (r *NotebookReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +func nbPodName(nb *apiv1.Notebook) string { + return nb.Name + "-notebook" +} + func (r *NotebookReconciler) notebookPod(nb *apiv1.Notebook, model *apiv1.Model) (*corev1.Pod, error) { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -149,7 +191,7 @@ func (r *NotebookReconciler) notebookPod(nb *apiv1.Notebook, model *apiv1.Model) Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: nb.Name + "-notebook", + Name: nbPodName(nb), Namespace: nb.Namespace, }, Spec: corev1.PodSpec{