Skip to content

Commit

Permalink
add the example of fl, fcil and sedna_fl example
Browse files Browse the repository at this point in the history
Signed-off-by: Marchons <[email protected]>
  • Loading branch information
Marchons committed Sep 8, 2024
1 parent 3ef8cc1 commit 885f77d
Show file tree
Hide file tree
Showing 51 changed files with 753 additions and 6 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import abc
from copy import deepcopy
from typing import List

import numpy as np
import tensorflow as tf
from sedna.algorithms.aggregation.aggregation import BaseAggregation
from sedna.common.class_factory import ClassType, ClassFactory

@ClassFactory.register(ClassType.FL_AGG, "FedAvg")
class FedAvg(BaseAggregation, abc.ABC):
def __init__(self):
super(FedAvg, self).__init__()
"""
Federated averaging algorithm
"""

def aggregate(self, clients:List):
"""
Calculate the average weight according to the number of samples
Parameters
----------
clients: List
All clients in federated learning job
Returns
-------
update_weights : Array-like
final weights use to update model layer
"""


print("aggregation....")
if not len(clients):
return self.weights
self.total_size = sum([c.num_samples for c in clients])
# print(next(iter(clients)).weights)
old_weight = [np.zeros(np.array(c).shape) for c in
next(iter(clients)).weights]
updates = []
for inx, row in enumerate(old_weight):
for c in clients:
row += (np.array(c.weights[inx]) * c.num_samples
/ self.total_size)
updates.append(row.tolist())
self.weights = deepcopy(updates)
print("finish aggregation....")
return updates
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
algorithm:
# paradigm name; string type;
# currently the options of value are as follows:
# 1> "singletasklearning"
# 2> "incrementallearning"
paradigm_type: "federatedclassincrementallearning"
fl_data_setting:
# ratio of training dataset; float type;
# the default value is 0.8.
train_ratio: 1.0
# the method of splitting dataset; string type; optional;
# currently the options of value are as follows:
# 1> "default": the dataset is evenly divided based train_ratio;
splitting_method: "default"
label_data_ratio: 1.0
data_partition: "iid"
# the url address of initial network for network pre-training; string url;
# the url address of initial network; string type; optional;
initial_model_url: "/home/wyd/ianvs/project/init_model/cnn.pb"
# algorithm module configuration in the paradigm; list type;
# incremental rounds setting of incremental learning; int type; default value is 2;

modules:
# kind of algorithm module; string type;
# currently the options of value are as follows:
# 1> "basemodel"
- type: "basemodel"
# name of python module; string type;
# example: basemodel.py has BaseModel module that the alias is "FPN" for this benchmarking;
name: "fcil"
# the url address of python module; string type;
url: "./examples/cifar100/federated_class_incremental_learning/fedavg/algorithm/basemodel.py"

# hyperparameters configuration for the python module; list type;
hyperparameters:
# name of the hyperparameter; string type;
- batch_size:
values:
- 32
- learning_rate:
values:
- 0.001
- epochs:
values:
- 1
- type: "aggregation"
name: "FedAvg"
url: "./examples/cifar100/federated_class_incremental_learning/fedavg/algorithm/aggregation.py"

Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import zipfile
import logging
import keras
import numpy as np
import tensorflow as tf
from keras import Sequential
from keras.src.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense
from sedna.common.class_factory import ClassType, ClassFactory
from resnet import resnet10
from network import NetWork, incremental_learning
__all__ = ["BaseModel"]
os.environ['BACKEND_TYPE'] = 'KERAS'
logging.getLogger().setLevel(logging.INFO)


@ClassFactory.register(ClassType.GENERAL, alias='fcil')
class BaseModel:
def __init__(self, **kwargs):
self.kwargs = kwargs
print(f"kwargs: {kwargs}")
self.batch_size = kwargs.get('batch_size', 1)
print(f"batch_size: {self.batch_size}")
self.epochs = kwargs.get('epochs', 1)
self.lr = kwargs.get('lr', 0.001)
self.optimizer = keras.optimizers.SGD(learning_rate=self.lr)
self.old_task_id = -1
self.fe = resnet10(10)
logging.info(type(self.fe))
self.model = NetWork(100, self.fe)
self._init_model()



def _init_model(self):
self.model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
x = np.random.rand(1, 32, 32, 3)
y = np.random.randint(0, 10, 1)

self.model.fit(x, y, epochs=1)

def load(self, model_url=None):
logging.info(f"load model from {model_url}")
extra_model_path = os.path.basename(model_url) + "/model"
with zipfile.ZipFile(model_url, 'r') as zip_ref:
zip_ref.extractall(extra_model_path)
self.model = tf.saved_model.load(extra_model_path)

