Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Update D2V, AutoTokenizer, and pretraining scripts #155

Merged
merged 11 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion EduNLP/Formula/Formula.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .ast import str2ast, get_edges, link_variable

CONST_MATHORD = {r"\pi"}
CONST_MATHORD = {"\\pi"}

__all__ = ["Formula", "FormulaGroup", "CONST_MATHORD", "link_formulas"]

Expand Down
2 changes: 1 addition & 1 deletion EduNLP/I2V/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# 2021/8/1 @ tongshiwei

from .i2v import I2V, get_pretrained_i2v
from .i2v import D2V, W2V, Elmo, Bert, DisenQ, QuesNet
from .i2v import D2V, W2V, Elmo, Bert, HfAuto, DisenQ, QuesNet
89 changes: 79 additions & 10 deletions EduNLP/I2V/i2v.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from longling import path_append
from EduData import get_data
from ..Tokenizer import Tokenizer, get_tokenizer
from EduNLP.Pretrain import ElmoTokenizer, BertTokenizer, DisenQTokenizer, QuesNetTokenizer, Question
from EduNLP.Pretrain import ElmoTokenizer, BertTokenizer, HfAutoTokenizer, DisenQTokenizer, QuesNetTokenizer, Question
from EduNLP import logger

__all__ = ["I2V", "D2V", "W2V", "Elmo", "Bert", "DisenQ", "QuesNet", "get_pretrained_i2v"]
__all__ = ["I2V", "D2V", "W2V", "Elmo", "Bert", "HfAuto", "DisenQ", "QuesNet", "get_pretrained_i2v"]


