diff --git a/deeppavlov/_meta.py b/deeppavlov/_meta.py index 1d9fbe7a1b..794f709ee1 100644 --- a/deeppavlov/_meta.py +++ b/deeppavlov/_meta.py @@ -1,4 +1,4 @@ -__version__ = '1.1.0' +__version__ = '1.1.1' __author__ = 'Neural Networks and Deep Learning lab, MIPT' __description__ = 'An open source library for building end-to-end dialog systems and training chatbots.' __keywords__ = ['NLP', 'NER', 'SQUAD', 'Intents', 'Chatbot'] diff --git a/deeppavlov/configs/multitask/mt_glue.json b/deeppavlov/configs/multitask/mt_glue.json new file mode 100644 index 0000000000..ca6e529213 --- /dev/null +++ b/deeppavlov/configs/multitask/mt_glue.json @@ -0,0 +1,287 @@ +{ + "dataset_reader": { + "class_name": "multitask_reader", + "task_defaults": { + "class_name": "huggingface_dataset_reader", + "path": "glue", + "train": "train", + "valid": "validation" + }, + "tasks": { + "cola": {"name": "cola"}, + "sst2": {"name": "sst2"}, + "qqp": {"name": "qqp"}, + "mrpc": {"name": "mrpc"}, + "rte": {"name": "rte"}, + "mnli": { + "name": "mnli", + "valid": "validation_matched" + }, + "qnli": {"name": "qnli"}, + "stsb": {"name": "stsb"} + } + }, + "dataset_iterator": { + "class_name": "multitask_iterator", + "num_train_epochs": "{NUM_TRAIN_EPOCHS}", + "gradient_accumulation_steps": "{GRADIENT_ACC_STEPS}", + "seed": 42, + "task_defaults": { + "class_name": "huggingface_dataset_iterator", + "label": "label", + "use_label_name": false, + "seed": 42 + }, + "tasks": { + "cola": { + "features": ["sentence"] + }, + "sst2": { + "features": ["sentence"] + }, + "qqp": { + "features": ["question1", "question2"] + }, + "mrpc": { + "features": ["sentence1", "sentence2"] + }, + "rte": { + "features": ["sentence1", "sentence2"] + }, + "mnli": { + "features": ["premise", "hypothesis"] + }, + "qnli": { + "features": ["question", "sentence"] + }, + "stsb": { + "features": ["sentence1", "sentence2"] + } + } + }, + "chainer": { + "in": ["x_cola", "x_sst2", "x_qqp", "x_mrpc", "x_rte", "x_mnli", "x_qnli", "x_stsb"], + "in_y": ["y_cola", "y_sst2", "y_qqp", "y_mrpc", "y_rte", "y_mnli", "y_qnli", "y_stsb" + ], + "pipe": [ + { + "class_name": "multitask_pipeline_preprocessor", + "possible_keys_to_extract": [0, 1], + "preprocessor": "TorchTransformersPreprocessor", + "vocab_file": "{BACKBONE}", + "max_seq_length": 128, + "do_lower_case": true, + "n_task": 8, + "in": ["x_cola", "x_sst2", "x_qqp", "x_mrpc", "x_rte", "x_mnli", "x_qnli", "x_stsb"], + "out": [ + "bert_features_cola", + "bert_features_sst2", + "bert_features_qqp", + "bert_features_mrpc", + "bert_features_rte", + "bert_features_mnli", + "bert_features_qnli", + "bert_features_stsb" + ] + }, + { + "id": "multitask_transformer", + "class_name": "multitask_transformer", + "optimizer_parameters": {"lr": 2e-5}, + "gradient_accumulation_steps": "{GRADIENT_ACC_STEPS}", + "learning_rate_drop_patience": 2, + "learning_rate_drop_div": 2.0, + "return_probas": true, + "backbone_model": "{BACKBONE}", + "save_path": "{MODEL_PATH}", + "load_path": "{MODEL_PATH}", + "tasks": { + "cola": { + "type": "classification", + "options": 2 + }, + "sst2": { + "type": "classification", + "options": 2 + }, + "qqp": { + "type": "classification", + "options": 2 + }, + "mrpc": { + "type": "classification", + "options": 2 + }, + "rte": { + "type": "classification", + "options": 2 + }, + "mnli": { + "type": "classification", + "options": 3 + }, + "qnli": { + "type": "classification", + "options": 2 + }, + "stsb": { + "type": "regression", + "options": 1 + } + }, + "in": [ + "bert_features_cola", + "bert_features_sst2", + "bert_features_qqp", + "bert_features_mrpc", + "bert_features_rte", + "bert_features_mnli", + "bert_features_qnli", + "bert_features_stsb" + ], + "in_y": ["y_cola", "y_sst2", "y_qqp", "y_mrpc", "y_rte", "y_mnli", "y_qnli", "y_stsb"], + "out": [ + "y_cola_pred_probas", + "y_sst2_pred_probas", + "y_qqp_pred_probas", + "y_mrpc_pred_probas", + "y_rte_pred_probas", + "y_mnli_pred_probas", + "y_qnli_pred_probas", + "y_stsb_pred" + ] + }, + { + "in": [ + "y_cola_pred_probas", + "y_sst2_pred_probas", + "y_qqp_pred_probas", + "y_mrpc_pred_probas", + "y_rte_pred_probas", + "y_mnli_pred_probas", + "y_qnli_pred_probas" + ], + "out": [ + "y_cola_pred_ids", + "y_sst2_pred_ids", + "y_qqp_pred_ids", + "y_mrpc_pred_ids", + "y_rte_pred_ids", + "y_mnli_pred_ids", + "y_qnli_pred_ids" + ], + "class_name": "proba2labels", + "max_proba": true + } + ], + "out": [ + "y_cola_pred_probas", + "y_sst2_pred_probas", + "y_qqp_pred_probas", + "y_mrpc_pred_probas", + "y_rte_pred_probas", + "y_mnli_pred_probas", + "y_qnli_pred_probas", + "y_stsb_pred", + "y_cola_pred_ids", + "y_sst2_pred_ids", + "y_qqp_pred_ids", + "y_mrpc_pred_ids", + "y_rte_pred_ids", + "y_mnli_pred_ids", + "y_qnli_pred_ids", + "y_stsb_pred" + ] + }, + "train": { + "epochs": "{NUM_TRAIN_EPOCHS}", + "batch_size": 32, + "metrics": [ + { + "name": "multitask_accuracy", + "inputs": [ + "y_rte", + "y_mnli", + "y_qnli", + "y_mrpc", + "y_cola", + "y_sst2", + "y_qqp", + "y_rte_pred_ids", + "y_mnli_pred_ids", + "y_qnli_pred_ids", + "y_mrpc_pred_ids", + "y_cola_pred_ids", + "y_sst2_pred_ids", + "y_qqp_pred_ids" + ] + }, + { + "name": "accuracy", + "alias": "accuracy_mrpc", + "inputs": ["y_mrpc", "y_mrpc_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_rte", + "inputs": ["y_rte", "y_rte_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_mnli", + "inputs": ["y_mnli", "y_mnli_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_qnli", + "inputs": ["y_qnli", "y_qnli_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_sst", + "inputs": ["y_sst2", "y_sst2_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_cola", + "inputs": ["y_cola", "y_cola_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_qqp", + "inputs": ["y_qqp", "y_qqp_pred_ids"] + }, + { + "name": "pearson_correlation", + "alias": "pearson_correlation_stsb", + "inputs": ["y_stsb", "y_stsb_pred"] + }, + { + "name": "spearman_correlation", + "alias": "spearman_correlation_stsb", + "inputs": ["y_stsb", "y_stsb_pred"] + } + ], + "validation_patience": 3, + "val_every_n_epochs": 1, + "log_every_n_epochs": 1, + "show_examples": false, + "evaluation_targets": ["valid"], + "class_name": "torch_trainer" + }, + "metadata": { + "variables": { + "BACKBONE": "bert-base-uncased", + "MODELS_PATH": "~/.deeppavlov/models/glue", + "MODEL_PATH": "{MODELS_PATH}/8task", + "NUM_TRAIN_EPOCHS": 5, + "GRADIENT_ACC_STEPS": 1 + }, + "download": [ + { + "url": "http://files.deeppavlov.ai/deeppavlov_data/multitask/glue.tar.gz", + "subdir": "{MODELS_PATH}" + } + ] + } +} diff --git a/deeppavlov/configs/multitask/multitask_example.json b/deeppavlov/configs/multitask/multitask_example.json new file mode 100644 index 0000000000..cf8cf7ad47 --- /dev/null +++ b/deeppavlov/configs/multitask/multitask_example.json @@ -0,0 +1,241 @@ +{ + "dataset_reader": { + "class_name": "multitask_reader", + "task_defaults": { + "class_name": "huggingface_dataset_reader", + "path": "glue", + "train": "train", + "valid": "validation", + "test": "test" + }, + "tasks": { + "cola": {"name": "cola"}, + "rte": {"name": "rte"}, + "stsb": {"name": "stsb"}, + "copa": { + "path": "super_glue", + "name": "copa" + }, + "conll": { + "class_name": "conll2003_reader", + "use_task_defaults": false, + "data_path": "{DOWNLOADS_PATH}/conll2003/", + "dataset_name": "conll2003", + "provide_pos": false + } + } + }, + "dataset_iterator": { + "class_name": "multitask_iterator", + "num_train_epochs": "{NUM_TRAIN_EPOCHS}", + "gradient_accumulation_steps": "{GRADIENT_ACC_STEPS}", + "seed": 42, + "task_defaults": { + "class_name": "huggingface_dataset_iterator", + "label": "label", + "use_label_name": false, + "seed": 42 + }, + "tasks": { + "cola": { + "features": ["sentence"] + }, + "rte": { + "features": ["sentence1", "sentence2"] + }, + "stsb": { + "features": ["sentence1", "sentence2"] + }, + "copa": { + "features": ["contexts", "choices"] + }, + "conll": { + "class_name": "basic_classification_iterator", + "seed": 42, + "use_task_defaults": false + } + } + }, + "chainer": { + "in": ["x_cola", "x_rte", "x_stsb", "x_copa", "x_conll"], + "in_y": ["y_cola", "y_rte", "y_stsb", "y_copa", "y_conll"], + "pipe": [ + { + "class_name": "multitask_pipeline_preprocessor", + "possible_keys_to_extract": [0, 1], + "preprocessors": [ + "TorchTransformersPreprocessor", + "TorchTransformersPreprocessor", + "TorchTransformersPreprocessor", + "TorchTransformersMultiplechoicePreprocessor", + "TorchTransformersNerPreprocessor" + ], + "do_lower_case": true, + "n_task": 5, + "vocab_file": "{BACKBONE}", + "max_seq_length": 200, + "max_subword_length": 15, + "token_masking_prob": 0.0, + "return_features": true, + "in": ["x_cola", "x_rte", "x_stsb", "x_copa", "x_conll"], + "out": [ + "bert_features_cola", + "bert_features_rte", + "bert_features_stsb", + "bert_features_copa", + "bert_features_conll" + ] + }, + { + "id": "vocab_conll", + "class_name": "simple_vocab", + "unk_token": ["O"], + "pad_with_zeros": true, + "save_path": "{MODELS_PATH}/tag.dict", + "load_path": "{MODELS_PATH}/tag.dict", + "fit_on": ["y_conll"], + "in": ["y_conll"], + "out": ["y_ids_conll"] + }, + { + "id": "multitask_transformer", + "class_name": "multitask_transformer", + "optimizer_parameters": {"lr": 2e-5}, + "gradient_accumulation_steps": "{GRADIENT_ACC_STEPS}", + "learning_rate_drop_patience": 2, + "learning_rate_drop_div": 2.0, + "return_probas": true, + "backbone_model": "{BACKBONE}", + "save_path": "{MODEL_PATH}", + "load_path": "{MODEL_PATH}", + "tasks": { + "cola": { + "type": "classification", + "options": 2 + }, + "rte": { + "type": "classification", + "options": 2 + }, + "stsb": { + "type": "regression", + "options": 1 + }, + "copa": { + "type": "multiple_choice", + "options": 2 + }, + "conll": { + "type": "sequence_labeling", + "options": "#vocab_conll.len" + } + }, + "in": [ + "bert_features_cola", + "bert_features_rte", + "bert_features_stsb", + "bert_features_copa", + "bert_features_conll" + ], + "in_y": ["y_cola", "y_rte", "y_stsb", "y_copa", "y_ids_conll"], + "out": [ + "y_cola_pred_probas", + "y_rte_pred_probas", + "y_stsb_pred", + "y_copa_pred_probas", + "y_conll_pred_ids" + ] + }, + { + "in": ["y_cola_pred_probas"], + "out": ["y_cola_pred_ids"], + "class_name": "proba2labels", + "max_proba": true + }, + { + "in": ["y_rte_pred_probas"], + "out": ["y_rte_pred_ids"], + "class_name": "proba2labels", + "max_proba": true + }, + { + "in": ["y_copa_pred_probas"], + "out": ["y_copa_pred_ids"], + "class_name": "proba2labels", + "max_proba": true + }, + { + "in": ["y_conll_pred_ids"], + "out": ["y_conll_pred_labels"], + "ref": "vocab_conll" + } + ], + "out": ["y_cola_pred_ids", "y_rte_pred_ids", "y_stsb_pred", "y_copa_pred_ids", "y_conll_pred_labels"] + }, + "train": { + "epochs": "{NUM_TRAIN_EPOCHS}", + "batch_size": 32, + "metrics": [ + { + "name": "multitask_accuracy", + "inputs": ["y_rte", "y_cola", "y_copa", "y_rte_pred_ids", "y_cola_pred_ids", "y_copa_pred_ids"] + }, + { + "name": "ner_f1", + "inputs": ["y_conll", "y_conll_pred_labels"] + }, + { + "name": "ner_token_f1", + "inputs": ["y_conll", "y_conll_pred_labels"] + }, + { + "name": "accuracy", + "alias": "accuracy_cola", + "inputs": ["y_cola", "y_cola_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_rte", + "inputs": ["y_rte", "y_rte_pred_ids"] + }, + { + "name": "accuracy", + "alias": "accuracy_copa", + "inputs": ["y_copa", "y_copa_pred_ids"] + }, + { + "name": "pearson_correlation", + "alias": "pearson_stsb", + "inputs": ["y_stsb", "y_stsb_pred"] + }, + { + "name": "spearman_correlation", + "alias": "spearman_stsb", + "inputs": ["y_stsb", "y_stsb_pred"] + } + ], + "validation_patience": 3, + "val_every_n_epochs": 1, + "log_every_n_epochs": 1, + "show_examples": false, + "evaluation_targets": ["valid"], + "class_name": "torch_trainer" + }, + "metadata": { + "variables": { + "ROOT_PATH": "~/.deeppavlov", + "MODELS_PATH": "{ROOT_PATH}/models/multitask_example", + "DOWNLOADS_PATH": "{ROOT_PATH}/downloads", + "BACKBONE": "distilbert-base-uncased", + "MODEL_PATH": "{MODELS_PATH}/{BACKBONE}", + "NUM_TRAIN_EPOCHS": 5, + "GRADIENT_ACC_STEPS": 1 + }, + "download": [ + { + "url": "http://files.deeppavlov.ai/deeppavlov_data/multitask/multitask_example.tar.gz", + "subdir": "{MODELS_PATH}" + } + ] + } +} diff --git a/deeppavlov/core/commands/train.py b/deeppavlov/core/commands/train.py index ac8d4c578c..a72956401f 100644 --- a/deeppavlov/core/commands/train.py +++ b/deeppavlov/core/commands/train.py @@ -18,7 +18,7 @@ from deeppavlov.core.commands.utils import expand_path, import_packages, parse_config from deeppavlov.core.common.errors import ConfigError -from deeppavlov.core.common.params import from_params +from deeppavlov.core.common.params import resolve from deeppavlov.core.common.registry import get_model from deeppavlov.core.data.data_fitting_iterator import DataFittingIterator from deeppavlov.core.data.data_learning_iterator import DataLearningIterator @@ -50,20 +50,19 @@ def read_data_by_config(config: dict): raise ConfigError("No dataset reader is provided in the JSON config.") reader = get_model(reader_config.pop('class_name'))() - data_path = reader_config.pop('data_path', '') + data_path = reader_config.get('data_path') if isinstance(data_path, list): - data_path = [expand_path(x) for x in data_path] - else: - data_path = expand_path(data_path) - - return reader.read(data_path, **reader_config) + reader_config['data_path'] = [expand_path(path) for path in data_path] + elif data_path is not None: + reader_config['data_path'] = expand_path(data_path) + return reader.read(**reader_config) def get_iterator_from_config(config: dict, data: dict): """Create iterator (from config) for specified data.""" - iterator_config = config['dataset_iterator'] - iterator: Union[DataLearningIterator, DataFittingIterator] = from_params(iterator_config, - data=data) + iterator_config = {k: resolve(v) for k, v in config['dataset_iterator'].items()} + iterator: Union[DataLearningIterator, DataFittingIterator] = get_model(iterator_config.pop('class_name'))( + **iterator_config, data=data) return iterator diff --git a/deeppavlov/core/common/metrics_registry.json b/deeppavlov/core/common/metrics_registry.json index c1f1a6c7a0..7632c22dda 100644 --- a/deeppavlov/core/common/metrics_registry.json +++ b/deeppavlov/core/common/metrics_registry.json @@ -40,4 +40,4 @@ "squad_v2_f1": "deeppavlov.metrics.squad_metrics:squad_v2_f1", "record_f1_score": "deeppavlov.metrics.record_metrics:record_f1_score", "record_em_score": "deeppavlov.metrics.record_metrics:record_em_score" -} \ No newline at end of file +} diff --git a/deeppavlov/core/common/params.py b/deeppavlov/core/common/params.py index 0ac0426c53..9d38d7de03 100644 --- a/deeppavlov/core/common/params.py +++ b/deeppavlov/core/common/params.py @@ -27,7 +27,7 @@ _refs = {} -def _resolve(val): +def resolve(val): if isinstance(val, str) and val.startswith('#'): component_id, *attributes = val[1:].split('.') try: @@ -44,7 +44,7 @@ def _resolve(val): def _init_param(param, mode): if isinstance(param, str): - param = _resolve(param) + param = resolve(param) elif isinstance(param, (list, tuple)): param = [_init_param(p, mode) for p in param] elif isinstance(param, dict): @@ -58,7 +58,7 @@ def _init_param(param, mode): def from_params(params: Dict, mode: str = 'infer', **kwargs) -> Union[Component, FunctionType]: """Builds and returns the Component from corresponding dictionary of parameters.""" # what is passed in json: - config_params = {k: _resolve(v) for k, v in params.items()} + config_params = {k: resolve(v) for k, v in params.items()} # get component by reference (if any) if 'ref' in config_params: diff --git a/deeppavlov/core/common/registry.json b/deeppavlov/core/common/registry.json index c45ddfdf81..6969143433 100644 --- a/deeppavlov/core/common/registry.json +++ b/deeppavlov/core/common/registry.json @@ -25,6 +25,10 @@ "line_reader": "deeppavlov.dataset_readers.line_reader:LineReader", "logit_ranker": "deeppavlov.models.doc_retrieval.logit_ranker:LogitRanker", "mask": "deeppavlov.models.preprocessors.mask:Mask", + "multitask_reader":"deeppavlov.dataset_readers.multitask_reader:MultiTaskReader", + "multitask_pipeline_preprocessor":"deeppavlov.models.preprocessors.multitask_preprocessor:MultiTaskPipelinePreprocessor", + "multitask_transformer":"deeppavlov.models.torch_bert.multitask_transformer:MultiTaskTransformer", + "multitask_iterator":"deeppavlov.dataset_iterators.multitask_iterator:MultiTaskIterator", "multi_squad_dataset_reader": "deeppavlov.dataset_readers.squad_dataset_reader:MultiSquadDatasetReader", "multi_squad_iterator": "deeppavlov.dataset_iterators.squad_iterator:MultiSquadIterator", "multi_squad_retr_iterator": "deeppavlov.dataset_iterators.squad_iterator:MultiSquadRetrIterator", diff --git a/deeppavlov/core/common/requirements_registry.json b/deeppavlov/core/common/requirements_registry.json index 0abdd3b308..f47311c0c7 100644 --- a/deeppavlov/core/common/requirements_registry.json +++ b/deeppavlov/core/common/requirements_registry.json @@ -105,6 +105,10 @@ "{DEEPPAVLOV_PATH}/requirements/pytorch.txt", "{DEEPPAVLOV_PATH}/requirements/transformers.txt" ], + "multitask_transformer": [ + "{DEEPPAVLOV_PATH}/requirements/pytorch.txt", + "{DEEPPAVLOV_PATH}/requirements/transformers.txt" + ], "torch_transformers_el_ranker": [ "{DEEPPAVLOV_PATH}/requirements/pytorch.txt", "{DEEPPAVLOV_PATH}/requirements/transformers.txt" @@ -138,6 +142,11 @@ "{DEEPPAVLOV_PATH}/requirements/torchcrf.txt", "{DEEPPAVLOV_PATH}/requirements/transformers.txt" ], + "multitask_pipeline_preprocessor": [ + "{DEEPPAVLOV_PATH}/requirements/pytorch.txt", + "{DEEPPAVLOV_PATH}/requirements/torchcrf.txt", + "{DEEPPAVLOV_PATH}/requirements/transformers.txt" + ], "torch_transformers_squad": [ "{DEEPPAVLOV_PATH}/requirements/pytorch.txt", "{DEEPPAVLOV_PATH}/requirements/transformers.txt" diff --git a/deeppavlov/core/data/simple_vocab.py b/deeppavlov/core/data/simple_vocab.py index 66efc4bede..7435a4b3ef 100644 --- a/deeppavlov/core/data/simple_vocab.py +++ b/deeppavlov/core/data/simple_vocab.py @@ -89,7 +89,10 @@ def _add_tokens_with_freqs(self, tokens, freqs): def __call__(self, batch, is_top=True, **kwargs): if isinstance(batch, Iterable) and not isinstance(batch, str): - looked_up_batch = [self(sample, is_top=False) for sample in batch] + if all([k is None for k in batch]): + return batch + else: + looked_up_batch = [self(sample, is_top=False) for sample in batch] else: return self[batch] if self._pad_with_zeros and is_top and not is_str_batch(looked_up_batch): diff --git a/deeppavlov/core/trainers/fit_trainer.py b/deeppavlov/core/trainers/fit_trainer.py index 1d069bad25..440667678b 100644 --- a/deeppavlov/core/trainers/fit_trainer.py +++ b/deeppavlov/core/trainers/fit_trainer.py @@ -169,7 +169,6 @@ def test(self, data: Iterable[Tuple[Collection[Any], Collection[Any]]], y_predicted = [y_predicted] for out, val in zip(outputs.values(), y_predicted): out += list(val) - if examples == 0: log.warning('Got empty data iterable for scoring') return {'eval_examples_count': 0, 'metrics': None, 'time_spent': str(datetime.timedelta(seconds=0))} @@ -177,7 +176,15 @@ def test(self, data: Iterable[Tuple[Collection[Any], Collection[Any]]], # metrics_values = [(m.name, m.fn(*[outputs[i] for i in m.inputs])) for m in metrics] metrics_values = [] for metric in metrics: - value = metric.fn(*[outputs[i] for i in metric.inputs]) + calculate_metric = True + for i in metric.inputs: + outputs[i] = [k for k in outputs[i] if k is not None] + if len(outputs[i]) == 0: + log.info(f'Metric {metric.alias} is not calculated due to absense of true and predicted samples') + calculate_metric = False + value = -1 + if calculate_metric: + value = metric.fn(*[outputs[i] for i in metric.inputs]) metrics_values.append((metric.alias, value)) report = { diff --git a/deeppavlov/core/trainers/utils.py b/deeppavlov/core/trainers/utils.py index 27e1d07647..f5bc768441 100644 --- a/deeppavlov/core/trainers/utils.py +++ b/deeppavlov/core/trainers/utils.py @@ -13,6 +13,7 @@ # limitations under the License. from collections import OrderedDict, namedtuple +from dataclasses import is_dataclass from functools import partial from json import JSONEncoder from typing import List, Tuple, Union, Iterable @@ -64,4 +65,6 @@ def default(self, obj): return int(obj) elif isinstance(obj, np.floating): return float(obj) + elif is_dataclass(obj): + return obj.__dict__ return JSONEncoder.default(self, obj) diff --git a/deeppavlov/dataset_iterators/huggingface_dataset_iterator.py b/deeppavlov/dataset_iterators/huggingface_dataset_iterator.py index 75a509fa17..dbd1261e6d 100644 --- a/deeppavlov/dataset_iterators/huggingface_dataset_iterator.py +++ b/deeppavlov/dataset_iterators/huggingface_dataset_iterator.py @@ -43,11 +43,15 @@ def preprocess(self, """ dataset = [] - for example in data: + for i in range(len(data)): # for example in data + example = data[i] if isinstance(features, str): feat = example[features] elif isinstance(features, list): - feat = tuple(example[f] for f in features) + try: + feat = tuple(example[f] for f in features) + except Exception as e: + raise Exception(f"{e} for example {example} while trying to find keys {features}") else: raise RuntimeError(f"features should be str or list, but found: {features}") lb = example[label] diff --git a/deeppavlov/dataset_iterators/multitask_iterator.py b/deeppavlov/dataset_iterators/multitask_iterator.py index 61e4939b5a..0d69c8a2a8 100644 --- a/deeppavlov/dataset_iterators/multitask_iterator.py +++ b/deeppavlov/dataset_iterators/multitask_iterator.py @@ -12,45 +12,301 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import math +import random from logging import getLogger -from typing import Optional +from typing import Iterator, Optional, Tuple, Union +import numpy as np + +from deeppavlov.core.common.errors import ConfigError +from deeppavlov.core.common.params import from_params +from deeppavlov.core.common.registry import register from deeppavlov.core.data.data_learning_iterator import DataLearningIterator log = getLogger(__name__) -class RepeatBatchGenerator: - """Repeating dataset. If there is not enough elements in the dataset to form another batch, elements for the batch - are drawn in the beginning of the dataset. Optionally dataset is reshuffled before a repeat. +@register('multitask_iterator') +class MultiTaskIterator: + """ + Class merges data from several dataset iterators. When used for batch generation batches from + merged dataset iterators are united into one batch. If sizes of merged datasets are different + smaller datasets are repeated until their size becomes equal to the largest dataset. + + Args: + data: dictionary which keys are task names and values are dictionaries with fields + ``"train", "valid", "test"``. + num_train_epochs: number of training epochs + tasks: dictionary which keys are task names and values are init params of dataset iterators. If task has + key-value pair ``'use_task_defaults': False`` task_defaults for this task dataset iterator will be ignored. + batch_size: batch_size + sampling_mode: mode of sampling we use. It can be plain, uniform or anneal. + gradient_accumulation_steps: number of gradient accumulation steps. Default is 1 + steps_per_epoch: number of steps per epoch. Nesessary if gradient_accumulation_steps > 1 + iterator_class_name: name of iterator class. + use_label_name, seed, features - parameters for the iterator class + one_element_tuples: if True, tuple of x consisting of one element is returned in this element. Default: True + task_defaults: default task parameters. + seed - random seed for sampling + + Attributes: + data: dictionary of data with fields "train", "valid" and "test" (or some of them) + """ + + def __init__( + self, + data: dict, + num_train_epochs: int, + tasks: dict, + batch_size: int = 8, + sampling_mode: str = 'plain', + gradient_accumulation_steps: int = 1, + steps_per_epoch: int = 0, + one_element_tuples: bool = True, + task_defaults: dict = None, + seed: int = 42, + **kwargs + ): + if data.keys() != tasks.keys(): + raise ConfigError("Task names from dataset reader don't mach task names from dataset iterator: " + f"{data.keys()} != {tasks.keys()}.") + self.task_iterators = {} + if task_defaults is None: + task_defaults = dict() + for task_name, task_params in tasks.items(): + if task_params.pop('use_task_defaults', True) is True: + task_config = copy.deepcopy(task_defaults) + task_config.update(task_params) + else: + task_config = task_params + try: + self.task_iterators[task_name] = from_params(task_config, data=data[task_name]) + except Exception as e: + log.error(f'Failed to initialize dataset_iterator for "{task_name}" task. Make sure that all parameters' + 'from `task_defaults` and task parameters are correct.') + raise e + self.n_tasks = len(tasks.keys()) + self.num_train_epochs = num_train_epochs + self.steps_per_epoch = steps_per_epoch + self.gradient_accumulation_steps = gradient_accumulation_steps + self.epochs_done = 0 + self.steps_taken = 0 + self.task_id = None + self.sampling_mode = sampling_mode + self.data = { + "train": self._extract_data_type("train"), + "valid": self._extract_data_type("valid"), + "test": self._extract_data_type("test"), + } + for mode in ["train", "valid", "test"]: + log.info(f'For {mode}') + for task_name in self.data[mode]: + log.info(f'{task_name} has {len(self.data[mode][task_name])} examples') + self.train_sizes = self._get_data_size("train") + if steps_per_epoch == 0: + self.steps_per_epoch = sum(self.train_sizes) // batch_size + else: + self.steps_per_epoch = steps_per_epoch + + def is_nan(a): + return a != a + + for mode in ['train', 'valid', 'test']: + for task in self.data[mode]: + for i in range(len(self.data[mode][task]) - 1, -1, -1): + x = self.data[mode][task][i][0] + y = self.data[mode][task][i][1] + if is_nan(x) or any([is_nan(z) for z in x]) or is_nan(y): + log.info(f'NAN detected {self.data[mode][task][i - 1:i]}') + del self.data[mode][task][i] + log.info(f'NAN for mode {mode} task {task} element {i} CLEARED') + elif isinstance(x, tuple) and len(x) == 1 and one_element_tuples: + # x is a tuple consisting of 1 element. return it as string + self.data[mode][task][i] = (x[0], y) + self.max_task_data_len = dict() + for data_type in self.data: + sizes = self._get_data_size(data_type) + self.max_task_data_len[data_type] = max(sizes) + random.seed(seed) + + def _get_data_size(self, data_type): + """Returns list of sizes of each dataset for the given data_type: train,test or valid.""" + return [len(self.data[data_type][key]) for key in self.data[data_type]] + + def _get_probs(self, data_type): + """Returns sampling probabilities for different sampling modes - plain, uniform or anneal""" + if self.sampling_mode == 'uniform': + sizes = [1 for _ in self._get_data_size(data_type)] + # as we sample uniformly + s = sum(sizes) + probs = [p / s for p in sizes] + elif self.sampling_mode == 'plain': + sizes = self._get_data_size(data_type) + n_samples = sum(sizes) + probs = [p / n_samples for p in sizes] + elif self.sampling_mode == 'anneal': + alpha = 1.0 - 0.8 * (self.epochs_done / self.num_train_epochs) + annealed_sizes = [p ** alpha for p in self._get_data_size(data_type)] + n_samples = sum(annealed_sizes) + probs = [p / n_samples for p in annealed_sizes] + else: + raise ValueError(f'Unsupported sampling mode {self.sampling_mode}') + return probs + + def _extract_data_type(self, data_type): + """Function that merges data of the current data_type (e.g. train) from all task_iterators into one dict""" + dataset_part = {} + for task, iterator in self.task_iterators.items(): + dataset_part[task] = getattr(iterator, data_type) + return dataset_part + def _transform_before_yielding(self, x, y, batch_size): + """Function that transforms data from dataset before yielding""" + + if len(x) != len(y): + raise Exception(f'x has len {len(x)} but y has len {len(y)}') + new_x, new_y = [], [] + for i in range(batch_size): + x_tuple = tuple([x[t_id][i] for t_id in range(self.n_tasks)]) + y_tuple = tuple([y[t_id][i] for t_id in range(self.n_tasks)]) + if self.n_tasks == 1: + x_tuple = x_tuple[0] + y_tuple = y_tuple[0] + new_x.append(x_tuple) + new_y.append(y_tuple) + batches = (tuple(new_x), tuple(new_y)) + return batches + + def gen_batches(self, batch_size: int, data_type: str = "train", + shuffle: bool = None) -> Iterator[Tuple[tuple, tuple]]: + """ + Generates batches and expected output to train neural networks. + If there are not enough samples from any task, samples are padded with None + Args: + batch_size: number of samples in batch + data_type: can be either 'train', 'test', or 'valid' + shuffle: whether to shuffle dataset before batching + Yields: + A tuple of a batch of inputs and a batch of expected outputs. + Inputs and outputs are tuples. Element of inputs or outputs is a tuple which + elements are x values of merged tasks in the order tasks are present in + `tasks` argument of `__init__` method. + """ + + max_task_data_len = self.max_task_data_len[data_type] + log.info(f'Batch size {batch_size} with gradient accumulation steps {self.gradient_accumulation_steps}') + log.info(f'Efficient batch size {batch_size // self.gradient_accumulation_steps}') + batch_size = batch_size // self.gradient_accumulation_steps + + if data_type == "train": + generators = [ + SingleTaskBatchGenerator(iter_, batch_size, data_type, shuffle) + for iter_ in self.task_iterators.values() + ] + # probs only required while training + probs = self._get_probs("train") + for step in range(self.steps_per_epoch): + if (self.steps_taken + 1) % self.gradient_accumulation_steps == 0 or self.task_id is None: + self.task_id = np.random.choice(self.n_tasks, p=probs) + x = [[None for _ in range(batch_size)] for _ in range(self.n_tasks)] + y = [[None for _ in range(batch_size)] for _ in range(self.n_tasks)] + x[self.task_id], y[self.task_id] = generators[self.task_id].__next__() + if not all([s is None for s in x[self.task_id]]): + batch_to_yield = self._transform_before_yielding( + x, y, batch_size) + yield batch_to_yield + + self.epochs_done += 1 + # one additional step is taken while logging training metrics + self.steps_taken -= 1 + else: + eval_batch_size = 1 + x = [[None for _ in range(eval_batch_size)] for _ in range(self.n_tasks)] + y = [[None for _ in range(eval_batch_size)] for _ in range(self.n_tasks)] + generators = [ + SingleTaskBatchGenerator( + iter_, batch_size=eval_batch_size, data_type=data_type, shuffle=shuffle) + for iter_ in self.task_iterators.values() + ] + for step in range(max_task_data_len): + for task_id in range(self.n_tasks): + x[task_id], y[task_id] = generators[task_id].__next__() + + batches = self._transform_before_yielding(x, y, eval_batch_size) + yield batches + + def get_instances(self, data_type: str = "train"): + """ + Returns a tuple of inputs and outputs from all datasets. Lengths of + and outputs are equal to the size of the largest dataset. Smaller + datasets are padded with Nones until their sizes are equal to the size of the + largest dataset. + Args: + data_type: can be either 'train', 'test', or 'valid' + Returns: + A tuple of all inputs for a data type and all expected outputs + for a data type. + """ + + max_task_data_len = max( + [ + len(iter_.get_instances(data_type)[0]) + for iter_ in self.task_iterators.values() + ] + ) + x_instances = [] + y_instances = [] + for task_name, iter_ in self.task_iterators.items(): + x, y = iter_.get_instances(data_type) + n_repeats = math.ceil(max_task_data_len / len(x)) + x *= n_repeats + y *= n_repeats + x_instances.append(x[:max_task_data_len]) + y_instances.append(y[:max_task_data_len]) + error_msg = f'Len of x_instances {len(x_instances)} and y_instances {len(y_instances)} dont match' + if len(x_instances) != len(y_instances): + raise Exception(error_msg) + instances = (tuple(zip(*x_instances)), tuple(zip(*y_instances))) + return instances + + +class SingleTaskBatchGenerator: + """ + Batch generator for a single task. + If there are no elements in the dataset to form another batch, Nones are returned. Args: dataset_iterator: dataset iterator from which batches are drawn. batch_size: size fo the batch. data_type: "train", "valid", or "test" - shuffle: whether dataset will be shuffled before each repeat. + shuffle: whether dataset will be shuffled. n_batches: the number of batches that will be generated. - size_of_the_last_batch: used if dataset size is not evenly divisible by batch size. """ + def __init__( - self, - dataset_iterator: DataLearningIterator, - batch_size: int, - data_type: str, - shuffle: bool, - n_batches: Optional[int] = None, - size_of_last_batch: Optional[int] = None + self, + dataset_iterator: Union[DataLearningIterator], + batch_size: int, + data_type: str, + shuffle: bool, + n_batches: Optional[int] = None, + size_of_last_batch: Optional[int] = None, ): self.dataset_iterator = dataset_iterator self.batch_size = batch_size self.data_type = data_type self.shuffle = shuffle self.n_batches = n_batches - self.size_of_last_batch = self.batch_size if size_of_last_batch is None else size_of_last_batch - - self.inner_batch_size = math.gcd(len(self.dataset_iterator.data[data_type]), batch_size) - self.gen = self.dataset_iterator.gen_batches(self.inner_batch_size, self.data_type, self.shuffle) + self.size_of_last_batch = ( + self.batch_size if size_of_last_batch is None else size_of_last_batch) + + self.inner_batch_size = math.gcd( + len(self.dataset_iterator.data[data_type]), batch_size + ) + self.gen = self.dataset_iterator.gen_batches( + self.inner_batch_size, self.data_type, self.shuffle + ) self.batch_count = 0 def __iter__(self): @@ -63,18 +319,15 @@ def __next__(self): while len(x) < self.batch_size or len(y) < self.batch_size: try: xx, yy = next(self.gen) + x += xx + y += yy except StopIteration: - self.gen = self.dataset_iterator.gen_batches(self.inner_batch_size, self.data_type, self.shuffle) - continue - assert len(xx) == self.inner_batch_size and len(yy) == self.inner_batch_size, \ - "self.inner_batch_size equals greatest common divisor of dataset size and " \ - "required batch size so dataset size has to divisible by task batch size evenly." - x += xx - y += yy - assert len(x) == self.batch_size and len(y) == self.batch_size + x_nones = tuple([None for _ in range(self.batch_size)]) + y_nones = x_nones + return x_nones, y_nones + self.batch_count += 1 if self.batch_count == self.n_batches: x = x[:self.size_of_last_batch] y = y[:self.size_of_last_batch] return x, y - diff --git a/deeppavlov/dataset_readers/basic_classification_reader.py b/deeppavlov/dataset_readers/basic_classification_reader.py index c354d2dc11..81a6738492 100644 --- a/deeppavlov/dataset_readers/basic_classification_reader.py +++ b/deeppavlov/dataset_readers/basic_classification_reader.py @@ -76,7 +76,7 @@ def read(self, data_path: str, url: str = None, file_name = kwargs.get(data_type, '{}.{}'.format(data_type, format)) if file_name is None: continue - + file = Path(data_path).joinpath(file_name) if file.exists(): if format == 'csv': diff --git a/deeppavlov/dataset_readers/conll2003_reader.py b/deeppavlov/dataset_readers/conll2003_reader.py index 8d4bb63fc2..dd76547a4e 100644 --- a/deeppavlov/dataset_readers/conll2003_reader.py +++ b/deeppavlov/dataset_readers/conll2003_reader.py @@ -20,7 +20,8 @@ def read(self, provide_doc_ids: bool = False, iob: bool = False, iobes: bool = False, - docstart_token: str = None): + docstart_token: str = None, + *args, **kwargs): self.provide_pos = provide_pos self.provide_chunk = provide_chunk self.provide_doc_ids = provide_doc_ids diff --git a/deeppavlov/dataset_readers/huggingface_dataset_reader.py b/deeppavlov/dataset_readers/huggingface_dataset_reader.py index 0ea7e1b293..2e62bf966c 100644 --- a/deeppavlov/dataset_readers/huggingface_dataset_reader.py +++ b/deeppavlov/dataset_readers/huggingface_dataset_reader.py @@ -32,7 +32,6 @@ class HuggingFaceDatasetReader(DatasetReader): @overrides def read(self, - data_path: str, path: str, name: Optional[str] = None, train: Optional[str] = None, # for lidirus with no train @@ -42,7 +41,6 @@ def read(self, """Wraps datasets.load_dataset method Args: - data_path: DeepPavlov's data_path argument, is not used, but passed by trainer path: datasets.load_dataset path argument (e.g., `glue`) name: datasets.load_dataset name argument (e.g., `mrpc`) train: split name to use as training data. diff --git a/deeppavlov/dataset_readers/multitask_reader.py b/deeppavlov/dataset_readers/multitask_reader.py new file mode 100644 index 0000000000..6bb3fe511a --- /dev/null +++ b/deeppavlov/dataset_readers/multitask_reader.py @@ -0,0 +1,52 @@ +# Copyright 2017 Neural Networks and Deep Learning lab, MIPT +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +from logging import getLogger +from typing import Dict + +from deeppavlov.core.common.registry import get_model, register +from deeppavlov.core.data.dataset_reader import DatasetReader + +log = getLogger(__name__) + + +@register('multitask_reader') +class MultiTaskReader(DatasetReader): + """Class to read several datasets simultaneously.""" + + def read(self, tasks: Dict[str, Dict[str, dict]], task_defaults: dict = None, **kwargs): + """Creates dataset readers for tasks and returns what task dataset readers `read()` methods return. + + Args: + tasks: dictionary which keys are task names and values are dictionaries with param name - value pairs for + nested dataset readers initialization. If task has key-value pair ``'use_task_defaults': False``, + task_defaults for this task dataset reader will be ignored. + task_defaults: default task parameters. + + Returns: + dictionary which keys are task names and values are what task readers `read()` methods returned. + """ + data = dict() + if task_defaults is None: + task_defaults = dict() + for task_name, task_params in tasks.items(): + if task_params.pop('use_task_defaults', True) is True: + task_config = copy.deepcopy(task_defaults) + task_config.update(task_params) + else: + task_config = task_params + reader = get_model(task_config.pop('class_name'))() + data[task_name] = reader.read(**task_config) + return data diff --git a/deeppavlov/dataset_readers/squad_dataset_reader.py b/deeppavlov/dataset_readers/squad_dataset_reader.py index 078cf46d8b..cbd4ef0985 100644 --- a/deeppavlov/dataset_readers/squad_dataset_reader.py +++ b/deeppavlov/dataset_readers/squad_dataset_reader.py @@ -52,12 +52,12 @@ class SquadDatasetReader(DatasetReader): url_multi_squad = 'http://files.deeppavlov.ai/datasets/multiparagraph_squad.tar.gz' url_squad2 = 'http://files.deeppavlov.ai/datasets/squad-v2.0.tar.gz' - def read(self, dir_path: str, dataset: Optional[str] = 'SQuAD', url: Optional[str] = None, *args, **kwargs) \ + def read(self, data_path: str, dataset: Optional[str] = 'SQuAD', url: Optional[str] = None, *args, **kwargs) \ -> Dict[str, Dict[str, Any]]: """ Args: - dir_path: path to save data + data_path: path to save data dataset: default dataset names: ``'SQuAD'``, ``'SberSQuAD'`` or ``'MultiSQuAD'`` url: link to archive with dataset, use url argument if non-default dataset is used @@ -80,19 +80,19 @@ def read(self, dir_path: str, dataset: Optional[str] = 'SQuAD', url: Optional[st else: raise RuntimeError(f'Dataset {dataset} is unknown') - dir_path = Path(dir_path) + data_path = Path(data_path) if dataset == "SQuAD2.0": required_files = [f'{dt}-v2.0.json' for dt in ['train', 'dev']] else: required_files = [f'{dt}-v1.1.json' for dt in ['train', 'dev']] - dir_path.mkdir(parents=True, exist_ok=True) + data_path.mkdir(parents=True, exist_ok=True) - if not all((dir_path / f).exists() for f in required_files): - download_decompress(self.url, dir_path) + if not all((data_path / f).exists() for f in required_files): + download_decompress(self.url, data_path) dataset = {} for f in required_files: - with dir_path.joinpath(f).open('r', encoding='utf8') as fp: + with data_path.joinpath(f).open('r', encoding='utf8') as fp: data = json.load(fp) if f in {'dev-v1.1.json', 'dev-v2.0.json'}: dataset['valid'] = data @@ -118,12 +118,12 @@ class MultiSquadDatasetReader(DatasetReader): url_multi_squad_retr = 'http://files.deeppavlov.ai/datasets/multi_squad_retr_enwiki20161221.tar.gz' url_multi_squad_ru_retr = 'http://files.deeppavlov.ai/datasets/multi_squad_ru_retr.tar.gz' - def read(self, dir_path: str, dataset: Optional[str] = 'MultiSQuADRetr', url: Optional[str] = None, *args, + def read(self, data_path: str, dataset: Optional[str] = 'MultiSQuADRetr', url: Optional[str] = None, *args, **kwargs) -> Dict[str, Dict[str, Any]]: """ Args: - dir_path: path to save data + data_path: path to save data dataset: default dataset names: ``'MultiSQuADRetr'``, ``'MultiSQuADRuRetr'`` url: link to archive with dataset, use url argument if non-default dataset is used @@ -142,19 +142,19 @@ def read(self, dir_path: str, dataset: Optional[str] = 'MultiSQuADRetr', url: Op else: raise RuntimeError(f'Dataset {dataset} is unknown') - dir_path = Path(dir_path) + data_path = Path(data_path) required_files = [f'{dt}.jsonl' for dt in ['train', 'dev']] - if not dir_path.exists(): - dir_path.mkdir(parents=True) + if not data_path.exists(): + data_path.mkdir(parents=True) - if not all((dir_path / f).exists() for f in required_files): - download_decompress(self.url, dir_path) + if not all((data_path / f).exists() for f in required_files): + download_decompress(self.url, data_path) dataset = {} for f in required_files: if 'dev' in f: - dataset['valid'] = dir_path.joinpath(f) + dataset['valid'] = data_path.joinpath(f) else: - dataset['train'] = dir_path.joinpath(f) + dataset['train'] = data_path.joinpath(f) return dataset diff --git a/deeppavlov/metrics/accuracy.py b/deeppavlov/metrics/accuracy.py index 560d92ee71..86602ed508 100644 --- a/deeppavlov/metrics/accuracy.py +++ b/deeppavlov/metrics/accuracy.py @@ -61,8 +61,11 @@ def multitask_accuracy(*args) -> float: """ n = len(args) y_true_by_tasks, y_predicted_by_tasks = args[:n // 2], args[n // 2:] - y_true, y_predicted = list(zip(*y_true_by_tasks)), list(zip(*y_predicted_by_tasks)) - return accuracy(y_true, y_predicted) + answers = [] + for true, pred in zip(y_true_by_tasks, y_predicted_by_tasks): + answers.append(accuracy(true, pred)) + final_answer = sum(answers)/len(answers) + return final_answer @register_metric('multitask_sequence_accuracy') diff --git a/deeppavlov/metrics/record_metrics.py b/deeppavlov/metrics/record_metrics.py index b193eacd56..9711565253 100644 --- a/deeppavlov/metrics/record_metrics.py +++ b/deeppavlov/metrics/record_metrics.py @@ -26,7 +26,8 @@ def record_f1_score(record_examples: List[RecordNestedExample]): example_f1s = [] for answer in example.answers: example_f1s.append(exact_match_score(example.prediction, answer)) - f1_scores.append(max(example_f1s)) + if example_f1s: + f1_scores.append(max(example_f1s)) return np.mean(f1_scores) @@ -47,8 +48,9 @@ def record_em_score(record_examples: List[RecordNestedExample]): example_ems = [] for answer in example.answers: example_ems.append(string_f1_score(example.prediction, answer)) - em_scores.append(max(example_ems)) - return np.mean(em_scores) + if example_ems: + em_scores.append(max(example_ems)) + return np.mean(em_scores) if em_scores else -1 def normalize_answer(s): diff --git a/deeppavlov/models/classifiers/proba2labels.py b/deeppavlov/models/classifiers/proba2labels.py index 3fa485b32d..45e71e05d8 100644 --- a/deeppavlov/models/classifiers/proba2labels.py +++ b/deeppavlov/models/classifiers/proba2labels.py @@ -13,7 +13,6 @@ # limitations under the License. from logging import getLogger -from typing import List, Union import numpy as np @@ -56,31 +55,35 @@ def __init__(self, self.is_binary = is_binary def __call__(self, - data: Union[np.ndarray, - List[List[float]], - List[List[int]]], *args, - **kwargs) -> Union[List[List[int]], List[int]]: + **kwargs): """ Process probabilities to labels - Args: - data: list of vectors with probability distribution - + Every argument is a list of vectors with probability distribution Returns: - list of labels (only label classification) or list of lists of labels (multi-label classification) + list of labels (only label classification) or list of lists of labels (multi-label classification), + or list of the following lists (in multitask setting) for every argument """ - if self.confidence_threshold: - if self.is_binary: - return [int(el > self.confidence_threshold) for el in data] + answer = [] + log.debug(f'input {args}') + for data in args: + if all([k is None for k in data]): + answer.append([]) + elif self.confidence_threshold: + if self.is_binary: + answer.append([int(el > self.confidence_threshold) for el in data]) + else: + answer.append([list(np.where(np.array(d) > self.confidence_threshold)[0]) for d in data]) + elif self.max_proba: + answer.append([np.argmax(d) for d in data]) + elif self.top_n: + answer.append([np.argsort(d)[::-1][:self.top_n] for d in data]) else: - return [list(np.where(np.array(d) > self.confidence_threshold)[0]) - for d in data] - elif self.max_proba: - return [np.argmax(d) for d in data] - elif self.top_n: - return [np.argsort(d)[::-1][:self.top_n] for d in data] - else: - raise ConfigError("Proba2Labels requires one of three arguments: bool `max_proba` or " - "float `confidence_threshold` for multi-label classification or" - "integer `top_n` for choosing several labels with the highest probabilities") + raise ConfigError("Proba2Labels requires one of three arguments: bool `max_proba` or " + "float `confidence_threshold` for multi-label classification or" + "integer `top_n` for choosing several labels with the highest probabilities") + if len(args) == 1: # only one argument + answer = answer[0] + log.debug(f'output {answer}') + return answer diff --git a/deeppavlov/models/preprocessors/multitask_preprocessor.py b/deeppavlov/models/preprocessors/multitask_preprocessor.py new file mode 100644 index 0000000000..abf25b8586 --- /dev/null +++ b/deeppavlov/models/preprocessors/multitask_preprocessor.py @@ -0,0 +1,126 @@ +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Iterable +from logging import getLogger + +from deeppavlov.core.common.registry import register +from deeppavlov.core.models.component import Component +from deeppavlov.models.preprocessors.torch_transformers_preprocessor import * + +log = getLogger(__name__) + + +@register('multitask_pipeline_preprocessor') +class MultiTaskPipelinePreprocessor(Component): + """ + Extracts out the task_id from the first index of each example for each task. + Then splits the input and performs tokenization + Params: + + vocab_file(str): vocabulary file for tokenization + do_lower_case(bool): if True, tokenization is lower-cased. Default: True + preprocessor(str): name of DeepPavlov class that is used for tokenization. + Default: TorchTransformersPreprocessor + preprocessors(List[str]): list of names of DeepPavlov classes that are used for tokenization. + Overrides preprocessor . The length of list must be equal to the number of tasks + max_seq_length(int): Maximum sequence length for tokenizer. Default: 512 + strict(bool): if True, we always try to split data assuming predefined modes as in multitask_example.json + If False, we go without splitting if we are not sure how to split the data. Default: False + print_first_example(bool): if True, we print the first input example after initialization. Default: False + """ + + def __init__(self, + vocab_file, + do_lower_case: bool = True, + preprocessor: str = 'TorchTransformersPreprocessor', + preprocessors: List[str] = None, + max_seq_length: int = 512, + strict=False, + print_first_example=False, + *args, **kwargs): + self.strict = strict + self.printed = False + self.print_first_example = print_first_example + self.prefix = '' + if preprocessors is None: + log.info( + f'Assuming the same preprocessor name for all : {preprocessor}') + self.preprocessor = eval(preprocessor)(vocab_file, do_lower_case, + max_seq_length, *args, **kwargs) + self.preprocessors = None + else: + for i in range(len(preprocessors)): + preprocessors[i] = eval(preprocessors[i]) + self.n_task = len(preprocessors) + self.preprocessors = [preprocessors[i](vocab_file=vocab_file, do_lower_case=do_lower_case, + max_seq_length=max_seq_length, + *args, **kwargs) for i in range(len(preprocessors))] + + def split(self, features): + if all([isinstance(k, str) for k in features]) or all([k is None for k in features]): + # single sentence classification + log.debug('Assuming single sentence classification') + texts_a, texts_b = features, None + elif all([isinstance(k, tuple) and len(k) == 2 for k in features]): + log.debug( + 'Assuming sentence pair classification or classification for multichoice') + texts_a, texts_b = [], [] + for feature in features: + text_a, text_b = feature + texts_a.append(text_a) + texts_b.append(text_b) + elif all([isinstance(k, list) for k in features]): + log.debug('Assuming ner classification') + texts_a, texts_b = list(features), None + else: + if self.strict: + raise Exception(f'Unsupported task data {features}') + else: + log.warning('Data not split.Going without splitting') + texts_a, texts_b = features, None + return texts_a, texts_b + + def __call__(self, *args): + """ + Returns batches of values from ``inp``. Every batch contains values that have same key from + ``keys_to_extract`` attribute. The order of elements of ``keys_to_extract`` is preserved. + + Args: + inp: A sequence of dictionaries with identical keys + + Returns: + A list of lists of values of dictionaries from ``inp`` + """ + self.n_task = len(args) + if self.preprocessors is None: + # Defining preprocessor list while we call the function, as only he + self.preprocessors = [self.preprocessor + for _ in range(self.n_task)] + answer = [] + for i in range(len(args)): + if all([j is None for j in args[i]]): + log.debug('All nones received') + answer.append([]) + else: + texts_a, texts_b = self.split(args[i]) + #log.debug(f'Preprocessor {self.preprocessors[i]}') + if all([j is None for j in texts_a]): + log.debug('All nones') + answer.append([]) + else: + if 'choice' in str(self.preprocessors[i]): + if isinstance(texts_a[0], str) and isinstance(texts_b[0],list): + for j in range(len(texts_b)): + texts_a[j] = [texts_a[j] for _ in range(len(texts_b[j]))] + if self.prefix: + for j in range(len(texts_a)): + texts_a[j] = [' '.join([self.prefix, text]) for text in texts_a[j]] + else: + if self.prefix: + texts_a = [' '.join([self.prefix, text]) for text in texts_a] + answer.append(self.preprocessors[i](texts_a, texts_b)) + if not self.printed and self.print_first_example: + print((texts_a, texts_b)) + print(answer[-1]) + self.printed = True + if answer == [[]]: + raise Exception('Empty answer') + return answer diff --git a/deeppavlov/models/preprocessors/torch_transformers_preprocessor.py b/deeppavlov/models/preprocessors/torch_transformers_preprocessor.py index 8bc2daec34..dc6e4dad28 100644 --- a/deeppavlov/models/preprocessors/torch_transformers_preprocessor.py +++ b/deeppavlov/models/preprocessors/torch_transformers_preprocessor.py @@ -59,10 +59,9 @@ def __init__(self, self.max_seq_length = max_seq_length if Path(vocab_file).is_file(): vocab_file = str(expand_path(vocab_file)) - self.tokenizer = AutoTokenizer(vocab_file=vocab_file, - do_lower_case=do_lower_case) + self.tokenizer = AutoTokenizer(vocab_file=vocab_file, do_lower_case=do_lower_case, **kwargs) else: - self.tokenizer = AutoTokenizer.from_pretrained(vocab_file, do_lower_case=do_lower_case) + self.tokenizer = AutoTokenizer.from_pretrained(vocab_file, do_lower_case=do_lower_case, **kwargs) def tokenize_mc_examples(self, contexts: List[List[str]], @@ -107,8 +106,9 @@ def __call__(self, texts_a: List[List[str]], texts_b: List[List[str]] = None) -> batch of :class:`transformers.data.processors.utils.InputFeatures` with subtokens, subtoken ids, \ subtoken mask, segment mask, or tuple of batch of InputFeatures and Batch of subtokens """ - - input_features = self.tokenize_mc_examples(texts_a, texts_b) + input_features = [] + if texts_a and texts_b and texts_a[0] and texts_b[0]: + input_features = self.tokenize_mc_examples(texts_a, texts_b) return input_features @@ -134,11 +134,11 @@ def __init__(self, max_seq_length: int = 512, **kwargs) -> None: self.max_seq_length = max_seq_length - self.tokenizer = AutoTokenizer.from_pretrained(vocab_file, do_lower_case=do_lower_case) + self.tokenizer = AutoTokenizer.from_pretrained(vocab_file, do_lower_case=do_lower_case, **kwargs) - def __call__(self, texts_a: List[str], texts_b: Optional[List[str]] = None) -> Union[List[InputFeatures], - Tuple[List[InputFeatures], - List[List[str]]]]: + def __call__(self, texts_a: List, texts_b: Optional[List[str]] = None) -> Union[List[InputFeatures], + Tuple[List[InputFeatures], + List[List[str]]]]: """Tokenize and create masks. texts_a and texts_b are separated by [SEP] token Args: @@ -152,6 +152,12 @@ def __call__(self, texts_a: List[str], texts_b: Optional[List[str]] = None) -> U # in case of iterator's strange behaviour if isinstance(texts_a, tuple): texts_a = list(texts_a) + elif isinstance(texts_a, str): + raise TypeError(f'Received string {texts_a} as an input! Check the iterator output') + elif texts_a == []: + return {} + + texts_a = [k for k in texts_a if k is not None] # handle dummy output input_features = self.tokenizer(text=texts_a, text_pair=texts_b, @@ -413,11 +419,11 @@ def __call__(self, questions_batch: List[List[str]], rels_batch: List[List[str]] token_type_ids_batch.append(encoding["token_type_ids"]) else: token_type_ids_batch.append([0]) - + input_features = {"input_ids": torch.LongTensor(input_ids_batch), "attention_mask": torch.LongTensor(attention_mask_batch), "token_type_ids": torch.LongTensor(token_type_ids_batch)} - + return input_features @@ -439,6 +445,7 @@ class TorchTransformersNerPreprocessor(Component): provide_subword_tags: output tags for subwords or for words subword_mask_mode: subword to select inside word tokens, can be "first" or "last" (default="first") + return_features: if True, returns answer in features format Attributes: max_seq_length: max sequence length in subtokens, including [SEP] and [CLS] tokens @@ -454,6 +461,7 @@ def __init__(self, token_masking_prob: float = 0.0, provide_subword_tags: bool = False, subword_mask_mode: str = "first", + return_features: bool = False, **kwargs): self._re_tokenizer = re.compile(r"[\w']+|[^\w ]") self.provide_subword_tags = provide_subword_tags @@ -468,6 +476,7 @@ def __init__(self, else: self.tokenizer = AutoTokenizer.from_pretrained(vocab_file, do_lower_case=do_lower_case) self.token_masking_prob = token_masking_prob + self.return_features = return_features def __call__(self, tokens: Union[List[List[str]], List[str]], @@ -533,9 +542,24 @@ def __call__(self, log.warning(f'Markers len: {len(swms)}, sum: {sum(swms)}') log.warning(f'Masks: {swms}') log.warning(f'Tags len: {len(ts)}\n Tags: {ts}') + if self.return_features: + feature_list = ({'input_ids': torch.Tensor(subword_tok_ids), + 'attention_mask': torch.Tensor(attention_mask), + 'token_type_ids': torch.Tensor(startofword_markers), + 'labels': torch.Tensor(nonmasked_tags)}) + return feature_list + else: return tokens, subword_tokens, subword_tok_ids, \ - attention_mask, startofword_markers, nonmasked_tags - return tokens, subword_tokens, subword_tok_ids, startofword_markers, attention_mask, tokens_offsets_batch + attention_mask, startofword_markers, nonmasked_tags + if self.return_features: + feature_list = ({'input_ids': torch.Tensor(subword_tok_ids), + 'attention_mask': torch.Tensor(attention_mask), + 'token_type_ids': torch.Tensor(startofword_markers) + }) + return feature_list + else: + return tokens, subword_tokens, subword_tok_ids, \ + startofword_markers, attention_mask, tokens_offsets_batch @staticmethod def _ner_bert_tokenize(tokens: List[str], @@ -684,6 +708,12 @@ def __call__(self, Returns: List[RecordNestedExample]: processed but not previously returned examples (may be empty in some cases) """ + if isinstance(y_pred_probas, list): + y_pred_probas = [k for k in y_pred_probas if k is not None] + y = [k for k in y if k is not None] + y_pred_probas = np.array(y_pred_probas) + if y == []: + return [] if not self.is_binary: # if we have outputs for both classes `0` and `1` y_pred_probas = y_pred_probas[:, 1] @@ -698,6 +728,7 @@ def __call__(self, if self.record_example_accumulator.examples_processed >= self.total_examples: # start over if all examples were processed self.reset_accumulator() + return self.record_example_accumulator.return_examples() def reset_accumulator(self): @@ -790,6 +821,7 @@ def return_examples(self) -> List[RecordNestedExample]: for index in indices_to_return: examples_to_return.append(self.nested_examples[index]) self.returned_indices.update(indices_to_return) + log.debug(f'Returning {examples_to_return}') return examples_to_return @staticmethod diff --git a/deeppavlov/models/torch_bert/multitask_transformer.py b/deeppavlov/models/torch_bert/multitask_transformer.py new file mode 100644 index 0000000000..cead7a99c5 --- /dev/null +++ b/deeppavlov/models/torch_bert/multitask_transformer.py @@ -0,0 +1,651 @@ +# Copyright 2017 Neural Networks and Deep Learning lab, MIPT +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Iterable +from logging import getLogger +from pathlib import Path +from typing import Dict, Optional + +import numpy as np +import torch +import torch.nn as nn +from overrides import overrides +from torch.nn import CrossEntropyLoss, MSELoss, BCEWithLogitsLoss +from transformers import AutoConfig, AutoModel + +from deeppavlov.core.common.errors import ConfigError +from deeppavlov.core.common.registry import register +from deeppavlov.core.models.torch_model import TorchModel +from deeppavlov.models.torch_bert.torch_transformers_sequence_tagger import token_from_subtoken, \ + token_labels_to_subtoken_labels + +log = getLogger(__name__) + + +class FocalLoss(nn.Module): + "Non weighted version of Focal Loss" + + def __init__(self, alpha=.5, gamma=2, categorical_loss=False, weight=None): + super(FocalLoss, self).__init__() + self.alpha = torch.tensor([alpha, 1 - alpha]).cuda() + self.gamma = gamma + self.categorical = categorical_loss + self.weight = weight + + def forward(self, inputs, targets): + if self.categorical: + loss = CrossEntropyLoss(weight=self.weight, reduction='none')(inputs, targets) + else: + loss = BCEWithLogitsLoss(weight=self.weight, reduction='none')(inputs, targets) + targets = targets.type(torch.long) + at = self.alpha.gather(0, targets.data.view(-1)) + pt = torch.exp(-loss) + F_loss = at * (1 - pt) ** self.gamma * loss + return F_loss.mean() + + +def SoftCrossEntropyLoss(inputs, targets): + logprobs = torch.nn.functional.log_softmax(inputs, dim=1) + return -(targets * logprobs).sum() / inputs.shape[0] + + +def we_transform_input(name): + return name in ['sequence_labeling', 'multiple_choice'] + + +class BertForMultiTask(nn.Module): + """ + BERT model for multiple choice,sequence labeling, ner, classification or regression + This module is composed of the BERT model with a linear layer on top of + the pooled output. + Params: + task_num_classes + task_types + backbone_model - na + """ + + def __init__(self, tasks_num_classes, multilabel, task_types, + weights, backbone_model='bert_base_uncased', + dropout=None, new_model=False,focal=False,one_hot_labels=None, + max_seq_len=320, model_takes_token_type_ids=True): + + super(BertForMultiTask, self).__init__() + config = AutoConfig.from_pretrained(backbone_model, output_hidden_states=True, output_attentions=True) + self.bert = AutoModel.from_pretrained(pretrained_model_name_or_path=backbone_model, + config=config) + self.classes = tasks_num_classes # classes for every task + self.weights = weights + self.multilabel = multilabel + self.new_model = new_model + self.one_hot_labels=one_hot_labels + self.model_takes_token_type_ids = model_takes_token_type_ids + if dropout is not None: + self.dropout = nn.Dropout(dropout) + elif hasattr(config, 'hidden_dropout_prob'): + self.dropout = nn.Dropout(config.hidden_dropout_prob) + elif hasattr(config, 'seq_classif_dropout'): + self.dropout = nn.Dropout(config.seq_classif_dropout) + elif hasattr(config, 'dropout'): + self.dropout = nn.Dropout(config.dropout) + else: + self.dropout = nn.Dropout(0) + self.max_seq_len = max_seq_len + self.activation = nn.Tanh() + self.task_types = task_types + self.focal=focal + OUT_DIM = config.hidden_size + if self.new_model and self.new_model!=2: + OUT_DIM = OUT_DIM * 2 + self.bert.final_classifier = nn.ModuleList( + [ + nn.Linear(OUT_DIM, num_labels) if self.task_types[i] not in ['multiple_choice', + 'regression', 'binary_head'] + else nn.Linear(OUT_DIM, 1) for i, num_labels in enumerate(self.classes) + ] + ) + if self.new_model:# or True: + self.bert.pooling_layer = nn.Linear(OUT_DIM, OUT_DIM) + else: + self.bert.pooler = nn.Linear(OUT_DIM, OUT_DIM) + + def get_logits(self, task_id, input_ids, attention_mask, token_type_ids): + name = self.task_types[task_id] + outputs = None + if we_transform_input(name): + input_ids = input_ids.view(-1, input_ids.size(-1)) + attention_mask = attention_mask.view(-1, attention_mask.size(-1)) + if token_type_ids is not None: + token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1)) + if token_type_ids is None or not self.model_takes_token_type_ids: + outputs = self.bert(input_ids=input_ids.long(), + attention_mask=attention_mask.long()) + else: + try: + outputs = self.bert(input_ids=input_ids.long(), + token_type_ids=token_type_ids.long(), + attention_mask=attention_mask.long()) + except Exception as e: + if "forward() got an unexpected keyword argument 'token_type_ids'" in str(e): + outputs = self.bert(input_ids=input_ids.long(), + attention_mask=attention_mask.long()) + self.model_takes_token_type_ids=False + else: + raise e + if name == 'sequence_labeling': + return outputs.last_hidden_state + elif self.new_model == 2: + return outputs.last_hidden_state[:, task_id] + elif self.new_model: + return torch.cat([outputs.last_hidden_state[:, 0], outputs.last_hidden_state[:, task_id + 1]], axis=1) + else: + return outputs.last_hidden_state[:, 0] + + def predict_on_top(self, task_id, last_hidden_state, labels=None): + name = self.task_types[task_id] + if name == 'sequence_labeling': + # last hidden state is all token tensor + final_output = self.dropout(last_hidden_state) + logits = self.bert.final_classifier[task_id](final_output) + if labels is not None: + active_logits = logits.view(-1, self.classes[task_id]) + if self.multilabel[task_id]: + loss_fct = BCEWithLogitsLoss() + loss = loss_fct(active_logits, labels) + elif not self.multilabel[task_id]: + loss_fct = CrossEntropyLoss() + loss = loss_fct(active_logits, labels.view(-1)) + return loss, logits + else: + return logits + elif name in ['classification', 'regression', 'multiple_choice']: + # last hidden state is a first token tensor + if self.new_model: # or True: + pooled_output = self.bert.pooling_layer(last_hidden_state) + else: + pooled_output = self.bert.pooler(last_hidden_state) + pooled_output = self.activation(pooled_output) + pooled_output = self.dropout(pooled_output) + logits = self.bert.final_classifier[task_id](pooled_output) + if name == 'multiple_choice': + logits = logits.view((-1, self.classes[task_id])) + if labels is not None: + l1, l2 = len(logits), len(labels) + if len(logits) != len(labels): + raise Exception(f'Len of logits {l1} and labels {l2} not match') + if labels is not None: + if name != "regression": + if self.multilabel[task_id]: + loss_fct = BCEWithLogitsLoss() + loss = loss_fct(logits, labels) + elif not self.multilabel[task_id]: + if self.focal: + if self.weights[task_id] is None: + loss_fct = FocalLoss() + else: + loss_fct = FocalLoss(weight=torch.tensor([self.weights[task_id]]).cuda()) + loss = loss_fct(logits, labels.view(-1)) + elif self.one_hot_labels[task_id]: + loss_fct = SoftCrossEntropyLoss + loss = loss_fct(logits, labels) + else: + if self.weights[task_id] is None: + loss_fct = CrossEntropyLoss() + else: + loss_fct = CrossEntropyLoss(weight=torch.Tensor([self.weights[task_id]]).cuda()) + loss = loss_fct(logits, labels.view(-1)) + return loss, logits + elif name == "regression": + loss_fct = MSELoss() + loss = loss_fct(logits, labels.unsqueeze(1)) + return loss, logits + else: + return logits + elif name == 'binary_head': + last_hidden_state = self.dropout(last_hidden_state) + pooled_output = self.bert.pooler(last_hidden_state) + pooled_output = self.activation(pooled_output) + pooled_output = self.dropout(pooled_output) + logits = self.bert.final_classifier[task_id](pooled_output) + if labels is not None: + if self.focal: + if self.weights[task_id] is None: + loss_fct = FocalLoss() + else: + loss_fct = FocalLoss(weight=torch.tensor([self.weights[task_id]]).cuda()) + else: + if self.weights[task_id] is None: + loss_fct = BCEWithLogitsLoss() + else: + loss_fct = BCEWithLogitsLoss(weight=torch.Tensor([self.weights[task_id]]).cuda()) + if len(labels.shape) == 1 and len(logits.shape) == 2: + labels = labels.unsqueeze(1) + loss = loss_fct(logits, labels) + return loss, logits + else: + return logits + else: + raise Exception(f'Unsupported name {name}') + + def forward(self, task_id, input_ids, attention_mask, token_type_ids, labels=None): + last_hidden_state = self.get_logits(task_id, input_ids, attention_mask, token_type_ids) + return self.predict_on_top(task_id, last_hidden_state, labels) + + +@register('multitask_transformer') +class MultiTaskTransformer(TorchModel): + """ + Multi-Task transformer-agnostic model + Args: + tasks: Dict of task names along with the labels for each task, + max_seq_len(int): maximum length of the input token sequence. + optimizer(str): optimizer name defaults to AdamW, + optimizer_parameters(dict): optimizer parameters, + lr_scheduler(str): name of the lr scheduler,if it is used + lr_scheduler_parameters(dict): lr scheduler parameters for the scheduler, if the scheduler is used + gradient_accumulation_steps(default:1): number of gradient accumulation steps, + steps_per_epoch(int): number of steps taken per epoch. Specify if gradient_accumulation_steps > 1 + backbone_model(str): name of HuggingFace.Transformers backbone model. Default: 'bert-base-cased' + clip_norm(float): normalization: value for gradient clipping. Specify only if gradient clipping is used + one_hot_labels(default: False): set to true if using one hot labels, + multilabel(default: False): set to true for multilabel classification, + return_probas(default: False): set true to return prediction probabilities, + freeze_embeddings(default: False): set true to freeze BERT embeddings + dropout(default: None): dropout for the final model layer. + If not set, defaults to the parameter hidden_dropout_prob of original model + cuda_cache_size(default:3): predicts cache size. Recommended if we need classify one samples for many tasks. 0 if we don't use cache + cuda_cache(default:True): if True, store cache on GPU + seed(default:42): Torch manual_random_seed + """ + + def __init__( + self, + tasks: Dict[str, Dict], + max_seq_len: str = 320, + optimizer: str = "AdamW", + optimizer_parameters: dict = {"lr": 2e-5}, + lr_scheduler: Optional[str] = None, + lr_scheduler_parameters: dict = {}, + gradient_accumulation_steps: Optional[int] = 1, + steps_per_epoch: Optional[int] = None, + backbone_model: str = "bert-base-cased", + clip_norm: Optional[float] = None, + focal: bool = False, + return_probas: bool = False, + freeze_embeddings: bool = False, + new_model=False, + dropout: Optional[float] = None, + binary_threshold: float = 0.5, + seed: int = 42, + *args, + **kwargs, + ) -> None: + self.return_probas = return_probas + self.one_hot_labels = [] + self.clip_norm = clip_norm + self.task_names = list(tasks.keys()) + self.task_types = [] + self.backbone_model = backbone_model + self.max_seq_len = max_seq_len + self.tasks_num_classes = [] + self.task_names = [] + self.multilabel = [] + self.weights = [] + self.types_to_cache = [] + for task in tasks: + self.task_names.append(task) + self.tasks_num_classes.append(tasks[task].get('options', 1)) + self.weights.append(tasks[task].get('weight', None)) + self.task_types.append(tasks[task]['type']) + self.multilabel.append(tasks[task].get('multilabel', False)) + self.one_hot_labels.append(tasks[task].get('one_hot_labels', False)) + self.types_to_cache.append(tasks[task].get('type_to_cache', -1)) + if self.return_probas and 'sequence_labeling' in self.task_types: + log.warning( + f'Return_probas for sequence_labeling not supported yet. Returning ids for this task') + self.n_tasks = len(tasks) + self.train_losses = [[] for _ in self.task_names] + self.optimizer_name = optimizer + self.optimizer_parameters = optimizer_parameters + self.lr_scheduler_name = lr_scheduler + self.lr_scheduler_parameters = lr_scheduler_parameters + self.gradient_accumulation_steps = gradient_accumulation_steps + self.steps_per_epoch = steps_per_epoch + self.steps_taken = 0 + self.prev_id = None + self.printed = False + self.freeze_embeddings = freeze_embeddings + self.dropout = dropout + self.new_model = new_model + self.binary_threshold = binary_threshold + self.focal = focal + self._reset_cache() + torch.manual_seed(seed) + + super().__init__( + optimizer_parameters=self.optimizer_parameters, + lr_scheduler=self.lr_scheduler_name, + lr_scheduler_parameters=self.lr_scheduler_parameters, + **kwargs, + ) + + def _reset_cache(self): + self.preds_cache = {index_: None for index_ in self.types_to_cache if index_ != -1} + + @overrides + def init_from_opt(self) -> None: + """ + Initialize from scratch `self.model` with the architecture built + in `model_func (MultitaskBert)` method of this class along with + `self.optimizer` as `self.optimizer_name` from `torch.optim` and + parameters `self.optimizer_parameters`, optionally initialize + `self.lr_scheduler` as `self.lr_scheduler_name` from + `torch.optim.lr_scheduler` and parameters `self.lr_scheduler_parameters` + """ + + self.model = BertForMultiTask( + backbone_model=self.backbone_model, + tasks_num_classes=self.tasks_num_classes, + weights=self.weights, + multilabel=self.multilabel, + one_hot_labels=self.one_hot_labels, + task_types=self.task_types, + new_model=self.new_model, + focal=self.focal, + dropout=self.dropout) + self.model = self.model.to(self.device) + no_decay = ["bias", "gamma", "beta"] + base = ["attn"] + + def get_non_decay_params(model): return [ + p + for n, p in model.named_parameters() + if not any(nd in n for nd in no_decay) + and not any(nd in n for nd in base) + ] + + def get_decay_params(model): return [ + p + for n, p in model.named_parameters() + if any(nd in n for nd in no_decay) + and not any(nd in n for nd in base) + ] + + model_parameters = [ + { + "params": get_non_decay_params(self.model), + "weight_decay": 0.01, + }, + { + "params": get_decay_params(self.model), + "weight_decay": 0.0, + }, + ] + + self.optimizer = getattr(torch.optim, self.optimizer_name)( + model_parameters, **self.optimizer_parameters + ) + + if self.lr_scheduler_name: + self.lr_scheduler = getattr( + torch.optim.lr_scheduler, self.lr_scheduler_name + )(self.optimizer, **self.lr_scheduler_parameters) + + @overrides + def load(self, fname: Optional[str] = None) -> None: + """ + Loads weights. + """ + if fname is not None: + self.load_path = fname + + if self.load_path: + log.info(f"Load path {self.load_path} is given.") + if isinstance( + self.load_path, + Path) and not self.load_path.parent.is_dir(): + raise ConfigError("Provided load path is incorrect!") + + weights_path = Path(self.load_path.resolve()) + weights_path = weights_path.with_suffix(f".pth.tar") + if weights_path.exists(): + log.info(f"Load path {weights_path} exists.") + log.info( + f"Initializing `{self.__class__.__name__}` from saved.") + + # firstly, initialize with random weights and previously saved + # parameters + self.init_from_opt() + + # now load the weights, optimizer from saved + log.info(f"Loading weights from {weights_path}.") + checkpoint = torch.load(weights_path, map_location=self.device) + self.model.load_state_dict(checkpoint["model_state_dict"]) + self.optimizer.load_state_dict( + checkpoint["optimizer_state_dict"]) + self.epochs_done = checkpoint.get("epochs_done", 0) + else: + log.info( + f"Init from scratch. Load path {weights_path} does not exist.") + self.init_from_opt() + else: + log.info( + f"Init from scratch. Load path {self.load_path} is not provided.") + self.init_from_opt() + if self.freeze_embeddings: + for n, p in self.model.bert.named_parameters(): + if not ('final_classifier' in n or 'pool' in n): + p.requires_grad = False + log.info("Bert Embeddings Freezed") + + if self.device.type == "cuda" and torch.cuda.device_count() > 1: + self.model = torch.nn.DataParallel(self.model) + + def _make_input(self, task_features, task_id, labels=None): + batch_input_size = None + if len(task_features) == 1 and isinstance(task_features, list): + task_features = task_features[0] + + if isinstance(labels, Iterable) and all([k is None for k in labels]): + labels = None + _input = {} + element_list = ["input_ids", "attention_mask", "token_type_ids"] + for elem in element_list: + if elem in task_features: + _input[elem] = task_features[elem] + batch_input_size = _input[elem].shape[0] + elif hasattr(task_features, elem): + _input[elem] = getattr(task_features, elem) + batch_input_size = _input[elem].shape[0] + if elem in _input: + if we_transform_input(self.task_types[task_id]): + _input[elem] = _input[elem].view( + (-1, _input[elem].size(-1))) + + if labels is not None: + if self.one_hot_labels[task_id]: + _input['labels'] = torch.from_numpy(np.array(list(labels))) + elif self.task_types[task_id] in ["regression", "binary_head"]: + _input["labels"] = torch.tensor( + np.array(labels, dtype=float), dtype=torch.float32 + ) + elif self.task_types[task_id] == 'multiple_choice': + labels = torch.Tensor(labels).long() + _input['labels'] = labels + elif self.task_types[task_id] == 'sequence_labeling': + subtoken_labels = [token_labels_to_subtoken_labels(y_el, y_mask, input_mask) + for y_el, y_mask, input_mask in zip(labels, _input['token_type_ids'].numpy(), + _input['attention_mask'].numpy())] + _input['labels'] = torch.from_numpy( + np.array(subtoken_labels)).to(torch.int64) + else: + if not self.multilabel[task_id]: + _input["labels"] = torch.from_numpy(np.array(labels)) + elif self.multilabel[task_id]: + # We assume that labels already are one hot encoded + num_classes = self.tasks_num_classes[task_id] + _input['labels'] = torch.zeros((len(labels), num_classes)) + for i in range(len(labels)): + for label_ind in labels[i]: + _input['labels'][i][label_ind] = 1 + element_list = element_list + ['labels'] + for elem in element_list: + if elem not in _input: + _input[elem] = None + else: + _input[elem] = _input[elem].to(self.device) + if 'labels' in _input and self.task_types[task_id] != 'multiple_choice': + error_msg = f'Len of labels {len(_input["labels"])} does not match len of ids {len(_input["input_ids"])}' + if len(_input['labels']) != len(_input['input_ids']): + raise Exception(error_msg) + return _input, batch_input_size + + def __call__(self, *args): + """Make prediction for given features (texts). + Args: + features: batch of InputFeatures for all tasks + Returns: + predicted classes or probabilities of each class + """ + # IMPROVE ARGS CHECKING AFTER DEBUG + log.debug(f'Calling {args}') + self.validation_predictions = [None for _ in range(len(args))] + for task_id in range(len(self.task_names)): + if len(args[task_id]): + _input, batch_input_size = self._make_input(task_features=args[task_id], task_id=task_id) + + if 'input_ids' not in _input: + raise Exception(f'No input_ids in _input {_input}') + cache_key = self.types_to_cache[task_id] + if cache_key != -1 and self.preds_cache[cache_key] is not None: + last_hidden_state = self.preds_cache[cache_key] + else: + with torch.no_grad(): + if self.is_data_parallel: + last_hidden_state = self.model.module.get_logits(task_id, **_input) + else: + last_hidden_state = self.model.get_logits(task_id, **_input) + if cache_key != -1: + self.preds_cache[cache_key] = last_hidden_state + with torch.no_grad(): + if self.is_data_parallel: + logits = self.model.module.predict_on_top(task_id, last_hidden_state) + else: + logits = self.model.predict_on_top(task_id, last_hidden_state) + if self.task_types[task_id] == 'sequence_labeling': + y_mask = _input['token_type_ids'].cpu() + logits = token_from_subtoken(logits.cpu(), y_mask) + predicted_ids = torch.argmax(logits, dim=-1).int().tolist() + seq_lengths = torch.sum(y_mask, dim=1).int().tolist() + pred = [prediction[:max_seq_len] for max_seq_len, prediction in zip(seq_lengths, predicted_ids)] + elif self.task_types[task_id] in ['regression', 'binary_head']: + pred = logits[:, 0] + if self.task_types[task_id] == 'binary_head': + pred = torch.sigmoid(logits).squeeze(1) + if not self.return_probas: + pred = (pred > self.binary_threshold).int() + pred = pred.cpu().numpy() + else: + if self.multilabel[task_id]: + probs = torch.sigmoid(logits) + if self.return_probas: + pred = probs + pred = pred.cpu().numpy() + else: + numbers_of_sample, numbers_of_class = (probs > self.binary_threshold).nonzero(as_tuple=True) + numbers_of_sample, numbers_of_class = numbers_of_sample.cpu().numpy(), numbers_of_class.cpu().numpy() + pred = [[] for _ in range(len(logits))] + for sample_num, class_num in zip(numbers_of_sample, numbers_of_class): + pred[sample_num].append(int(class_num)) + else: + if self.multilabel[task_id]: + probs = torch.sigmoid(logits) + if self.return_probas: + pred = probs + pred = pred.cpu().numpy() + else: + numbers_of_sample, numbers_of_class = (probs > self.binary_threshold).nonzero(as_tuple=True) + numbers_of_sample, numbers_of_class = numbers_of_sample.cpu().numpy(), numbers_of_class.cpu().numpy() + pred = [[] for _ in range(len(logits))] + for sample_num, class_num in zip(numbers_of_sample, numbers_of_class): + pred[sample_num].append(int(class_num)) + else: + if self.return_probas: + pred = torch.softmax(logits, dim=-1) + else: + pred = torch.argmax(logits, dim=1) + pred = pred.cpu().numpy() + self.validation_predictions[task_id] = pred + if len(args) == 1: + return self.validation_predictions[0] + for i in range(len(self.validation_predictions)): + if self.validation_predictions[i] is None: + self.validation_predictions[i] = [] + self._reset_cache() + log.debug(self.validation_predictions) + return self.validation_predictions + + def train_on_batch(self, *args): + """Train model on given batch. + This method calls train_op using features and y (labels). + Args: + features: batch of InputFeatures + y: batch of labels (class id) + Returns: + dict with loss for each task + """ + log.debug(f'Training for {args}') + error_msg = f'Len of arguments {len(args)} is WRONG. ' \ + f'Correct is {2 * self.n_tasks} as n_tasks is {self.n_tasks}' + if len(args) != 2 * self.n_tasks: + raise Exception(error_msg) + ids_to_iterate = [k for k in range(self.n_tasks) if len(args[k]) > 0] + if len(ids_to_iterate) == 0: + raise Exception(f'No examples given! Given args {args}') + elif len(ids_to_iterate) > 1: + raise Exception('Samples from more than 1 task in train_on_batch') + task_id = ids_to_iterate[0] + _input, batch_size = self._make_input(task_features=args[task_id], task_id=task_id, + labels=args[task_id + self.n_tasks]) + if _input == {}: + raise Exception('Empty input!') + + if self.prev_id is None: + self.prev_id = task_id + elif self.prev_id != task_id and not self.printed: + log.info('Seen samples from different tasks') + self.printed = True + if 'token_type_ids' not in _input: + _input['token_type_ids'] = None + loss, logits = self.model(task_id=task_id, **_input) + if self.is_data_parallel: + loss = loss.mean() + loss = loss / self.gradient_accumulation_steps + loss.backward() + + # Clip the norm of the gradients to 1.0. + # This is to help prevent the "exploding gradients" problem. + if self.clip_norm: + torch.nn.utils.clip_grad_norm_( + self.model.parameters(), self.clip_norm) + + if (self.steps_taken + 1) % self.gradient_accumulation_steps == 0 or ( + self.steps_per_epoch is not None and (self.steps_taken + 1) % self.steps_per_epoch == 0): + self.optimizer.step() + if self.lr_scheduler: + self.lr_scheduler.step() # Update learning rate schedule + self.optimizer.zero_grad() + self.train_losses[task_id] = loss.item() + self.steps_taken += 1 + log.debug(f'train {task_id} {logits}') + return {"losses": self.train_losses} diff --git a/docs/apiref/dataset_iterators.rst b/docs/apiref/dataset_iterators.rst index e2b47ee805..a93d79053c 100644 --- a/docs/apiref/dataset_iterators.rst +++ b/docs/apiref/dataset_iterators.rst @@ -13,3 +13,6 @@ Concrete DatasetIterator classes. .. automodule:: deeppavlov.dataset_iterators.typos_iterator :members: + +.. automodule:: deeppavlov.dataset_iterators.multitask_iterator + :members: diff --git a/docs/apiref/dataset_readers.rst b/docs/apiref/dataset_readers.rst index a7ad0f6abc..570659957e 100644 --- a/docs/apiref/dataset_readers.rst +++ b/docs/apiref/dataset_readers.rst @@ -23,3 +23,6 @@ Concrete DatasetReader classes. .. automodule:: deeppavlov.dataset_readers.ubuntu_v2_reader :members: + +.. automodule:: deeppavlov.dataset_readers.multitask_reader + :members: diff --git a/docs/features/models/multitask_bert.rst b/docs/features/models/multitask_bert.rst new file mode 100644 index 0000000000..7344ecbd96 --- /dev/null +++ b/docs/features/models/multitask_bert.rst @@ -0,0 +1,231 @@ +Multi-task BERT in DeepPavlov +============================= + +Multi-task BERT in DeepPavlov is an implementation of BERT training algorithm published in the paper +`Multi-Task Deep Neural Networks for Natural Language Understanding `_. + +The idea is to share BERT body between several tasks. This is necessary if a model pipe has several +components using BERT and the amount of GPU memory is limited. Each task has its own 'head' part attached to the +output of the BERT encoder. If multi-task BERT has :math:`T` heads, one training iteration consists of + +- composing :math:`T` lists of examples, one for each task, + +- :math:`T` gradient steps, one gradient step for each task. + +By default, on every training steps lists of examples for all but one tasks are empty, as if in the original MT-DNN repository. + +When one of BERT heads is being trained, other heads' parameters do not change. On each training step both BERT head +and body parameters are modified. + +Currently multitask bert heads support classification, regression, NER and multiple choice tasks. + +At this page, multi-task BERT usage is explained on a toy configuration file of a model that is trained for the +single-sentence classification, sentence pair classification, regression, multiple choice and NER. +The config for this model is :config:`multitask_example `. + +Other examples of using multitask models can be found in :config:`mt_glue `. + +Train config +------------ + +When using ``multitask_transformer`` component, you can use the same inference file as the train file. + +Data reading and iteration is performed by :class:`~deeppavlov.dataset_readers.multitask_reader.MultiTaskReader` +and :class:`~deeppavlov.dataset_iterators.multitask_iterator.MultiTaskIterator`. These classes are composed +of task readers and iterators and generate batches that contain data from heterogeneous datasets. Example below +demonstrates the usage of multitask dataset reader: + +.. code:: json + + "dataset_reader": { + "class_name": "multitask_reader", + "task_defaults": { + "class_name": "huggingface_dataset_reader", + "path": "glue", + "train": "train", + "valid": "validation", + "test": "test" + }, + "tasks": { + "cola": {"name": "cola"}, + "copa": { + "path": "super_glue", + "name": "copa" + }, + "conll": { + "class_name": "conll2003_reader", + "use_task_defaults": false, + "data_path": "{DOWNLOADS_PATH}/conll2003/", + "dataset_name": "conll2003", + "provide_pos": false + } + } + } + +Nested dataset readers are listed in the ``tasks`` section. By default, default nested readers parameters are taken from +``task_defaults`` section. Values from the ``tasks`` could complement parameters, like ``name`` parameter in the +``dataset_reader.tasks.cola``, and could overwrite default parameter values, like ``path`` parameter from +``dataset_reader.tasks.copa``. In the ``dataset_reader.tasks.conll`` ``use_task_defaults`` is ``False``. This is special +parameter, that forces ``multitask_reader`` to ignore ``task_defaults`` while creating nested reader, which means that +dataset reader for ``conll`` task will use only parameters from ``dataset_reader.tasks.conll``. + +The same principle with default values applies to ``multitask_iterator``. + +Batches generated by ``multitask_iterator`` are tuples of two elements: inputs of the model and labels. +Both inputsand labels are lists of tuples. The inputs have following format: +``[(first_task_inputs[0], second_task_inputs[0],...), (first_task_inputs[1], second_task_inputs[1], ...), ...]`` +where ``first_task_inputs``, ``second_task_inputs``, and so on are x values of batches from task dataset iterators. +The labels in the second element have the similar format. + +If task datasets have different sizes, then for smaller datasets the lists are padded with ``None`` values. For example, +if the first task dataset inputs are ``[0, 1, 2, 3, 4, 5, 6]``, the second task dataset inputs are ``[7, 8, 9]``, +and the batch size is ``2``, then multi-task input mini-batches will be ``[(0, 7), (1, 8)]``, ``[(2, 9), (3, None)]``, +``[(4, None), (5, None)]``, ``[(6, None)]``. + +In this tutorial, there are 5 datasets. Considering the batch structure, ``chainer`` inputs in +:config:`multitask_example ` are: + +.. code:: json + + "in": ["x_cola", "x_rte", "x_stsb", "x_copa", "x_conll"], + "in_y": ["y_cola", "y_rte", "y_stsb", "y_copa", "y_conll"] + +Sometimes a task dataset iterator returns inputs or labels consisting of more than one element. For example, in the +model input element could consist of two strings. If there is a necessity to split such a variable, ``InputSplitter`` +component can be used. Data preparation in the multitask setting can be similar to the preparation in singletask setting +except for the names of the variables. + +For streamlining the code, however, ``input_splitter`` and ``tokenizer`` can be unified into the +``multitask_pipeline_preprocessor``. This preprocessor gets as a parameter ``preprocessor`` the one preprocessor class +name for all tasks, or gets the preprocessor name list as a parameter ``preprocessors``. After splitting input by +``possible_keys_to_extract``, every preprocessor (being initialized by the input beforehand) processes the input. +Note, that if ``strict`` parameter(default:False) is set to True, we always try to split data. Here is the definition of +``multitask_pipeline_preprocessor`` from the :config:`multitask_example `: + +.. code:: json + + "class_name": "multitask_pipeline_preprocessor", + "possible_keys_to_extract": [0, 1], + "preprocessors": [ + "TorchTransformersPreprocessor", + "TorchTransformersPreprocessor", + "TorchTransformersPreprocessor", + "TorchTransformersMultiplechoicePreprocessor", + "TorchTransformersNerPreprocessor" + ], + "do_lower_case": true, + "n_task": 5, + "vocab_file": "{BACKBONE}", + "max_seq_length": 200, + "max_subword_length": 15, + "token_masking_prob": 0.0, + "return_features": true, + "in": ["x_cola", "x_rte", "x_stsb", "x_copa", "x_conll"], + "out": [ + "bert_features_cola", + "bert_features_rte", + "bert_features_stsb", + "bert_features_copa", + "bert_features_conll" + ] + +The ``multitask_transformer`` component has common and task-specific parameters. Shared parameters are provided inside +the tasks parameter. The tasks is a dictionary that keys are task names and values are task-specific parameters (type, +options). Common parameters, are backbone_model(same parameter as in the tokenizer) and all parameters from torch_bert. +**The order of tasks MATTERS.** + +Here is the definition of ``multitask_transformer`` from the :config:`multitask_example `: + +.. code:: json + + "id": "multitask_transformer", + "class_name": "multitask_transformer", + "optimizer_parameters": {"lr": 2e-5}, + "gradient_accumulation_steps": "{GRADIENT_ACC_STEPS}", + "learning_rate_drop_patience": 2, + "learning_rate_drop_div": 2.0, + "return_probas": true, + "backbone_model": "{BACKBONE}", + "save_path": "{MODEL_PATH}", + "load_path": "{MODEL_PATH}", + "tasks": { + "cola": { + "type": "classification", + "options": 2 + }, + "rte": { + "type": "classification", + "options": 2 + }, + "stsb": { + "type": "regression", + "options": 1 + }, + "copa": { + "type": "multiple_choice", + "options": 2 + }, + "conll": { + "type": "sequence_labeling", + "options": "#vocab_conll.len" + } + }, + "in": [ + "bert_features_cola", + "bert_features_rte", + "bert_features_stsb", + "bert_features_copa", + "bert_features_conll" + ], + "in_y": ["y_cola", "y_rte", "y_stsb", "y_copa", "y_ids_conll"], + "out": [ + "y_cola_pred_probas", + "y_rte_pred_probas", + "y_stsb_pred", + "y_copa_pred_probas", + "y_conll_pred_ids" + ] + +Note that ``proba2labels`` can now take several arguments. + +.. code:: json + + { + "in":["y_cola_pred_probas", "y_rte_pred_probas", "y_copa_pred_probas"], + "out":["y_cola_pred_ids", "y_rte_pred_ids", "y_copa_pred_ids"], + "class_name":"proba2labels", + "max_proba":true + } + +You may need to create your own metric for early stopping. In this example, the target metric is an average of AUC ROC +for insults and sentiment tasks and F1 for NER task: + +.. code:: python + + from deeppavlov.metrics.roc_auc_score import roc_auc_score + + def roc_auc__roc_auc__ner_f1(true_onehot1, pred_probas1, true_onehot2, pred_probas2, ner_true3, ner_pred3): + roc_auc1 = roc_auc_score(true_onehot1, pred_probas1) + roc_auc2 = roc_auc_score(true_onehot2, pred_probas2) + ner_f1_3 = ner_f1(ner_true3, ner_pred3) / 100 + return (roc_auc1 + roc_auc2 + ner_f1_3) / 3 + +It he code above will be saved at ``custom_metric.py``, metric could be used in the config as +``custom_metric:roc_auc__roc_auc__ner_f1`` (``module.submodules:function_name`` reference format). + +You can make an inference-only config. In this config, there is no need in dataset reader and dataset iterator. +A ``train`` field and components preparing ``in_y`` are removed. In ``multitask_transformer`` component configuration +all training parameters (learning rate, optimizer, etc.) are omitted. + +Here are the results of ``deeppavlov/configs/multitask/mt_glue.json`` compared to the analogous single-task configs, +according to the test server. + ++-------------------+-------------+----------------+----------+---------------+-----------------------+---------------+------------+----------+----------+----------------+ +| Task | Score | CoLA | SST-2 | MRPC | STS-B | QQP | MNLI(m/mm) | QNLI | RTE | AX | ++-------------------+-------------+----------------+----------+---------------+-----------------------+---------------+------------+----------+----------+----------------+ +| Metric | from server | Matthew's Corr | Accuracy | F1 / Accuracy | Pearson/Spearman Corr | F1 / Accuracy | Accuracy | Accuracy | Accuracy | Matthew's Corr | ++===================+=============+================+==========+===============+=======================+===============+============+==========+==========+================+ +| Multitask config | 77.8 | 43.6 | 93.2 | 88.6/84.2 | 84.3/84.0 | 70.1/87.9 | 83.0/82.6 | 90.6 | 75.4 | 35.4 | ++-------------------+-------------+----------------+----------+---------------+-----------------------+---------------+------------+----------+----------+----------------+ +| Singletask config | 77.6 | 53.6 | 92.7 | 87.7/83.6 | 84.4/83.1 | 70.5/88.9 | 84.4/83.2 | 90.3 | 63.4 | 36.3 | ++-------------------+-------------+----------------+----------+---------------+-----------------------+---------------+------------+----------+----------+----------------+ diff --git a/docs/index.rst b/docs/index.rst index 391bbc58ab..dffd62807b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -26,8 +26,8 @@ Welcome to DeepPavlov's documentation! :glob: :maxdepth: 1 :caption: Models - - + + Multitask BERT Context Question Answering Classification Named Entity Recognition