From 97cec4d27537710df4e9dccb3e1ff40462182991 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Wed, 28 Jun 2023 22:08:22 -0700 Subject: [PATCH 01/13] Fix examples code style --- examples/demo_custom_torch_workflow.py | 3 +- examples/demo_jax_distributed.py | 7 ++- .../collaborative_filtering_movielens.py | 10 ++- .../tensorflow/vision/image_captioning.py | 63 ++++++++++++++----- ..._classification_with_vision_transformer.py | 15 +++-- .../keras_io/tensorflow/vision/involution.py | 58 ++++++++++++++--- examples/keras_io/tensorflow/vision/mixup.py | 17 +++-- .../vision/mlp_image_classification.py | 35 ++++++++--- .../vision/perceiver_image_classification.py | 22 +++++-- .../timeseries_classification_transformer.py | 4 +- .../vision/attention_mil_classification.py | 30 ++++++--- 11 files changed, 201 insertions(+), 63 deletions(-) diff --git a/examples/demo_custom_torch_workflow.py b/examples/demo_custom_torch_workflow.py index aaa25f25e..e7d971a32 100644 --- a/examples/demo_custom_torch_workflow.py +++ b/examples/demo_custom_torch_workflow.py @@ -98,6 +98,7 @@ def train(model, train_loader, num_epochs, optimizer, loss_fn): ######## Using a Keras model or layer in a torch Module ######## ################################################################ + class MyModel(nn.Module): def __init__(self): super().__init__() @@ -126,4 +127,4 @@ def forward(self, x): # Instantiate the torch loss function loss_fn = nn.CrossEntropyLoss() -train(torch_module, train_loader, num_epochs, optimizer, loss_fn) \ No newline at end of file +train(torch_module, train_loader, num_epochs, optimizer, loss_fn) diff --git a/examples/demo_jax_distributed.py b/examples/demo_jax_distributed.py index 328490f0a..9aef7060a 100644 --- a/examples/demo_jax_distributed.py +++ b/examples/demo_jax_distributed.py @@ -157,7 +157,12 @@ def make_model(): # data will be split along the batch axis data_mesh = Mesh(devices, axis_names=("batch",)) # naming axes of the mesh # naming axes of the sharded partition -data_sharding = NamedSharding(data_mesh,P("batch",),) +data_sharding = NamedSharding( + data_mesh, + P( + "batch", + ), +) # all variables will be replicated on all devices var_mesh = Mesh(devices, axis_names=("_")) diff --git a/examples/keras_io/structured_data/collaborative_filtering_movielens.py b/examples/keras_io/structured_data/collaborative_filtering_movielens.py index 9b828930d..41e89ebb7 100644 --- a/examples/keras_io/structured_data/collaborative_filtering_movielens.py +++ b/examples/keras_io/structured_data/collaborative_filtering_movielens.py @@ -40,6 +40,7 @@ import keras_core as keras from keras_core import layers from keras_core import ops + """ ## First, load the data and apply preprocessing """ @@ -97,7 +98,11 @@ df = df.sample(frac=1, random_state=42) x = df[["user", "movie"]].values # Normalize the targets between 0 and 1. Makes it easy to train. -y = df["rating"].apply(lambda x: (x - min_rating) / (max_rating - min_rating)).values +y = ( + df["rating"] + .apply(lambda x: (x - min_rating) / (max_rating - min_rating)) + .values +) # Assuming training on 90% of the data and validating on 10%. train_indices = int(0.9 * df.shape[0]) x_train, x_val, y_train, y_val = ( @@ -204,7 +209,8 @@ def call(self, inputs): ratings = model.predict(user_movie_array).flatten() top_ratings_indices = ratings.argsort()[-10:][::-1] recommended_movie_ids = [ - movie_encoded2movie.get(movies_not_watched[x][0]) for x in top_ratings_indices + movie_encoded2movie.get(movies_not_watched[x][0]) + for x in top_ratings_indices ] print("Showing recommendations for user: {}".format(user_id)) diff --git a/examples/keras_io/tensorflow/vision/image_captioning.py b/examples/keras_io/tensorflow/vision/image_captioning.py index aba580c06..ad5136cb8 100644 --- a/examples/keras_io/tensorflow/vision/image_captioning.py +++ b/examples/keras_io/tensorflow/vision/image_captioning.py @@ -178,7 +178,9 @@ def train_val_split(caption_data, train_size=0.8, shuffle=True): def custom_standardization(input_string): lowercase = tf.strings.lower(input_string) - return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "") + return tf.strings.regex_replace( + lowercase, "[%s]" % re.escape(strip_chars), "" + ) strip_chars = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~" @@ -263,7 +265,9 @@ def get_cnn_model(): # We freeze our feature extractor base_model.trainable = False base_model_out = base_model.output - base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out) + base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))( + base_model_out + ) cnn_model = keras.models.Model(base_model.input, base_model_out) return cnn_model @@ -342,7 +346,9 @@ def __init__(self, embed_dim, ff_dim, num_heads, **kwargs): self.layernorm_3 = layers.LayerNormalization() self.embedding = PositionalEmbedding( - embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE + embed_dim=EMBED_DIM, + sequence_length=SEQ_LENGTH, + vocab_size=VOCAB_SIZE, ) self.out = layers.Dense(VOCAB_SIZE, activation="softmax") @@ -394,7 +400,10 @@ def get_causal_attention_mask(self, inputs): mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( - [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], + [ + tf.expand_dims(batch_size, -1), + tf.constant([1, 1], dtype=tf.int32), + ], axis=0, ) return tf.tile(mask, mult) @@ -431,7 +440,9 @@ def calculate_accuracy(self, y_true, y_pred, mask): mask = tf.cast(mask, dtype=tf.float32) return tf.reduce_sum(accuracy) / tf.reduce_sum(mask) - def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True): + def _compute_caption_loss_and_acc( + self, img_embed, batch_seq, training=True + ): encoder_out = self.encoder(img_embed, training=training) batch_seq_inp = batch_seq[:, :-1] batch_seq_true = batch_seq[:, 1:] @@ -469,7 +480,8 @@ def train_step(self, batch_data): # 4. Get the list of all the trainable weights train_vars = ( - self.encoder.trainable_variables + self.decoder.trainable_variables + self.encoder.trainable_variables + + self.decoder.trainable_variables ) # 5. Get the gradients @@ -484,7 +496,10 @@ def train_step(self, batch_data): self.acc_tracker.update_state(batch_acc) # 8. Return the loss and accuracy values - return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()} + return { + "loss": self.loss_tracker.result(), + "acc": self.acc_tracker.result(), + } def test_step(self, batch_data): batch_img, batch_seq = batch_data @@ -513,7 +528,10 @@ def test_step(self, batch_data): self.acc_tracker.update_state(batch_acc) # 5. Return the loss and accuracy values - return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()} + return { + "loss": self.loss_tracker.result(), + "acc": self.acc_tracker.result(), + } @property def metrics(self): @@ -523,8 +541,12 @@ def metrics(self): cnn_model = get_cnn_model() -encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1) -decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2) +encoder = TransformerEncoderBlock( + embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1 +) +decoder = TransformerDecoderBlock( + embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2 +) caption_model = ImageCaptioningModel( cnn_model=cnn_model, encoder=encoder, @@ -539,15 +561,20 @@ def metrics(self): # Define the loss function cross_entropy = keras.losses.SparseCategoricalCrossentropy( - from_logits=False, reduction=None, + from_logits=False, + reduction=None, ) # EarlyStopping criteria -early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) +early_stopping = keras.callbacks.EarlyStopping( + patience=3, restore_best_weights=True +) # Learning Rate Scheduler for the optimizer -class LRSchedule(keras.optimizers.schedules.learning_rate_schedule.LearningRateSchedule): +class LRSchedule( + keras.optimizers.schedules.learning_rate_schedule.LearningRateSchedule +): def __init__(self, post_warmup_learning_rate, warmup_steps): super().__init__() self.post_warmup_learning_rate = post_warmup_learning_rate @@ -568,10 +595,14 @@ def __call__(self, step): # Create a learning rate schedule num_train_steps = len(train_dataset) * EPOCHS num_warmup_steps = num_train_steps // 15 -lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps) +lr_schedule = LRSchedule( + post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps +) # Compile the model -caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy) +caption_model.compile( + optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy +) # Fit the model caption_model.fit( @@ -639,4 +670,4 @@ def generate_caption(): this example easily runnable, we have trained it with a few constraints, like a minimal number of attention heads. To improve the predictions, you can try changing these training settings and find a good model for your use case. -""" \ No newline at end of file +""" diff --git a/examples/keras_io/tensorflow/vision/image_classification_with_vision_transformer.py b/examples/keras_io/tensorflow/vision/image_classification_with_vision_transformer.py index 027ddc476..47614b4ff 100644 --- a/examples/keras_io/tensorflow/vision/image_classification_with_vision_transformer.py +++ b/examples/keras_io/tensorflow/vision/image_classification_with_vision_transformer.py @@ -58,7 +58,10 @@ projection_dim, ] # Size of the transformer layers transformer_layers = 8 -mlp_head_units = [2048, 1024] # Size of the dense layers of the final classifier +mlp_head_units = [ + 2048, + 1024, +] # Size of the dense layers of the final classifier """ @@ -218,7 +221,9 @@ def create_vit_classifier(): representation = layers.Flatten()(representation) representation = layers.Dropout(0.5)(representation) # Add MLP. - features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5) + features = mlp( + representation, hidden_units=mlp_head_units, dropout_rate=0.5 + ) # Classify outputs. logits = layers.Dense(num_classes)(features) # Create the Keras model. @@ -241,7 +246,9 @@ def run_experiment(model): loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=[ keras.metrics.SparseCategoricalAccuracy(name="accuracy"), - keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"), + keras.metrics.SparseTopKCategoricalAccuracy( + 5, name="top-5-accuracy" + ), ], ) @@ -288,4 +295,4 @@ def run_experiment(model): but also by parameters such as the learning rate schedule, optimizer, weight decay, etc. In practice, it's recommended to fine-tune a ViT model that was pre-trained using a large, high-resolution dataset. -""" \ No newline at end of file +""" diff --git a/examples/keras_io/tensorflow/vision/involution.py b/examples/keras_io/tensorflow/vision/involution.py index 6e4002085..a44b6c7ae 100644 --- a/examples/keras_io/tensorflow/vision/involution.py +++ b/examples/keras_io/tensorflow/vision/involution.py @@ -123,7 +123,9 @@ def build(self, input_shape): keras.layers.BatchNormalization(), keras.layers.ReLU(), keras.layers.Conv2D( - filters=self.kernel_size * self.kernel_size * self.group_number, + filters=self.kernel_size + * self.kernel_size + * self.group_number, kernel_size=1, ), ] @@ -198,22 +200,39 @@ def call(self, x): # Compute involution with stride 1. output_tensor, _ = Involution( - channel=3, group_number=1, kernel_size=5, stride=1, reduction_ratio=1, name="inv_1" + channel=3, + group_number=1, + kernel_size=5, + stride=1, + reduction_ratio=1, + name="inv_1", )(input_tensor) print(f"with stride 1 ouput shape: {output_tensor.shape}") # Compute involution with stride 2. output_tensor, _ = Involution( - channel=3, group_number=1, kernel_size=5, stride=2, reduction_ratio=1, name="inv_2" + channel=3, + group_number=1, + kernel_size=5, + stride=2, + reduction_ratio=1, + name="inv_2", )(input_tensor) print(f"with stride 2 ouput shape: {output_tensor.shape}") # Compute involution with stride 1, channel 16 and reduction ratio 2. output_tensor, _ = Involution( - channel=16, group_number=1, kernel_size=5, stride=1, reduction_ratio=2, name="inv_3" + channel=16, + group_number=1, + kernel_size=5, + stride=1, + reduction_ratio=2, + name="inv_3", )(input_tensor) print( - "with channel 16 and reduction ratio 2 ouput shape: {}".format(output_tensor.shape) + "with channel 16 and reduction ratio 2 ouput shape: {}".format( + output_tensor.shape + ) ) """ @@ -250,7 +269,9 @@ def call(self, x): .shuffle(256) .batch(256) ) -test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(256) +test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch( + 256 +) """ ## Visualise the data @@ -287,7 +308,9 @@ def call(self, x): print("building the convolution model...") conv_model = keras.Sequential( [ - keras.layers.Conv2D(32, (3, 3), input_shape=(32, 32, 3), padding="same"), + keras.layers.Conv2D( + 32, (3, 3), input_shape=(32, 32, 3), padding="same" + ), keras.layers.ReLU(name="relu1"), keras.layers.MaxPooling2D((2, 2)), keras.layers.Conv2D(64, (3, 3), padding="same"), @@ -323,17 +346,32 @@ def call(self, x): inputs = keras.Input(shape=(32, 32, 3)) x, _ = Involution( - channel=3, group_number=1, kernel_size=3, stride=1, reduction_ratio=2, name="inv_1" + channel=3, + group_number=1, + kernel_size=3, + stride=1, + reduction_ratio=2, + name="inv_1", )(inputs) x = keras.layers.ReLU()(x) x = keras.layers.MaxPooling2D((2, 2))(x) x, _ = Involution( - channel=3, group_number=1, kernel_size=3, stride=1, reduction_ratio=2, name="inv_2" + channel=3, + group_number=1, + kernel_size=3, + stride=1, + reduction_ratio=2, + name="inv_2", )(x) x = keras.layers.ReLU()(x) x = keras.layers.MaxPooling2D((2, 2))(x) x, _ = Involution( - channel=3, group_number=1, kernel_size=3, stride=1, reduction_ratio=2, name="inv_3" + channel=3, + group_number=1, + kernel_size=3, + stride=1, + reduction_ratio=2, + name="inv_3", )(x) x = keras.layers.ReLU()(x) x = keras.layers.Flatten()(x) diff --git a/examples/keras_io/tensorflow/vision/mixup.py b/examples/keras_io/tensorflow/vision/mixup.py index 2666d3f84..34cf72727 100644 --- a/examples/keras_io/tensorflow/vision/mixup.py +++ b/examples/keras_io/tensorflow/vision/mixup.py @@ -140,7 +140,8 @@ def mix_up(ds_one, ds_two, alpha=0.2): # First create the new dataset using our `mix_up` utility train_ds_mu = train_ds.map( - lambda ds_one, ds_two: mix_up(ds_one, ds_two, alpha=0.2), num_parallel_calls=AUTO + lambda ds_one, ds_two: mix_up(ds_one, ds_two, alpha=0.2), + num_parallel_calls=AUTO, ) # Let's preview 9 samples from the dataset @@ -160,7 +161,9 @@ def mix_up(ds_one, ds_two, alpha=0.2): def get_training_model(): model = keras.Sequential( [ - layers.Conv2D(16, (5, 5), activation="relu", input_shape=(28, 28, 1)), + layers.Conv2D( + 16, (5, 5), activation="relu", input_shape=(28, 28, 1) + ), layers.MaxPooling2D(pool_size=(2, 2)), layers.Conv2D(32, (5, 5), activation="relu"), layers.MaxPooling2D(pool_size=(2, 2)), @@ -187,7 +190,9 @@ def get_training_model(): model = get_training_model() model.load_weights("initial_weights.weights.h5") -model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]) +model.compile( + loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"] +) model.fit(train_ds_mu, validation_data=val_ds, epochs=EPOCHS) _, test_acc = model.evaluate(test_ds) print("Test accuracy: {:.2f}%".format(test_acc * 100)) @@ -198,7 +203,9 @@ def get_training_model(): model = get_training_model() model.load_weights("initial_weights.weights.h5") -model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]) +model.compile( + loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"] +) # Notice that we are NOT using the mixed up dataset here model.fit(train_ds_one, validation_data=val_ds, epochs=EPOCHS) _, test_acc = model.evaluate(test_ds) @@ -226,4 +233,4 @@ def get_training_model(): adversarial examples and stabilized GAN (Generative Adversarial Networks) training. * There are a number of data augmentation techniques that extend mixup such as [CutMix](https://arxiv.org/abs/1905.04899) and [AugMix](https://arxiv.org/abs/1912.02781). -""" \ No newline at end of file +""" diff --git a/examples/keras_io/tensorflow/vision/mlp_image_classification.py b/examples/keras_io/tensorflow/vision/mlp_image_classification.py index 10689ca96..b2a7c2d3d 100644 --- a/examples/keras_io/tensorflow/vision/mlp_image_classification.py +++ b/examples/keras_io/tensorflow/vision/mlp_image_classification.py @@ -32,7 +32,7 @@ import keras_core as keras from keras_core import layers from keras_core.layers import Lambda - + """ ## Prepare the data """ @@ -183,7 +183,9 @@ def call(self, images): padding="VALID", ) patch_dims = patches.shape[-1] - patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims]) + patches = tf.reshape( + patches, [batch_size, self.num_patches, patch_dims] + ) return patches @@ -207,7 +209,9 @@ def call(self, images): class MLPMixerLayer(layers.Layer): - def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs): + def __init__( + self, num_patches, hidden_units, dropout_rate, *args, **kwargs + ): super().__init__(*args, **kwargs) self.mlp1 = keras.Sequential( @@ -257,7 +261,10 @@ def call(self, inputs): """ mlpmixer_blocks = keras.Sequential( - [MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)] + [ + MLPMixerLayer(num_patches, embedding_dim, dropout_rate) + for _ in range(num_blocks) + ] ) learning_rate = 0.005 mlpmixer_classifier = build_classifier(mlpmixer_blocks) @@ -292,7 +299,9 @@ def call(self, inputs): class FNetLayer(layers.Layer): - def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs): + def __init__( + self, num_patches, embedding_dim, dropout_rate, *args, **kwargs + ): super().__init__(*args, **kwargs) self.ffn = keras.Sequential( @@ -332,7 +341,10 @@ def call(self, inputs): """ fnet_blocks = keras.Sequential( - [FNetLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)] + [ + FNetLayer(num_patches, embedding_dim, dropout_rate) + for _ in range(num_blocks) + ] ) learning_rate = 0.001 fnet_classifier = build_classifier(fnet_blocks, positional_encoding=True) @@ -363,7 +375,9 @@ def call(self, inputs): class gMLPLayer(layers.Layer): - def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs): + def __init__( + self, num_patches, embedding_dim, dropout_rate, *args, **kwargs + ): super().__init__(*args, **kwargs) self.channel_projection1 = keras.Sequential( @@ -416,7 +430,10 @@ def call(self, inputs): """ gmlp_blocks = keras.Sequential( - [gMLPLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)] + [ + gMLPLayer(num_patches, embedding_dim, dropout_rate) + for _ in range(num_blocks) + ] ) learning_rate = 0.003 gmlp_classifier = build_classifier(gmlp_blocks) @@ -429,4 +446,4 @@ def call(self, inputs): You may also try to increase the size of the input images and use different patch sizes. Note that, the paper used advanced regularization strategies, such as MixUp and CutMix, as well as AutoAugment. -""" \ No newline at end of file +""" diff --git a/examples/keras_io/tensorflow/vision/perceiver_image_classification.py b/examples/keras_io/tensorflow/vision/perceiver_image_classification.py index a6e223436..cb37aa5f5 100644 --- a/examples/keras_io/tensorflow/vision/perceiver_image_classification.py +++ b/examples/keras_io/tensorflow/vision/perceiver_image_classification.py @@ -71,14 +71,18 @@ patch_size = 2 # Size of the patches to be extract from the input images. num_patches = (image_size // patch_size) ** 2 # Size of the data array. latent_dim = 256 # Size of the latent array. -projection_dim = 256 # Embedding size of each element in the data and latent arrays. +projection_dim = ( + 256 # Embedding size of each element in the data and latent arrays. +) num_heads = 8 # Number of Transformer heads. ffn_units = [ projection_dim, projection_dim, ] # Size of the Transformer Feedforward network. num_transformer_blocks = 4 -num_iterations = 2 # Repetitions of the cross-attention and Transformer modules. +num_iterations = ( + 2 # Repetitions of the cross-attention and Transformer modules. +) classifier_units = [ projection_dim, num_classes, @@ -205,13 +209,19 @@ def create_cross_attention_module( ): inputs = { # Recieve the latent array as an input of shape [1, latent_dim, projection_dim]. - "latent_array": layers.Input(shape=(latent_dim, projection_dim), name="latent_array"), + "latent_array": layers.Input( + shape=(latent_dim, projection_dim), name="latent_array" + ), # Recieve the data_array (encoded image) as an input of shape [batch_size, data_dim, projection_dim]. - "data_array": layers.Input(shape=(data_dim, projection_dim), name="data_array"), + "data_array": layers.Input( + shape=(data_dim, projection_dim), name="data_array" + ), } # Apply layer norm to the inputs - latent_array = layers.LayerNormalization(epsilon=1e-6)(inputs["latent_array"]) + latent_array = layers.LayerNormalization(epsilon=1e-6)( + inputs["latent_array"] + ) data_array = layers.LayerNormalization(epsilon=1e-6)(inputs["data_array"]) # Create query tensor: [1, latent_dim, projection_dim]. @@ -475,4 +485,4 @@ def run_experiment(model): The Perceiver benefits from inceasing the model size. However, larger models needs bigger accelerators to fit in and train efficiently. This is why in the Perceiver paper they used 32 TPU cores to run the experiments. -""" \ No newline at end of file +""" diff --git a/examples/keras_io/timeseries/timeseries_classification_transformer.py b/examples/keras_io/timeseries/timeseries_classification_transformer.py index 66b7bc738..f9826447d 100644 --- a/examples/keras_io/timeseries/timeseries_classification_transformer.py +++ b/examples/keras_io/timeseries/timeseries_classification_transformer.py @@ -148,7 +148,9 @@ def build_model( ) model.summary() -callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)] +callbacks = [ + keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True) +] model.fit( x_train, diff --git a/examples/keras_io/vision/attention_mil_classification.py b/examples/keras_io/vision/attention_mil_classification.py index 0483cd964..50198bad8 100644 --- a/examples/keras_io/vision/attention_mil_classification.py +++ b/examples/keras_io/vision/attention_mil_classification.py @@ -96,7 +96,9 @@ class label. In medical imaging (e.g. computational pathology, etc.) an *entire """ -def create_bags(input_data, input_labels, positive_class, bag_count, instance_count): +def create_bags( + input_data, input_labels, positive_class, bag_count, instance_count +): # Set up bags. bags = [] bag_labels = [] @@ -109,7 +111,9 @@ def create_bags(input_data, input_labels, positive_class, bag_count, instance_co for _ in range(bag_count): # Pick a fixed size random subset of samples. - index = np.random.choice(input_data.shape[0], instance_count, replace=False) + index = np.random.choice( + input_data.shape[0], instance_count, replace=False + ) instances_data = input_data[index] instances_labels = input_labels[index] @@ -241,7 +245,9 @@ def build(self, input_shape): def call(self, inputs): # Assigning variables from the number of inputs. - instances = [self.compute_attention_scores(instance) for instance in inputs] + instances = [ + self.compute_attention_scores(instance) for instance in inputs + ] # Stack instances into a single tensor. instances = ops.stack(instances) @@ -257,7 +263,9 @@ def compute_attention_scores(self, instance): original_instance = instance # tanh(v*h_k^T) - instance = ops.tanh(ops.tensordot(instance, self.v_weight_params, axes=1)) + instance = ops.tanh( + ops.tensordot(instance, self.v_weight_params, axes=1) + ) # for learning non-linear relations efficiently. if self.use_gated: @@ -292,7 +300,7 @@ def plot(data, labels, bag_class, predictions=None, attention_weights=None): attention_weights: Attention weights for each instance within the input data. If you don't specify anything, the values won't be displayed. """ - return ## TODO + return ## TODO labels = np.array(labels).reshape(-1) if bag_class == "positive": @@ -499,12 +507,16 @@ def predict(data, labels, trained_models): models_predictions.append(predictions) # Create intermediate model to get MIL attention layer weights. - intermediate_model = keras.Model(model.input, model.get_layer("alpha").output) + intermediate_model = keras.Model( + model.input, model.get_layer("alpha").output + ) # Predict MIL attention layer weights. intermediate_predictions = intermediate_model.predict(data) - attention_weights = np.squeeze(np.swapaxes(intermediate_predictions, 1, 0)) + attention_weights = np.squeeze( + np.swapaxes(intermediate_predictions, 1, 0) + ) models_attention_weights.append(attention_weights) loss, accuracy = model.evaluate(data, labels, verbose=0) @@ -523,7 +535,9 @@ def predict(data, labels, trained_models): # Evaluate and predict classes and attention scores on validation data. -class_predictions, attention_params = predict(val_data, val_labels, trained_models) +class_predictions, attention_params = predict( + val_data, val_labels, trained_models +) # Plot some results from our validation data. plot( From f9edaeb268005e4f3eed59b5eacec78ae53fef80 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Thu, 29 Jun 2023 12:45:31 -0700 Subject: [PATCH 02/13] Fix issues related to meta device + seed generator --- .../demo_custom_layer_backend_agnostic.py | 4 +++- keras_core/backend/torch/core.py | 23 +++++++++++-------- keras_core/backend/torch/random.py | 4 ++++ keras_core/random/seed_generator.py | 5 ++-- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/examples/demo_custom_layer_backend_agnostic.py b/examples/demo_custom_layer_backend_agnostic.py index 5609781a6..13e811668 100644 --- a/examples/demo_custom_layer_backend_agnostic.py +++ b/examples/demo_custom_layer_backend_agnostic.py @@ -2,7 +2,6 @@ import keras_core from keras_core import Model -from keras_core import backend from keras_core import initializers from keras_core import layers from keras_core import losses @@ -11,6 +10,9 @@ from keras_core import optimizers +keras_core.config.disable_traceback_filtering() + + class MyDense(layers.Layer): def __init__(self, units, name=None): super().__init__(name=name) diff --git a/keras_core/backend/torch/core.py b/keras_core/backend/torch/core.py index ff915416f..3ea8051d9 100644 --- a/keras_core/backend/torch/core.py +++ b/keras_core/backend/torch/core.py @@ -39,10 +39,14 @@ def device_scope(device): global_state.set_global_attribute("torch_device", previous_device) +def get_default_device(): + return "cuda" if torch.cuda.is_available() else "cpu" + + def get_device(): device = global_state.get_global_attribute("torch_device", None) if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" + get_default_device() return device @@ -216,14 +220,15 @@ def symbolic_call(fn, args, kwargs, fill_value): ) return fn(*args, **kwargs) except: - # If the `"meta"` device placement fails, fall back to tracing - # eagerly with tensors on the default device. This will be - # more robust, but more expensive. - args, kwargs = nest.map_structure( - lambda x: convert_keras_tensor_to_torch(x, fill_value), - (args, kwargs), - ) - return fn(*args, **kwargs) + with device_scope(get_default_device()): + # If the `"meta"` device placement fails, fall back to tracing + # eagerly with tensors on the default device. This will be + # more robust, but more expensive. + args, kwargs = nest.map_structure( + lambda x: convert_keras_tensor_to_torch(x, fill_value), + (args, kwargs), + ) + return fn(*args, **kwargs) with StatelessScope(): outputs = symbolic_call(fn, args, kwargs, fill_value=83) diff --git a/keras_core/backend/torch/random.py b/keras_core/backend/torch/random.py index 6d6dc5c8a..815d42d92 100644 --- a/keras_core/backend/torch/random.py +++ b/keras_core/backend/torch/random.py @@ -12,6 +12,10 @@ def torch_seed_generator(seed): seed_val, _ = draw_seed(seed) + device = get_device() + if device == "meta": + # Generator is not support by the meta device. + return None generator = torch.Generator(device=get_device()) generator.manual_seed(int(seed_val)) return generator diff --git a/keras_core/random/seed_generator.py b/keras_core/random/seed_generator.py index 7a00c10d7..478238398 100644 --- a/keras_core/random/seed_generator.py +++ b/keras_core/random/seed_generator.py @@ -44,9 +44,8 @@ def draw_seed(seed): seed_state = seed.state # Use * 1 to create a copy new_seed_value = seed_state.value * 1 - seed.state.assign(seed_state + np.array([0, 1], dtype="uint32")) - if backend.backend() == "torch": - return backend.convert_to_numpy(new_seed_value) + increment = backend.convert_to_tensor(np.array([0, 1]), dtype="uint32") + seed.state.assign(seed_state + increment) return new_seed_value elif isinstance(seed, int): return convert_to_tensor([seed, 0], dtype="uint32") From 30ab1e9c63039a7d8b85b8053dd6d900af888821 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Thu, 29 Jun 2023 12:57:32 -0700 Subject: [PATCH 03/13] Minor fixes --- examples/demo_custom_layer_backend_agnostic.py | 3 --- keras_core/backend/torch/random.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/examples/demo_custom_layer_backend_agnostic.py b/examples/demo_custom_layer_backend_agnostic.py index 13e811668..784c70c6e 100644 --- a/examples/demo_custom_layer_backend_agnostic.py +++ b/examples/demo_custom_layer_backend_agnostic.py @@ -10,9 +10,6 @@ from keras_core import optimizers -keras_core.config.disable_traceback_filtering() - - class MyDense(layers.Layer): def __init__(self, units, name=None): super().__init__(name=name) diff --git a/keras_core/backend/torch/random.py b/keras_core/backend/torch/random.py index 815d42d92..9193a02db 100644 --- a/keras_core/backend/torch/random.py +++ b/keras_core/backend/torch/random.py @@ -14,7 +14,7 @@ def torch_seed_generator(seed): seed_val, _ = draw_seed(seed) device = get_device() if device == "meta": - # Generator is not support by the meta device. + # Generator is not supported by the meta device. return None generator = torch.Generator(device=get_device()) generator.manual_seed(int(seed_val)) From f8e8c2bdb3508af43ce6e424e660225b12666b74 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Thu, 29 Jun 2023 13:44:01 -0700 Subject: [PATCH 04/13] Fix compute_output_spec bug in torch --- .../common/compute_output_spec_test.py | 56 +++++++++++++++++++ keras_core/backend/torch/core.py | 8 +-- 2 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 keras_core/backend/common/compute_output_spec_test.py diff --git a/keras_core/backend/common/compute_output_spec_test.py b/keras_core/backend/common/compute_output_spec_test.py new file mode 100644 index 000000000..d1314be39 --- /dev/null +++ b/keras_core/backend/common/compute_output_spec_test.py @@ -0,0 +1,56 @@ +import pytest + +from keras_core import backend +from keras_core import testing + + +def example_fn(x): + x = (x + 2) * backend.numpy.ones(x.shape) + x = backend.numpy.stack([x, x], axis=-1) + return x + + +class ComputeOutputSpecTest(testing.TestCase): + def test_basics(self): + out = backend.compute_output_spec( + example_fn, backend.KerasTensor((2, 3)) + ) + self.assertTrue(isinstance(out, backend.KerasTensor)) + self.assertEqual(out.shape, (2, 3, 2)) + + out = backend.compute_output_spec( + example_fn, backend.KerasTensor((None, 3)) + ) + self.assertTrue(isinstance(out, backend.KerasTensor)) + self.assertEqual(out.shape, (None, 3, 2)) + + out = backend.compute_output_spec( + example_fn, backend.KerasTensor((2, None)) + ) + self.assertTrue(isinstance(out, backend.KerasTensor)) + self.assertEqual(out.shape, (2, None, 2)) + + @pytest.mark.skipif( + backend.backend() != "torch", reason="Only applicable for torch" + ) + def test_torch_meta_device_incompatible_ops(self): + class Container: + def __init__(self): + self.canary = False + + def example_meta_fn(self, x): + y = backend.numpy.ones(x.shape) + if str(y.device) == "meta": + self.canary = True + raise ValueError("Erroring out on meta device") + x = (x + 2) * y + x = backend.numpy.stack([x, x], axis=-1) + return x + + instance = Container() + out = backend.compute_output_spec( + instance.example_meta_fn, backend.KerasTensor((2, 3)) + ) + self.assertTrue(isinstance(out, backend.KerasTensor)) + self.assertTrue(instance.canary) + self.assertEqual(out.shape, (2, 3, 2)) diff --git a/keras_core/backend/torch/core.py b/keras_core/backend/torch/core.py index 3ea8051d9..d462f9436 100644 --- a/keras_core/backend/torch/core.py +++ b/keras_core/backend/torch/core.py @@ -214,21 +214,21 @@ def symbolic_call(fn, args, kwargs, fill_value): # which should give a "zero flop" way to trace shape, but does # not have universal support with torch operations. with device_scope("meta"): - args, kwargs = nest.map_structure( + meta_args, meta_kwargs = nest.map_structure( lambda x: convert_keras_tensor_to_torch(x, fill_value), (args, kwargs), ) - return fn(*args, **kwargs) + return fn(*meta_args, **meta_kwargs) except: with device_scope(get_default_device()): # If the `"meta"` device placement fails, fall back to tracing # eagerly with tensors on the default device. This will be # more robust, but more expensive. - args, kwargs = nest.map_structure( + eager_args, eager_kwargs = nest.map_structure( lambda x: convert_keras_tensor_to_torch(x, fill_value), (args, kwargs), ) - return fn(*args, **kwargs) + return fn(*eager_args, **eager_kwargs) with StatelessScope(): outputs = symbolic_call(fn, args, kwargs, fill_value=83) From c7b77b6a9333cd4a7755f801dc86a030c5963334 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Thu, 29 Jun 2023 14:01:24 -0700 Subject: [PATCH 05/13] Minor fixes --- keras_core/backend/common/compute_output_spec_test.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/keras_core/backend/common/compute_output_spec_test.py b/keras_core/backend/common/compute_output_spec_test.py index d1314be39..fa30301d3 100644 --- a/keras_core/backend/common/compute_output_spec_test.py +++ b/keras_core/backend/common/compute_output_spec_test.py @@ -5,7 +5,7 @@ def example_fn(x): - x = (x + 2) * backend.numpy.ones(x.shape) + x = (x + 2) * backend.numpy.ones_like(x) x = backend.numpy.stack([x, x], axis=-1) return x @@ -54,3 +54,11 @@ def example_meta_fn(self, x): self.assertTrue(isinstance(out, backend.KerasTensor)) self.assertTrue(instance.canary) self.assertEqual(out.shape, (2, 3, 2)) + + instance = Container() + out = backend.compute_output_spec( + instance.example_meta_fn, backend.KerasTensor((2, None)) + ) + self.assertTrue(isinstance(out, backend.KerasTensor)) + self.assertTrue(instance.canary) + self.assertEqual(out.shape, (2, None, 2)) From 55c3800fe42945f77562e969a6cbfa8737d6bcaa Mon Sep 17 00:00:00 2001 From: Tirth Patel Date: Thu, 29 Jun 2023 21:01:44 +0000 Subject: [PATCH 06/13] Use detach instead of requires_grad_ for the torch backend (#417) --- keras_core/backend/torch/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/keras_core/backend/torch/core.py b/keras_core/backend/torch/core.py index d462f9436..4082024af 100644 --- a/keras_core/backend/torch/core.py +++ b/keras_core/backend/torch/core.py @@ -345,4 +345,6 @@ def while_loop( def stop_gradient(variable): - return variable.requires_grad_(False) + # We can't use `.requires_grad_(False)` here since it only + # works when the tensor is a leaf node in the graph. + return variable.detach() From 3a9508d17262a4197a7f1b236d0518ea3e4f5f1e Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Thu, 29 Jun 2023 15:38:20 -0700 Subject: [PATCH 07/13] Minor fixes --- keras_core/backend/tensorflow/optimizer.py | 2 ++ keras_core/layers/layer.py | 3 +-- keras_core/optimizers/base_optimizer.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/keras_core/backend/tensorflow/optimizer.py b/keras_core/backend/tensorflow/optimizer.py index e736ef5fa..d80135c9b 100644 --- a/keras_core/backend/tensorflow/optimizer.py +++ b/keras_core/backend/tensorflow/optimizer.py @@ -86,6 +86,8 @@ def apply_grad_to_update_var(var, grad): return self.update_step(grad, var, learning_rate) for grad, var in grads_and_vars: + if isinstance(var, backend.Variable): + var = var.value distribution.extended.update( var, apply_grad_to_update_var, args=(grad,), group=False ) diff --git a/keras_core/layers/layer.py b/keras_core/layers/layer.py index afe7ee4b6..4bff5c1d7 100644 --- a/keras_core/layers/layer.py +++ b/keras_core/layers/layer.py @@ -19,7 +19,6 @@ import inspect import warnings -import numpy as np from tensorflow import nest from keras_core import backend @@ -896,7 +895,7 @@ def save_own_variables(self, store): """ all_vars = self._trainable_variables + self._non_trainable_variables for i, v in enumerate(all_vars): - store[f"{i}"] = np.array(v) + store[f"{i}"] = v.numpy() def load_own_variables(self, store): """Loads the state of the layer. diff --git a/keras_core/optimizers/base_optimizer.py b/keras_core/optimizers/base_optimizer.py index 6afb1f03b..a1765a3ac 100644 --- a/keras_core/optimizers/base_optimizer.py +++ b/keras_core/optimizers/base_optimizer.py @@ -326,7 +326,7 @@ def learning_rate(self, learning_rate): def save_own_variables(self, store): """Get the state of this optimizer object.""" for i, variable in enumerate(self.variables): - store[str(i)] = np.array(variable) + store[str(i)] = variable.numpy() def load_own_variables(self, store): """Set the state of this optimizer object.""" From 87b97e59ea46a51fc3899da3343048fbc3412e98 Mon Sep 17 00:00:00 2001 From: Tirth Patel Date: Fri, 30 Jun 2023 04:53:55 +0000 Subject: [PATCH 08/13] Fix the get_device() function for the torch backend (#418) --- keras_core/backend/torch/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keras_core/backend/torch/core.py b/keras_core/backend/torch/core.py index 4082024af..ee84c7870 100644 --- a/keras_core/backend/torch/core.py +++ b/keras_core/backend/torch/core.py @@ -46,7 +46,7 @@ def get_default_device(): def get_device(): device = global_state.get_global_attribute("torch_device", None) if device is None: - get_default_device() + return get_default_device() return device From 87c201de97275311be962a3080b55005ff4f909e Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Thu, 29 Jun 2023 21:54:35 -0700 Subject: [PATCH 09/13] Fix code format --- keras_core/optimizers/base_optimizer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/keras_core/optimizers/base_optimizer.py b/keras_core/optimizers/base_optimizer.py index a1765a3ac..5b1016901 100644 --- a/keras_core/optimizers/base_optimizer.py +++ b/keras_core/optimizers/base_optimizer.py @@ -1,8 +1,6 @@ import re import warnings -import numpy as np - from keras_core import backend from keras_core import initializers from keras_core import ops From 7698a97b4de8b192c270379e81682c5f06a0114c Mon Sep 17 00:00:00 2001 From: Chen Qian Date: Fri, 30 Jun 2023 13:14:13 -0700 Subject: [PATCH 10/13] Extend image classification benchmark to include Xception and ResNetV250 (#420) * Expand image classification benchmark * Extend image benchmark --------- Co-authored-by: chenmoneygithub --- .../efficient_net_image_classification_benchmark.py | 12 +++++++++--- .../layers/normalization/batch_normalization.py | 6 ++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/benchmarks/model_benchmark/efficient_net_image_classification_benchmark.py b/benchmarks/model_benchmark/efficient_net_image_classification_benchmark.py index 01a574155..c5670febe 100644 --- a/benchmarks/model_benchmark/efficient_net_image_classification_benchmark.py +++ b/benchmarks/model_benchmark/efficient_net_image_classification_benchmark.py @@ -19,8 +19,8 @@ from model_benchmark.benchmark_utils import BenchmarkMetricsCallback import keras_core as keras -from keras_core.applications import EfficientNetV2B0 +flags.DEFINE_string("model", "EfficientNetV2B0", "The model to benchmark.") flags.DEFINE_integer("epochs", 1, "The number of epochs.") flags.DEFINE_integer("batch_size", 4, "Batch Size.") flags.DEFINE_string( @@ -35,6 +35,12 @@ IMAGE_SIZE = (224, 224) CHANNELS = 3 +MODEL_MAP = { + "EfficientNetV2B0": keras.applications.EfficientNetV2B0, + "Xception": keras.applications.Xception, + "ResNet50V2": keras.applications.ResNet50V2, +} + def load_data(): # Load cats vs dogs dataset, and split into train and validation sets. @@ -55,7 +61,6 @@ def preprocess_inputs(image, label): preprocess_inputs, num_parallel_calls=tf.data.AUTOTUNE ) .batch(FLAGS.batch_size) - .cache() .prefetch(tf.data.AUTOTUNE) ) val_dataset = ( @@ -68,8 +73,9 @@ def preprocess_inputs(image, label): def load_model(): + model_class = MODEL_MAP[FLAGS.model] # Load the EfficientNetV2B0 model and add a classification head. - model = EfficientNetV2B0(include_top=False, weights="imagenet") + model = model_class(include_top=False, weights="imagenet") classifier = keras.models.Sequential( [ keras.Input([IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS]), diff --git a/keras_core/layers/normalization/batch_normalization.py b/keras_core/layers/normalization/batch_normalization.py index 4fe46a793..129aaaa95 100644 --- a/keras_core/layers/normalization/batch_normalization.py +++ b/keras_core/layers/normalization/batch_normalization.py @@ -221,8 +221,10 @@ def call(self, inputs, training=None, mask=None): ) ) else: - moving_mean = ops.reshape(self.moving_mean, broadcast_shape) - moving_variance = ops.reshape(self.moving_variance, broadcast_shape) + moving_mean = ops.cast(self.moving_mean, inputs.dtype) + moving_variance = ops.cast(self.moving_variance, inputs.dtype) + moving_mean = ops.reshape(moving_mean, broadcast_shape) + moving_variance = ops.reshape(moving_variance, broadcast_shape) outputs = (inputs - moving_mean) / ops.sqrt( moving_variance + self.epsilon ) From 67a6b7424ded9b70fd4393054cc3f2662587ebb6 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Fri, 30 Jun 2023 13:18:03 -0700 Subject: [PATCH 11/13] Commit distributed training guides --- guides/custom_train_step_in_torch.py | 2 - guides/distributed_training_with_jax.py | 0 .../distributed_training_with_tensorflow.py | 274 +++++++++++++++++ guides/distributed_training_with_torch.py | 275 ++++++++++++++++++ 4 files changed, 549 insertions(+), 2 deletions(-) create mode 100644 guides/distributed_training_with_jax.py create mode 100644 guides/distributed_training_with_tensorflow.py create mode 100644 guides/distributed_training_with_torch.py diff --git a/guides/custom_train_step_in_torch.py b/guides/custom_train_step_in_torch.py index 056ae3aba..25859019f 100644 --- a/guides/custom_train_step_in_torch.py +++ b/guides/custom_train_step_in_torch.py @@ -460,8 +460,6 @@ def train_step(self, real_images): Let's test-drive it: """ -keras.config.disable_traceback_filtering() - # Prepare the dataset. We use both the training & test MNIST digits. batch_size = 64 (x_train, _), (x_test, _) = keras.datasets.mnist.load_data() diff --git a/guides/distributed_training_with_jax.py b/guides/distributed_training_with_jax.py new file mode 100644 index 000000000..e69de29bb diff --git a/guides/distributed_training_with_tensorflow.py b/guides/distributed_training_with_tensorflow.py new file mode 100644 index 000000000..4f84188e1 --- /dev/null +++ b/guides/distributed_training_with_tensorflow.py @@ -0,0 +1,274 @@ +""" +Title: Multi-GPU distributed training with TensorFlow +Author: [fchollet](https://twitter.com/fchollet) +Date created: 2020/04/28 +Last modified: 2023/06/29 +Description: Guide to multi-GPU training for Keras models with TensorFlow. +Accelerator: GPU +""" +""" +## Introduction + +There are generally two ways to distribute computation across multiple devices: + +**Data parallelism**, where a single model gets replicated on multiple devices or +multiple machines. Each of them processes different batches of data, then they merge +their results. There exist many variants of this setup, that differ in how the different +model replicas merge results, in whether they stay in sync at every batch or whether they +are more loosely coupled, etc. + +**Model parallelism**, where different parts of a single model run on different devices, +processing a single batch of data together. This works best with models that have a +naturally-parallel architecture, such as models that feature multiple branches. + +This guide focuses on data parallelism, in particular **synchronous data parallelism**, +where the different replicas of the model stay in sync after each batch they process. +Synchronicity keeps the model convergence behavior identical to what you would see for +single-device training. + +Specifically, this guide teaches you how to use the `tf.distribute` API to train Keras +models on multiple GPUs, with minimal changes to your code, +on multiple GPUs (typically 2 to 16) installed on a single machine (single host, +multi-device training). This is the most common setup for researchers and small-scale +industry workflows. +""" + +""" +## Setup +""" + +import os + +os.environ["KERAS_BACKEND"] = "tensorflow" + +import tensorflow as tf +import keras_core as keras + +""" +## Single-host, multi-device synchronous training + +In this setup, you have one machine with several GPUs on it (typically 2 to 16). Each +device will run a copy of your model (called a **replica**). For simplicity, in what +follows, we'll assume we're dealing with 8 GPUs, at no loss of generality. + +**How it works** + +At each step of training: + +- The current batch of data (called **global batch**) is split into 8 different +sub-batches (called **local batches**). For instance, if the global batch has 512 +samples, each of the 8 local batches will have 64 samples. +- Each of the 8 replicas independently processes a local batch: they run a forward pass, +then a backward pass, outputting the gradient of the weights with respect to the loss of +the model on the local batch. +- The weight updates originating from local gradients are efficiently merged across the 8 +replicas. Because this is done at the end of every step, the replicas always stay in +sync. + +In practice, the process of synchronously updating the weights of the model replicas is +handled at the level of each individual weight variable. This is done through a **mirrored +variable** object. + +**How to use it** + +To do single-host, multi-device synchronous training with a Keras model, you would use +the [`tf.distribute.MirroredStrategy` API]( + https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy). +Here's how it works: + +- Instantiate a `MirroredStrategy`, optionally configuring which specific devices you +want to use (by default the strategy will use all GPUs available). +- Use the strategy object to open a scope, and within this scope, create all the Keras +objects you need that contain variables. Typically, that means **creating & compiling the +model** inside the distribution scope. In some cases, the first call to `fit()` may also +create variables, so it's a good idea to put your `fit()` call in the scope as well. +- Train the model via `fit()` as usual. + +Importantly, we recommend that you use `tf.data.Dataset` objects to load data +in a multi-device or distributed workflow. + +Schematically, it looks like this: + +```python +# Create a MirroredStrategy. +strategy = tf.distribute.MirroredStrategy() +print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + +# Open a strategy scope. +with strategy.scope(): + # Everything that creates variables should be under the strategy scope. + # In general this is only model construction & `compile()`. + model = Model(...) + model.compile(...) + + # Train the model on all available devices. + model.fit(train_dataset, validation_data=val_dataset, ...) + + # Test the model on all available devices. + model.evaluate(test_dataset) +``` + +Here's a simple end-to-end runnable example: +""" + + +def get_compiled_model(): + # Make a simple 2-layer densely-connected neural network. + inputs = keras.Input(shape=(784,)) + x = keras.layers.Dense(256, activation="relu")(inputs) + x = keras.layers.Dense(256, activation="relu")(x) + outputs = keras.layers.Dense(10)(x) + model = keras.Model(inputs, outputs) + model.compile( + optimizer=keras.optimizers.Adam(), + loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), + metrics=[keras.metrics.SparseCategoricalAccuracy()], + ) + return model + + +def get_dataset(): + batch_size = 32 + num_val_samples = 10000 + + # Return the MNIST dataset in the form of a `tf.data.Dataset`. + (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() + + # Preprocess the data (these are Numpy arrays) + x_train = x_train.reshape(-1, 784).astype("float32") / 255 + x_test = x_test.reshape(-1, 784).astype("float32") / 255 + y_train = y_train.astype("float32") + y_test = y_test.astype("float32") + + # Reserve num_val_samples samples for validation + x_val = x_train[-num_val_samples:] + y_val = y_train[-num_val_samples:] + x_train = x_train[:-num_val_samples] + y_train = y_train[:-num_val_samples] + return ( + tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch( + batch_size + ), + tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size), + tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size), + ) + + +# Create a MirroredStrategy. +strategy = tf.distribute.MirroredStrategy() +print("Number of devices: {}".format(strategy.num_replicas_in_sync)) + +# Open a strategy scope. +with strategy.scope(): + # Everything that creates variables should be under the strategy scope. + # In general this is only model construction & `compile()`. + model = get_compiled_model() + + # Train the model on all available devices. + train_dataset, val_dataset, test_dataset = get_dataset() + model.fit(train_dataset, epochs=2, validation_data=val_dataset) + + # Test the model on all available devices. + model.evaluate(test_dataset) + +""" +## Using callbacks to ensure fault tolerance + +When using distributed training, you should always make sure you have a strategy to +recover from failure (fault tolerance). The simplest way to handle this is to pass +`ModelCheckpoint` callback to `fit()`, to save your model +at regular intervals (e.g. every 100 batches or every epoch). You can then restart +training from your saved model. + +Here's a simple example: +""" + +# Prepare a directory to store all the checkpoints. +checkpoint_dir = "./ckpt" +if not os.path.exists(checkpoint_dir): + os.makedirs(checkpoint_dir) + + +def make_or_restore_model(): + # Either restore the latest model, or create a fresh one + # if there is no checkpoint available. + checkpoints = [ + checkpoint_dir + "/" + name for name in os.listdir(checkpoint_dir) + ] + if checkpoints: + latest_checkpoint = max(checkpoints, key=os.path.getctime) + print("Restoring from", latest_checkpoint) + return keras.models.load_model(latest_checkpoint) + print("Creating a new model") + return get_compiled_model() + + +def run_training(epochs=1): + # Create a MirroredStrategy. + strategy = tf.distribute.MirroredStrategy() + + # Open a strategy scope and create/restore the model + with strategy.scope(): + model = make_or_restore_model() + + callbacks = [ + # This callback saves a SavedModel every epoch + # We include the current epoch in the folder name. + keras.callbacks.ModelCheckpoint( + filepath=checkpoint_dir + "/ckpt-{epoch}.keras", + save_freq="epoch", + ) + ] + model.fit( + train_dataset, + epochs=epochs, + callbacks=callbacks, + validation_data=val_dataset, + verbose=2, + ) + + +# Running the first time creates the model +run_training(epochs=1) + +# Calling the same function again will resume from where we left off +run_training(epochs=1) + +""" +## `tf.data` performance tips + +When doing distributed training, the efficiency with which you load data can often become +critical. Here are a few tips to make sure your `tf.data` pipelines +run as fast as possible. + +**Note about dataset batching** + +When creating your dataset, make sure it is batched with the global batch size. +For instance, if each of your 8 GPUs is capable of running a batch of 64 samples, you +call use a global batch size of 512. + +**Calling `dataset.cache()`** + +If you call `.cache()` on a dataset, its data will be cached after running through the +first iteration over the data. Every subsequent iteration will use the cached data. The +cache can be in memory (default) or to a local file you specify. + +This can improve performance when: + +- Your data is not expected to change from iteration to iteration +- You are reading data from a remote distributed filesystem +- You are reading data from local disk, but your data would fit in memory and your +workflow is significantly IO-bound (e.g. reading & decoding image files). + +**Calling `dataset.prefetch(buffer_size)`** + +You should almost always call `.prefetch(buffer_size)` after creating a dataset. It means +your data pipeline will run asynchronously from your model, +with new samples being preprocessed and stored in a buffer while the current batch +samples are used to train the model. The next batch will be prefetched in GPU memory by +the time the current batch is over. +""" + +""" +That's it! +""" diff --git a/guides/distributed_training_with_torch.py b/guides/distributed_training_with_torch.py new file mode 100644 index 000000000..4ff00a78e --- /dev/null +++ b/guides/distributed_training_with_torch.py @@ -0,0 +1,275 @@ +""" +Title: Multi-GPU distributed training with PyTorch +Author: [fchollet](https://twitter.com/fchollet) +Date created: 2023/06/29 +Last modified: 2023/06/29 +Description: Guide to multi-GPU training for Keras models with PyTorch. +Accelerator: GPU +""" +""" +## Introduction + +There are generally two ways to distribute computation across multiple devices: + +**Data parallelism**, where a single model gets replicated on multiple devices or +multiple machines. Each of them processes different batches of data, then they merge +their results. There exist many variants of this setup, that differ in how the different +model replicas merge results, in whether they stay in sync at every batch or whether they +are more loosely coupled, etc. + +**Model parallelism**, where different parts of a single model run on different devices, +processing a single batch of data together. This works best with models that have a +naturally-parallel architecture, such as models that feature multiple branches. + +This guide focuses on data parallelism, in particular **synchronous data parallelism**, +where the different replicas of the model stay in sync after each batch they process. +Synchronicity keeps the model convergence behavior identical to what you would see for +single-device training. + +Specifically, this guide teaches you how to use the `tf.distribute` API to train Keras +models on multiple GPUs, with minimal changes to your code, +on multiple GPUs (typically 2 to 16) installed on a single machine (single host, +multi-device training). This is the most common setup for researchers and small-scale +industry workflows. +""" + +""" +## Setup + +Let's start by defining the function that creates the model that we will train, +and the function that creates the dataset we will train on (MNIST in this case). +""" + +import os + +os.environ["KERAS_BACKEND"] = "torch" + +import torch +import numpy as np +import keras_core as keras + + +def get_model(): + # Make a simple convnet with batch normalization and dropout. + inputs = keras.Input(shape=(28, 28, 1)) + x = keras.layers.Rescaling(1.0 / 255.0)(x) + x = keras.layers.Conv2D( + filters=12, kernel_size=3, padding="same", use_bias=False + )(x) + x = keras.layers.BatchNormalization(scale=False, center=True)(x) + x = keras.layers.ReLU()(x) + x = keras.layers.Conv2D( + filters=24, + kernel_size=6, + use_bias=False, + strides=2, + )(x) + x = keras.layers.BatchNormalization(scale=False, center=True)(x) + x = keras.layers.ReLU()(x) + x = keras.layers.Conv2D( + filters=32, + kernel_size=6, + padding="same", + strides=2, + name="large_k", + )(x) + x = keras.layers.BatchNormalization(scale=False, center=True)(x) + x = keras.layers.ReLU()(x) + x = keras.layers.GlobalAveragePooling2D()(x) + x = keras.layers.Dense(256, activation="relu")(x) + x = keras.layers.Dropout(0.5)(x) + outputs = keras.layers.Dense(10)(x) + model = keras.Model(inputs, outputs) + return model + + +def get_dataset(): + # Load the data and split it between train and test sets + (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() + + # Scale images to the [0, 1] range + x_train = x_train.astype("float32") + x_test = x_test.astype("float32") + # Make sure images have shape (28, 28, 1) + x_train = np.expand_dims(x_train, -1) + x_test = np.expand_dims(x_test, -1) + print("x_train shape:", x_train.shape) + print(x_train.shape[0], "train samples") + print(x_test.shape[0], "test samples") + + # Create a TensorDataset + dataset = torch.utils.data.TensorDataset( + torch.from_numpy(x_train), torch.from_numpy(y_train) + ) + return dataset + + +""" +Next, let's define a simple PyTorch training loop that targets +a GPU (note the calls to `.cuda()`). +""" + + +def train_model(model, dataloader, num_epochs, optimizer, loss_fn): + for epoch in range(num_epochs): + running_loss = 0.0 + for batch_idx, (inputs, targets) in enumerate(dataloader): + inputs = inputs.cuda(non_blocking=True) + targets = targets.cuda(non_blocking=True) + + # Forward pass + outputs = model(inputs) + loss = loss_fn(outputs, targets) + + # Backward and optimize + optimizer.zero_grad() + loss.backward() + optimizer.step() + + running_loss += loss.item() + + # Print loss statistics + if (batch_idx + 1) % 10 == 0: + print( + f"Epoch {epoch + 1}/{num_epochs}, " + f"Batch {batch_idx + 1}/{len(dataloader)}, " + f"Loss: {running_loss / 10}" + ) + running_loss = 0.0 + + +""" +## Single-host, multi-device synchronous training + +In this setup, you have one machine with several GPUs on it (typically 2 to 16). Each +device will run a copy of your model (called a **replica**). For simplicity, in what +follows, we'll assume we're dealing with 8 GPUs, at no loss of generality. + +**How it works** + +At each step of training: + +- The current batch of data (called **global batch**) is split into 8 different +sub-batches (called **local batches**). For instance, if the global batch has 512 +samples, each of the 8 local batches will have 64 samples. +- Each of the 8 replicas independently processes a local batch: they run a forward pass, +then a backward pass, outputting the gradient of the weights with respect to the loss of +the model on the local batch. +- The weight updates originating from local gradients are efficiently merged across the 8 +replicas. Because this is done at the end of every step, the replicas always stay in +sync. + +In practice, the process of synchronously updating the weights of the model replicas is +handled at the level of each individual weight variable. This is done through a **mirrored +variable** object. + +**How to use it** + +To do single-host, multi-device synchronous training with a Keras model, you would use +the `torch.nn.parallel.DistributedDataParallel` module wrapper. +Here's how it works: + +- We use `torch.multiprocessing.spawn` to spawn multiple Python processes, one +per device. Each process will run the `per_device_launch_fn` function. +- The `per_device_launch_fn` function does the following: + - It uses `torch.distributed.dist.init_process_group` and `torch.cuda.set_device` + to configure the device to be used for that process. + - It uses `torch.utils.data.distributed.DistributedSampler` + and `torch.utils.data.DataLoader` to turn our data into a distributed data loader. + - It also uses `torch.nn.parallel.DistributedDataParallel` to turn our model into + a distributed PyTorch module. + - It then calls the `train_model` function. +- The `train_model` function will then run in each process, with the model using +a separate device in each process. + +Here's the flow, where each step is split into its own utility function: +""" + +# Config +num_gpu = torch.cuda.device_count() +num_epochs = 2 +batch_size = 64 +print(f"Running on {num_gpu} GPUs") + + +def setup_device(current_gpu_index, num_gpus): + # Device setup + os.environ["MASTER_ADDR"] = "keras-core-torch" + os.environ["MASTER_PORT"] = "56492" + device = torch.device("cuda:{}".format(current_gpu_index)) + torch.distributed.dist.init_process_group( + backend="nccl", + init_method="env://", + world_size=num_gpus, + rank=current_gpu_index, + ) + torch.cuda.set_device(device) + + +def cleanup(): + torch.distributed.dist.dist.destroy_process_group() + + +def prepare_dataloader(dataset, current_gpu_index, num_gpus, batch_size): + sampler = torch.utils.data.distributed.DistributedSampler( + dataset, + num_replicas=num_gpus, + rank=current_gpu_index, + shuffle=False, + ) + dataloader = torch.utils.data.DataLoader( + dataset, + sampler=sampler, + batch_size=batch_size, + shuffle=False, + ) + return dataloader + + +def per_device_launch_fn(current_gpu_index, num_gpu): + # Setup the process groups + setup_device(current_gpu_index, num_gpu) + + ################################################################# + ######## Writing a torch training loop for a Keras model ######## + ################################################################# + + dataset = get_dataset() + model = get_model() + + # prepare the dataloader + dataloader = prepare_dataloader( + dataset, current_gpu_index, num_gpu, batch_size + ) + + # Instantiate the torch optimizer + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + + # Instantiate the torch loss function + loss_fn = torch.nn.CrossEntropyLoss() + + # Put model on device + model = model.to(current_gpu_index) + ddp_model = torch.nn.parallel.DistributedDataParallel( + model, device_ids=[current_gpu_index], output_device=current_gpu_index + ) + + train_model(ddp_model, dataloader, num_epochs, optimizer, loss_fn) + + cleanup() + + +""" +Time to spawn: +""" + +torch.multiprocessing.spawn( + per_device_launch_fn, + args=(num_gpu,), + nprocs=num_gpu, + join=True, +) + +""" +That's it! +""" From befaa987c7c4a28c65748dbf017fdf4cf9b8dcfb Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Fri, 30 Jun 2023 13:23:39 -0700 Subject: [PATCH 12/13] Touchups --- guides/distributed_training_with_torch.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/guides/distributed_training_with_torch.py b/guides/distributed_training_with_torch.py index 4ff00a78e..9e2212766 100644 --- a/guides/distributed_training_with_torch.py +++ b/guides/distributed_training_with_torch.py @@ -26,7 +26,8 @@ Synchronicity keeps the model convergence behavior identical to what you would see for single-device training. -Specifically, this guide teaches you how to use the `tf.distribute` API to train Keras +Specifically, this guide teaches you how to use PyTorch's `DistributedDataParallel` +module wrapper to train Keras models on multiple GPUs, with minimal changes to your code, on multiple GPUs (typically 2 to 16) installed on a single machine (single host, multi-device training). This is the most common setup for researchers and small-scale @@ -230,10 +231,6 @@ def per_device_launch_fn(current_gpu_index, num_gpu): # Setup the process groups setup_device(current_gpu_index, num_gpu) - ################################################################# - ######## Writing a torch training loop for a Keras model ######## - ################################################################# - dataset = get_dataset() model = get_model() From 7ee847dba126d785366abdf962db37ee0154d137 Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Fri, 30 Jun 2023 13:24:05 -0700 Subject: [PATCH 13/13] Touchups --- guides/distributed_training_with_torch.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/guides/distributed_training_with_torch.py b/guides/distributed_training_with_torch.py index 9e2212766..0e3374486 100644 --- a/guides/distributed_training_with_torch.py +++ b/guides/distributed_training_with_torch.py @@ -27,8 +27,7 @@ single-device training. Specifically, this guide teaches you how to use PyTorch's `DistributedDataParallel` -module wrapper to train Keras -models on multiple GPUs, with minimal changes to your code, +module wrapper to train Keras, with minimal changes to your code, on multiple GPUs (typically 2 to 16) installed on a single machine (single host, multi-device training). This is the most common setup for researchers and small-scale industry workflows.