Replies: 2 comments
-
Pasting the code here for future reference Code example
import covalent as ct
import torch
import torch.nn.functional as F
from pathlib import Path
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from typing import Tuple
import time
device = 'cuda'
use_covalent = True
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def data_loader(
batch_size: int,
train: bool,
download: bool = True,
shuffle: bool = True,
data_dir: str = "~/data/mnist/",
) -> torch.utils.data.dataloader.DataLoader:
"""MNIST data loader."""
data_dir = Path(data_dir).expanduser()
data_dir.mkdir(parents=True, exist_ok=True)
data = datasets.MNIST(data_dir, train=train, download=download, transform=ToTensor())
return DataLoader(data, batch_size=batch_size, shuffle=shuffle)
def get_optimizer(
model: NeuralNetwork, learning_rate: float, momentum: float
) -> torch.optim.Optimizer:
"""Get Stochastic Gradient Descent optimizer."""
return torch.optim.SGD(model.parameters(), learning_rate, momentum)
def train_over_one_epoch(
dataloader: torch.utils.data.dataloader.DataLoader,
model: NeuralNetwork,
optimizer: torch.optim.Optimizer,
log_interval: int,
epoch: int,
loss_fn,
train_losses: list,
train_counter: int,
device: str = "cpu",
):
"""Train neural network model over one epoch."""
size = len(dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % log_interval == 0:
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
train_losses.append(loss)
train_counter.append((batch * 64) + ((epoch - 1) * len(dataloader.dataset)))
return model, optimizer
def test(
dataloader: torch.utils.data.dataloader.DataLoader,
model: NeuralNetwork,
loss_fn: callable,
device: str = "cpu",
) -> None:
"""Test the model performance."""
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
accuracy = 100 * correct
print(f"Test Error: \n Accuracy: {(accuracy):>0.1f}%, Avg loss: {test_loss:>8f} \n")
return accuracy, test_loss
def train_model(
train_dataloader: torch.utils.data.dataloader.DataLoader,
train_losses: list,
train_counter: int,
log_interval: int,
model: NeuralNetwork,
learning_rate: float,
momentum: float,
loss_fn: callable,
epochs: int,
results_dir: str = "~/data/mnist/results/",
) -> Tuple[NeuralNetwork,]:
"""Train neural network model."""
results_dir = Path(results_dir).expanduser()
results_dir.mkdir(parents=True, exist_ok=True)
optimizer = torch.optim.SGD(model.parameters(), learning_rate, momentum)
for epoch in range(1, epochs + 1):
print(f"Epoch {epoch}\n-------------------------------")
model, optimizer = train_over_one_epoch(
dataloader=train_dataloader,
model=model,
optimizer=optimizer,
train_losses=train_losses,
train_counter=train_counter,
log_interval=log_interval,
epoch=epoch,
loss_fn=loss_fn,
device=device
)
# Save model and optimizer
torch.save(model.state_dict(), f"{results_dir}model.pth")
torch.save(optimizer.state_dict(), f"{results_dir}optimizer.pth")
return model, optimizer
def workflow(
model: NeuralNetwork,
epochs: int = 2,
batch_size_train: int = 64,
batch_size_test: int = 1000,
learning_rate: float = 0.01,
momentum: float = 0.5,
log_interval: int = 200,
loss_fn: callable = F.nll_loss,
):
accuracy_list, test_loss_list = [],[]
for i in range(1):
"""MNIST classifier training workflow"""
train_dataloader = data_loader(batch_size=batch_size_train, train=True)
test_dataloader = data_loader(batch_size=batch_size_test, train=False)
train_losses, train_counter, test_losses = [], [], []
trained_model, optimizer = train_model(
train_dataloader=train_dataloader,
train_losses=train_losses,
train_counter=train_counter,
log_interval=log_interval,
model=model,
learning_rate=learning_rate,
momentum=momentum,
loss_fn=loss_fn,
epochs=epochs,
)
accuracy, test_loss = test(dataloader=test_dataloader, model=trained_model, loss_fn=loss_fn, device=device)
#accuracy_list.append(accuracy)
#test_loss_list.append(test_loss)
return accuracy_list, test_loss_list
if use_covalent:
# Convert tasks to electrons
data_loader = ct.electron(data_loader)
get_optimizer = ct.electron(get_optimizer)
train_over_one_epoch = ct.electron(train_over_one_epoch)
train_model = ct.electron(train_model)
test = ct.electron(test)
# Convert workflow to lattice
workflow = ct.lattice(workflow, executor="local")
dispatch_id = ct.dispatch(workflow)(model=NeuralNetwork().to(device))
print(f"Dispatch id: {dispatch_id}")
result = ct.get_result(dispatch_id=dispatch_id, wait=True)
print(f"Covalent workflow takes {result.end_time - result.start_time} seconds.")
print(f"Covalent workflow execution status: {result.status}")
print(result.get_node_result(6)["stdout"])
print(result.get_node_result(19)["stdout"])
else:
start = time.time()
workflow(
model=NeuralNetwork().to(device),
)
end = time.time()
print(f"Regular workflow takes {end - start} seconds.")
|
Beta Was this translation helpful? Give feedback.
-
Hi @Viyom , Welcome to Covalent, just having a quick text on my end (because i dont have the data to run your example) one my end a simple high compute function does seem to release memory and cpu once its done here is an example (video attached) - Could you maybe try it out with dask executor, which is the default one instead of Screen.Recording.2023-07-31.at.3.04.46.PM.mov |
Beta Was this translation helpful? Give feedback.
-
System Configuration:
The memory deallocation doesn't take place automatically. Even after the experiment is complete, the CPU/GPU logs show reserved memory for the experiment. This happens because the processes do not terminate by themselves after the experiment is completed.
The attached file trains and tests MNIST classifier. We can modify the variables on top to make it run on cpu/gpu and choose if we want to use covalent or not.
Execute the attached file: MNISTClassification.zip
With
htop
andnvidia-smi
we can see that the process remain there in sleeping state even after experiment completion.Ideally, the processes should get terminated after their execution. Please let me know if I am missing something here.
Thank you
Beta Was this translation helpful? Give feedback.
All reactions