-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpearson_correlate.py
executable file
·194 lines (124 loc) · 5.78 KB
/
pearson_correlate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import sys, logging, math
from optparse import OptionParser
from nltk.stem import RegexpStemmer
from collections import Counter, defaultdict
from operator import itemgetter
reg_stemmer_signs = RegexpStemmer(r'(\'|\?|!|;|:|\.|,|!?|"|\]|\[|\(|\)|-)+')
reg_stemmer_suffs = RegexpStemmer(r'(es|s)\b')
# ten words with max PPМС to key-word
k_max = 10
# split file by empty lines,
# returns chunks for processing
def make_chunk(file_obj, key):
chunk = []
while(True):
line = next(file_obj).strip()
if line:
l = line.split()
# clean from stop words, empty values
l = list(filter(lambda word: not word.isupper(),l))
l = list(filter(lambda word: word.isalpha(), l))
l = [word.lower() for word in l]
stop_words = [
'and', 'the','this','of','thou','not','is','be','to',\
'for','shall','will','in','with','are','a','you','do',\
'is', 'me','but','thee','so','as','it','hi','by','or',\
'from','on','no','then','nor','thine','for','thy','at',\
'if','was'
]
filtered_list = [ w for w in l if w not in stop_words ]
# simple stemming
for stem in (reg_stemmer_signs, reg_stemmer_suffs):
filtered_list = [ stem.stem(word) for word in filtered_list ]
filtered_list = list(filter(lambda word: word, filtered_list))
chunk.extend(filtered_list)
elif chunk.count(key):
yield chunk
if __name__ == "__main__":
usage = "usage: %prog [options] -f [FnLE] -k key_word"
parser = OptionParser(usage)
parser.add_option("-f", action = "store", type = "string", dest = "file", metavar = "[REQUIRED]",
help = "path to file wnth processnng texts")
parser.add_option("-k", type = "string", dest = "key_word", metavar = "[REQUIRED]",
help = "key word, relatevely to which estnmate correlatnons")
parser.add_option("-v", action = "store_true", dest = "verbose", default = False, metavar = " ",
help = "be verbose")
(options, args) = parser.parse_args()
if None in options.__dict__.values():
logger.debug("")
parser.print_help()
logger.debug("")
sys.exit(1)
# in case if options.verbose is True
formatter = logging.Formatter('%(message)s')
logger = logging.getLogger('')
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
if options.verbose:
logger.setLevel(logging.DEBUG)
key_freq_per_exp = {}
key_freq_total = 0
unique_words_freq_total = Counter()
unique_words_freq_per_exp = {}
n = 0
try:
with open(options.file, "rt", encoding="utf-8") as infile:
while(True):
try:
experiment = next(make_chunk(infile, options.key_word))
except StopIteration as err:
break
key_freq_per_exp[n] = experiment.count(options.key_word)
key_freq_total += key_freq_per_exp.get(n)
logger.debug('#'+str(n)+'; key word: '+options.key_word.upper()+'; frequency per exp: '+str(key_freq_per_exp.get(n)))
cur_freqs = dict([(word,experiment.count(word)) for word in experiment]) # uniqie per experiment
unique_words_freq_total += Counter(cur_freqs) # unique total
unique_words_freq_per_exp[n] = cur_freqs
n +=1
avg_x = key_freq_total/n
logger.debug('AVG f for '+options.key_word.upper()+' = '+str(avg_x)+'/n')
# build avg cur_freqs dict for each unique word from all chunks
avg_y_values = {}
logger.debug('Average cur_freqs dict for each unique word in experiment:\n')
for item in tuple(unique_words_freq_total.most_common()):
k,value = item
avg_y_values[k] = value/n
logger.debug(k+' --> '+str(round(avg_y_values.get(k),5)))
# key_word's delta cur_freqs
x_subs_dict = {}
for i in range(n):
subs_x = key_freq_per_exp.get(i) - avg_x
x_subs_dict[i] = (subs_x, pow(subs_x,2))
# calculate Pearson's product-moment coefficient
unique_words_list = list(unique_words_freq_total.keys())
correlations={}
for key_word in unique_words_list:
if key_word == options.key_word:
continue
logger.debug('\nPPMC for '+key_word.upper()+':')
covariance = 0
sigma_sum_x = 0
sigma_sum_y = 0
for i in range(n):
f_per_chunk = unique_words_freq_per_exp.get(i)
y_freq = f_per_chunk.get(key_word)
if not y_freq:
y_freq = 0
delta_x, delta_x_squared = x_subs_dict.get(i)
covariance += (y_freq - avg_y_values.get(key_word))* delta_x
sigma_sum_x += delta_x_squared
sigma_sum_y += pow((y_freq - avg_y_values.get(key_word)),2)
r = covariance/(math.sqrt(sigma_sum_x*sigma_sum_y))
correlations[key_word] = r
logger.debug('>>> '+str(round(r,5)))
print('Top '+str(k_max)+' correlated with word "'+options.key_word+'" are:\n')
print('\t{0:11} {1:5}\n'.format('word','PPMC'))
for key, value in sorted(correlations.items(), key=itemgetter(1),reverse=True)[:k_max]:
print('\t{0:11} {1:5}'.format(key,round(value,5)))
except Exception as err:
logger.error(str(err))
sys.exit(1)