From 0a26a45bb8b9de2d48cbf5b07c5b5087c5d2c75b Mon Sep 17 00:00:00 2001 From: Chris Chase Date: Fri, 28 Jun 2024 16:52:40 -0400 Subject: [PATCH 1/4] PyTorch conversion WIP Adding Ray distributed training WIP more wip wip it good WIP working distributed training job Added onnx conversion and cleaned up Updating rest/grpc requests Remove unused notebook Add distributed training notebook Delete unused csv Remove unused HF_USER/TOKEN env vars from distributed training. Fixed Elyra pipeline Fixed gpu errors in training notebook --- .gitignore | 4 +- 1_experiment_train.ipynb | 646 ++++++++++++++++++----------- 3_rest_requests_multi_model.ipynb | 55 ++- 4_grpc_requests_multi_model.ipynb | 67 +-- 5_rest_requests_single_model.ipynb | 63 ++- 6 Train Save.pipeline | 16 +- ray-scripts/requirements.txt | 15 +- ray-scripts/train_cpu.py | 308 ++++++++++++++ utils/s3.py | 4 +- 9 files changed, 826 insertions(+), 352 deletions(-) create mode 100644 ray-scripts/train_cpu.py diff --git a/.gitignore b/.gitignore index 241c36e..32f3c1b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ -models -artifact +artifact/ +models/ node_modules/ # Byte-compiled / optimized / DLL files diff --git a/1_experiment_train.ipynb b/1_experiment_train.ipynb index 7b0feda..672edb8 100644 --- a/1_experiment_train.ipynb +++ b/1_experiment_train.ipynb @@ -2,82 +2,42 @@ "cells": [ { "cell_type": "markdown", - "metadata": { - "tags": [] - }, + "metadata": {}, "source": [ "# Experiment" ] }, - { - "cell_type": "markdown", - "metadata": { - "tags": [] - }, - "source": [ - "## Install Python dependencies" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:05.830869Z", - "start_time": "2024-08-19T15:45:04.819700Z" - }, - "is_executing": true, - "scrolled": true - }, - "outputs": [], - "source": [ - "!pip install onnx onnxruntime tf2onnx" - ] - }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Import the dependencies for the model training code:" + "### Set the target device" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:08.983925Z", - "start_time": "2024-08-19T15:45:05.835311Z" - } + "tags": [] }, "outputs": [], "source": [ - "import numpy as np\n", - "import pandas as pd\n", - "import datetime\n", - "from keras.models import Sequential\n", - "from keras.layers import Dense, Dropout, BatchNormalization, Activation\n", - "from sklearn.model_selection import train_test_split\n", - "from sklearn.preprocessing import StandardScaler\n", - "from sklearn.utils import class_weight\n", - "import tf2onnx\n", - "import onnx\n", - "import pickle\n", - "from pathlib import Path" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The output might show TensorFlow messages, such as a \"Could not find TensorRT\" warning. You can ignore these messages.\n" + "import torch\n", + "\n", + "device = (\n", + " \"cuda\" if torch.cuda.is_available()\n", + " else \"mps\" if torch.backends.mps.is_available()\n", + " else \"cpu\"\n", + ")\n", + "\n", + "device" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Load the CSV data\n", + "## Working with Data\n", "\n", "The CSV data that you use to train the model contains the following fields:\n", "\n", @@ -95,15 +55,12 @@ "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:09.394745Z", - "start_time": "2024-08-19T15:45:09.051361Z" - } + "tags": [] }, "outputs": [], "source": [ - "# Set the input (X) and output (Y) data. \n", - "# The only output data is whether it's fraudulent. All other fields are inputs to the model.\n", + "import torch\n", + "import pandas as pd \n", "\n", "feature_indexes = [\n", " 1, # distance_from_last_transaction\n", @@ -117,339 +74,551 @@ " 7 # fraud\n", "]\n", "\n", - "df = pd.read_csv('data/train.csv')\n", - "X_train = df.iloc[:, feature_indexes].values\n", - "y_train = df.iloc[:, label_indexes].values\n", - "\n", - "df = pd.read_csv('data/validate.csv')\n", - "X_val = df.iloc[:, feature_indexes].values\n", - "y_val = df.iloc[:, label_indexes].values\n", + "train_df = pd.read_csv('data/train.csv')\n", + "labels_df = train_df.iloc[:, label_indexes]\n", + "train_df = train_df.iloc[:, feature_indexes]\n", + "train_df_tensor = torch.tensor(train_df.values, dtype=torch.float).to(device)\n", + "labels_df_tensor = torch.tensor(labels_df.values, dtype=torch.float).to(device)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Scaling the data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import torch\n", "\n", - "df = pd.read_csv('data/test.csv')\n", - "X_test = df.iloc[:, feature_indexes].values\n", - "y_test = df.iloc[:, label_indexes].values\n", "\n", + "# like scikit learn standard scaler\n", + "class TorchStandardScaler:\n", + " def __init__(self):\n", + " self.mean = None\n", + " self.std = None\n", "\n", - "# Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values.\n", - "# It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set.\n", + " def fit(self, tensor):\n", + " self.mean = tensor.mean(dim=0, keepdim=False)\n", + " self.std = tensor.std(dim=0, keepdim=False)\n", "\n", - "scaler = StandardScaler()\n", + " def transform(self, tensor):\n", + " return (tensor - self.mean) / self.std\n", "\n", - "X_train = scaler.fit_transform(X_train)\n", - "X_val = scaler.transform(X_val)\n", - "X_test = scaler.transform(X_test)\n", + " def fit_transform(self, tensor):\n", + " self.fit(tensor)\n", + " return self.transform(tensor)\n", "\n", - "Path(\"artifact\").mkdir(parents=True, exist_ok=True)\n", - "with open(\"artifact/test_data.pkl\", \"wb\") as handle:\n", - " pickle.dump((X_test, y_test), handle)\n", - "with open(\"artifact/scaler.pkl\", \"wb\") as handle:\n", - " pickle.dump(scaler, handle)\n", "\n", - "# Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions.\n", - "class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.ravel())\n", - "class_weights = {i : class_weights[i] for i in range(len(class_weights))}" + "train_df_tensor = torch.tensor(train_df.values, dtype=torch.float).to(device)\n", + "scaler = TorchStandardScaler()\n", + "scaler.fit(train_df_tensor)\n", + "scaler.mean, scaler.std" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Build the model\n", + "## Create PyTorch Datasets and DataLoaders" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import torch\n", + "from torch.utils.data import Dataset, DataLoader\n", "\n", - "The model is a simple, fully-connected, deep neural network, containing three hidden layers and one output layer." + "\n", + "class CSVDataset(Dataset):\n", + " def __init__(self, csv_file, pyarrow_fs=None, transform=None, target_transform=None):\n", + " self.feature_indexes = feature_indexes\n", + " self.label_indexes = label_indexes\n", + " \n", + " if pyarrow_fs:\n", + " with pyarrow_fs.open_input_file(csv_file) as file:\n", + " training_table = pv.read_csv(file)\n", + " self.data = training_table.to_pandas()\n", + " else:\n", + " self.data = pd.read_csv(csv_file)\n", + "\n", + "\n", + " self.features = self.data.iloc[:, self.feature_indexes].values\n", + " self.labels = self.data.iloc[:, self.label_indexes].values\n", + " self.features = torch.tensor(self.features, dtype=torch.float).to(device)\n", + " self.labels = torch.tensor(self.labels, dtype=torch.float).to(device)\n", + "\n", + " self.transform = transform\n", + " self.target_transform = target_transform\n", + "\n", + " if self.transform:\n", + " self.features = self.transform(self.features)\n", + " if self.target_transform:\n", + " self.labels = self.target_transform(self.labels)\n", + "\n", + " def __len__(self):\n", + " return len(self.data)\n", + "\n", + " def __getitem__(self, idx):\n", + " if torch.is_tensor(idx):\n", + " idx = idx.tolist()\n", + " features = self.features[idx]\n", + " label = self.labels[idx]\n", + " return features, label\n", + "\n", + "\n", + "training_data = CSVDataset('data/train.csv')\n", + "validation_data = CSVDataset('data/validate.csv')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:09.489856Z", - "start_time": "2024-08-19T15:45:09.419813Z" - } + "tags": [] }, "outputs": [], "source": [ - "model = Sequential()\n", - "model.add(Dense(32, activation='relu', input_dim=len(feature_indexes)))\n", - "model.add(Dropout(0.2))\n", - "model.add(Dense(32))\n", - "model.add(BatchNormalization())\n", - "model.add(Activation('relu'))\n", - "model.add(Dropout(0.2))\n", - "model.add(Dense(32))\n", - "model.add(BatchNormalization())\n", - "model.add(Activation('relu'))\n", - "model.add(Dropout(0.2))\n", - "model.add(Dense(1, activation='sigmoid'))\n", - "\n", - "model.compile(\n", - " optimizer='adam',\n", - " loss='binary_crossentropy',\n", - " metrics=['accuracy']\n", - ")\n", + "from torch.utils.data import DataLoader\n", + "\n", + "batch_size = 64\n", "\n", - "model.summary()" + "training_dataloader = DataLoader(training_data, batch_size=batch_size)\n", + "validation_dataloader = DataLoader(validation_data, batch_size=batch_size)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Train the model\n", + "## Build the model\n", "\n", - "Training a model is often the most time-consuming part of the machine learning process. Large models can take multiple GPUs for days. Expect the training on CPU for this very simple model to take a minute or more." + "The model is a simple, fully-connected, deep neural network, containing three hidden layers and one output layer." ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:29.664796Z", - "start_time": "2024-08-19T15:45:09.496686Z" - } + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "tags": [] }, "outputs": [], "source": [ - "# Train the model and get performance\n", - "import os\n", - "import time\n", - "\n", - "start = time.time()\n", - "epochs = 2\n", - "history = model.fit(\n", - " X_train,\n", - " y_train,\n", - " epochs=epochs,\n", - " validation_data=(X_val, y_val),\n", - " verbose=True,\n", - " class_weight=class_weights\n", - ")\n", - "end = time.time()\n", - "print(f\"Training of model is complete. Took {end-start} seconds\")" + "from torch import nn\n", + "\n", + "\n", + "class NeuralNetwork(nn.Module):\n", + " def __init__(self, scaler):\n", + " super().__init__()\n", + " self.linear_relu_stack = nn.Sequential(\n", + " nn.Linear(5, 32),\n", + " nn.ReLU(),\n", + " nn.Linear(32, 32),\n", + " nn.ReLU(),\n", + " nn.Linear(32, 32),\n", + " nn.ReLU(),\n", + " nn.Linear(32, 1),\n", + " nn.Sigmoid(),\n", + " )\n", + " self.scaler = scaler\n", + "\n", + " def forward(self, x):\n", + " with torch.no_grad():\n", + " x_pre = self.scaler.transform(x)\n", + " probs = self.linear_relu_stack(x_pre)\n", + " return probs\n", + "\n", + "\n", + "model = NeuralNetwork(scaler).to(device)\n", + "model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Save the model file" + "## Train the model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:29.845680Z", - "start_time": "2024-08-19T15:45:29.674230Z" - } + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "tags": [] }, "outputs": [], "source": [ - "import tensorflow as tf\n", + "from sklearn.metrics import precision_score, recall_score\n", "\n", - "# Normally we use tf2.onnx.convert.from_keras.\n", - "# workaround for tf2onnx bug https://github.com/onnx/tensorflow-onnx/issues/2348\n", "\n", - "# Wrap the model in a `tf.function`\n", - "@tf.function(input_signature=[tf.TensorSpec([None, X_train.shape[1]], tf.float32, name='dense_input')])\n", - "def model_fn(x):\n", - " return model(x)\n", + "def train_loop(dataloader, model, loss_fn, optimizer):\n", + " size = len(dataloader.dataset)\n", + " model.train()\n", + " for batch, (X, y) in enumerate(dataloader):\n", + " pred = model(X)\n", + " loss = loss_fn(pred, y)\n", "\n", - "# Convert the Keras model to ONNX\n", - "model_proto, _ = tf2onnx.convert.from_function(\n", - " model_fn,\n", - " input_signature=[tf.TensorSpec([None, X_train.shape[1]], tf.float32, name='dense_input')]\n", - ")\n", + " loss.backward()\n", + " optimizer.step()\n", + " optimizer.zero_grad()\n", "\n", - "# Save the model as ONNX for easy use of ModelMesh\n", - "os.makedirs(\"models/fraud/1\", exist_ok=True)\n", - "onnx.save(model_proto, \"models/fraud/1/model.onnx\")" + " if batch % round(size / batch_size / 10) == 0:\n", + " loss = loss.item()\n", + " current = batch * batch_size + len(X)\n", + " print(f\"loss: {loss:>7f} [{current:>5d}/{size:>5d}]\")\n", + "\n", + "\n", + "def eval_loop(dataloader, model, loss_fn):\n", + " model.eval()\n", + " size = len(dataloader.dataset)\n", + " num_batches = len(dataloader)\n", + " eval_loss, correct = 0, 0\n", + "\n", + " all_preds = torch.tensor([])\n", + " all_labels = torch.tensor([])\n", + "\n", + " with torch.no_grad():\n", + " for X, y in dataloader:\n", + " pred = model(X)\n", + " eval_loss += loss_fn(pred, y).item()\n", + " correct += torch.eq(torch.round(pred), y).sum().item()\n", + "\n", + " pred_labels = torch.round(pred)\n", + " all_preds = torch.cat((all_preds, pred_labels.cpu()))\n", + " all_labels = torch.cat((all_labels, y.cpu()))\n", + "\n", + " precision = precision_score(all_labels, all_preds)\n", + " recall = recall_score(all_labels, all_preds)\n", + "\n", + " eval_loss /= num_batches\n", + " accuracy = correct / size * 100\n", + "\n", + " return {\n", + " \"accuracy\": accuracy,\n", + " \"loss\": eval_loss,\n", + " \"precision\": precision,\n", + " \"recall\": recall\n", + " }\n", + "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "The output might include TensorFlow messages related to GPUs. You can ignore these messages." + "Training a model is often the most time-consuming part of the machine learning process. Large models can take multiple GPUs for days. Expect the training on CPU for this very simple model to take a minute or more." ] }, { - "cell_type": "markdown", - "metadata": {}, + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], "source": [ - "## Confirm the model file was created successfully\n", + "%%time\n", + "\n", + "import torch.nn as nn\n", + "\n", + "loss_fn = nn.BCELoss().to(device)\n", + "\n", + "learning_rate = 1e-3\n", + "optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)\n", "\n", - "The output should include the model name, size, and date. " + "num_epochs = 2\n", + "for t in range(num_epochs):\n", + " print(f\"\\nEpoch {t+1}\\n-------------------------------\")\n", + " train_loop(training_dataloader, model, loss_fn, optimizer)\n", + " metrics = eval_loop(validation_dataloader, model, loss_fn)\n", + " print(f\"Eval Metrics: \\n Accuracy: {(metrics['accuracy']):>0.1f}%, Avg loss: {metrics['loss']:>8f}, \"\n", + " f\"Precision: {metrics['precision']:.4f}, Recall: {metrics['recall']:.4f} \\n\")\n", + "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:30.012353Z", - "start_time": "2024-08-19T15:45:29.856416Z" - } + "tags": [] }, "outputs": [], "source": [ - "! ls -alRh ./models/" + "print(f\"Eval Metrics: \\n Accuracy: {(metrics['accuracy']):>0.1f}%, Avg loss: {metrics['loss']:>8f}, \"\n", + " f\"Precision: {metrics['precision']:.4f}, Recall: {metrics['recall']:.4f} \\n\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Test the model" + "### Test Model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:30.047040Z", - "start_time": "2024-08-19T15:45:30.029773Z" - } + "tags": [] }, "outputs": [], "source": [ - "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", - "import numpy as np\n", - "import pickle\n", - "import onnxruntime as rt" + "def run_inference(test_data):\n", + " model.eval()\n", + " with torch.inference_mode():\n", + " prediction = torch.round(model(test_data))\n", + "\n", + " if prediction.item() == 1:\n", + " return \"fraud\"\n", + " else:\n", + " return \"NOT fraud\"" ] }, { - "cell_type": "markdown", - "metadata": {}, + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], "source": [ - "Load the test data and scaler:" + "# valid transaction\n", + "valid_tx = torch.tensor([[0.0, 1.0, 1.0, 1.0, 0.0]]).to(device)\n", + "prediction = run_inference(valid_tx)\n", + "print(f\"The model thinks the valid transaction is {prediction}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:30.062713Z", - "start_time": "2024-08-19T15:45:30.058023Z" - } + "tags": [] }, "outputs": [], "source": [ - "with open('artifact/scaler.pkl', 'rb') as handle:\n", - " scaler = pickle.load(handle)\n", - "with open('artifact/test_data.pkl', 'rb') as handle:\n", - " (X_test, y_test) = pickle.load(handle)" + "# fraudulent use case\n", + "fraud_tx = torch.tensor([[100, 1.2, 0.0, 0.0, 1.0]]).to(device)\n", + "prediction = run_inference(fraud_tx)\n", + "print(f\"The model thinks the valid transaction is {prediction}\")" ] }, { - "cell_type": "markdown", - "metadata": {}, + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], "source": [ - "Create an ONNX inference runtime session and predict values for all test inputs:" + "# test_df = pd.read_csv('data/test_sample.csv', )\n", + "test_df = pd.read_csv('data/test.csv', )\n", + "test_labels_df = test_df.iloc[:, label_indexes]\n", + "test_data_df = test_df.iloc[:, feature_indexes]\n", + "test_data_df_tensor = torch.tensor(test_data_df.values, dtype=torch.float).to(device)\n", + "test_labels_df_tensor = torch.tensor(test_labels_df.values, dtype=torch.float).to(device)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:30.210272Z", - "start_time": "2024-08-19T15:45:30.073900Z" - } + "tags": [] }, "outputs": [], "source": [ - "sess = rt.InferenceSession(\"models/fraud/1/model.onnx\", providers=rt.get_available_providers())\n", - "input_name = sess.get_inputs()[0].name\n", - "output_name = sess.get_outputs()[0].name\n", - "y_pred_temp = sess.run([output_name], {input_name: X_test.astype(np.float32)}) \n", - "y_pred_temp = np.asarray(np.squeeze(y_pred_temp[0]))\n", - "threshold = 0.95\n", - "y_pred = np.where(y_pred_temp > threshold, 1, 0)" + "model.eval()\n", + "with torch.inference_mode():\n", + " y_pred = model(test_data_df_tensor)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", + "from matplotlib import pyplot as plt\n", + "\n", + "correct = torch.eq(torch.round(y_pred), test_labels_df_tensor).sum().item()\n", + "acc = (correct / len(y_pred)) * 100\n", + "\n", + "y_pred_cpu = torch.Tensor.cpu(torch.Tensor.cpu(torch.round(y_pred)))\n", + "test_labels_df_tensor_cpu = torch.Tensor.cpu(torch.Tensor.cpu(test_labels_df_tensor))\n", + "\n", + "precision = precision_score(test_labels_df_tensor_cpu, y_pred_cpu)\n", + "recall = recall_score(test_labels_df_tensor_cpu, y_pred_cpu)\n", + "\n", + "print(f\"Eval Metrics: \\n Accuracy: {acc:>0.1f}%, \"\n", + " f\"Precision: {precision:.4f}, Recall: {recall:.4f} \\n\")\n", + "\n", + "c_matrix = confusion_matrix(test_labels_df_tensor_cpu,\n", + " y_pred_cpu)\n", + "ConfusionMatrixDisplay(c_matrix).plot()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Show the results:" + "## Export to ONNX\n", + "\n", + "If we want to use the model again without having the original neural network, we can save it as ONNX. In addition, this format is useful for model serving." ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:30.644142Z", - "start_time": "2024-08-19T15:45:30.221686Z" - }, "tags": [] }, "outputs": [], "source": [ - "from sklearn.metrics import precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay\n", - "import numpy as np\n", - "\n", - "y_test_arr = y_test.squeeze()\n", - "correct = np.equal(y_pred, y_test_arr).sum().item()\n", - "acc = (correct / len(y_pred)) * 100\n", - "precision = precision_score(y_test_arr, np.round(y_pred))\n", - "recall = recall_score(y_test_arr, np.round(y_pred))\n", - "\n", - "print(f\"Eval Metrics: \\n Accuracy: {acc:>0.1f}%, \"\n", - " f\"Precision: {precision:.4f}, Recall: {recall:.4f} \\n\")\n", + "!pip install onnx onnxscript onnxruntime" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", "\n", - "c_matrix = confusion_matrix(y_test_arr, y_pred)\n", - "ConfusionMatrixDisplay(c_matrix).plot()" + "os.makedirs(\"models/fraud/1\", exist_ok=True)\n", + "dummy_input = torch.randn(1, 5, device=device)\n", + "onnx_model = torch.onnx.export(\n", + " model,\n", + " dummy_input,\n", + " \"models/fraud/1/model.onnx\",\n", + " input_names=[\"inputs\"],\n", + " output_names=[\"outputs\"],\n", + " dynamic_axes={\n", + " \"inputs\": {0: \"batch_size\"},\n", + " },\n", + " verbose=True)" ] }, { "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "### Test the ONNX model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", + "import numpy as np\n", + "import pickle\n", + "from matplotlib import pyplot as plt\n", + "\n", + "import onnx\n", + "import onnxruntime as rt" + ] + }, + { + "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ - "## Example: Is Sally's transaction likely to be fraudulent?\n", + "onnx_test_data = test_data_df.values\n", + "onnx_test_data = np.float32(onnx_test_data)\n", "\n", - "Here is the order of the fields from Sally's transaction details:\n", - "* distance_from_last_transaction\n", - "* ratio_to_median_price\n", - "* used_chip \n", - "* used_pin_number\n", - "* online_order " + "onnx_test_labels = test_data_df.values\n", + "onnx_test_labels = np.float32(onnx_test_labels)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sess = rt.InferenceSession(\"models/fraud/1/model.onnx\", providers=rt.get_available_providers())\n", + "input_name = sess.get_inputs()[0].name\n", + "output_name = sess.get_outputs()[0].name\n", + "input_name, output_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "ExecuteTime": { - "end_time": "2024-08-19T15:45:30.679688Z", - "start_time": "2024-08-19T15:45:30.669086Z" - }, "tags": [] }, "outputs": [], "source": [ - "sally_transaction_details = [\n", - " [0.3111400080477545,\n", - " 1.9459399775518593,\n", - " 1.0,\n", - " 0.0,\n", - " 0.0]\n", - " ]\n", + "onnx_output = sess.run([output_name], {input_name: onnx_test_data})[0]\n", + "onnx_output" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", + "from matplotlib import pyplot as plt\n", + "import numpy as np\n", "\n", - "prediction = sess.run([output_name], {input_name: scaler.transform(sally_transaction_details).astype(np.float32)})\n", + "correct = np.equal(np.round(onnx_output), test_labels_df).sum().item()\n", + "acc = (correct / len(onnx_output)) * 100\n", + "precision = precision_score(test_labels_df_tensor_cpu, np.round(onnx_output))\n", + "recall = recall_score(test_labels_df, np.round(onnx_output))\n", "\n", - "print(\"Is Sally's transaction predicted to be fraudulent? (true = YES, false = NO) \")\n", - "print(np.squeeze(prediction) > threshold)\n", + "print(f\"Eval Metrics: \\n Accuracy: {acc:>0.1f}%, \"\n", + " f\"Precision: {precision:.4f}, Recall: {recall:.4f} \\n\")\n", "\n", - "print(\"How likely was Sally's transaction to be fraudulent? \")\n", - "print(\"{:.5f}\".format(100 * np.squeeze(prediction)) + \"%\")" + "c_matrix = confusion_matrix(test_labels_df.values, np.round(onnx_output))\n", + "ConfusionMatrixDisplay(c_matrix).plot()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "### Check our ONNX output matches our original PyTorch Model Output" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "np.array_equal(np.round(y_pred.numpy()), np.round(onnx_output))" ] }, { @@ -476,12 +645,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.7" - }, - "vscode": { - "interpreter": { - "hash": "63462a1f26ab486248b2a0fd058a0d9f9a6566a80083a3e1eb8f35617f2381b2" - } + "version": "3.9.18" } }, "nbformat": 4, diff --git a/3_rest_requests_multi_model.ipynb b/3_rest_requests_multi_model.ipynb index b1a5fcb..ea4d8ee 100644 --- a/3_rest_requests_multi_model.ipynb +++ b/3_rest_requests_multi_model.ipynb @@ -22,7 +22,9 @@ "cell_type": "code", "execution_count": null, "id": "1d17b252-7827-4cae-adb0-f98c9d80bcd7", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "deployed_model_name = \"fraud\"\n", @@ -58,7 +60,7 @@ " json_data = {\n", " \"inputs\": [\n", " {\n", - " \"name\": \"dense_input\",\n", + " \"name\": \"inputs\",\n", " \"shape\": [1, 5],\n", " \"datatype\": \"FP32\",\n", " \"data\": data\n", @@ -71,28 +73,17 @@ " return response_dict['outputs'][0]['data']" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "8cdbe0b1", - "metadata": {}, - "outputs": [], - "source": [ - "#Load the scaler\n", - "import pickle\n", - "with open('artifact/scaler.pkl', 'rb') as handle:\n", - " scaler = pickle.load(handle)" - ] - }, { "cell_type": "code", "execution_count": null, "id": "f0a68b67-b109-4a2f-b097-092f4a4d25ce", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "data = [0.3111400080477545, 1.9459399775518593, 1.0, 0.0, 0.0]\n", - "prediction = rest_request(scaler.transform([data]).tolist()[0])\n", + "prediction = rest_request(data)\n", "prediction" ] }, @@ -100,12 +91,12 @@ "cell_type": "code", "execution_count": null, "id": "7e54617f-0c9e-4220-b66c-93885d847050", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "threshhold = 0.95\n", - "\n", - "if (prediction[0] > threshhold):\n", + "if round(prediction[0]):\n", " print('fraud')\n", "else:\n", " print('not fraud')" @@ -130,14 +121,15 @@ "cell_type": "code", "execution_count": null, "id": "0393a5a7", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "data = [0.0, 1.0, 1.0, 1.0, 0.0]\n", - "prediction = rest_request(scaler.transform([data]).tolist()[0])\n", - "threshhold = 0.95\n", + "prediction = rest_request(data)\n", "\n", - "if (prediction[0] > threshhold):\n", + "if round(prediction[0]):\n", " print('The model predicts that this is fraud')\n", "else:\n", " print('The model predicts that this is not fraud')" @@ -168,14 +160,21 @@ "outputs": [], "source": [ "data = [100, 1.2, 0.0, 0.0, 1.0]\n", - "prediction = rest_request(scaler.transform([data]).tolist()[0])\n", - "threshhold = 0.95\n", + "prediction = rest_request(data)\n", "\n", - "if (prediction[0] > threshhold):\n", + "if round(prediction[0]):\n", " print('The model predicts that this is fraud')\n", "else:\n", " print('The model predicts that this is not fraud')" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc383448-2069-4b9f-9f0b-6c67a657c0d9", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/4_grpc_requests_multi_model.ipynb b/4_grpc_requests_multi_model.ipynb index 4593e5a..0fb96c2 100644 --- a/4_grpc_requests_multi_model.ipynb +++ b/4_grpc_requests_multi_model.ipynb @@ -24,7 +24,9 @@ "cell_type": "code", "execution_count": null, "id": "db9df000-a171-4652-8160-272f81e49612", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "!pip install grpcio grpcio-tools" @@ -34,7 +36,9 @@ "cell_type": "code", "execution_count": null, "id": "1d17b252-7827-4cae-adb0-f98c9d80bcd7", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "grpc_host = 'modelmesh-serving'\n", @@ -56,7 +60,9 @@ "cell_type": "code", "execution_count": null, "id": "545aa5f4-356f-4e70-b7e6-cd352a68927a", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "import sys\n", @@ -92,7 +98,9 @@ "cell_type": "code", "execution_count": null, "id": "67c1d001-ff99-414a-95d4-5729d5849298", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "import numpy as np\n", @@ -101,7 +109,7 @@ " # request content building\n", " inputs = []\n", " inputs.append(grpc_predict_v2_pb2.ModelInferRequest().InferInputTensor())\n", - " inputs[0].name = \"dense_input\"\n", + " inputs[0].name = \"inputs\"\n", " inputs[0].datatype = \"FP32\"\n", " inputs[0].shape.extend([1, 5])\n", " inputs[0].contents.fp32_contents.extend(data)\n", @@ -124,28 +132,17 @@ "### Run the Request" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "4fc549f6", - "metadata": {}, - "outputs": [], - "source": [ - "#Load the scaler\n", - "import pickle\n", - "with open('artifact/scaler.pkl', 'rb') as handle:\n", - " scaler = pickle.load(handle)" - ] - }, { "cell_type": "code", "execution_count": null, "id": "12947866-e0f5-4c72-ba9a-04229b1af990", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "data = [0.3111400080477545, 1.9459399775518593, 1.0, 0.0, 0.0]\n", - "prediction = grpc_request(scaler.transform([data]).tolist()[0])\n", + "prediction = grpc_request(data)\n", "prediction" ] }, @@ -153,12 +150,12 @@ "cell_type": "code", "execution_count": null, "id": "946f9f1d-b24a-4aa6-b839-f0e8013ef84d", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "threshhold = 0.95\n", - "\n", - "if (prediction[0] > threshhold):\n", + "if np.round(prediction):\n", " print('fraud')\n", "else:\n", " print('not fraud')" @@ -183,14 +180,15 @@ "cell_type": "code", "execution_count": null, "id": "f0a68b67-b109-4a2f-b097-092f4a4d25ce", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "data = [0.0, 1.0, 1.0, 1.0, 0.0]\n", - "prediction = grpc_request(scaler.transform([data]).tolist()[0])\n", - "threshhold = 0.95\n", + "prediction = grpc_request(data)\n", "\n", - "if (prediction[0] > threshhold):\n", + "if np.round(prediction):\n", " print('The model predicts that this is fraud')\n", "else:\n", " print('The model predicts that this is not fraud')" @@ -221,14 +219,21 @@ "outputs": [], "source": [ "data = [100, 1.2, 0.0, 0.0, 1.0]\n", - "prediction = grpc_request(scaler.transform([data]).tolist()[0])\n", - "threshhold = 0.95\n", + "prediction = grpc_request(data)\n", "\n", - "if (prediction[0] > threshhold):\n", + "if np.round(prediction):\n", " print('The model predicts that this is fraud')\n", "else:\n", " print('The model predicts that this is not fraud')" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "53d14f30-353a-4fd7-ab56-a4d3250bb9da", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/5_rest_requests_single_model.ipynb b/5_rest_requests_single_model.ipynb index f60cba1..7e98501 100644 --- a/5_rest_requests_single_model.ipynb +++ b/5_rest_requests_single_model.ipynb @@ -1,11 +1,9 @@ { "cells": [ { - "cell_type": "code", - "execution_count": null, - "id": "55c8afde-9b18-4b6a-9ee5-33924bdb4f16", + "cell_type": "markdown", + "id": "73ac75c8-d70e-4c40-8c92-b6e22285243e", "metadata": {}, - "outputs": [], "source": [ "# REST Inference" ] @@ -29,7 +27,9 @@ "cell_type": "code", "execution_count": null, "id": "0de65d02-84a6-4cff-882e-551cdd42b486", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "deployed_model_name = \"fraud\"\n", @@ -65,7 +65,7 @@ " json_data = {\n", " \"inputs\": [\n", " {\n", - " \"name\": \"dense_input\",\n", + " \"name\": \"inputs\",\n", " \"shape\": [1, 5],\n", " \"datatype\": \"FP32\",\n", " \"data\": data\n", @@ -78,28 +78,17 @@ " return response_dict['outputs'][0]['data']" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "5f871f12", - "metadata": {}, - "outputs": [], - "source": [ - "#Load the scaler\n", - "import pickle\n", - "with open('artifact/scaler.pkl', 'rb') as handle:\n", - " scaler = pickle.load(handle)" - ] - }, { "cell_type": "code", "execution_count": null, "id": "45ad16ac-23da-48bd-9796-f8e4cacae981", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "data = [0.3111400080477545, 1.9459399775518593, 1.0, 0.0, 0.0]\n", - "prediction = rest_request(scaler.transform([data]).tolist()[0])\n", + "prediction = rest_request(data)\n", "prediction" ] }, @@ -107,12 +96,12 @@ "cell_type": "code", "execution_count": null, "id": "1d66e0f7-4d4e-4879-bdf1-36b712432fd9", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "threshhold = 0.95\n", - "\n", - "if (prediction[0] > threshhold):\n", + "if round(prediction[0]):\n", " print('fraud')\n", "else:\n", " print('not fraud')" @@ -137,15 +126,15 @@ "cell_type": "code", "execution_count": null, "id": "f0a68b67-b109-4a2f-b097-092f4a4d25ce", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "data = [0.0, 1.0, 1.0, 1.0, 0.0]\n", - "prediction = rest_request(scaler.transform([data]).tolist()[0])\n", - "prediction\n", - "threshhold = 0.95\n", + "prediction = rest_request(data)\n", "\n", - "if (prediction[0] > threshhold):\n", + "if round(prediction[0]):\n", " print('The model predicts that this is fraud')\n", "else:\n", " print('The model predicts that this is not fraud')" @@ -176,15 +165,21 @@ "outputs": [], "source": [ "data = [100, 1.2, 0.0, 0.0, 1.0]\n", - "prediction = rest_request(scaler.transform([data]).tolist()[0])\n", - "prediction\n", - "threshhold = 0.95\n", + "prediction = rest_request(data)\n", "\n", - "if (prediction[0] > threshhold):\n", + "if round(prediction[0]):\n", " print('The model predicts that this is fraud')\n", "else:\n", " print('The model predicts that this is not fraud')" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4739baa2-f13e-4aae-85a2-33ed1d65e79b", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/6 Train Save.pipeline b/6 Train Save.pipeline index 63e5e52..88d54fe 100644 --- a/6 Train Save.pipeline +++ b/6 Train Save.pipeline @@ -15,7 +15,9 @@ "app_data": { "component_parameters": { "dependencies": [ - "data/*.csv" + "data/train.csv", + "data/validate.csv", + "data/test.csv" ], "include_subdirectories": true, "outputs": [ @@ -33,7 +35,7 @@ "label": "", "ui_data": { "label": "1_experiment_train.ipynb", - "image": "/notebook/fraud-detection/tensorflow/static/elyra/notebook.svg", + "image": "/notebook/chase-llm-demo/pytorch/static/elyra/notebook.svg", "x_pos": 88, "y_pos": 275, "description": "Run notebook file" @@ -115,7 +117,7 @@ "label": "", "ui_data": { "label": "2_save_model.ipynb", - "image": "/notebook/fraud-detection/tensorflow/static/elyra/notebook.svg", + "image": "/notebook/chase-llm-demo/pytorch/static/elyra/notebook.svg", "x_pos": 342, "y_pos": 277, "description": "Run notebook file" @@ -168,14 +170,14 @@ "name": "6 Train Save", "runtime": "Data Science Pipelines", "pipeline_defaults": { - "kubernetes_shared_mem_size": {}, + "kubernetes_pod_annotations": [], "kubernetes_pod_labels": [], - "kubernetes_tolerations": [], "mounted_volumes": [], - "kubernetes_pod_annotations": [], + "kubernetes_tolerations": [], + "kubernetes_shared_mem_size": {}, "env_vars": [], "kubernetes_secrets": [], - "runtime_image": "quay.io/modh/runtime-images@sha256:1186ac6c9026d1091f707fe8cedfcc1ea12d1ec46edd9e8d56bb4b12ba048630" + "runtime_image": "quay.io/modh/runtime-images@sha256:ef9cc4f0dc1c0dff82c8bcdee295a5c8b8c19d297844fc209316ace315c79982" }, "pipeline_parameters": [] } diff --git a/ray-scripts/requirements.txt b/ray-scripts/requirements.txt index 4a708f2..8403b17 100755 --- a/ray-scripts/requirements.txt +++ b/ray-scripts/requirements.txt @@ -1,7 +1,8 @@ -boto3~=1.35.12 -botocore~=1.35.12 -scikit-learn~=1.5.1 -tensorflow~= 2.15.1 -keras~=2.15.0 -onnx~=1.16.2 -tf2onnx~=1.16.1 +boto3 +botocore +torch +torchvision +scikit-learn +onnx +onnxscript +onnxruntime diff --git a/ray-scripts/train_cpu.py b/ray-scripts/train_cpu.py new file mode 100644 index 0000000..725fe87 --- /dev/null +++ b/ray-scripts/train_cpu.py @@ -0,0 +1,308 @@ +import os +import torch +import torch.nn as nn +import ray +import pandas as pd +import tempfile +import boto3 +import botocore + +from sklearn.metrics import precision_score, recall_score + +import pyarrow +import pyarrow.fs +import pyarrow.csv + +from torch.utils.data import Dataset, DataLoader +from ray.train.torch import TorchTrainer +from ray.train import RunConfig, ScalingConfig + +feature_indexes = [ + 1, # distance_from_last_transaction + 2, # ratio_to_median_purchase_price + 4, # used_chip + 5, # used_pin_number + 6, # online_order +] + +label_indexes = [ + 7 # fraud +] + +first_layer_num = len(feature_indexes) + +device = "cpu" +use_gpu = False +num_epochs = 2 +batch_size = 64 +learning_rate = 1e-3 +bucket_name = os.environ.get("AWS_S3_BUCKET") +state_dict_filename = "model.pth" +onnx_model_filename = "model.onnx" + + +class TorchStandardScaler: + def __init__(self): + self.mean = None + self.std = None + + def fit(self, tensor): + self.mean = tensor.mean(dim=0, keepdim=False) + self.std = tensor.std(dim=0, keepdim=False) + + def transform(self, tensor): + return (tensor - self.mean) / self.std + + def fit_transform(self, tensor): + self.fit(tensor) + return self.transform(tensor) + + +class CSVDataset(Dataset): + def __init__(self, csv_file, pyarrow_fs=None, transform=None, target_transform=None): + self.feature_indexes = feature_indexes + self.label_indexes = label_indexes + + if pyarrow_fs: + with pyarrow_fs.open_input_file(csv_file) as file: + training_table = pyarrow.csv.read_csv(file) + self.data = training_table.to_pandas() + else: + self.data = pd.read_csv(csv_file) + + self.features = self.data.iloc[:, self.feature_indexes].values + self.labels = self.data.iloc[:, self.label_indexes].values + self.features = torch.tensor(self.features, dtype=torch.float).to(device) + self.labels = torch.tensor(self.labels, dtype=torch.float).to(device) + + self.transform = transform + self.target_transform = target_transform + + # small dataset will fit and + if self.transform: + self.features = self.transform(self.features) + if self.target_transform: + self.labels = self.target_transform(self.labels) + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + if torch.is_tensor(idx): + idx = idx.tolist() + features = self.features[idx] + label = self.labels[idx] + return features, label + + +class NeuralNetwork(nn.Module): + def __init__(self, scaler): + super().__init__() + self.linear_relu_stack = nn.Sequential( + nn.Linear(first_layer_num, 32), + nn.ReLU(), + nn.Linear(32, 32), + nn.ReLU(), + nn.Linear(32, 32), + nn.ReLU(), + nn.Linear(32, 1), + nn.Sigmoid(), + ) + self.scaler = scaler + + def forward(self, x): + with torch.no_grad(): + x_pre = self.scaler.transform(x) + probs = self.linear_relu_stack(x_pre) + return probs + + +def get_scaler(pyarrow_fs): + train_csv_path = bucket_name + "/" + os.environ.get("TRAIN_DATA") + with pyarrow_fs.open_input_file(train_csv_path) as file: + training_table = pyarrow.csv.read_csv(file) + + train_df = training_table.to_pandas() + train_df = train_df.iloc[:, feature_indexes] + train_df_tensor = torch.tensor(train_df.values, dtype=torch.float).to(device) + scaler = TorchStandardScaler() + scaler.fit(train_df_tensor) + + return scaler + + +def get_datasets(pyarrow_fs): + train_csv_path = bucket_name + "/" + os.environ.get("TRAIN_DATA") + validate_csv_path = bucket_name + "/" + os.environ.get("VALIDATE_DATA") + + training_data = CSVDataset(train_csv_path, pyarrow_fs) + validation_data = CSVDataset(validate_csv_path, pyarrow_fs) + + return training_data, validation_data + + +def get_fs(): + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + endpoint_url = os.environ.get("AWS_S3_ENDPOINT") + region_name = os.environ.get("AWS_DEFAULT_REGION") + + return pyarrow.fs.S3FileSystem( + access_key=aws_access_key_id, + secret_key=aws_secret_access_key, + region=region_name, + endpoint_override=endpoint_url) + +def get_s3_resource(): + aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') + aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') + endpoint_url = os.environ.get('AWS_S3_ENDPOINT') + region_name = os.environ.get('AWS_DEFAULT_REGION') + + session = boto3.session.Session(aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + + s3_resource = session.resource( + 's3', + config=botocore.client.Config(signature_version='s3v4'), + endpoint_url=endpoint_url, + region_name=region_name) + + return s3_resource + + +def train_loop(dataloader, model, loss_fn, optimizer): + size = len(dataloader.dataset) + model.train() + for batch, (X, y) in enumerate(dataloader): + pred = model(X) + loss = loss_fn(pred, y) + + loss.backward() + optimizer.step() + optimizer.zero_grad() + + if batch % round(size / batch_size / 10) == 0: + loss = loss.item() + current = batch * batch_size + len(X) + print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") + + +def eval_loop(dataloader, model, loss_fn): + model.eval() + size = len(dataloader.dataset) + num_batches = len(dataloader) + eval_loss, correct = 0, 0 + + all_preds = torch.tensor([]) + all_labels = torch.tensor([]) + + with torch.no_grad(): + for X, y in dataloader: + pred = model(X) + eval_loss += loss_fn(pred, y).item() + correct += torch.eq(torch.round(pred), y).sum().item() + + pred_labels = torch.round(pred) + all_preds = torch.cat((all_preds, pred_labels.cpu())) + all_labels = torch.cat((all_labels, y.cpu())) + + precision = precision_score(all_labels, all_preds) + recall = recall_score(all_labels, all_preds) + + eval_loss /= num_batches + accuracy = correct / size * 100 + + return { + "accuracy": accuracy, + "loss": eval_loss, + "precision": precision, + "recall": recall + } + + +def train_func_distributed(): + pyarrow_fs = get_fs() + + training_data, validation_data = get_datasets(pyarrow_fs) + training_dataloader = DataLoader(training_data, batch_size=batch_size) + training_dataloader = ray.train.torch.prepare_data_loader(training_dataloader) + validation_dataloader = DataLoader(validation_data, batch_size=batch_size) + validation_dataloader = ray.train.torch.prepare_data_loader(validation_dataloader) + + model = NeuralNetwork(scaler).to(device) + model = ray.train.torch.prepare_model(model) + + loss_fn = nn.BCELoss().to(device) + optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate) + + for epoch in range(num_epochs): + if ray.train.get_context().get_world_size() > 1: + training_dataloader.sampler.set_epoch(epoch) + + train_loop(training_dataloader, model, loss_fn, optimizer) + metrics = eval_loop(validation_dataloader, model, loss_fn) + metrics["epoch"] = epoch + + with tempfile.TemporaryDirectory() as temp_checkpoint_dir: + torch.save( + model.module.state_dict(), + os.path.join(temp_checkpoint_dir, state_dict_filename) + ) + ray.train.report( + metrics, + checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir), + ) + if ray.train.get_context().get_world_rank() == 0: + print(metrics) + + +def save_onnx_model(checkpoint_path): + s3_resource = get_s3_resource() + bucket = s3_resource.Bucket(bucket_name) + + cp_s3_key = checkpoint_path.removeprefix(f"{bucket_name}/") + "/" + state_dict_filename + state_dict_local = f"/tmp/{state_dict_filename}" + print(f"Downloading model state_dict from {cp_s3_key} to {state_dict_local}") + bucket.download_file(cp_s3_key, state_dict_local) + + pytorch_model = NeuralNetwork(scaler) + pytorch_model.load_state_dict(torch.load(state_dict_local)) + + onnx_model_local = f"/tmp/{onnx_model_filename}" + dummy_input = torch.randn(1, 5, device=device) + torch.onnx.export( + pytorch_model, + dummy_input, + onnx_model_local, + input_names=["inputs"], + output_names=["outputs"], + dynamic_axes={ + "inputs": {0: "batch_size"}, + }, + verbose=True) + + upload_path = os.environ.get("MODEL_OUTPUT") + onnx_s3_key = os.path.join(upload_path, onnx_model_filename) + print(f"Uploading model from {onnx_model_local} to {onnx_s3_key}") + bucket.upload_file(onnx_model_local, onnx_s3_key) + + +pyarrow_fs = get_fs() +scaler = get_scaler(pyarrow_fs) + +trainer = TorchTrainer( + train_func_distributed, + run_config=RunConfig( + storage_filesystem=pyarrow_fs, + storage_path=f"{bucket_name}/ray/", + name="fraud-training", + ), + scaling_config=ScalingConfig( + num_workers=3, # num_workers = number of worker nodes with the ray head node included + use_gpu=use_gpu, + ), +) + +results = trainer.fit() +save_onnx_model(results.checkpoint.path) diff --git a/utils/s3.py b/utils/s3.py index 29bba59..bd59f01 100755 --- a/utils/s3.py +++ b/utils/s3.py @@ -9,8 +9,8 @@ bucket_name = os.environ.get('AWS_S3_BUCKET') if not all([aws_access_key_id, aws_secret_access_key, endpoint_url, region_name, bucket_name]): - raise ValueError("One or more connection variables are empty. " - "Please check your connection to an S3 bucket.") + raise ValueError("One or data connection variables are empty. " + "Please check your data connection to an S3 bucket.") session = boto3.session.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) From ac3348c3cc65320ab51e1348cfa78f3b2a516886 Mon Sep 17 00:00:00 2001 From: RHRolun Date: Tue, 11 Feb 2025 15:38:56 +0100 Subject: [PATCH 2/4] remove tf training --- 8_distributed_training.ipynb | 2 +- ray-scripts/train_tf_cpu.py | 228 ----------------------------------- 2 files changed, 1 insertion(+), 229 deletions(-) delete mode 100644 ray-scripts/train_tf_cpu.py diff --git a/8_distributed_training.ipynb b/8_distributed_training.ipynb index a526b76..0c02b7c 100644 --- a/8_distributed_training.ipynb +++ b/8_distributed_training.ipynb @@ -309,7 +309,7 @@ "import os\n", "\n", "# script = \"test_data_loader.py\"\n", - "script = \"train_tf_cpu.py\"\n", + "script = \"train_cpu.py\"\n", "runtime_env = {\n", " \"working_dir\": \"./ray-scripts\",\n", " \"excludes\": [],\n", diff --git a/ray-scripts/train_tf_cpu.py b/ray-scripts/train_tf_cpu.py deleted file mode 100644 index 58255ce..0000000 --- a/ray-scripts/train_tf_cpu.py +++ /dev/null @@ -1,228 +0,0 @@ -import os -import pickle -import boto3 -import botocore - -import pyarrow -import pyarrow.fs -import pyarrow.csv - -import sklearn -import numpy as np - -import tensorflow as tf -import onnx -import tf2onnx -from keras.models import Sequential -from keras.layers import Dense, Dropout, BatchNormalization, Activation - -import ray -from ray import train -from ray.train import RunConfig, ScalingConfig -from ray.train.tensorflow import TensorflowTrainer -from ray.train.tensorflow.keras import ReportCheckpointCallback -from ray.data.preprocessors import Concatenator, StandardScaler - -use_gpu = os.environ.get("USE_GPU", "False").lower() == "true" -num_workers = int(os.environ.get("NUM_WORKERS", "1")) -num_epochs = int(os.environ.get("NUM_EPOCHS", "2")) -batch_size = int(os.environ.get("BATCH_SIZE", "64")) -learning_rate = 1e-3 -output_column_name = "features" - -feature_columns = [ - "distance_from_last_transaction", - "ratio_to_median_purchase_price", - "used_chip", - "used_pin_number", - "online_order", -] - -label_columns = [ - "fraud", -] - -aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") -aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") -endpoint_url = os.environ.get("AWS_S3_ENDPOINT") -region_name = os.environ.get("AWS_DEFAULT_REGION") -bucket_name = os.environ.get("AWS_S3_BUCKET") -train_data = os.environ.get("TRAIN_DATA", "data/train.csv") - -keras_model_filename = "model.keras" -model_output_prefix = os.environ.get("MODEL_OUTPUT", "models/fraud/1/") -model_output_filename = os.environ.get("MODEL_OUTPUT_FILENAME", "model.onnx") -scaler_output = model_output_prefix + "scaler.pkl" -model_output = model_output_prefix + model_output_filename - - -def get_pyarrow_fs(): - return pyarrow.fs.S3FileSystem( - access_key=aws_access_key_id, - secret_key=aws_secret_access_key, - region=region_name, - endpoint_override=endpoint_url) - - -def get_s3_resource(): - session = boto3.session.Session( - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key) - - s3_resource = session.resource( - 's3', - config=botocore.client.Config(signature_version='s3v4'), - endpoint_url=endpoint_url, - region_name=region_name) - - return s3_resource - -def get_class_weights(pyarrow_fs): - with pyarrow_fs.open_input_file(f"{bucket_name}/{train_data}") as file: - training_table = pyarrow.csv.read_csv(file) - - y_train = training_table.to_pandas() - y_train = y_train.loc[:, label_columns] - # Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions. - class_weights = sklearn.utils.class_weight.compute_class_weight( - 'balanced', - classes=np.unique(y_train), - y=y_train.values.ravel()) - class_weights = {i : class_weights[i] for i in range(len(class_weights))} - - return class_weights - - -def build_model() -> tf.keras.Model: - model = Sequential() - model.add(Dense(32, activation='relu', input_dim=len(feature_columns))) - model.add(Dropout(0.2)) - model.add(Dense(32)) - model.add(BatchNormalization()) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(32)) - model.add(BatchNormalization()) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(1, activation='sigmoid')) - return model - - -def train_func(config: dict): - batch_size = config.get("batch_size", 64) - epochs = config.get("epochs", 3) - cw = config.get("class_weight", 3) - - strategy = tf.distribute.MultiWorkerMirroredStrategy() - with strategy.scope(): - multi_worker_model = build_model() - multi_worker_model.compile( - optimizer="adam", - loss="binary_crossentropy", - metrics=["accuracy"], - ) - - dataset = train.get_dataset_shard("train") - results = [] - - for epoch in range(epochs): - print(f"Epoch: {epoch}") - tf_dataset = dataset.to_tf( - feature_columns=output_column_name, - label_columns=label_columns[0], - batch_size=batch_size - ) - history = multi_worker_model.fit( - tf_dataset, - class_weight=cw, - callbacks=[ReportCheckpointCallback()] - ) - results.append(history.history) - - return results - - -def create_sklearn_standard_scaler(scaler): - sk_scaler = sklearn.preprocessing.StandardScaler() - mean = [] - std = [] - - for column in feature_columns: - mean.append(scaler.stats_[f"mean({column})"]) - std.append(scaler.stats_[f"std({column})"]) - - sk_scaler.mean_ = np.array(mean) - sk_scaler.scale_ = np.array(std) - sk_scaler.var_ = sk_scaler.scale_ ** 2 - - return sk_scaler - - -def save_scalar(scaler): - s3_resource = get_s3_resource() - bucket = s3_resource.Bucket(bucket_name) - sklearn_scaler = create_sklearn_standard_scaler(scaler) - - sk_scaler_filename = "/tmp/scaler.pkl" - with open(sk_scaler_filename, "wb") as f: - pickle.dump(sklearn_scaler, f) - - print(f"Uploading scaler from {sk_scaler_filename} to {scaler_output}") - bucket.upload_file(sk_scaler_filename, scaler_output) - - -def save_onnx_model(checkpoint_path): - s3_resource = get_s3_resource() - bucket = s3_resource.Bucket(bucket_name) - - cp_s3_key = checkpoint_path.removeprefix(f"{bucket_name}/") + "/" + keras_model_filename - keras_model_local = f"/tmp/{keras_model_filename}" - - print(f"Downloading model state_dict from {cp_s3_key} to {keras_model_local}") - bucket.download_file(cp_s3_key, keras_model_local) - keras_model = tf.keras.models.load_model(keras_model_local) - onnx_model_local = f"/tmp/model.onnx" - onnx_model, _ = tf2onnx.convert.from_keras(keras_model) - onnx.save(onnx_model, onnx_model_local) - - print(f"Uploading model from {onnx_model_local} to {model_output}") - bucket.upload_file(onnx_model_local, model_output) - - -pyarrow_fs = get_pyarrow_fs() -class_weights = get_class_weights(pyarrow_fs) - -config = {"lr": learning_rate, "batch_size": batch_size, "epochs": num_epochs, "class_weight":class_weights} - -train_dataset = ray.data.read_csv( - filesystem=pyarrow_fs, - paths=f"s3://{bucket_name}/{train_data}") -scaler = StandardScaler(columns=feature_columns) -concatenator = Concatenator(include=feature_columns, output_column_name=output_column_name) -train_dataset = scaler.fit_transform(train_dataset) -train_dataset = concatenator.fit_transform(train_dataset) - -print(scaler.stats_) - -scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu) - -trainer = TensorflowTrainer( - train_loop_per_worker=train_func, - train_loop_config=config, - run_config=RunConfig( - storage_filesystem=pyarrow_fs, - storage_path=f"{bucket_name}/ray/", - name="fraud-training", - ), - scaling_config=scaling_config, - datasets={"train": train_dataset}, - metadata={"preprocessor_pkl": scaler.serialize()}, -) -result = trainer.fit() -metadata = result.checkpoint.get_metadata() -print(metadata) -print(StandardScaler.deserialize(metadata["preprocessor_pkl"])) - -save_scalar(scaler) -save_onnx_model(result.checkpoint.path) From ca3edc251f805ed97b5239a9a6c8d48da7a3ddf2 Mon Sep 17 00:00:00 2001 From: RHRolun Date: Tue, 11 Feb 2025 15:46:08 +0100 Subject: [PATCH 3/4] Started pipeline and remove a bunch of duplicate imports --- 1_experiment_train.ipynb | 20 +-- pipeline/7_get_data_train_upload.py | 206 +++++++++++++++++----------- 2 files changed, 132 insertions(+), 94 deletions(-) diff --git a/1_experiment_train.ipynb b/1_experiment_train.ipynb index 672edb8..b47051a 100644 --- a/1_experiment_train.ipynb +++ b/1_experiment_train.ipynb @@ -59,7 +59,6 @@ }, "outputs": [], "source": [ - "import torch\n", "import pandas as pd \n", "\n", "feature_indexes = [\n", @@ -96,9 +95,6 @@ }, "outputs": [], "source": [ - "import torch\n", - "\n", - "\n", "# like scikit learn standard scaler\n", "class TorchStandardScaler:\n", " def __init__(self):\n", @@ -138,8 +134,6 @@ }, "outputs": [], "source": [ - "import pandas as pd\n", - "import torch\n", "from torch.utils.data import Dataset, DataLoader\n", "\n", "\n", @@ -192,8 +186,6 @@ }, "outputs": [], "source": [ - "from torch.utils.data import DataLoader\n", - "\n", "batch_size = 64\n", "\n", "training_dataloader = DataLoader(training_data, batch_size=batch_size)\n", @@ -340,8 +332,6 @@ "source": [ "%%time\n", "\n", - "import torch.nn as nn\n", - "\n", "loss_fn = nn.BCELoss().to(device)\n", "\n", "learning_rate = 1e-3\n", @@ -539,10 +529,8 @@ "metadata": {}, "outputs": [], "source": [ - "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", "import numpy as np\n", "import pickle\n", - "from matplotlib import pyplot as plt\n", "\n", "import onnx\n", "import onnxruntime as rt" @@ -591,10 +579,6 @@ "metadata": {}, "outputs": [], "source": [ - "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", - "from matplotlib import pyplot as plt\n", - "import numpy as np\n", - "\n", "correct = np.equal(np.round(onnx_output), test_labels_df).sum().item()\n", "acc = (correct / len(onnx_output)) * 100\n", "precision = precision_score(test_labels_df_tensor_cpu, np.round(onnx_output))\n", @@ -610,7 +594,9 @@ { "cell_type": "markdown", "metadata": {}, - "source": "### Check our ONNX output matches our original PyTorch Model Output" + "source": [ + "### Check our ONNX output matches our original PyTorch Model Output" + ] }, { "cell_type": "code", diff --git a/pipeline/7_get_data_train_upload.py b/pipeline/7_get_data_train_upload.py index d3d7455..b067843 100644 --- a/pipeline/7_get_data_train_upload.py +++ b/pipeline/7_get_data_train_upload.py @@ -7,7 +7,7 @@ from kfp import kubernetes -@dsl.component(base_image="quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2024a-20240523") +@dsl.component(base_image="quay.io/modh/runtime-images:runtime-pytorch-ubi9-python-3.11-2024b-20241108") def get_data(train_data_output_path: OutputPath(), validate_data_output_path: OutputPath()): import urllib.request print("starting download...") @@ -22,33 +22,18 @@ def get_data(train_data_output_path: OutputPath(), validate_data_output_path: Ou @dsl.component( - base_image="quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2024a-20240523", - packages_to_install=["onnx", "onnxruntime", "tf2onnx"], + base_image="quay.io/modh/runtime-images:runtime-pytorch-ubi9-python-3.11-2024b-20241108", + packages_to_install=["onnx", "onnxruntime", "onnxscript"], ) def train_model(train_data_input_path: InputPath(), validate_data_input_path: InputPath(), model_output_path: OutputPath()): - import numpy as np - import pandas as pd - from keras.models import Sequential - from keras.layers import Dense, Dropout, BatchNormalization, Activation - from sklearn.model_selection import train_test_split - from sklearn.preprocessing import StandardScaler - from sklearn.utils import class_weight - import tf2onnx - import onnx - import pickle - from pathlib import Path - - # Load the CSV data which we will use to train the model. - # It contains the following fields: - # distancefromhome - The distance from home where the transaction happened. - # distancefromlast_transaction - The distance from last transaction happened. - # ratiotomedianpurchaseprice - Ratio of purchased price compared to median purchase price. - # repeat_retailer - If it's from a retailer that already has been purchased from before. - # used_chip - If the (credit card) chip was used. - # usedpinnumber - If the PIN number was used. - # online_order - If it was an online order. - # fraud - If the transaction is fraudulent. + import torch + import pandas as pd + device = ( + "cuda" if torch.cuda.is_available() + else "mps" if torch.backends.mps.is_available() + else "cpu" + ) feature_indexes = [ 1, # distance_from_last_transaction @@ -62,61 +47,128 @@ def train_model(train_data_input_path: InputPath(), validate_data_input_path: In 7 # fraud ] - X_train = pd.read_csv(train_data_input_path) - y_train = X_train.iloc[:, label_indexes] - X_train = X_train.iloc[:, feature_indexes] - - X_val = pd.read_csv(validate_data_input_path) - y_val = X_val.iloc[:, label_indexes] - X_val = X_val.iloc[:, feature_indexes] - - # Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values. - # It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set. - - scaler = StandardScaler() - - X_train = scaler.fit_transform(X_train.values) - - Path("artifact").mkdir(parents=True, exist_ok=True) - with open("artifact/scaler.pkl", "wb") as handle: - pickle.dump(scaler, handle) - - # Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions. - class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.values.ravel()) - class_weights = {i: class_weights[i] for i in range(len(class_weights))} - - # Build the model, the model we build here is a simple fully connected deep neural network, containing 3 hidden layers and one output layer. - - model = Sequential() - model.add(Dense(32, activation='relu', input_dim=len(feature_indexes))) - model.add(Dropout(0.2)) - model.add(Dense(32)) - model.add(BatchNormalization()) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(32)) - model.add(BatchNormalization()) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(1, activation='sigmoid')) - model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) - model.summary() - - # Train the model and get performance - - epochs = 2 - history = model.fit(X_train, y_train, epochs=epochs, - validation_data=(scaler.transform(X_val.values), y_val), - verbose=True, class_weight=class_weights) - - # Save the model as ONNX for easy use of ModelMesh - model_proto, _ = tf2onnx.convert.from_keras(model) - print(model_output_path) - onnx.save(model_proto, model_output_path) + train_df = pd.read_csv(train_data_input_path) + labels_df = train_df.iloc[:, label_indexes] + train_df = train_df.iloc[:, feature_indexes] + train_df_tensor = torch.tensor(train_df.values, dtype=torch.float).to(device) + labels_df_tensor = torch.tensor(labels_df.values, dtype=torch.float).to(device) + + # like scikit learn standard scaler + class TorchStandardScaler: + def __init__(self): + self.mean = None + self.std = None + + def fit(self, tensor): + self.mean = tensor.mean(dim=0, keepdim=False) + self.std = tensor.std(dim=0, keepdim=False) + + def transform(self, tensor): + return (tensor - self.mean) / self.std + + def fit_transform(self, tensor): + self.fit(tensor) + return self.transform(tensor) + + + train_df_tensor = torch.tensor(train_df.values, dtype=torch.float).to(device) + scaler = TorchStandardScaler() + scaler.fit(train_df_tensor) + scaler.mean, scaler.std + + +# def train_model(train_data_input_path: InputPath(), validate_data_input_path: InputPath(), model_output_path: OutputPath()): +# import numpy as np +# import pandas as pd +# from keras.models import Sequential +# from keras.layers import Dense, Dropout, BatchNormalization, Activation +# from sklearn.model_selection import train_test_split +# from sklearn.preprocessing import StandardScaler +# from sklearn.utils import class_weight +# import tf2onnx +# import onnx +# import pickle +# from pathlib import Path + +# # Load the CSV data which we will use to train the model. +# # It contains the following fields: +# # distancefromhome - The distance from home where the transaction happened. +# # distancefromlast_transaction - The distance from last transaction happened. +# # ratiotomedianpurchaseprice - Ratio of purchased price compared to median purchase price. +# # repeat_retailer - If it's from a retailer that already has been purchased from before. +# # used_chip - If the (credit card) chip was used. +# # usedpinnumber - If the PIN number was used. +# # online_order - If it was an online order. +# # fraud - If the transaction is fraudulent. + + +# feature_indexes = [ +# 1, # distance_from_last_transaction +# 2, # ratio_to_median_purchase_price +# 4, # used_chip +# 5, # used_pin_number +# 6, # online_order +# ] + +# label_indexes = [ +# 7 # fraud +# ] + +# X_train = pd.read_csv(train_data_input_path) +# y_train = X_train.iloc[:, label_indexes] +# X_train = X_train.iloc[:, feature_indexes] + +# X_val = pd.read_csv(validate_data_input_path) +# y_val = X_val.iloc[:, label_indexes] +# X_val = X_val.iloc[:, feature_indexes] + +# # Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values. +# # It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set. + +# scaler = StandardScaler() + +# X_train = scaler.fit_transform(X_train.values) + +# Path("artifact").mkdir(parents=True, exist_ok=True) +# with open("artifact/scaler.pkl", "wb") as handle: +# pickle.dump(scaler, handle) + +# # Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions. +# class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.values.ravel()) +# class_weights = {i: class_weights[i] for i in range(len(class_weights))} + +# # Build the model, the model we build here is a simple fully connected deep neural network, containing 3 hidden layers and one output layer. + +# model = Sequential() +# model.add(Dense(32, activation='relu', input_dim=len(feature_indexes))) +# model.add(Dropout(0.2)) +# model.add(Dense(32)) +# model.add(BatchNormalization()) +# model.add(Activation('relu')) +# model.add(Dropout(0.2)) +# model.add(Dense(32)) +# model.add(BatchNormalization()) +# model.add(Activation('relu')) +# model.add(Dropout(0.2)) +# model.add(Dense(1, activation='sigmoid')) +# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) +# model.summary() + +# # Train the model and get performance + +# epochs = 2 +# history = model.fit(X_train, y_train, epochs=epochs, +# validation_data=(scaler.transform(X_val.values), y_val), +# verbose=True, class_weight=class_weights) + +# # Save the model as ONNX for easy use of ModelMesh +# model_proto, _ = tf2onnx.convert.from_keras(model) +# print(model_output_path) +# onnx.save(model_proto, model_output_path) @dsl.component( - base_image="quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2024a-20240523", + base_image="quay.io/modh/runtime-images:runtime-pytorch-ubi9-python-3.11-2024b-20241108", packages_to_install=["boto3", "botocore"] ) def upload_model(input_model_path: InputPath()): From c8049e2591369a68bc9de4c589f7ca4243d4e5fc Mon Sep 17 00:00:00 2001 From: RHRolun Date: Tue, 11 Feb 2025 15:52:02 +0100 Subject: [PATCH 4/4] Fully updated pipeline --- 1_experiment_train.ipynb | 7 - pipeline/7_get_data_train_upload.py | 236 +++++++++++++++++----------- 2 files changed, 146 insertions(+), 97 deletions(-) diff --git a/1_experiment_train.ipynb b/1_experiment_train.ipynb index b47051a..091ab22 100644 --- a/1_experiment_train.ipynb +++ b/1_experiment_train.ipynb @@ -606,13 +606,6 @@ "source": [ "np.array_equal(np.round(y_pred.numpy()), np.round(onnx_output))" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/pipeline/7_get_data_train_upload.py b/pipeline/7_get_data_train_upload.py index b067843..949160d 100644 --- a/pipeline/7_get_data_train_upload.py +++ b/pipeline/7_get_data_train_upload.py @@ -28,6 +28,10 @@ def get_data(train_data_output_path: OutputPath(), validate_data_output_path: Ou def train_model(train_data_input_path: InputPath(), validate_data_input_path: InputPath(), model_output_path: OutputPath()): import torch import pandas as pd + from torch.utils.data import Dataset, DataLoader + from torch import nn + from sklearn.metrics import precision_score, recall_score + import os device = ( "cuda" if torch.cuda.is_available() @@ -76,96 +80,148 @@ def fit_transform(self, tensor): scaler.fit(train_df_tensor) scaler.mean, scaler.std - -# def train_model(train_data_input_path: InputPath(), validate_data_input_path: InputPath(), model_output_path: OutputPath()): -# import numpy as np -# import pandas as pd -# from keras.models import Sequential -# from keras.layers import Dense, Dropout, BatchNormalization, Activation -# from sklearn.model_selection import train_test_split -# from sklearn.preprocessing import StandardScaler -# from sklearn.utils import class_weight -# import tf2onnx -# import onnx -# import pickle -# from pathlib import Path - -# # Load the CSV data which we will use to train the model. -# # It contains the following fields: -# # distancefromhome - The distance from home where the transaction happened. -# # distancefromlast_transaction - The distance from last transaction happened. -# # ratiotomedianpurchaseprice - Ratio of purchased price compared to median purchase price. -# # repeat_retailer - If it's from a retailer that already has been purchased from before. -# # used_chip - If the (credit card) chip was used. -# # usedpinnumber - If the PIN number was used. -# # online_order - If it was an online order. -# # fraud - If the transaction is fraudulent. - - -# feature_indexes = [ -# 1, # distance_from_last_transaction -# 2, # ratio_to_median_purchase_price -# 4, # used_chip -# 5, # used_pin_number -# 6, # online_order -# ] - -# label_indexes = [ -# 7 # fraud -# ] - -# X_train = pd.read_csv(train_data_input_path) -# y_train = X_train.iloc[:, label_indexes] -# X_train = X_train.iloc[:, feature_indexes] - -# X_val = pd.read_csv(validate_data_input_path) -# y_val = X_val.iloc[:, label_indexes] -# X_val = X_val.iloc[:, feature_indexes] - -# # Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values. -# # It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set. - -# scaler = StandardScaler() - -# X_train = scaler.fit_transform(X_train.values) - -# Path("artifact").mkdir(parents=True, exist_ok=True) -# with open("artifact/scaler.pkl", "wb") as handle: -# pickle.dump(scaler, handle) - -# # Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions. -# class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.values.ravel()) -# class_weights = {i: class_weights[i] for i in range(len(class_weights))} - -# # Build the model, the model we build here is a simple fully connected deep neural network, containing 3 hidden layers and one output layer. - -# model = Sequential() -# model.add(Dense(32, activation='relu', input_dim=len(feature_indexes))) -# model.add(Dropout(0.2)) -# model.add(Dense(32)) -# model.add(BatchNormalization()) -# model.add(Activation('relu')) -# model.add(Dropout(0.2)) -# model.add(Dense(32)) -# model.add(BatchNormalization()) -# model.add(Activation('relu')) -# model.add(Dropout(0.2)) -# model.add(Dense(1, activation='sigmoid')) -# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) -# model.summary() - -# # Train the model and get performance - -# epochs = 2 -# history = model.fit(X_train, y_train, epochs=epochs, -# validation_data=(scaler.transform(X_val.values), y_val), -# verbose=True, class_weight=class_weights) - -# # Save the model as ONNX for easy use of ModelMesh -# model_proto, _ = tf2onnx.convert.from_keras(model) -# print(model_output_path) -# onnx.save(model_proto, model_output_path) - + class CSVDataset(Dataset): + def __init__(self, csv_file, pyarrow_fs=None, transform=None, target_transform=None): + self.feature_indexes = feature_indexes + self.label_indexes = label_indexes + + if pyarrow_fs: + with pyarrow_fs.open_input_file(csv_file) as file: + training_table = pv.read_csv(file) + self.data = training_table.to_pandas() + else: + self.data = pd.read_csv(csv_file) + + + self.features = self.data.iloc[:, self.feature_indexes].values + self.labels = self.data.iloc[:, self.label_indexes].values + self.features = torch.tensor(self.features, dtype=torch.float).to(device) + self.labels = torch.tensor(self.labels, dtype=torch.float).to(device) + + self.transform = transform + self.target_transform = target_transform + + if self.transform: + self.features = self.transform(self.features) + if self.target_transform: + self.labels = self.target_transform(self.labels) + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + if torch.is_tensor(idx): + idx = idx.tolist() + features = self.features[idx] + label = self.labels[idx] + return features, label + + + training_data = CSVDataset('data/train.csv') + validation_data = CSVDataset(train_data_input_path) + + batch_size = 64 + + training_dataloader = DataLoader(training_data, batch_size=batch_size) + validation_dataloader = DataLoader(validation_data, batch_size=batch_size) + + class NeuralNetwork(nn.Module): + def __init__(self, scaler): + super().__init__() + self.linear_relu_stack = nn.Sequential( + nn.Linear(5, 32), + nn.ReLU(), + nn.Linear(32, 32), + nn.ReLU(), + nn.Linear(32, 32), + nn.ReLU(), + nn.Linear(32, 1), + nn.Sigmoid(), + ) + self.scaler = scaler + + def forward(self, x): + with torch.no_grad(): + x_pre = self.scaler.transform(x) + probs = self.linear_relu_stack(x_pre) + return probs + + + model = NeuralNetwork(scaler).to(device) + + def train_loop(dataloader, model, loss_fn, optimizer): + size = len(dataloader.dataset) + model.train() + for batch, (X, y) in enumerate(dataloader): + pred = model(X) + loss = loss_fn(pred, y) + + loss.backward() + optimizer.step() + optimizer.zero_grad() + + if batch % round(size / batch_size / 10) == 0: + loss = loss.item() + current = batch * batch_size + len(X) + print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") + + + def eval_loop(dataloader, model, loss_fn): + model.eval() + size = len(dataloader.dataset) + num_batches = len(dataloader) + eval_loss, correct = 0, 0 + + all_preds = torch.tensor([]) + all_labels = torch.tensor([]) + + with torch.no_grad(): + for X, y in dataloader: + pred = model(X) + eval_loss += loss_fn(pred, y).item() + correct += torch.eq(torch.round(pred), y).sum().item() + + pred_labels = torch.round(pred) + all_preds = torch.cat((all_preds, pred_labels.cpu())) + all_labels = torch.cat((all_labels, y.cpu())) + + precision = precision_score(all_labels, all_preds) + recall = recall_score(all_labels, all_preds) + + eval_loss /= num_batches + accuracy = correct / size * 100 + + return { + "accuracy": accuracy, + "loss": eval_loss, + "precision": precision, + "recall": recall + } + + loss_fn = nn.BCELoss().to(device) + + learning_rate = 1e-3 + optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate) + + num_epochs = 2 + for t in range(num_epochs): + print(f"\nEpoch {t+1}\n-------------------------------") + train_loop(training_dataloader, model, loss_fn, optimizer) + metrics = eval_loop(validation_dataloader, model, loss_fn) + print(f"Eval Metrics: \n Accuracy: {(metrics['accuracy']):>0.1f}%, Avg loss: {metrics['loss']:>8f}, " + f"Precision: {metrics['precision']:.4f}, Recall: {metrics['recall']:.4f} \n") + + dummy_input = torch.randn(1, 5, device=device) + onnx_model = torch.onnx.export( + model, + dummy_input, + model_output_path, + input_names=["inputs"], + output_names=["outputs"], + dynamic_axes={ + "inputs": {0: "batch_size"}, + }, + verbose=True) @dsl.component( base_image="quay.io/modh/runtime-images:runtime-pytorch-ubi9-python-3.11-2024b-20241108",