diff --git a/recognition/ArcFace/dali_image_iter.py b/recognition/ArcFace/dali_image_iter.py new file mode 100644 index 000000000..700869c75 --- /dev/null +++ b/recognition/ArcFace/dali_image_iter.py @@ -0,0 +1,112 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import random +import logging +import sys +import numbers +import math +import sklearn +import datetime +import numpy as np +import cv2 + +import mxnet as mx +from mxnet import ndarray as nd +from mxnet import io +from mxnet import recordio + +from nvidia.dali.pipeline import Pipeline +import nvidia.dali.ops as ops +import nvidia.dali.types as types +from nvidia.dali.plugin.mxnet import DALIClassificationIterator + + +logger = logging.getLogger() + + +class HybridTrainPipe(Pipeline): + # TODO: 这里还要添加个数据增强,dali提供了很多基础的数据增强方式:https://docs.nvidia.com/deeplearning/dali/user-guide/docs/supported_ops.html?highlight=ops#support-table + # 因为太懒(菜)这里只添加了 random_mirror + def __init__(self, path_imgrec, batch_size, num_threads, device_id, num_gpus, initial_fill): + ''' + initial_fill: 太大会占用内存,太小导致单个 batch id 重复率高而 loss 下降太慢,测试了下 batch_size*1000 基本不影响到训练 + num_threads: 经测试,单核3.5GHz的U,hhd设置为3~4,ssd设置为5~6 + ''' + super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id, seed = 12 + device_id) + logging.info('loading recordio %s...', path_imgrec) + path_imgidx = path_imgrec[0:-4] + ".idx" + self.input = ops.MXNetReader(path = [path_imgrec], index_path=[path_imgidx], + random_shuffle = True, shard_id = device_id, num_shards = num_gpus, + prefetch_queue_depth = 5, initial_fill = initial_fill) + self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB) + self.res = ops.Resize(device="gpu", resize_x=112, resize_y=112) + self.rrc = ops.RandomResizedCrop(device = "gpu", size = (112, 112)) + # self.cmnp = ops.CropMirrorNormalize(device = "gpu", + # dtype = types.FLOAT, + # output_layout = types.NCHW, + # mean = [0.485 * 255,0.456 * 255,0.406 * 255], + # std = [0.229 * 255,0.224 * 255,0.225 * 255]) + self.cmnp = ops.CropMirrorNormalize(device = "gpu", + dtype = types.FLOAT, + output_layout = types.NCHW) + self.coin = ops.CoinFlip(probability = 0.5) + + + def define_graph(self): + rng = self.coin() + self.jpegs, self.labels = self.input(name = "Reader") + # TODO: 这部分是问题最大的地方,原始的.rec开始和结尾都记录着其他信息, + # 一旦读到空图像会 raise RuntimeError,并提示 'pipline broken',无法 reset pipline, + # 尝试了加 try 啥的都不行,大佬看看有没有啥解决方案 + images = self.decode(self.jpegs) + images = self.res(images) + output = self.cmnp(images, mirror = rng) + return [output, self.labels] + + +if __name__ == "__main__": + path_imgrec = '/home/ps/data/src_data/glint360k/train.rec' + batch_size = 128 + N = 4 + # 多卡测试,速度和单卡一样,也是18000samples/s,可能主要卡在 SSD 读取速度上了,2080Ti GPU占用20%左右 + # 测试 HHD 8000 samples/s, SSD 18000 samples/s + # trainpipes = [HybridTrainPipe(path_imgidx, path_imgrec, batch_size=batch_size, num_threads=6, device_id = i, num_gpus = N) for i in range(N)] + # htp = trainpipes[0] + # 单卡测试 + htp = HybridTrainPipe(path_imgrec, batch_size, 6, device_id = 0, num_gpus = N, initial_fill = batch_size) + trainpipes = [htp] + + htp.build() + print("Training pipeline epoch size: {}".format(htp.epoch_size("Reader"))) + dali_train_iter = DALIClassificationIterator(trainpipes, htp.epoch_size("Reader")) + print([dali_train_iter.provide_data[0][:2]], [dali_train_iter.provide_label[0][:2]]) + import time + time_start = time.time() + batch_num = 0 + while True: + batch = dali_train_iter.next() + batch_num += 1 + # # print("batch num:", len(batch)) + # # # print("batch:", batch[0].asnumpy()) + # # print("elem num:", len(batch[0].data)) + # # print("image num:", batch[0].data[0].shape) + # # print("label num:", batch[0].label[0].shape) + # 查看图像结果 + # for image, label in zip(batch[0].data[0], batch[0].label[0]): + # # image = elem.data[0][0] + # # label = elem.data[0][1] + # # print(image) + # print(image.shape) + # print(label.asnumpy) + # cv2.imshow("image", image.asnumpy()) + # cv2.waitKey(0) + + time_now = time.time() + if time_now - time_start > 1 and batch_num > 0: + print("\r{:.2f} samples/s".format(batch_num*batch_size/(time_now - time_start)), end='') + batch_num = 0 + time_start = time_now + diff --git a/recognition/ArcFace/dali_parall_module_local_v1.py b/recognition/ArcFace/dali_parall_module_local_v1.py new file mode 100644 index 000000000..657ca54b5 --- /dev/null +++ b/recognition/ArcFace/dali_parall_module_local_v1.py @@ -0,0 +1,628 @@ +''' +@author: insightface +''' + +import logging +import copy +import time +import os + +import mxnet as mx +import numpy as np +from mxnet import context as ctx +from mxnet.initializer import Uniform +from mxnet.module.base_module import BaseModule +from mxnet.module.module import Module +from mxnet import metric +from mxnet.model import BatchEndParam +from mxnet import io +import mxnet.ndarray as nd +from config import config + + +class ParallModule(BaseModule): + def __init__(self, + symbol, + data_names, + label_names, + logger=logging, + context=ctx.cpu(), + work_load_list=None, + asymbol=None, + args=None): + super(ParallModule, self).__init__(logger=logger) + self._symbol = symbol + self._asymbol = asymbol + self._data_names = data_names + self._label_names = label_names + self._context = context + self._work_load_list = work_load_list + self._num_classes = config.num_classes + self._batch_size = args.batch_size + self._verbose = args.verbose + self._emb_size = config.emb_size + self._local_class_start = args.local_class_start + self._iter = 0 + + self._curr_module = None + + self._num_workers = config.num_workers + self._num_ctx = len(self._context) + self._ctx_num_classes = args.ctx_num_classes + self._nd_cache = {} + self._ctx_cpu = mx.cpu() + self._ctx_single_gpu = self._context[-1] + self._fixed_param_names = None + self._curr_module = Module(self._symbol, + self._data_names, + self._label_names, + logger=self.logger, + context=self._context, + work_load_list=self._work_load_list, + fixed_param_names=self._fixed_param_names) + self._arcface_modules = [] + self._ctx_class_start = [] + for i in range(len(self._context)): + + args._ctxid = i + _module = Module(self._asymbol(args), + self._data_names, + self._label_names, + logger=self.logger, + context=mx.gpu(i), + work_load_list=self._work_load_list, + fixed_param_names=self._fixed_param_names) + self._arcface_modules.append(_module) + _c = args.local_class_start + i * args.ctx_num_classes + self._ctx_class_start.append(_c) + self._usekv = False + if self._usekv: + self._distkv = mx.kvstore.create('dist_sync') + self._kvinit = {} + + def _reset_bind(self): + self.binded = False + self._curr_module = None + + @property + def data_names(self): + return self._data_names + + @property + def output_names(self): + return self._symbol.list_outputs() + + @property + def data_shapes(self): + assert self.binded + return self._curr_module.data_shapes + + @property + def label_shapes(self): + assert self.binded + return self._curr_module.label_shapes + + @property + def output_shapes(self): + assert self.binded + return self._curr_module.output_shapes + + def get_export_params(self): + assert self.binded and self.params_initialized + _g, _x = self._curr_module.get_params() + g = _g.copy() + x = _x.copy() + return g, x + + def get_params(self): + assert self.binded and self.params_initialized + _g, _x = self._curr_module.get_params() + g = _g.copy() + x = _x.copy() + for _module in self._arcface_modules: + _g, _x = _module.get_params() + ag = _g.copy() + ax = _x.copy() + g.update(ag) + x.update(ax) + return g, x + + def set_params(self, + arg_params, + aux_params, + allow_missing=False, + force_init=True, + allow_extra=False): + g = arg_params + x = aux_params + #ag = {} + #ax = {} + rk = [] + for k in g: + v = g[k] + if k.startswith('fc7'): + p1 = k.find('_') + p2 = k.rfind('_') + _ctxid = int(k[p1 + 1:p2]) + self._arcface_modules[_ctxid].set_params({k: v}, {}) + rk.append(k) + for k in rk: + del g[k] + self._curr_module.set_params(g, x) + #self._arcface_module.set_params(ag, ax) + + def init_params(self, + initializer=Uniform(0.01), + arg_params=None, + aux_params=None, + allow_missing=False, + force_init=False, + allow_extra=False): + if self.params_initialized and not force_init: + return + assert self.binded, 'call bind before initializing the parameters' + #TODO init the same weights with all work nodes + self._curr_module.init_params(initializer=initializer, + arg_params=arg_params, + aux_params=aux_params, + allow_missing=allow_missing, + force_init=force_init, + allow_extra=allow_extra) + for _module in self._arcface_modules: + #_initializer = initializer + _initializer = mx.init.Normal(0.01) + _module.init_params(initializer=_initializer, + arg_params=None, + aux_params=None, + allow_missing=allow_missing, + force_init=force_init, + allow_extra=allow_extra) + self.params_initialized = True + + def bind(self, + data_shapes, + label_shapes=None, + for_training=True, + inputs_need_grad=False, + force_rebind=False, + shared_module=None): + print('in_bind', self.params_initialized, data_shapes, label_shapes) + if self.params_initialized: + arg_params, aux_params = self.get_params() + + # force rebinding is typically used when one want to switch from + # training to prediction phase. + if force_rebind: + self._reset_bind() + + if self.binded: + self.logger.warning('Already binded, ignoring bind()') + return + + assert shared_module is None, 'shared_module for MutableModule is not supported' + self.for_training = for_training + self.inputs_need_grad = inputs_need_grad + self.binded = True + self._curr_module.bind(data_shapes, + label_shapes, + for_training, + inputs_need_grad, + force_rebind=False, + shared_module=None) + _data_shape = data_shapes[0][1] + print('_data_shape', _data_shape, label_shapes) + for _module in self._arcface_modules: + _module.bind( + [('data', + (_data_shape[0] * self._num_workers, self._emb_size))], + [('softmax_label', (_data_shape[0] * self._num_workers, ))], + for_training, + True, + force_rebind=False, + shared_module=None) + if self.params_initialized: + self.set_params(arg_params, aux_params) + + def init_optimizer(self, + kvstore='local', + optimizer='sgd', + optimizer_params=(('learning_rate', 0.01), ), + force_init=False): + assert self.binded and self.params_initialized + if self.optimizer_initialized and not force_init: + self.logger.warning('optimizer already initialized, ignoring.') + return + + self._curr_module.init_optimizer(kvstore, + optimizer, + optimizer_params, + force_init=force_init) + for _module in self._arcface_modules: + _module.init_optimizer(kvstore, + optimizer, + optimizer_params, + force_init=force_init) + self.optimizer_initialized = True + + def kv_push(self, key, value): + #if value.context!=mx.cpu(): + # value = value.as_in_context(mx.cpu()) + if not key in self._kvinit: + self._distkv.init(key, nd.zeros_like(value)) + self._kvinit[key] = 1 + self._distkv.push(key, value) + + #get fc1 and partial fc7 + def forward(self, data_batch, is_train=None): + #g,x = self.get_params() + #print('{fc7_weight[0][0]}', self._iter, g['fc7_0_weight'].asnumpy()[0][0]) + #print('{pre_fc1_weight[0][0]}', self._iter, g['pre_fc1_weight'].asnumpy()[0][0]) + + assert self.binded and self.params_initialized + self._curr_module.forward(data_batch, is_train=is_train) + if is_train: + self._iter += 1 + fc1, label = self._curr_module.get_outputs( + merge_multi_context=True) + global_fc1 = fc1 + self.global_label = label.as_in_context(self._ctx_cpu) + + for i, _module in enumerate(self._arcface_modules): + _label = self.global_label - self._ctx_class_start[i] + db_global_fc1 = io.DataBatch([global_fc1], [_label]) + _module.forward(db_global_fc1) #fc7 with margin + #print('forward end') + + def get_ndarray(self, context, name, shape): + key = "%s_%s" % (name, context) + #print(key) + if not key in self._nd_cache: + v = nd.zeros(shape=shape, ctx=context) + self._nd_cache[key] = v + else: + v = self._nd_cache[key] + return v + + def get_ndarray2(self, context, name, arr): + key = "%s_%s" % (name, context) + #print(key) + if not key in self._nd_cache: + v = nd.zeros(shape=arr.shape, ctx=context) + self._nd_cache[key] = v + else: + v = self._nd_cache[key] + arr.copyto(v) + return v + + def backward(self, out_grads=None): + #print('in backward') + assert self.binded and self.params_initialized + #tmp_ctx = self._ctx_cpu + tmp_ctx = self._ctx_single_gpu + fc7_outs = [] + ctx_fc7_max = self.get_ndarray(tmp_ctx, 'ctx_fc7_max', + (self._batch_size, len(self._context))) + #local_fc7_max = nd.zeros( (self.global_label.shape[0],1), ctx=mx.cpu()) + for i, _module in enumerate(self._arcface_modules): + _fc7 = _module.get_outputs(merge_multi_context=True)[0] + fc7_outs.append(_fc7) + _fc7_max = nd.max(_fc7, axis=1).as_in_context(tmp_ctx) + ctx_fc7_max[:, i] = _fc7_max + + local_fc7_max = self.get_ndarray(tmp_ctx, 'local_fc7_max', + (self._batch_size, 1)) + nd.max(ctx_fc7_max, axis=1, keepdims=True, out=local_fc7_max) + global_fc7_max = local_fc7_max + #local_fc7_sum = None + local_fc7_sum = self.get_ndarray(tmp_ctx, 'local_fc7_sum', + (self._batch_size, 1)) + local_fc7_sum[:, :] = 0.0 + for i, _module in enumerate(self._arcface_modules): + _max = self.get_ndarray2(fc7_outs[i].context, 'fc7_max', + global_fc7_max) + fc7_outs[i] = nd.broadcast_sub(fc7_outs[i], _max) + fc7_outs[i] = nd.exp(fc7_outs[i]) + _sum = nd.sum(fc7_outs[i], axis=1, + keepdims=True).as_in_context(tmp_ctx) + local_fc7_sum += _sum + global_fc7_sum = local_fc7_sum + + if self._iter % self._verbose == 0: + #_ctx = self._context[-1] + _ctx = self._ctx_cpu + _probs = [] + for i, _module in enumerate(self._arcface_modules): + _prob = self.get_ndarray2(_ctx, '_fc7_prob_%d' % i, + fc7_outs[i]) + _probs.append(_prob) + fc7_prob = self.get_ndarray( + _ctx, 'test_fc7_prob', + (self._batch_size, self._ctx_num_classes * len(self._context))) + nd.concat(*_probs, dim=1, out=fc7_prob) + fc7_pred = nd.argmax(fc7_prob, axis=1) + local_label = self.global_label - self._local_class_start + #local_label = self.get_ndarray2(_ctx, 'test_label', local_label) + _pred = nd.equal(fc7_pred, local_label) + print('{fc7_acc}', self._iter, nd.mean(_pred).asnumpy()[0]) + + #local_fc1_grad = [] + #fc1_grad_ctx = self._ctx_cpu + fc1_grad_ctx = self._ctx_single_gpu + local_fc1_grad = self.get_ndarray(fc1_grad_ctx, 'local_fc1_grad', + (self._batch_size, self._emb_size)) + local_fc1_grad[:, :] = 0.0 + + for i, _module in enumerate(self._arcface_modules): + _sum = self.get_ndarray2(fc7_outs[i].context, 'fc7_sum', + global_fc7_sum) + fc7_outs[i] = nd.broadcast_div(fc7_outs[i], _sum) + a = i * self._ctx_num_classes + b = (i + 1) * self._ctx_num_classes + _label = self.global_label - self._ctx_class_start[i] + _label = self.get_ndarray2(fc7_outs[i].context, 'label', _label) + onehot_label = self.get_ndarray( + fc7_outs[i].context, 'label_onehot', + (self._batch_size, self._ctx_num_classes)) + nd.one_hot(_label, + depth=self._ctx_num_classes, + on_value=1.0, + off_value=0.0, + out=onehot_label) + fc7_outs[i] -= onehot_label + _module.backward(out_grads=[fc7_outs[i]]) + #ctx_fc1_grad = _module.get_input_grads()[0].as_in_context(mx.cpu()) + ctx_fc1_grad = self.get_ndarray2(fc1_grad_ctx, + 'ctx_fc1_grad_%d' % i, + _module.get_input_grads()[0]) + local_fc1_grad += ctx_fc1_grad + + global_fc1_grad = local_fc1_grad + self._curr_module.backward(out_grads=[global_fc1_grad]) + + def update(self): + assert self.binded and self.params_initialized and self.optimizer_initialized + self._curr_module.update() + for i, _module in enumerate(self._arcface_modules): + _module.update() + mx.nd.waitall() + + def get_outputs(self, merge_multi_context=True): + assert self.binded and self.params_initialized + return self._curr_module.get_outputs( + merge_multi_context=merge_multi_context) + #return self._arcface_module.get_outputs(merge_multi_context=merge_multi_context) + + def get_input_grads(self, merge_multi_context=True): + assert self.binded and self.params_initialized and self.inputs_need_grad + return self._curr_module.get_input_grads( + merge_multi_context=merge_multi_context) + + def update_metric(self, eval_metric, labels): + assert self.binded and self.params_initialized + #self._curr_module.update_metric(eval_metric, labels) + #label = labels[0] + #print(label.shape) + #self._arcface_module.update_metric(eval_metric, labels) + + def install_monitor(self, mon): + """ Install monitor on all executors """ + assert self.binded + self._curr_module.install_monitor(mon) + + def forward_backward(self, data_batch): + """A convenient function that calls both ``forward`` and ``backward``.""" + self.forward(data_batch, is_train=True) # get fc1 and partial fc7 + self.backward() + + def fit(self, + train_data, + eval_data=None, + eval_metric='acc', + epoch_end_callback=None, + batch_end_callback=None, + kvstore='local', + optimizer='sgd', + optimizer_params=(('learning_rate', 0.01), ), + eval_end_callback=None, + eval_batch_end_callback=None, + initializer=Uniform(0.01), + arg_params=None, + aux_params=None, + allow_missing=False, + force_rebind=False, + force_init=False, + begin_epoch=0, + num_epoch=None, + validation_metric=None, + monitor=None, + sparse_row_id_fn=None): + """Trains the module parameters. + + Checkout `Module Tutorial `_ to see + a end-to-end use-case. + + Parameters + ---------- + train_data : DataIter + Train DataIter. + eval_data : DataIter + If not ``None``, will be used as validation set and the performance + after each epoch will be evaluated. + eval_metric : str or EvalMetric + Defaults to 'accuracy'. The performance measure used to display during training. + Other possible predefined metrics are: + 'ce' (CrossEntropy), 'f1', 'mae', 'mse', 'rmse', 'top_k_accuracy'. + epoch_end_callback : function or list of functions + Each callback will be called with the current `epoch`, `symbol`, `arg_params` + and `aux_params`. + batch_end_callback : function or list of function + Each callback will be called with a `BatchEndParam`. + kvstore : str or KVStore + Defaults to 'local'. + optimizer : str or Optimizer + Defaults to 'sgd'. + optimizer_params : dict + Defaults to ``(('learning_rate', 0.01),)``. The parameters for + the optimizer constructor. + The default value is not a dict, just to avoid pylint warning on dangerous + default values. + eval_end_callback : function or list of function + These will be called at the end of each full evaluation, with the metrics over + the entire evaluation set. + eval_batch_end_callback : function or list of function + These will be called at the end of each mini-batch during evaluation. + initializer : Initializer + The initializer is called to initialize the module parameters when they are + not already initialized. + arg_params : dict + Defaults to ``None``, if not ``None``, should be existing parameters from a trained + model or loaded from a checkpoint (previously saved model). In this case, + the value here will be used to initialize the module parameters, unless they + are already initialized by the user via a call to `init_params` or `fit`. + `arg_params` has a higher priority than `initializer`. + aux_params : dict + Defaults to ``None``. Similar to `arg_params`, except for auxiliary states. + allow_missing : bool + Defaults to ``False``. Indicates whether to allow missing parameters when `arg_params` + and `aux_params` are not ``None``. If this is ``True``, then the missing parameters + will be initialized via the `initializer`. + force_rebind : bool + Defaults to ``False``. Whether to force rebinding the executors if already bound. + force_init : bool + Defaults to ``False``. Indicates whether to force initialization even if the + parameters are already initialized. + begin_epoch : int + Defaults to 0. Indicates the starting epoch. Usually, if resumed from a + checkpoint saved at a previous training phase at epoch N, then this value should be + N+1. + num_epoch : int + Number of epochs for training. + sparse_row_id_fn : A callback function + The function takes `data_batch` as an input and returns a dict of + str -> NDArray. The resulting dict is used for pulling row_sparse + parameters from the kvstore, where the str key is the name of the param, + and the value is the row id of the param to pull. + + Examples + -------- + >>> # An example of using fit for training. + >>> # Assume training dataIter and validation dataIter are ready + >>> # Assume loading a previously checkpointed model + >>> sym, arg_params, aux_params = mx.model.load_checkpoint(model_prefix, 3) + >>> mod.fit(train_data=train_dataiter, eval_data=val_dataiter, optimizer='sgd', + ... optimizer_params={'learning_rate':0.01, 'momentum': 0.9}, + ... arg_params=arg_params, aux_params=aux_params, + ... eval_metric='acc', num_epoch=10, begin_epoch=3) + """ + assert num_epoch is not None, 'please specify number of epochs' + # TODO: 不理解为啥要加这个,加了这个中间一不小心中断连岂不是无法继续训练了? + # assert arg_params is None and aux_params is None + + # 跟之前的输入格式保持一致 + provide_label = [(train_data.provide_label[0][0], (train_data.provide_label[0][1][0],))] + self.bind(data_shapes=[train_data.provide_data[0][:2]], + label_shapes=provide_label, + for_training=True, + force_rebind=force_rebind) + if monitor is not None: + self.install_monitor(monitor) + self.init_params(initializer=initializer, + arg_params=arg_params, + aux_params=aux_params, + allow_missing=allow_missing, + force_init=force_init) + self.init_optimizer(kvstore=kvstore, + optimizer=optimizer, + optimizer_params=optimizer_params) + + if validation_metric is None: + validation_metric = eval_metric + if not isinstance(eval_metric, metric.EvalMetric): + eval_metric = metric.create(eval_metric) + epoch_eval_metric = copy.deepcopy(eval_metric) + + ################################################################################ + # training loop + ################################################################################ + for epoch in range(begin_epoch, num_epoch): + tic = time.time() + eval_metric.reset() + epoch_eval_metric.reset() + nbatch = 0 + # data_iter = iter(train_data) + end_of_batch = False + # next_data_batch = next(data_iter) + while not end_of_batch: + # data_batch = next_data_batch + # batch = train_data.next() + try: + batch = train_data.next() + except StopIteration: + end_of_batch = True + except Exception as e: + # TODO: 一旦程序到这就回不去了,train_data.reset()会提示数据没有到结尾无法 reset, orz + print("something error raised, start a new epoch, error:") + print(e) + end_of_batch = True + continue + data_batch = io.DataBatch([batch[0].data[0]], [batch[0].label[0][..., 0]], batch[0].data[0].shape[0]) + if monitor is not None: + monitor.tic() + self.forward_backward(data_batch) + self.update() + assert not isinstance(data_batch, list) + + #if isinstance(data_batch, list): + # #print('XXX') + # self.update_metric(eval_metric, + # [db.label for db in data_batch], + # pre_sliced=True) + # self.update_metric(epoch_eval_metric, + # [db.label for db in data_batch], + # pre_sliced=True) + #else: + # #print('before update metric') + # self.update_metric(eval_metric, data_batch.label) + # self.update_metric(epoch_eval_metric, data_batch.label) + #labels = data_batch.label + #labels = [self.global_label] + #self.update_metric(eval_metric, labels) + #self.update_metric(epoch_eval_metric, labels) + + # 去除了手动 prefetch,在初始化 dali 时已设置了 prefetch 深度 + # try: + # # pre fetch next batch + # next_data_batch = next(data_iter) + # self.prepare(next_data_batch, + # sparse_row_id_fn=sparse_row_id_fn) + # except StopIteration: + # end_of_batch = True + + if monitor is not None: + monitor.toc_print() + + #if end_of_batch: + # eval_name_vals = epoch_eval_metric.get_name_value() + + if batch_end_callback is not None: + batch_end_params = BatchEndParam(epoch=epoch, + nbatch=nbatch, + eval_metric=None, + locals=locals()) + batch_end_callback(batch_end_params) + #for callback in _as_list(batch_end_callback): + # callback(batch_end_params) + nbatch += 1 + + # one epoch of training is finished + #for name, val in eval_name_vals: + # self.logger.info('Epoch[%d] Train-%s=%f', epoch, name, val) + toc = time.time() + self.logger.info('Epoch[%d] Time cost=%.3f', epoch, (toc - tic)) + + # sync aux params across devices + arg_params, aux_params = self.get_params() + self.set_params(arg_params, aux_params) + + # end of 1 epoch, reset the data-iter for another epoch + train_data.reset() diff --git a/recognition/ArcFace/dali_train_parall.py b/recognition/ArcFace/dali_train_parall.py new file mode 100644 index 000000000..4ae089cb0 --- /dev/null +++ b/recognition/ArcFace/dali_train_parall.py @@ -0,0 +1,464 @@ +''' +@author: insightface +''' + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import sys +import math +import random +import logging +import pickle +import sklearn +import numpy as np +from image_iter import FaceImageIter +import mxnet as mx +from mxnet import ndarray as nd +import argparse +import mxnet.optimizer as optimizer +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'common')) +import flops_counter +from config import config, default, generate_config +import verification +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'symbol')) +import fresnet +import fmobilefacenet +import fmobilenet +import fmnasnet +import fdensenet +import vargfacenet +from dali_parall_module_local_v1 import ParallModule + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +args = None + + +def parse_args(): + parser = argparse.ArgumentParser(description='Train parall face network') + # general + parser.add_argument('--dataset', + default=default.dataset, + help='dataset config') + parser.add_argument('--network', + default=default.network, + help='network config') + parser.add_argument('--loss', default=default.loss, help='loss config') + args, rest = parser.parse_known_args() + generate_config(args.network, args.dataset, args.loss) + parser.add_argument('--models-root', + default=default.models_root, + help='root directory to save model.') + parser.add_argument('--pretrained', + default=default.pretrained, + help='pretrained model to load') + parser.add_argument('--pretrained-epoch', + type=int, + default=default.pretrained_epoch, + help='pretrained epoch to load') + parser.add_argument( + '--ckpt', + type=int, + default=default.ckpt, + help= + 'checkpoint saving option. 0: discard saving. 1: save when necessary. 2: always save' + ) + parser.add_argument( + '--verbose', + type=int, + default=default.verbose, + help='do verification testing and model saving every verbose batches') + parser.add_argument('--lr', + type=float, + default=default.lr, + help='start learning rate') + parser.add_argument('--lr-steps', + type=str, + default=default.lr_steps, + help='steps of lr changing') + parser.add_argument('--wd', + type=float, + default=default.wd, + help='weight decay') + parser.add_argument('--mom', + type=float, + default=default.mom, + help='momentum') + parser.add_argument('--frequent', + type=int, + default=default.frequent, + help='') + parser.add_argument('--per-batch-size', + type=int, + default=default.per_batch_size, + help='batch size in each context') + parser.add_argument('--kvstore', + type=str, + default=default.kvstore, + help='kvstore setting') + parser.add_argument('--worker-id', + type=int, + default=0, + help='worker id for dist training, starts from 0') + parser.add_argument('--extra-model-name', + type=str, + default='', + help='extra model name') + args = parser.parse_args() + return args + + +def get_symbol_embedding(embedding=None): + if embedding is None: + embedding = eval(config.net_name).get_symbol() + all_label = mx.symbol.Variable('softmax_label') + #embedding = mx.symbol.BlockGrad(embedding) + all_label = mx.symbol.BlockGrad(all_label) + out_list = [embedding, all_label] + out = mx.symbol.Group(out_list) + return out + + +def get_symbol_arcface(args): + embedding = mx.symbol.Variable('data') + all_label = mx.symbol.Variable('softmax_label') + gt_label = all_label + is_softmax = True + #print('call get_sym_arcface with', args, config) + _weight = mx.symbol.Variable("fc7_%d_weight" % args._ctxid, + shape=(args.ctx_num_classes, config.emb_size), + lr_mult=config.fc7_lr_mult, + wd_mult=config.fc7_wd_mult) + if config.loss_name == 'softmax': #softmax + fc7 = mx.sym.FullyConnected(data=embedding, + weight=_weight, + no_bias=True, + num_hidden=args.ctx_num_classes, + name='fc7_%d' % args._ctxid) + elif config.loss_name == 'margin_softmax': + _weight = mx.symbol.L2Normalization(_weight, mode='instance') + nembedding = mx.symbol.L2Normalization(embedding, + mode='instance', + name='fc1n_%d' % args._ctxid) + fc7 = mx.sym.FullyConnected(data=nembedding, + weight=_weight, + no_bias=True, + num_hidden=args.ctx_num_classes, + name='fc7_%d' % args._ctxid) + if config.loss_m1 != 1.0 or config.loss_m2 != 0.0 or config.loss_m3 != 0.0: + gt_one_hot = mx.sym.one_hot(gt_label, + depth=args.ctx_num_classes, + on_value=1.0, + off_value=0.0) + if config.loss_m1 == 1.0 and config.loss_m2 == 0.0: + _one_hot = gt_one_hot * config.loss_m3 + fc7 = fc7 - _one_hot + else: + fc7_onehot = fc7 * gt_one_hot + cos_t = fc7_onehot + t = mx.sym.arccos(cos_t) + if config.loss_m1 != 1.0: + t = t * config.loss_m1 + if config.loss_m2 != 0.0: + t = t + config.loss_m2 + margin_cos = mx.sym.cos(t) + if config.loss_m3 != 0.0: + margin_cos = margin_cos - config.loss_m3 + margin_fc7 = margin_cos + margin_fc7_onehot = margin_fc7 * gt_one_hot + diff = margin_fc7_onehot - fc7_onehot + fc7 = fc7 + diff + fc7 = fc7 * config.loss_s + + out_list = [] + out_list.append(fc7) + if config.loss_name == 'softmax': #softmax + out_list.append(gt_label) + out = mx.symbol.Group(out_list) + return out + + +def train_net(args): + #_seed = 727 + #random.seed(_seed) + #np.random.seed(_seed) + #mx.random.seed(_seed) + ctx = [] + cvd = os.environ['CUDA_VISIBLE_DEVICES'].strip() + if len(cvd) > 0: + for i in range(len(cvd.split(','))): + ctx.append(mx.gpu(i)) + if len(ctx) == 0: + ctx = [mx.cpu()] + print('use cpu') + else: + print('gpu num:', len(ctx)) + if len(args.extra_model_name) == 0: + prefix = os.path.join( + args.models_root, + '%s-%s-%s' % (args.network, args.loss, args.dataset), 'model') + else: + prefix = os.path.join( + args.models_root, '%s-%s-%s-%s' % + (args.network, args.loss, args.dataset, args.extra_model_name), + 'model') + prefix_dir = os.path.dirname(prefix) + print('prefix', prefix) + if not os.path.exists(prefix_dir): + os.makedirs(prefix_dir) + args.ctx_num = len(ctx) + if args.per_batch_size == 0: + args.per_batch_size = 128 + args.batch_size = args.per_batch_size * args.ctx_num + args.rescale_threshold = 0 + args.image_channel = config.image_shape[2] + config.batch_size = args.batch_size + config.per_batch_size = args.per_batch_size + data_dir = config.dataset_path + path_imgrec = None + path_imglist = None + image_size = config.image_shape[0:2] + assert len(image_size) == 2 + assert image_size[0] == image_size[1] + print('image_size', image_size) + print('num_classes', config.num_classes) + path_imgrec = os.path.join(data_dir, "train.rec") + + data_shape = (args.image_channel, image_size[0], image_size[1]) + + num_workers = config.num_workers + global_num_ctx = num_workers * args.ctx_num + if config.num_classes % global_num_ctx == 0: + args.ctx_num_classes = config.num_classes // global_num_ctx + else: + args.ctx_num_classes = config.num_classes // global_num_ctx + 1 + args.local_num_classes = args.ctx_num_classes * args.ctx_num + args.local_class_start = args.local_num_classes * args.worker_id + + #if len(args.partial)==0: + # local_classes_range = (0, args.num_classes) + #else: + # _vec = args.partial.split(',') + # local_classes_range = (int(_vec[0]), int(_vec[1])) + + #args.partial_num_classes = local_classes_range[1] - local_classes_range[0] + #args.partial_start = local_classes_range[0] + + print('Called with argument:', args, config) + mean = None + + begin_epoch = 0 + base_lr = args.lr + base_wd = args.wd + base_mom = args.mom + arg_params = None + aux_params = None + if len(args.pretrained) == 0: + esym = get_symbol_embedding() + asym = get_symbol_arcface + else: + #assert False + print('loading', args.pretrained, args.pretrained_epoch) + pretrain_esym, arg_params, aux_params = mx.model.load_checkpoint( + args.pretrained, args.pretrained_epoch) + esym = get_symbol_embedding(pretrain_esym) + asym = get_symbol_arcface + + if config.count_flops: + all_layers = esym.get_internals() + _sym = all_layers['fc1_output'] + FLOPs = flops_counter.count_flops(_sym, + data=(1, 3, image_size[0], + image_size[1])) + _str = flops_counter.flops_str(FLOPs) + print('Network FLOPs: %s' % _str) + + # if config.num_workers == 1: + # from dali_parall_module_local_v1 import ParallModule + # else: + # from parall_module_dist import ParallModule + + model = ParallModule( + context=ctx, + symbol=esym, + data_names=['data'], + label_names=['softmax_label'], + asymbol=asym, + args=args, + ) + val_dataiter = None + # TODO: if config.use_dali: + if True: + from dali_image_iter import HybridTrainPipe + from nvidia.dali.plugin.mxnet import DALIClassificationIterator + # trainpipes = [HybridTrainPipe(path_imgrec, args.batch_size, num_threads=4, device_id = i, num_gpus = 4) for i in range(2)] + # htp = trainpipes[0] + htp = HybridTrainPipe(path_imgrec, args.batch_size, 4, 0, 4, args.batch_size*1000) + trainpipes = [htp] + htp.build() + print("Training pipeline epoch size: {}".format(htp.epoch_size("Reader"))) + dali_train_iter = DALIClassificationIterator(trainpipes, htp.epoch_size("Reader")) + train_dataiter = dali_train_iter + else: + train_dataiter = FaceImageIter( + batch_size=args.batch_size, + data_shape=data_shape, + path_imgrec=path_imgrec, + shuffle=True, + rand_mirror=config.data_rand_mirror, + mean=mean, + cutoff=config.data_cutoff, + color_jittering=config.data_color, + images_filter=config.data_images_filter, + ) + + if config.net_name == 'fresnet' or config.net_name == 'fmobilefacenet': + initializer = mx.init.Xavier(rnd_type='gaussian', + factor_type="out", + magnitude=2) #resnet style + else: + initializer = mx.init.Xavier(rnd_type='uniform', + factor_type="in", + magnitude=2) + + _rescale = 1.0 / args.batch_size + opt = optimizer.SGD(learning_rate=base_lr, + momentum=base_mom, + wd=base_wd, + rescale_grad=_rescale) + _cb = mx.callback.Speedometer(args.batch_size, args.frequent) + + ver_list = [] + ver_name_list = [] + for name in config.val_targets: + path = os.path.join(data_dir, name + ".bin") + if os.path.exists(path): + data_set = verification.load_bin(path, image_size) + ver_list.append(data_set) + ver_name_list.append(name) + print('ver', name) + + def ver_test(nbatch): + results = [] + for i in range(len(ver_list)): + acc1, std1, acc2, std2, xnorm, embeddings_list = verification.test( + ver_list[i], model, args.batch_size, 10, None, None) + print('[%s][%d]XNorm: %f' % (ver_name_list[i], nbatch, xnorm)) + #print('[%s][%d]Accuracy: %1.5f+-%1.5f' % (ver_name_list[i], nbatch, acc1, std1)) + print('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % + (ver_name_list[i], nbatch, acc2, std2)) + results.append(acc2) + return results + + highest_acc = [0.0, 0.0] #lfw and target + #for i in range(len(ver_list)): + # highest_acc.append(0.0) + global_step = [0] + save_step = [0] + lr_steps = [int(x) for x in args.lr_steps.split(',')] + print('lr_steps', lr_steps) + + def _batch_callback(param): + #global global_step + global_step[0] += 1 + mbatch = global_step[0] + for step in lr_steps: + if mbatch == step: + opt.lr *= 0.1 + print('lr change to', opt.lr) + break + + _cb(param) + if mbatch % 1000 == 0: + print('lr-batch-epoch:', opt.lr, param.nbatch, param.epoch) + + if mbatch >= 0 and mbatch % args.verbose == 0: + acc_list = ver_test(mbatch) + save_step[0] += 1 + msave = save_step[0] + do_save = False + is_highest = False + if len(acc_list) > 0: + #lfw_score = acc_list[0] + #if lfw_score>highest_acc[0]: + # highest_acc[0] = lfw_score + # if lfw_score>=0.998: + # do_save = True + score = sum(acc_list) + if acc_list[-1] >= highest_acc[-1]: + if acc_list[-1] > highest_acc[-1]: + is_highest = True + else: + if score >= highest_acc[0]: + is_highest = True + highest_acc[0] = score + highest_acc[-1] = acc_list[-1] + #if lfw_score>=0.99: + # do_save = True + if is_highest: + do_save = True + if args.ckpt == 0: + do_save = False + elif args.ckpt == 2: + do_save = True + elif args.ckpt == 3: + msave = 1 + + if do_save: + # print('saving', msave) + # arg, aux = model.get_export_params() + # all_layers = model.symbol.get_internals() + # _sym = all_layers['fc1_output'] + # mx.model.save_checkpoint(prefix, msave, _sym, arg, aux) + + print('saving', msave) + arg, aux = model.get_params() + # TODO: 这里求加个保存全部参数的方法,方便继续训练,我这么改不知道有没有问题 + if config.ckpt_embedding: + all_layers = model.symbol.get_internals() + _sym = all_layers['fc1_output'] + _arg = {} + for k in arg: + if not k.startswith('fc7'): + _arg[k] = arg[k] + mx.model.save_checkpoint(prefix, msave, _sym, _arg, aux) + else: + mx.model.save_checkpoint(prefix, msave, model.symbol, arg, + aux) + + print('[%d]Accuracy-Highest: %1.5f' % (mbatch, highest_acc[-1])) + if config.max_steps > 0 and mbatch > config.max_steps: + sys.exit(0) + + epoch_cb = None + # train_dataiter = mx.io.PrefetchingIter(train_dataiter) + + model.fit( + train_dataiter, + begin_epoch=begin_epoch, + num_epoch=999999, + eval_data=val_dataiter, + #eval_metric = eval_metrics, + kvstore=args.kvstore, + optimizer=opt, + #optimizer_params = optimizer_params, + initializer=initializer, + arg_params=arg_params, + aux_params=aux_params, + allow_missing=True, + batch_end_callback=_batch_callback, + epoch_end_callback=epoch_cb) + + +def main(): + global args + args = parse_args() + train_net(args) + + +if __name__ == '__main__': + main() diff --git a/recognition/partial_fc/mxnet/evaluation/example.sh b/recognition/partial_fc/mxnet/evaluation/example.sh old mode 100644 new mode 100755 index a132a7769..0e96c877b --- a/recognition/partial_fc/mxnet/evaluation/example.sh +++ b/recognition/partial_fc/mxnet/evaluation/example.sh @@ -1,9 +1,14 @@ +#!/bin/bash + +# run `python ijb.py --help` for more information python -u ijb.py \ ---model-prefix /anxiang/opensource/model/celeb360k_final0.1/model \ ---image-path /data/anxiang/datasets/IJB_release/IJBC \ ---result-dir /anxiang/opensource/results/test \ +--model-prefix ./models/y1-cosface-glink360/model \ +--image-path /data/IJB_release/IJBC \ +--result-dir ./results/test \ --model-epoch 0 \ --gpu 0,1,2,3,4,5,6,7 \ --target IJBC \ - --job cosface \ ---batch-size 256 \ No newline at end of file +--job cosface \ +--batch-size 256 \ +-es 128 + diff --git a/recognition/partial_fc/mxnet/evaluation/ijb.py b/recognition/partial_fc/mxnet/evaluation/ijb.py index 4651f65cb..4719be7b8 100644 --- a/recognition/partial_fc/mxnet/evaluation/ijb.py +++ b/recognition/partial_fc/mxnet/evaluation/ijb.py @@ -32,6 +32,7 @@ parser.add_argument('--gpu', default='0', type=str, help='gpu id') parser.add_argument('--batch-size', default=128, type=int, help='') parser.add_argument('--job', default='insightface', type=str, help='job name') +parser.add_argument('-es', '--emb-size', type=int, help='embedding size') parser.add_argument('--target', default='IJBC', type=str, @@ -123,6 +124,8 @@ def batchify_fn(data): batchify_fn=batchify_fn) symbol, arg_params, aux_params = mx.module.module.load_checkpoint( prefix, epoch) + all_layers = symbol.get_internals() + symbol = all_layers['fc1_output'] # init model list for i in range(num_ctx): @@ -325,7 +328,7 @@ def read_score(path): args.model_epoch, dataset, args.batch_size, - size=512) + size=args.emb_size) faceness_scores = [] for each_line in files: