forked from NationalLibraryOfNorway/dhlab-app-sentiment
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sentiment.py
333 lines (256 loc) · 10.5 KB
/
sentiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import logging
import numpy as np
import pandas as pd
import requests
from collections import Counter
from pathlib import Path
from typing import Generator, List, Union
import dhlab as dh
from dhlab.api.dhlab_api import concordance, get_chunks_para
from dhlab.constants import BASE_URL
from dhlab.nbtokenizer import tokenize
# File handling util functions
def load_corpus_from_file(file_path):
"""Load a Corpus object from an excel or csv file."""
try:
corpus = (
dh.Corpus.from_df(pd.read_excel(file_path))
if file_path.endswith(".xlsx")
else dh.Corpus.from_csv(file_path)
)
except FileNotFoundError:
print("The corpus file must be a .csv or .xlsx file: ", file_path)
corpus = dh.Corpus()
return corpus
def load_sentiment_terms(fpath: str = None) -> pd.Series:
"""Load a sentiment lexicon from a file path."""
if fpath is None or not Path(fpath).exists:
print("File not found: ", fpath)
return pd.read_csv(fpath, names=["terms"])
def load_norsentlex(save_resources=False) -> List[pd.Series]:
"""Load the sentiment lexicons from ``ǸorsentLex``.
- Github repo [norsentlex](https://github.com/ltgoslo/norsentlex)
- [Lexicon information in neural sentiment analysis:
a multi-task learning approach](https://aclanthology.org/W19-6119) (Barnes et al., NoDaLiDa 2019)
"""
def get_fpath(sentiment):
rawpath = f"https://raw.githubusercontent.com/ltgoslo/norsentlex/master/Fullform/Fullform_{sentiment}_lexicon.txt"
localpath = Path(f"{sentiment}_lexicon.csv")
return localpath if localpath.exists() else rawpath
pos = load_sentiment_terms(get_fpath("Positive"))
neg = load_sentiment_terms(get_fpath("Negative"))
if save_resources:
pos.to_csv("Positive_lexicon.csv", index=False)
neg.to_csv("Negative_lexicon.csv", index=False)
return pos, neg
# Helper functions
def make_list(value) -> list:
"""Turn a string or list into a list.
:param value: Can be a list, a single valued string, a comma-separated string of values,
or a multiline string of values separated by newline.
"""
if isinstance(value, str):
if value.__contains__("\n"):
newlist = value.strip("\n").strip().split("\n")
elif value.__contains__(","):
newlist = value.split(",")
else:
newlist = [value]
return [v.strip() for v in newlist]
else:
assert isinstance(value, list)
return value
def strip_bold_annotation(text):
return text.replace("<b>", "").replace("</b>", "")
def make_search_link(docid: str, search_term: str = None):
"""Create a URL to the online library view of the digital object, with the search term"""
link = f"https://www.nb.no/items/{docid}"
return link if search_term is None else f"{link}?searchText={search_term}"
def add_urls(df):
df["url"] = df.apply(
lambda row: make_search_link(row.loc["urn"], row.loc["word"]), axis=1
)
# urls = df.apply(make_link, axis=1)
return df
def count_tokens(text):
text = strip_bold_annotation(text)
tokens = tokenize(text)
newcoll = Counter([tok.lower() for tok in tokens if not tok == "..."])
return pd.Series(newcoll.values(), index=newcoll.keys(), name="counts")
def strip_empty_cols(df: pd.DataFrame):
"""Remove columns without values from a dataframe."""
return df.dropna(axis=1, how="all").fillna("")
def group_index_terms(df: pd.DataFrame) -> pd.DataFrame:
"""Group duplicate index terms, make them case-insensitive, and sum up their frequency counts."""
if hasattr(df, "frame"):
df = df.frame
df = df.loc[df.index.str.isalpha()]
df.index = df.index.str.lower()
df = df.groupby(df.index).sum().to_frame("counts")
return df
# Sentiment scoring functions: Number crunching
def count_terms_in_doc(
urns: List[str],
words: Union[list, str],
docid_column="dhlabid"
):
"""Similar functionality as ``dhlab.api.dhlab_api.get_document_frequencies``,
except the dataframe isn't pivoted.
"""
params = {"urns": urns, "words": make_list(words), "cutoff": 0}
cols = [docid_column, "word", "count", "urncount"]
try:
r = requests.post(f"{BASE_URL}/frequencies", json=params)
r.raise_for_status
result = r.json()
df = pd.DataFrame(result, columns=cols)
except requests.exceptions.JSONDecodeError as e:
logging.error(f"Couldn't decode JSON object: {e}")
logging.info(f"Returning empty dataframe instead of word counts")
df = pd.DataFrame(columns=cols)
df = df.drop("urncount", axis=1)
# df = pd.pivot_table(df, values="count", index="word", columns="urn").fillna(0)
return df
def count_matching_tokens(token_counts: pd.DataFrame, terms: pd.Series) -> pd.DataFrame:
"""Combine word counts with a series of terms."""
target_terms = terms.join(token_counts, how="inner", on="terms")
return target_terms
def coll_sentiment(coll, word="barnevern", return_score_only=False):
"""Compute a sentiment score of positive and negative terms in `coll`.
The collocations of the ``word`` are used to count occurrences of positive and negative terms.
:param coll: a collocations dataframe or a dh.Corpus where ``word`` occurs.
:param str word: a word to estimate sentiment scores for
:param bool return_score_only: If True,
return a tuple with the absolute counts for positive and negative terms.
"""
if isinstance(coll, dh.Corpus):
coll = coll.coll(word).frame
coll = group_index_terms(coll)
# Data import
pos, neg = load_norsentlex()
positive_counts = count_matching_tokens(coll, pos)
negative_counts = count_matching_tokens(coll, neg)
if return_score_only:
return positive_counts.counts.sum(), negative_counts.counts.sum()
neutral_counts = coll.join(
pd.DataFrame(
pd.concat([coll, negative_counts, positive_counts]).index.drop_duplicates(
keep=False
)
).set_index(0),
how="inner",
)
positive_counts["sentiment"] = "pos"
negative_counts["sentiment"] = "neg"
neutral_counts["sentiment"] = "neutral"
return pd.concat([positive_counts, negative_counts, neutral_counts])
def sentiment_by_place(
cities=["Kristiansand", "Stavanger"],
from_year=1999,
to_year=2010
):
for city in cities:
lst = []
for year in range(from_year, to_year):
corpus = dh.Corpus(
doctype="digavis", freetext=f"city: {city} year: {year}", limit=1000
)
pos, neg = coll_sentiment(corpus, "barnevern", return_score_only=True)
lst.append(
pd.DataFrame(
[[pos, neg, pos - neg]],
index=[year],
columns=["positive", "negative", "sum"],
)
)
yield pd.concat(lst)
def score_sentiment(text, positive, negative):
"""Calculate a sentiment score for the ``text`` input."""
context = count_tokens(text)
sent_counts = [
count_matching_tokens(context, sent_terms).counts.sum()
if not context.empty
else 0
for sent_terms in (positive, negative)
]
return sent_counts
def count_and_score_target_words(corpus: pd.DataFrame, word: str):
"""Add word frequency and sentiment score for ``word`` in the given ``corpus``."""
if isinstance(corpus, dh.Corpus):
corpus = corpus.frame
urnlist = corpus.urn.to_list()
limit = 60 * len(urnlist)
docid_column = "dhlabid"
conc = concordance(urnlist, word, window=200, limit=limit)
# FIXME: remove once concordance also returns dhlabid by default
conc = conc.rename(columns={"docid": docid_column}).drop("urn", axis=1)
word_freq = count_terms_in_doc(urnlist, [word])
word_freq = word_freq.merge(
conc, how="inner", left_on=docid_column, right_on=docid_column
)
pos, neg = load_norsentlex(save_resources=False)
word_freq[["positive", "negative"]] = word_freq.apply(
lambda x: score_sentiment(x.conc, pos, neg), axis=1, result_type="expand"
)
word_freq["sentimentscore"] = word_freq["positive"] - word_freq["negative"]
df = corpus.merge(
word_freq.drop(columns="conc"),
how="inner",
left_on=docid_column,
right_on=docid_column,
)
df = strip_empty_cols(df)
return df
def compute_sentiment_analysis(*args, **kwargs):
"""Compute sentiment score on the input data."""
return count_and_score_target_words(*args, **kwargs)
# DUMPING GROUND
# Unnecessary function
def unpivot(frame):
"""Reshape a dataframe with multiple indexes.
Util function copied from Pandas docs:
https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html
"""
N, K = frame.shape
data = {
"count": frame.to_numpy().ravel("F"),
"urn": np.asarray(frame.columns).repeat(N),
"word": np.tile(np.asarray(frame.index), K),
}
return pd.DataFrame(data, columns=["word", "urn", "count"])
# Unnecessary function
def count_terms(corpus: dh.Corpus, search_terms: str):
"""wrapper to undo the pivot in the get_document_frequencies function."""
words = make_list(search_terms)
count_matrix = corpus.count(words).frame
flattened = unpivot(count_matrix)
non_null_counts = flattened.loc[flattened["count"] != 0.0]
return non_null_counts.reset_index(drop=True)
# Unnecessary function
def timestamp_generator(from_year: int, to_year: int) -> Generator:
"""Generate a timestamp per day in the period ``from_year``-``to_year``."""
# range of timestamps
timestamp_range = pd.date_range(start=f"{from_year}-01-01", end=f"{to_year}-12-31")
for i in timestamp_range:
date = "".join(str(i).split()[0].split("-"))
yield date
def get_context_bow(urn, word):
freq_col = "counts"
token_col = "token"
par_idx_col = "paragraph"
# Get a dataframe with all paragraphs in a URN and their word counts
chunks = get_chunks_para(urn)
total = [
{par_idx_col: i, token_col: token, freq_col: count}
for i, para in enumerate(chunks)
for token, count in para.items()
]
df = pd.DataFrame(total)
# Filter dataframe on the paragraphs that contain the search word
df["lowercase"] = df[token_col].str.lower()
matching_paragraphs = df[par_idx_col][df["lowercase"].str.match(word)]
cdf = df[df.paragraph.isin(matching_paragraphs)]
context = cdf[freq_col]
context.index = cdf[token_col]
context = group_index_terms(context)
return context