forked from wkdaniel3/Bayesian-Analysis-for-Wine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtext_processing.py
executable file
·113 lines (98 loc) · 4.14 KB
/
text_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import numpy as np
import re
import nltk
from nltk.stem.snowball import SnowballStemmer
from nltk.util import ngrams
from nltk.corpus import stopwords
stemmer = SnowballStemmer('english')
# =============================================================================
# Notes
# =============================================================================
# =============================================================================
# Text Cleaning
# =============================================================================
def remove_text(pattern, df, col): #uses regex to find and replace text in posts
string = re.compile(r'{}'.format(pattern))
removed_text = df[col].str.findall(string)
df[col] = df[col].str.replace(string, '')
return df
def drop_empty_posts(df, col):
'''currently only accepts a single df and a list of columns to be dropped (based on col name).'''
to_drop = np.where(df[col].isna() == True)[0]
df = df.drop(labels = to_drop, axis = 0)
to_drop = np.where(df[col] == '')[0]
df = df.drop(labels = to_drop, axis = 0)
return df
def remove_whitespace(df, col):
# Lowercase
df[col] = df[col].str.lower()
# Remove new lines and tabs
t = str.maketrans("\n\t\r", " ")
df[col] = df[col].str.translate(str.maketrans(t))
df[col] = df[col].str.strip()
# Remove Whitespace
return df
# return df
def text_to_lower(df, col):
df[col] = df[col].str.lower()
return df
def remove_punct(df, col):
df[col] = df[col].str.replace('[^\w\s]','')
return df
def remove_stopwords(df, col):
stop = stopwords.words('english')
df[col] = df[col].apply(lambda x: ' '.join(x for x in x.split() if x not in stop))
return df
def tokenize_only(text):
return [word.lower() for sent in nltk.sent_tokenize(text) for word in nltk.word_tokenize(sent)]
def tokenize_and_stem(text):
tokens = tokenize_only(text)
return [stemmer.stem(t) for t in tokens]
def tokenize_and_stem_and_bigram(text):
tokens = tokenize_and_stem(text)
word_list = list(tokens)
bigrams = list(ngrams(tokens, 2))
bigram_list = []
for bigram in bigrams:
bigram_list.append(' '.join(bigram))
word_list += bigram_list
return word_list
def text_process_pipeline(df, col, quotes=True, links=True, PGP=True, punctuation=True, whitespace=True, lower=True, stopwords=True):
if quotes is True:
print('Removing "Quotes from: <author name> <date>" from column')
df = remove_text('Quote.*?(?<=\d\d:\d\d\s)am|Quote.*?(?<=\d\d:\d\d\s)pm', df, col)
if links is True:
print('Removing web links from column')
df = remove_text('(http.*?)\s', df, col)
if PGP is True:
print('Removing PGP keys, signatures, and messages from column')
df = remove_text('-----BEGIN PGP PUBLIC KEY BLOCK-----[\s\S]*-----END PGP PUBLIC KEY BLOCK-----', df, col)
df = remove_text('-----BEGIN PGP SIGNATURE-----[\s\S]*-----END PGP SIGNATURE-----', df, col)
df = remove_text('-----BEGIN PGP MESSAGE-----[\s\S]*-----END PGP MESSAGE-----', df, col)
if punctuation is True:
print('Removing extra whitespace from column')
df = remove_punct(df, col)
if lower is True:
print('Making all text in column lowercase')
df = text_to_lower(df, col)
if stopwords is True:
print('Removing empty strings/nulls and stopwords from column')
df = drop_empty_posts(df, col)
df = remove_stopwords(df, col)
df = drop_empty_posts(df, col)
if whitespace is True:
print('Removing extra whitespace from column')
df = remove_punct(df, col)
df.reset_index(inplace = True)
df.rename(columns = {'index': 'post_id'}, inplace = True)
return df
# =============================================================================
# Text Features
# =============================================================================
def avg_word_len(sentence):
'''takes a single string as argument and returns avg word length'''
try:
words = sentence.split()
return round((sum(len(word) for word in words)/len(words)), 2)
except ZeroDivisionError:
return 0