Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Error when using WandbLogger #205

Open
KwanWaiChung opened this issue Aug 30, 2022 · 1 comment
Open

Error when using WandbLogger #205

KwanWaiChung opened this issue Aug 30, 2022 · 1 comment

Comments

@KwanWaiChung
Copy link

KwanWaiChung commented Aug 30, 2022

Error occurs when I try to use WandbLogger from pytorch_lightning in the standard example code below.

The error is:
(RayExecutor pid=121351) Exception in thread SockSrvRdThr:
(RayExecutor pid=121351) Traceback (most recent call last):
(RayExecutor pid=121351) File "python3.8/threading.py", line 932, in _bootstrap_inner
(RayExecutor pid=121351) self.run()
(RayExecutor pid=121351) File "python3.8/site-packages/wandb/sdk/service/server_sock.py", line 113, in run
(RayExecutor pid=121351) shandler(sreq)
(RayExecutor pid=121351) File "python3.8/site-packages/wandb/sdk/service/server_sock.py", line 152, in server_inform_attach
(RayExecutor pid=121351) dict(self._mux._streams[stream_id]._settings),
(RayExecutor pid=121351) KeyError: '1f8g49m6'

"""Example using Pytorch Lightning with Pytorch DDP on Ray Accelerator."""
from asyncio.log import logger
import os
import tempfile

import pytorch_lightning as pl
import torch
from pytorch_lightning.loggers import WandbLogger, TensorBoardLogger
from torch.utils.data import random_split, DataLoader
from torchvision.datasets import MNIST
from torchvision import transforms

import ray
from ray import tune
from ray_lightning.tune import TuneReportCallback, get_tune_resources
from ray_lightning import RayStrategy
from ray_lightning.tests.utils import LightningMNISTClassifier

BASE_PATH = os.path.dirname(os.path.abspath(__file__))


class MNISTClassifier(LightningMNISTClassifier):
    def __init__(self, config, data_dir=None):
        super().__init__(config, data_dir)
        self.batch_size = config["batch_size"]

    def prepare_data(self):
        self.dataset = MNIST(
            self.data_dir,
            train=True,
            download=True,
            transform=transforms.ToTensor(),
        )

    def train_dataloader(self):
        dataset = self.dataset
        train_length = len(dataset)
        dataset_train, _ = random_split(
            dataset,
            [train_length - 5000, 5000],
            generator=torch.Generator().manual_seed(0),
        )
        loader = DataLoader(
            dataset_train,
            batch_size=self.batch_size,
            num_workers=1,
            drop_last=True,
            pin_memory=True,
        )
        return loader

    def val_dataloader(self):
        dataset = self.dataset
        train_length = len(dataset)
        _, dataset_val = random_split(
            dataset,
            [train_length - 5000, 5000],
            generator=torch.Generator().manual_seed(0),
        )
        loader = DataLoader(
            dataset_val,
            batch_size=self.batch_size,
            num_workers=1,
            drop_last=True,
            pin_memory=True,
        )
        return loader


def train_mnist(
    config,
    checkpoint_dir=None,
    data_dir=None,
    num_epochs=10,
    num_workers=1,
    use_gpu=False,
    callbacks=None,
    **trainer_kwargs
):
    model = MNISTClassifier(config, data_dir)

    callbacks = callbacks or []

    trainer = pl.Trainer(
        max_epochs=num_epochs,
        callbacks=callbacks,
        strategy=RayStrategy(num_workers=num_workers, use_gpu=use_gpu),
        enable_progress_bar=False,
        logger=WandbLogger(project="Try ray", name="try", config=config),
        **trainer_kwargs
    )
    trainer.fit(model)


def tune_mnist(
    data_dir,
    num_samples=10,
    num_epochs=10,
    num_workers=1,
    use_gpu=False,
    **trainer_kwargs
):
    config = {
        "layer_1": tune.choice([32, 64, 128]),
        "layer_2": tune.choice([64, 128, 256]),
        "lr": tune.loguniform(1e-4, 1e-1),
        "batch_size": tune.choice([32, 64, 128]),
    }

    # Add Tune callback.
    metrics = {"loss": "ptl/val_loss", "acc": "ptl/val_accuracy"}
    callbacks = [TuneReportCallback(metrics, on="validation_end")]
    trainable = tune.with_parameters(
        train_mnist,
        data_dir=data_dir,
        num_epochs=num_epochs,
        num_workers=num_workers,
        use_gpu=use_gpu,
        callbacks=callbacks,
        **trainer_kwargs
    )
    analysis = tune.run(
        trainable,
        metric="loss",
        mode="min",
        config=config,
        num_samples=num_samples,
        resources_per_trial=get_tune_resources(
            num_workers=num_workers, use_gpu=use_gpu
        ),
        name="tune_mnist",
        local_dir=BASE_PATH,
    )

    print("Best hyperparameters found were: ", analysis.best_config)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--num-workers",
        type=int,
        help="Number of training workers to use.",
        default=1,
    )
    parser.add_argument(
        "--use-gpu", action="store_true", help="Use GPU for training."
    )
    parser.add_argument(
        "--tune",
        action="store_true",
        help="Use Ray Tune for hyperparameter tuning.",
    )
    parser.add_argument(
        "--num-samples",
        type=int,
        default=10,
        help="Number of samples to tune.",
    )
    parser.add_argument(
        "--num-epochs",
        type=int,
        default=10,
        help="Number of epochs to train for.",
    )
    parser.add_argument(
        "--smoke-test", action="store_true", help="Finish quickly for testing"
    )
    parser.add_argument(
        "--address",
        required=False,
        type=str,
        help="the address to use for Ray",
    )
    args, _ = parser.parse_known_args()

    num_epochs = 1 if args.smoke_test else args.num_epochs
    num_workers = 1 if args.smoke_test else args.num_workers
    use_gpu = False if args.smoke_test else args.use_gpu
    num_samples = 1 if args.smoke_test else args.num_samples

    if args.smoke_test:
        ray.init(num_cpus=2)
    else:
        ray.init(address=args.address)

    data_dir = os.path.join(tempfile.gettempdir(), "mnist_data_")

    if args.tune:
        tune_mnist(data_dir, num_samples, num_epochs, num_workers, use_gpu)
    else:
        config = {"layer_1": 32, "layer_2": 64, "lr": 1e-1, "batch_size": 32}
        train_mnist(
            config,
            data_dir=data_dir,
            num_epochs=num_epochs,
            num_workers=num_workers,
            use_gpu=use_gpu,
        )
@constd
Copy link

constd commented Jan 14, 2023

I've been experiencing the same issue, which makes ray a non-starter for my situation. I would love to help resolve this, but, as I'm very new to ray, I don't know where to start. Anybody would be so kind as to point me in the right direction?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants