Skip to content

Commit

Permalink
good img
Browse files Browse the repository at this point in the history
  • Loading branch information
YUYING07 committed Sep 25, 2024
1 parent 34e29e0 commit 6388fc4
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 99 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ libmoon.egg-info/

archive/
libmoon/problem/mtl/mtl_data/FGNET
libmoon/problem/mtl/mtl_data/quickdraw
libmoon/Output
.git/
.vscode/
Expand Down
231 changes: 132 additions & 99 deletions libmoon/gan_tester/PrefImageGan.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


def load_imgs():
directory = '/mnt/d/pycharm/libmoon/libmoon/problem/mtl/mtl_data/FGNET/male/'
class ImageDataset(torch.utils.data.Dataset):
def __init__(self, data):
self.data = data

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
return self.data[idx][0], self.data[idx][1]


def load_FGNET_imgs():
directory = '/home/a/yingyingyu/libmoon/libmoon/problem/mtl/mtl_data/FGNET/male/'
Full_imgs = []
males_f = os.listdir(directory)
males_res = [[f[:3], f.split('.')[0][-2:]] for f in males_f if f.endswith('.JPG')]
Expand Down Expand Up @@ -45,6 +56,18 @@ def load_imgs():
return Full_imgs


def load_quickdraw_imgs():
directory = '/home/a/yingyingyu/libmoon/libmoon/problem/mtl/mtl_data/quickdraw/'
apple_f = np.load(directory + 'full_numpy_bitmap_apple.npy')
baseball_f = np.load(directory + 'full_numpy_bitmap_baseball.npy')
Full_imgs = []
for i in range(100000):
img1 = apple_f[i].reshape(28, 28)
img2 = baseball_f[i].reshape(28, 28)
Full_imgs.append([img1, img2])
return Full_imgs


def plot_figure(folder_name, generated_samples, sample1, sample2, pref):
plt.scatter(generated_samples[:, 0], generated_samples[:, 1], label='Generated', s=50)
plt.scatter(sample1[:, 0], sample1[:, 1], label='Sample 1', s=25, alpha=0.5)
Expand All @@ -69,45 +92,54 @@ class Generator(nn.Module):
Pref-conditioned GAN
'''

def __init__(self, input_dim, output_dim, bs):
def __init__(self, latent_dim):
super(Generator, self).__init__()
# 2 is the preference dimension.
self.f1 = nn.Linear(input_dim + 2, (input_dim + 2) * bs)
self.FN1 = nn.Sequential(
nn.Linear(input_dim + 2, 256),
nn.ReLU(),
nn.Linear(256, 512),
nn.ReLU(),
nn.Linear(512, output_dim))

def layer_block(input_size, output_size):
layers = [nn.Linear(input_size, output_size)]
layers.append(nn.LeakyReLU(0.2, inplace=True))
return layers

# self.FN1 = nn.Sequential(
# nn.Linear(latent_dim + 2, 256),
# nn.ReLU(),
# nn.Linear(256, 512),
# nn.ReLU(),
# nn.Linear(512, output_dim))
self.model = nn.Sequential(
*layer_block(latent_dim + 2, 128),
*layer_block(128, 256),
*layer_block(256, 512),
*layer_block(512, 1024),
nn.Linear(1024, int(np.prod((1, 28, 28)))),
nn.Tanh())

def forward(self, z, pref):
batch_size = len(z)
pref_batch = pref.repeat(batch_size, 1)
input_arg = torch.cat([z, pref_batch], dim=1)
input_arg = input_arg.view(batch_size, -1, input_arg.shape[1])
res1 = self.f1(input_arg)
res1 = res1.view(batch_size, -1, input_arg.shape[2])
return self.FN1(res1)
# res1 = self.f1(input_arg)
img = self.model(input_arg)
img = img.view(img.size(0), *(1, 28, 28))
return img


class Discriminator(nn.Module):
def __init__(self, input_dim):
def __init__(self):
super(Discriminator, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, 512 // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512 // 2, 512 // 4),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512 // 4, 1),
nn.Linear(int(np.prod((1, 28, 28))), 512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, 256),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(256, 1),
nn.Sigmoid())

def forward(self, x):
return self.model(x)
img_flat = x.view(x.size(0), -1)
verdict = self.model(img_flat)
return verdict


def sample_multiple_gaussian(batch_size, dim, mean=0, std=1):
Expand All @@ -117,15 +149,15 @@ def sample_multiple_gaussian(batch_size, dim, mean=0, std=1):


def sample_real_imgs(test_size):
imgs = load_imgs()
imgs = load_quickdraw_imgs()
idx = np.random.choice(len(imgs), test_size)
real_samples_1 = torch.Tensor(np.array([f[0] for f in imgs]))
real_samples_2 = torch.Tensor(np.array([f[1] for f in imgs]))
return real_samples_1[idx], real_samples_2[idx]


class PrefGANTrainer:
def __init__(self, lr, num_epochs, batch_size, n_obj, input_dim, output_dim):
def __init__(self, lr, num_epochs, batch_size, n_obj, latent_dim):
'''
:param lr, float: learning rate.
:param num_epochs, int : number of epochs.
Expand All @@ -134,83 +166,85 @@ def __init__(self, lr, num_epochs, batch_size, n_obj, input_dim, output_dim):
:param pref, np.array : preference vector.
'''
self.lr = lr
self.bs = int(32 * 32 / output_dim)
self.num_epochs = num_epochs
self.batch_size = batch_size
self.n_obj = n_obj
self.input_dim = input_dim
self.output_dim = output_dim
self.latent_dim = latent_dim

self.generator = Generator(args.input_dim, args.output_dim, self.bs).to(device)
self.discriminator_arr = [Discriminator(args.output_dim).to(device) for _ in range(n_obj)]
self.d_optimizer_arr = [optim.Adam(discriminator.parameters(), lr=args.lr) for discriminator in
self.generator = Generator(self.latent_dim).to(device)
self.discriminator_arr = [Discriminator().to(device) for _ in range(n_obj)]
self.d_optimizer_arr = [optim.Adam(discriminator.parameters(), lr=self.lr) for discriminator in
self.discriminator_arr]
self.g_optimizer = optim.Adam(self.generator.parameters(), lr=args.lr)
self.g_optimizer = optim.Adam(self.generator.parameters(), lr=self.lr)
self.criterion = nn.BCELoss()

def load_dataset(self):
# real_samples_1, real_samples_2 = sample_multiple_gaussian(self.batch_size, self.output_dim)
imgs = load_imgs()
real_samples_1, real_samples_2 = (torch.Tensor(np.array([f[0] for f in imgs])),
torch.Tensor(np.array([f[1] for f in imgs])))
real_samples_arr = [real_samples_1, real_samples_2]
return real_samples_arr

def D_train(self, x, discriminator, opt):
z = torch.randn(self.batch_size, self.input_dim).to(device) # Random noise
imgs = load_quickdraw_imgs()
real_samples_1, real_samples_2 = (torch.Tensor(np.array([f[0] for f in imgs])).view(-1, 28, 28),
torch.Tensor(np.array([f[1] for f in imgs])).view(-1, 28, 28))
# real_samples_1, real_samples_2 = (torch.Tensor(np.array([f[0] for f in imgs])).view(-1, 28, 28),
# torch.Tensor(np.array([f[0] for f in imgs])).view(-1, 28, 28))
real_samples_arr = [[real_samples_1[i], real_samples_2[i]] for i in range(len(real_samples_1))]
dataset = ImageDataset(real_samples_arr)
data_loader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
return data_loader

def D_train(self, x_real, discriminator, opt):
z = torch.randn(self.batch_size, self.latent_dim).to(device) # Random noise
pref = Dirichlet(torch.Tensor([0.5, 0.5])).sample().to(device)
discriminator.zero_grad()
x_real = x.view(self.batch_size, -1, self.output_dim)
d_real = discriminator(x_real)
y_real = torch.ones_like(d_real)
real_loss = self.criterion(d_real, y_real)
x_fake = self.generator(z, pref)
d_fake = discriminator(x_fake)
y_fake = torch.zeros_like(d_fake)
fake_loss = self.criterion(d_fake, y_fake)
D_loss = real_loss + fake_loss
D_loss = (real_loss + fake_loss) / 2
D_loss.backward()
opt.step()
return D_loss

def G_train(self, discriminator):
z = torch.randn(self.batch_size, self.input_dim).to(device) # Random noise
z = torch.randn(self.batch_size, self.latent_dim).to(device) # Random noise
pref = Dirichlet(torch.Tensor([0.5, 0.5])).sample().to(device)
self.generator.zero_grad()
fake_samples = self.generator(z, pref)
d_fake = discriminator(fake_samples)
y = torch.ones_like(d_fake)
g_loss = self.criterion(d_fake, y)
g_loss.backward()
self.g_optimizer.step()
# g_loss.backward()
# self.g_optimizer.step()
return g_loss

def train(self):
real_samples_arr = self.load_dataset()
dataloader = self.load_dataset()
for epoch in range(self.num_epochs):
d_loss_arr = []
for idx, discriminator in enumerate(self.discriminator_arr):
real_samples = real_samples_arr[idx].to(device)
d_loss = self.D_train(real_samples, discriminator, self.d_optimizer_arr[idx])
d_loss_arr.append(d_loss.item())
z = torch.randn(self.batch_size, self.input_dim).to(device)
g_loss_arr = []
for idx, discriminator in enumerate(self.discriminator_arr):
g_loss = self.G_train(discriminator)
g_loss_arr.append(g_loss)
pref = Dirichlet(torch.Tensor([0.5, 0.5])).sample().to(device)
g_loss_arr = torch.stack(g_loss_arr)
# self.g_optimizer.zero_grad()
# scalar_loss = torch.dot(g_loss_arr, pref)
# scalar_loss.backward()
# self.g_optimizer.step()
if (epoch + 1) % 100 == 0:
print(
f'Epoch [{epoch + 1}/{self.num_epochs}], d_loss: {d_loss.item():.4f}, g_loss: {g_loss.item():.4f}')
for i, images in enumerate(dataloader):
d_loss_arr = []
for idx, discriminator in enumerate(self.discriminator_arr):
real_samples = images[idx].to(device)
d_loss = self.D_train(real_samples, discriminator, self.d_optimizer_arr[idx])
d_loss_arr.append(d_loss.cpu().item())
g_loss_arr = []
g_loss_arr_ = []
for idx, discriminator in enumerate(self.discriminator_arr):
g_loss = self.G_train(discriminator)
g_loss_arr.append(g_loss.cpu().item())
g_loss_arr_.append(g_loss)
pref = Dirichlet(torch.Tensor([0.5, 0.5])).sample().to(device)
g_loss_arr_ = torch.stack(g_loss_arr_)
self.g_optimizer.zero_grad()
scalar_loss = torch.dot(g_loss_arr_, pref)
torch.autograd.set_detect_anomaly(True)
scalar_loss.backward()
self.g_optimizer.step()
print(f"[Epoch {epoch:=4d}/{self.num_epochs}] [Batch {i:=4d}/{len(dataloader)}] ---> "
f"[D Loss: {np.mean(np.array(d_loss_arr)):.6f}] [G Loss: {np.mean(np.array(g_loss_arr)):.6f}]")

def generate_samples(self, test_size):
with torch.no_grad():
z = torch.randn(test_size, self.input_dim)
z = torch.randn(test_size, self.latent_dim)
generated_samples = self.generator(z)
real_samples_1, real_samples_2 = sample_multiple_gaussian(self.batch_size, self.output_dim)
return generated_samples.numpy(), real_samples_1, real_samples_2
Expand All @@ -227,7 +261,7 @@ def evaluate(self, test_size, eval_num=11):
For each preference, we need to generate samples.
'''
for idx, pref in enumerate(pref_arr):
z = torch.randn(test_size, self.input_dim).to(device)
z = torch.randn(test_size, self.latent_dim).to(device)
generated_samples = self.generator(z, pref).cpu().numpy()
real_samples_1, real_samples_2 = sample_multiple_gaussian(self.batch_size, self.output_dim)
fig = plt.figure()
Expand All @@ -253,32 +287,32 @@ def evaluate_img(self, test_size, eval_num=11):
pref_0 = np.linspace(0, 1, eval_num)
pref_1 = 1 - pref_0
pref_arr = torch.Tensor(np.stack([pref_0, pref_1], axis=1)).to(device)
folder_name = '/home/a/yingyingyu/libmoon/libmoon/Output/divergence/prefgan'
os.makedirs(folder_name, exist_ok=True)
real_samples_1, real_samples_2 = sample_real_imgs(test_size)
# save generated samples
for i in range(test_size):
img = real_samples_1[i]
fig = plt.figure()
plt.imshow(img, cmap='gray')
plt.axis('off')
fig_name = os.path.join(folder_name, 'res_real1_{:.2f}.pdf'.format(i))
plt.savefig(fig_name, bbox_inches='tight')
print('Save fig to {}'.format(fig_name))
for i in range(test_size):
img = real_samples_2[i]
fig = plt.figure()
plt.imshow(img, cmap='gray')
plt.axis('off')
fig_name = os.path.join(folder_name, 'res_real2_{:.2f}.pdf'.format(i))
plt.savefig(fig_name, bbox_inches='tight')
print('Save fig to {}'.format(fig_name))
with torch.no_grad():
for idx, pref in enumerate(pref_arr):
z = torch.randn(test_size, self.input_dim).to(device)
z = torch.randn(test_size, self.latent_dim).to(device)
generated_samples = self.generator(z, pref).cpu().numpy()
real_samples_1, real_samples_2 = sample_real_imgs(test_size)
folder_name = '/mnt/d/pycharm/libmoon/libmoon/Output/divergence/prefgan'
os.makedirs(folder_name, exist_ok=True)
# save generated samples
for i in range(test_size):
img = real_samples_1[i]
fig = plt.figure()
plt.imshow(img, cmap='gray')
plt.axis('off')
fig_name = os.path.join(folder_name, 'res_real1_{:.2f}_{:.2f}.pdf'.format(i, pref[0]))
plt.savefig(fig_name, bbox_inches='tight')
print('Save fig to {}'.format(fig_name))
for i in range(test_size):
img = real_samples_2[i]
fig = plt.figure()
plt.imshow(img, cmap='gray')
plt.axis('off')
fig_name = os.path.join(folder_name, 'res_real2_{:.2f}_{:.2f}.pdf'.format(i, pref[0]))
plt.savefig(fig_name, bbox_inches='tight')
print('Save fig to {}'.format(fig_name))
for i in range(test_size):
img = generated_samples[i].reshape(32, 32)
img = generated_samples[i].reshape(28, 28)
fig = plt.figure()
plt.imshow(img, cmap='gray')
plt.axis('off')
Expand All @@ -289,18 +323,17 @@ def evaluate_img(self, test_size, eval_num=11):

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='example script')
parser.add_argument('--input-dim', type=int, default=10) # What does it mean?
parser.add_argument('--output-dim', type=int, default=128)
parser.add_argument('--latent-dim', type=int, default=128)
parser.add_argument('--n-obj', type=int, default=2)
parser.add_argument('--batch-size', type=int, default=64)
parser.add_argument('--batch-size', type=int, default=256)
parser.add_argument('--test-size', type=int, default=10)
parser.add_argument('--num-epochs', type=int, default=5000)
parser.add_argument('--lr', type=float, default=5e-6)
parser.add_argument('--num-epochs', type=int, default=10)
parser.add_argument('--lr', type=float, default=0.00001)
parser.add_argument('--pref0', type=float, default=0.2)
# Hyperparameters
args = parser.parse_args()
trainer = PrefGANTrainer(lr=args.lr, num_epochs=args.num_epochs, n_obj=args.n_obj,
batch_size=args.batch_size, input_dim=args.input_dim, output_dim=args.output_dim)
batch_size=args.batch_size, latent_dim=args.latent_dim)
trainer.train()
# samples = trainer.evaluate(test_size=args.test_size, eval_num=11)
samples = trainer.evaluate_img(test_size=args.test_size, eval_num=11)

0 comments on commit 6388fc4

Please sign in to comment.