def _initialize(self):
logging.info(f"initialize finished")

def get_weights(self):
logging.info(f"get_weights")
weights = [layer.tolist() for layer in self.model.get_weights()]
logging.info(len(weights))
return weights

def set_weights(self, weights):
weights = [np.array(layer) for layer in weights]
self.model.set_weights(weights)
logging.info("----------finish set weights-------------")

def save(self, model_path=""):
logging.info("save model")

def model_info(self, model_path, result, relpath):
logging.info("model info")
return {}



def train(self, train_data, valid_data, **kwargs):
round = kwargs.get("round", -1)
task_id = kwargs.get("task_id", -1)
task_size = kwargs.get("task_size", 10)
self.model.compile(optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
logging.info(f"train data: {train_data[0].shape} {train_data[1].shape}")
train_db = self.data_process(train_data)
logging.info(train_db)
for epoch in range(self.epochs):
total_loss = 0
total_num = 0
logging.info(f"Epoch {epoch + 1} / {self.epochs}")
logging.info("-" * 50)
for x, y in train_db:
# self.model.fit(x, y, batch_size=self.batch_size)
with tf.GradientTape() as tape:
logits = self.model(x, training=True)
loss = tf.reduce_mean(keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True))
grads = tape.gradient(loss, self.model.trainable_variables)
self.optimizer.apply(grads, self.model.trainable_variables)
# self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
total_loss += loss
total_num += 1

logging.info(f"train round {round}: Epoch {epoch + 1} avg loss: {total_loss / total_num}")
logging.info(f"finish round {round} train")
self.eval(train_data, round)
return {"num_samples": train_data[0].shape[0]}

def predict(self, data, **kwargs):
result = {}
for data in data.x:
x = np.load(data)
logits = self.model(x, training=False)
pred = tf.cast(tf.argmax(logits, axis=1), tf.int32)
result[data] = pred.numpy()
logging.info("finish predict")
return result

def eval(self, data, round, **kwargs):
total_num = 0
total_correct = 0
data = self.data_process(data)
# print(f"in evalute data: {data}")
for i, (x, y) in enumerate(data):
logits = self.model(x, training=False)
# prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(logits, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
pred = tf.reshape(pred, y.shape)
# print(pred.shape, y.shape)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
logging.info(f"total_correct: {total_correct}, total_num: {total_num}")
acc = total_correct / total_num
del total_correct
logging.info(f"finsih round {round}evaluate, acc: {acc}")
return acc

def data_process(self, data, **kwargs):

assert data is not None, "data is None"
# data[0]'shape = (50000, 32,32,3) data[1]'shape = (50000,1)
return tf.data.Dataset.from_tensor_slices((data[0], data[1])).shuffle(100000).batch(self.batch_size)
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import keras
import tensorflow as tf
import numpy as np
from keras.src.layers import Dense
from resnet import resnet10


class NetWork(keras.Model):
def __init__(self, num_classes, feature_extractor):
super(NetWork, self).__init__()
self.num_classes = num_classes
self.feature = feature_extractor
self.fc = Dense(num_classes, activation='softmax')

def call(self, inputs):
# print(type(self.feature))
x = self.feature(inputs)
x = self.fc(x)
return x

def feature_extractor(self, inputs):
return self.feature.predict(inputs)

def predict(self, fea_input):
return self.fc(fea_input)

def get_config(self):
return {
'num_classes': self.num_classes,
'feature_extractor': self.feature,
}

@classmethod
def from_config(cls, config):
return cls(**config)




def incremental_learning(old_model:NetWork, num_class):
new_model = NetWork(num_class, resnet10(num_class) )
x = np.random.rand(1, 32, 32, 3)
y = np.random.randint(0, num_class, 1)
new_model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
new_model.fit(x, y, epochs=1)
print(old_model.fc.units, new_model.fc.units)
for layer in old_model.layers:
if hasattr(new_model.feature, layer.name):
new_model.feature.__setattr__(layer.name, layer)
if num_class > old_model.fc.units:
original_use_bias = hasattr(old_model.fc, 'bias')
print("original_use_bias", original_use_bias)
init_class = old_model.fc.units
k = new_model.fc.kernel
new_model.fc.kernel.assign(tf.pad(old_model.fc.kernel,
[[0, 0], [0, num_class - init_class]])) # 假设初始类别数为10
if original_use_bias:
new_model.fc.bias.assign(tf.pad(old_model.fc.bias,
[[0, num_class - init_class]]))
new_model.build((None, 32, 32, 3))
return new_model

def copy_model(model: NetWork):
cfg = model.get_config()

copy_model = model.from_config(cfg)
return copy_model

Loading

0 comments on commit 885f77d

Please sign in to comment.