forked from hankcs/AveragedPerceptronPython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPerceptronTagger.py
213 lines (193 loc) · 7.63 KB
/
PerceptronTagger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# -*- coding:utf-8 -*-
# Filename: PerceptronTagger.py
# Author:hankcs
# Date: 2016-09-03 PM2:22
from __future__ import absolute_import
import os
import random
from collections import defaultdict
import pickle
import logging
from AveragedPerceptron import AveragedPerceptron
PICKLE = "data/trontagger-0.1.0.pickle"
class PerceptronTagger():
'''Greedy Averaged Perceptron tagger, as implemented by Matthew Honnibal.
See more implementation details here:
http://honnibal.wordpress.com/2013/09/11/a-good-part-of-speechpos-tagger-in-about-200-lines-of-python/
:param load: Load the pickled model upon instantiation.
'''
START = ['-START-', '-START2-']
END = ['-END-', '-END2-']
AP_MODEL_LOC = os.path.join(os.path.dirname(__file__), PICKLE)
def __init__(self, load=True):
self.model = AveragedPerceptron()
self.tagdict = {}
self.classes = set()
if load:
self.load(self.AP_MODEL_LOC)
def tag(self, corpus):
'''Tags a string `corpus`.'''
# Assume untokenized corpus has \n between sentences and ' ' between words
s_split = lambda t: t.split('\n')
w_split = lambda s: s.split()
def split_sents(corpus):
for s in s_split(corpus):
yield w_split(s)
prev, prev2 = self.START
tokens = []
for words in split_sents(corpus):
context = self.START + [self._normalize(w) for w in words] + self.END
for i, word in enumerate(words):
tag = self.tagdict.get(word)
if not tag:
features = self._get_features(i, word, context, prev, prev2)
tag = self.model.predict(features)
tokens.append((word, tag))
prev2 = prev
prev = tag
return tokens
def train(self, sentences, save_loc=None, nr_iter=5):
'''Train a model from sentences, and save it at ``save_loc``. ``nr_iter``
controls the number of Perceptron training iterations.
:param sentences: A list of (words, tags) tuples.
:param save_loc: If not ``None``, saves a pickled model in this location.
:param nr_iter: Number of training iterations.
'''
self._make_tagdict(sentences)
self.model.classes = self.classes
for iter_ in range(nr_iter):
c = 0
n = 0
for words, tags in sentences:
prev, prev2 = self.START
context = self.START + [self._normalize(w) for w in words] \
+ self.END
for i, word in enumerate(words):
guess = self.tagdict.get(word)
if not guess:
feats = self._get_features(i, word, context, prev, prev2)
guess = self.model.predict(feats)
self.model.update(tags[i], guess, feats)
prev2 = prev
prev = guess
c += guess == tags[i]
n += 1
random.shuffle(sentences)
logging.info("Iter {0}: {1}/{2}={3}".format(iter_, c, n, _pc(c, n)))
self.model.average_weights()
# Pickle as a binary file
if save_loc is not None:
pickle.dump((self.model.weights, self.tagdict, self.classes),
open(save_loc, 'wb'), -1)
return None
def load(self, loc):
'''Load a pickled model.'''
try:
w_td_c = pickle.load(open(loc, 'rb'))
except IOError:
msg = ("Missing trontagger.pickle file.")
raise IOError(msg)
self.model.weights, self.tagdict, self.classes = w_td_c
self.model.classes = self.classes
return None
def _normalize(self, word):
'''Normalization used in pre-processing.
- All words are lower cased
- Digits in the range 1800-2100 are represented as !YEAR;
- Other digits are represented as !DIGITS
:rtype: str
'''
if '-' in word and word[0] != '-':
return '!HYPHEN'
elif word.isdigit() and len(word) == 4:
return '!YEAR'
elif word[0].isdigit():
return '!DIGITS'
else:
return word.lower()
def _get_features(self, i, word, context, prev, prev2):
'''Map tokens into a feature representation, implemented as a
{hashable: float} dict. If the features change, a new model must be
trained.
'''
def add(name, *args):
features[' '.join((name,) + tuple(args))] += 1
i += len(self.START)
features = defaultdict(int)
# It's useful to have a constant feature, which acts sort of like a prior
add('bias')
add('i suffix', word[-3:])
add('i pref1', word[0])
add('i-1 tag', prev)
add('i-2 tag', prev2)
add('i tag+i-2 tag', prev, prev2)
add('i word', context[i])
add('i-1 tag+i word', prev, context[i])
add('i-1 word', context[i - 1])
add('i-1 suffix', context[i - 1][-3:])
add('i-2 word', context[i - 2])
add('i+1 word', context[i + 1])
add('i+1 suffix', context[i + 1][-3:])
add('i+2 word', context[i + 2])
return features
def _make_tagdict(self, sentences):
'''Make a tag dictionary for single-tag words.'''
counts = defaultdict(lambda: defaultdict(int))
for words, tags in sentences:
for word, tag in zip(words, tags):
counts[word][tag] += 1
self.classes.add(tag)
freq_thresh = 20
ambiguity_thresh = 0.97
for word, tag_freqs in counts.items():
tag, mode = max(tag_freqs.items(), key=lambda item: item[1])
n = sum(tag_freqs.values())
# Don't add rare words to the tag dictionary
# Only add quite unambiguous words
if n >= freq_thresh and (float(mode) / n) >= ambiguity_thresh:
self.tagdict[word] = tag
def _pc(n, d):
return (float(n) / d) * 100
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
tagger = PerceptronTagger(False)
try:
tagger.load(PICKLE)
print(tagger.tag('how are you ?'))
logging.info('Start testing...')
right = 0.0
total = 0.0
sentence = ([], [])
for line in open('data/test.txt'):
params = line.split()
if len(params) != 2: continue
sentence[0].append(params[0])
sentence[1].append(params[1])
if params[0] == '.':
text = ''
words = sentence[0]
tags = sentence[1]
for i, word in enumerate(words):
text += word
if i < len(words): text += ' '
outputs = tagger.tag(text)
assert len(tags) == len(outputs)
total += len(tags)
for o, t in zip(outputs, tags):
if o[1].strip() == t: right += 1
sentence = ([], [])
logging.info("Precision : %f", right / total)
except IOError:
logging.info('Reading corpus...')
training_data = []
sentence = ([], [])
for line in open('data/train.txt'):
params = line.split('\t')
sentence[0].append(params[0])
sentence[1].append(params[1])
if params[0] == '.':
training_data.append(sentence)
sentence = ([], [])
logging.info('training corpus size : %d', len(training_data))
logging.info('Start training...')
tagger.train(training_data, save_loc=PICKLE)