-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathllama.py
145 lines (120 loc) · 4.78 KB
/
llama.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation.utils import GenerationConfig
from transformers import LlamaForCausalLM
import torch
from GPU_get import *
dividing_line_len = 50
model_dir, device, tokenizer, model = None, None, None, None
history = []
init_state = False
def stdout_message(message):
dividing_line = "=" * dividing_line_len
print(dividing_line + "\n" + message + "\n" + dividing_line)
def errout_message(message):
dividing_line = "!" * dividing_line_len
print(dividing_line + "\n" + message + "\n" + dividing_line)
exit()
def clear_history():
global history
history = []
def init(dir_input="", device_input=None):
global init_state
if init_state:
return
init_state = True
stdout_message("Setting up directory and device")
global device, model_dir, tokenizer, model
if dir_input == "":
errout_message("Please provide model directory")
model_dir = dir_input
if device_input is None:
device = get_gpu(0.32)
else:
device = device_input
if device is None:
errout_message("No GPU available")
else:
stdout_message(f"Device: {device}")
tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = LlamaForCausalLM.from_pretrained(model_dir, torch_dtype='auto', device_map=device)
stdout_message(f"finished loading model in device: {device}")
def dialogue(input_text):
prompt = input_text
messages = [
{'role': 'system', 'content': ''},
{'role': 'user', 'content': prompt}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_input = tokenizer([text], return_tensors='pt').to(device)
attention_mask = torch.ones(model_input.input_ids.shape, dtype=torch.long, device=device)
generated_ids = model.generate(
model_input.input_ids,
max_new_tokens=512,
attention_mask=attention_mask,
pad_token_id=tokenizer.eos_token_id,
)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_input.input_ids, generated_ids)]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
history.append([input_text, response])
return response
def dialogue_with_history(input_text):
prompt = input_text
messages = [
{'role': 'system', 'content': ''}
]
for i in range (len(history)):
messages.append({'role': 'user', 'content': history[i][0]})
messages.append({'role': 'assistant', 'content': history[i][1]})
messages.append({'role': 'user', 'content': prompt})
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_input = tokenizer([text], return_tensors='pt').to(device)
attention_mask = torch.ones(model_input.input_ids.shape, dtype=torch.long, device=device)
generated_ids = model.generate(
model_input.input_ids,
max_new_tokens=512,
attention_mask=attention_mask,
pad_token_id=tokenizer.eos_token_id,
)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_input.input_ids, generated_ids)]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
history.append([input_text, response])
return response
def forward(input_text):
prompt = input_text
messages = [
{'role': 'system', 'content': ''}
]
for i in range (len(history)):
messages.append({'role': 'user', 'content': history[i][0]})
messages.append({'role': 'assistant', 'content': history[i][1]})
messages.append({'role': 'user', 'content': prompt})
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_input = tokenizer([text], return_tensors='pt').to(device)
attention_mask = torch.ones(model_input.input_ids.shape, dtype=torch.long, device=device)
output = model.forward(
model_input.input_ids,
attention_mask=attention_mask,
)
return output
def text2hidden_state(text):
model_input = tokenizer([text], return_tensors='pt', add_special_tokens=True).to(device)
# add eos
model_input = torch.cat((model_input.input_ids, torch.tensor([tokenizer.eos_token_id]).to(device).unsqueeze(0)), dim=1)
attention_mask = torch.ones(model_input.shape, dtype=torch.long, device=device)
output = model(model_input, attention_mask=attention_mask, output_hidden_states=True)
eos_token_id = tokenizer.eos_token_id
eos_position = (model_input == eos_token_id).nonzero(as_tuple=True)[1].item()
torch.cuda.empty_cache()
return output.hidden_states[-1][:, eos_position, :].detach().float().cpu().numpy()