class I2V(object):
Expand Down Expand Up @@ -51,8 +51,8 @@ class I2V(object):
(...)
>>> path = path_append(path, os.path.basename(path) + '.bin', to_str=True)
>>> i2v = D2V("pure_text", "d2v", filepath=path, pretrained_t2v=False)
>>> i2v(item)
([array([ ...dtype=float32)], None)
>>> i2v(item) # doctest: +SKIP
([array([ ...dtype=float32)], [[array([ ...dtype=float32)]])

Returns
-------
Expand All @@ -69,6 +69,9 @@ def __init__(self, tokenizer, t2v, *args, tokenizer_kwargs: dict = None,
if tokenizer == 'bert':
self.tokenizer = BertTokenizer.from_pretrained(
**tokenizer_kwargs if tokenizer_kwargs is not None else {})
elif tokenizer == 'hf_auto':
self.tokenizer = HfAutoTokenizer.from_pretrained(
**tokenizer_kwargs if tokenizer_kwargs is not None else {})
elif tokenizer == 'quesnet':
self.tokenizer = QuesNetTokenizer.from_pretrained(
**tokenizer_kwargs if tokenizer_kwargs is not None else {})
Expand Down Expand Up @@ -189,8 +192,8 @@ class D2V(I2V):
(...)
>>> path = path_append(path, os.path.basename(path) + '.bin', to_str=True)
>>> i2v = D2V("pure_text","d2v",filepath=path, pretrained_t2v = False)
>>> i2v(item)
([array([ ...dtype=float32)], None)
>>> i2v(item) # doctest: +SKIP
# ([array([ ...dtype=float32)], [[array([ ...dtype=float32)]])

Returns
-------
Expand Down Expand Up @@ -221,7 +224,7 @@ def infer_vector(self, items, tokenize=True, key=lambda x: x, *args,
"""
tokens = self.tokenize(items, key=key) if tokenize is True else items
tokens = [token for token in tokens]
return self.t2v(tokens, *args, **kwargs), None
return self.t2v(tokens, *args, **kwargs), self.t2v.infer_tokens(tokens, *args, **kwargs)

@classmethod
def from_pretrained(cls, name, model_dir=MODEL_DIR, *args, **kwargs):
Expand Down Expand Up @@ -426,6 +429,71 @@ def from_pretrained(cls, name, model_dir=MODEL_DIR, device='cpu', *args, **kwarg
tokenizer_kwargs=tokenizer_kwargs)


class HfAuto(I2V):
"""
The model aims to transfer item and tokens to vector with Bert.

Bases
-------
I2V

Parameters
-----------
tokenizer: str
the tokenizer name
t2v: str
the name of token2vector model
args:
the parameters passed to t2v
tokenizer_kwargs: dict
the parameters passed to tokenizer
pretrained_t2v: bool
True: use pretrained t2v model
False: use your own t2v model
kwargs:
the parameters passed to t2v

Returns
-------
i2v model: Bert
"""

def infer_vector(self, items: Tuple[List[str], List[dict], str, dict],
*args, key=lambda x: x, return_tensors='pt', **kwargs) -> tuple:
"""
It is a function to switch item to vector. And before using the function, it is nesseary to load model.

Parameters
-----------
items : str or dict or list
the item of question, or question list
return_tensors: str
tensor type used in tokenizer
args:
the parameters passed to t2v
kwargs:
the parameters passed to t2v

Returns
--------
vector:list
"""
is_batch = isinstance(items, list)
items = items if is_batch else [items]
inputs = self.tokenize(items, key=key, return_tensors=return_tensors)
return self.t2v.infer_vector(inputs, *args, **kwargs), self.t2v.infer_tokens(inputs, *args, **kwargs)

@classmethod
def from_pretrained(cls, name, model_dir=MODEL_DIR, device='cpu', *args, **kwargs):
model_path = path_append(model_dir, get_pretrained_model_info(name)[0].split('/')[-1], to_str=True)
for i in [".tar.gz", ".tar.bz2", ".tar.bz", ".tar.tgz", ".tar", ".tgz", ".zip", ".rar"]:
model_path = model_path.replace(i, "")
logger.info("model_path: %s" % model_path)
tokenizer_kwargs = {"tokenizer_config_dir": model_path}
return cls("bert", name, pretrained_t2v=True, model_dir=model_dir, device=device,
tokenizer_kwargs=tokenizer_kwargs)


class DisenQ(I2V):
"""
The model aims to transfer item and tokens to vector with DisenQ.
Expand Down Expand Up @@ -542,6 +610,7 @@ def from_pretrained(cls, name, model_dir=MODEL_DIR, device='cpu', *args, **kwarg
"w2v": W2V,
"d2v": D2V,
"bert": Bert,
"hf_auto": HfAuto,
"disenq": DisenQ,
"quesnet": QuesNet,
"elmo": Elmo
Expand Down Expand Up @@ -579,13 +648,13 @@ def get_pretrained_i2v(name, model_dir=MODEL_DIR, device='cpu'):
>>> (); i2v = get_pretrained_i2v("d2v_test_256", "examples/test_model/d2v"); () # doctest: +SKIP
(...)
>>> print(i2v(item)) # doctest: +SKIP
([array([ ...dtype=float32)], None)
([array([ ...dtype=float32)], [[array([ ...dtype=float32)]])
"""
pretrained_models = get_all_pretrained_models()
if name not in pretrained_models:
raise KeyError(
"Unknown model name %s, use one of the provided models: %s" % (name, ", ".join(pretrained_models))
)
_, t2v = get_pretrained_model_info(name)
_class, *params = MODEL_MAP[t2v], name
_, i2v = get_pretrained_model_info(name)
_class, *params = MODEL_MAP[i2v], name
return _class.from_pretrained(*params, model_dir=model_dir, device=device)
1 change: 1 addition & 0 deletions EduNLP/ModelZoo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .utils import *
from .bert import *
from .hf_model import *
from .rnn import *
from .disenqnet import *
from .quesnet import *
1 change: 1 addition & 0 deletions EduNLP/ModelZoo/hf_model/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .hf_model import *
165 changes: 165 additions & 0 deletions EduNLP/ModelZoo/hf_model/hf_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import torch
from torch import nn
import json
import os
from transformers import AutoModel, PretrainedConfig, AutoConfig
from typing import List
from EduNLP.utils.log import logger
from ..base_model import BaseModel
from ..utils import PropertyPredictionOutput, KnowledgePredictionOutput
from ..rnn.harnn import HAM


__all__ = ["HfModelForPropertyPrediction", "HfModelForKnowledgePrediction"]


class HfModelForPropertyPrediction(BaseModel):
def __init__(self, pretrained_model_dir=None, head_dropout=0.5, init=True):
super(HfModelForPropertyPrediction, self).__init__()
bert_config = AutoConfig.from_pretrained(pretrained_model_dir)
if init:
logger.info(f'Load AutoModel from checkpoint: {pretrained_model_dir}')
self.model = AutoModel.from_pretrained(pretrained_model_dir)
else:
logger.info(f'Load AutoModel from config: {pretrained_model_dir}')
self.model = AutoModel(bert_config)
self.hidden_size = self.model.config.hidden_size
self.head_dropout = head_dropout
self.dropout = nn.Dropout(head_dropout)
self.classifier = nn.Linear(self.hidden_size, 1)
self.sigmoid = nn.Sigmoid()
self.criterion = nn.MSELoss()

self.config = {k: v for k, v in locals().items() if k not in ["self", "__class__", "bert_config"]}
self.config['architecture'] = 'HfModelForPropertyPrediction'
self.config = PretrainedConfig.from_dict(self.config)

def forward(self,
input_ids=None,
attention_mask=None,
token_type_ids=None,
labels=None):
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
item_embeds = outputs.last_hidden_state[:, 0, :]
item_embeds = self.dropout(item_embeds)

logits = self.sigmoid(self.classifier(item_embeds)).squeeze(1)
loss = None
if labels is not None:
loss = self.criterion(logits, labels) if labels is not None else None
return PropertyPredictionOutput(
loss=loss,
logits=logits,
)

@classmethod
def from_config(cls, config_path, **kwargs):
config_path = os.path.join(os.path.dirname(config_path), 'model_config.json')
with open(config_path, "r", encoding="utf-8") as rf:
model_config = json.load(rf)
model_config['pretrained_model_dir'] = os.path.dirname(config_path)
model_config.update(kwargs)
return cls(
pretrained_model_dir=model_config['pretrained_model_dir'],
head_dropout=model_config.get("head_dropout", 0.5),
init=model_config.get('init', False)
)

def save_config(self, config_dir):
config_path = os.path.join(config_dir, "model_config.json")
with open(config_path, "w", encoding="utf-8") as wf:
json.dump(self.config.to_dict(), wf, ensure_ascii=False, indent=2)
self.model.config.save_pretrained(config_dir)


class HfModelForKnowledgePrediction(BaseModel):
def __init__(self,
pretrained_model_dir=None,
num_classes_list: List[int] = None,
num_total_classes: int = None,
head_dropout=0.5,
flat_cls_weight=0.5,
attention_unit_size=256,
fc_hidden_size=512,
beta=0.5,
init=True
):
super(HfModelForKnowledgePrediction, self).__init__()
bert_config = AutoConfig.from_pretrained(pretrained_model_dir)
if init:
logger.info(f'Load AutoModel from checkpoint: {pretrained_model_dir}')
self.model = AutoModel.from_pretrained(pretrained_model_dir)
else:
logger.info(f'Load AutoModel from config: {pretrained_model_dir}')
self.model = AutoModel(bert_config)
self.hidden_size = self.model.config.hidden_size
self.head_dropout = head_dropout
self.dropout = nn.Dropout(head_dropout)
self.sigmoid = nn.Sigmoid()
self.criterion = nn.MSELoss()
self.flat_classifier = nn.Linear(self.hidden_size, num_total_classes)
self.ham_classifier = HAM(
num_classes_list=num_classes_list,
num_total_classes=num_total_classes,
sequence_model_hidden_size=self.model.config.hidden_size,
attention_unit_size=attention_unit_size,
fc_hidden_size=fc_hidden_size,
beta=beta,
dropout_rate=head_dropout
)
self.flat_cls_weight = flat_cls_weight
self.num_classes_list = num_classes_list
self.num_total_classes = num_total_classes

self.config = {k: v for k, v in locals().items() if k not in ["self", "__class__", "bert_config"]}
self.config['architecture'] = 'HfModelForKnowledgePrediction'
self.config = PretrainedConfig.from_dict(self.config)

def forward(self,
input_ids=None,
attention_mask=None,
token_type_ids=None,
labels=None):
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
item_embeds = outputs.last_hidden_state[:, 0, :]
item_embeds = self.dropout(item_embeds)
tokens_embeds = outputs.last_hidden_state
tokens_embeds = self.dropout(tokens_embeds)
flat_logits = self.sigmoid(self.flat_classifier(item_embeds))
ham_outputs = self.ham_classifier(tokens_embeds)
ham_logits = self.sigmoid(ham_outputs.scores)
logits = self.flat_cls_weight * flat_logits + (1 - self.flat_cls_weight) * ham_logits
loss = None
if labels is not None:
labels = torch.sum(torch.nn.functional.one_hot(labels, num_classes=self.num_total_classes), dim=1)
labels = labels.float()
loss = self.criterion(logits, labels) if labels is not None else None
return KnowledgePredictionOutput(
loss=loss,
logits=logits,
)

@classmethod
def from_config(cls, config_path, **kwargs):
config_path = os.path.join(os.path.dirname(config_path), 'model_config.json')
with open(config_path, "r", encoding="utf-8") as rf:
model_config = json.load(rf)
model_config['pretrained_model_dir'] = os.path.dirname(config_path)
model_config.update(kwargs)
return cls(
pretrained_model_dir=model_config['pretrained_model_dir'],
head_dropout=model_config.get("head_dropout", 0.5),
num_classes_list=model_config.get('num_classes_list'),
num_total_classes=model_config.get('num_total_classes'),
flat_cls_weight=model_config.get('flat_cls_weight', 0.5),
attention_unit_size=model_config.get('attention_unit_size', 256),
fc_hidden_size=model_config.get('fc_hidden_size', 512),
beta=model_config.get('beta', 0.5),
init=model_config.get('init', False)
)

def save_config(self, config_dir):
config_path = os.path.join(config_dir, "model_config.json")
with open(config_path, "w", encoding="utf-8") as wf:
json.dump(self.config.to_dict(), wf, ensure_ascii=False, indent=2)
self.model.config.save_pretrained(config_dir)
Loading
Loading