Skip to content

Commit

Permalink
cleaned up commit 327f5de for pull request
Browse files Browse the repository at this point in the history
mapred_tfidf.py:
        switched argument parsing to argparse

        if MapReduce will overwrite directories, users are now asked
        whether they want to continue

        added '--force' flag to automatically force directory overwrites

map_reduce_utils: 'numeric' words such as '14th' are filtered out

mappers/reducers:
        added documentation for each script, removed unneeded
        conditionals, added helper methods for print formatting

        word_count_red no longer has to re-read each file to see how
        long it is

        cos_sim_map now ensures that two documents are sent to the
        same reducer regardless of which order they arrive in.
  • Loading branch information
zbsimon committed Nov 24, 2014
1 parent 327f5de commit a4a23fa
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 75 deletions.
16 changes: 8 additions & 8 deletions compare_texts.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import nltk
import string
import sys
import pylab
import numpy as np
import networkx as nx
import argparse
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from nltk.stem.porter import PorterStemmer
Expand All @@ -21,10 +21,8 @@ def get_texts(dir):
return texts

def stem_tokens(src, stemmer):
stemmed = []
for item in src:
stemmed.append(stemmer.stem(item))
return stemmed
return [stemmer.stem(item) for item in src]


def tokenize(text):
tokens = nltk.word_tokenize(text)
Expand Down Expand Up @@ -53,10 +51,12 @@ def plot_graph(g):


if __name__ == '__main__':
desc = 'compute the tf-idf cosine similarity of a set of documents'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--input', default='./corpus', dest='input_dir')
args = vars(parser.parse_args())
input_dir = args['input_dir']

input_dir = './corpus'
if len(sys.argv) >= 2:
input_dir = sys.argv[1]
texts = get_texts(input_dir)
similarities = get_similarities(texts)
sim_graph = get_similarity_graph(similarities)
Expand Down
10 changes: 10 additions & 0 deletions contents_mapper.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
#!/usr/bin/env python

import os
import sys
from map_reduce_utils import clean_text

"""
(file_contents) --> (file_name) (file_contents)
for each line from stdin consisting of a document in the corpus, emits
a key-value pair to stdout with a key of the corresponding filename
and a value of the file contents cleaned with
map_reduce_utils.clean_text
"""

for line in sys.stdin:
docname = os.environ['mapreduce_map_input_file']
contents = clean_text(line)
Expand Down
8 changes: 8 additions & 0 deletions corp_freq_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

import sys

"""
(word file_name) (n N) --> (word) (file_name n N 1)
emits a line for each unique word in each file to be consumed
by corp_freq_red to find the number of occurences of each
unique word throughout the entire corpus.
"""

for line in sys.stdin:
key, value = line.strip().split('\t')
word, docname = key.strip().split()
Expand Down
23 changes: 10 additions & 13 deletions corp_freq_red.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import sys

"""
(word) (file_name n N 1) --> (word file_name) (n N m)
sums up the number of occurences of each unique word throughout
the corpus and emits this sum for each document that the word
occurs in.
"""


def print_results(count, files):
# This is only printing the first element in files, but the output
# has the same word in two different lines, so it must be the case
# that this is getting
for string in files:
# i think we're not putting a tab in the correct place
print '%s %s' % (string, count)

processed_files = []
Expand All @@ -19,29 +23,22 @@ def print_results(count, files):


for line in sys.stdin:
# do same thing as in step A, but keep a list of all documents that
# are processed and their n, N values, then when we write out
# to stdout, we can do it for each one of these
key, value = line.strip().split('\t')
word = key.strip()
docname, word_count, doc_count, count = value.strip().split()
count = int(count)
# add document/word combo to processed files
processed_combo = '%s %s\t%s %s' % (word, docname, word_count, doc_count)
count = int(count)
if cur_word == word:
cur_count += count
processed_files.append(processed_combo)
else:
if cur_word:
# This is getting hit on the first time a word is seen,
# we want it on the last one, so need to check before
# adding counts, print there
if cur_word is not None:
print_results(cur_count, processed_files)
cur_word = word
cur_count = count
processed_files = []
processed_files.append(processed_combo)

if cur_word == word and cur_word is not None:
if cur_word is not None:
print_results(cur_count, processed_files)
17 changes: 15 additions & 2 deletions cos_sim_map.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
#!/usr/bin/env python
import sys

"""
(word) (file1 file2 tfidf1*tfidf2) --> (file1 file2) (tfidf1*tfidf2)
for each word common to two documents, removes the word from the
key/value pair and replaces it with the two filenames so that we can
sum up the values for each pair of documents in the reducer.
"""

for line in sys.stdin:
key, value = line.strip().split('\t')
doc1, doc2, product = value.strip().split()
product = float(product)
print '%s %s\t%.16f' % (doc1, doc2, product)

# we want to ensure that (doc1 doc2) and (doc2 doc1) get
# sent to the same reducer, so we order them alphabetically
if doc1 > doc2:
doc1, doc2 = doc2, doc1

print '%s %s\t%s' % (doc1, doc2, product)
15 changes: 13 additions & 2 deletions cos_sim_red.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
#!/usr/bin/env python
import sys

"""
(file1 file2) (tfidf1*tfidf2) --> (file1 file2) (cosine_similarity(f1, f2))
sums up the products of the tfidf values of words common between every
pair of documents to produce the cosine similarity of the two documents
"""

cur_sum = 0
cur_docs = (None, None) # will become (doc1, doc2)


def print_result(doc1, doc2, sum_for_docs):
print '%s %s\t%.16f' % (doc1, doc2, sum_for_docs)

for line in sys.stdin:
key, value = line.strip().split('\t')
doc1, doc2 = key.strip().split()
Expand All @@ -12,9 +23,9 @@
cur_sum += product
else:
if cur_docs[0] is not None and cur_docs[1] is not None:
print '%s %s\t%.16f' % (cur_docs[0], cur_docs[1], cur_sum)
print_result(cur_docs[0], cur_docs[1], cur_sum)
cur_docs = (doc1, doc2)
cur_sum = 0

if cur_docs[0] is not None and cur_docs[1] is not None:
print '%s %s\t%.16f' % (cur_docs[0], cur_docs[1], cur_sum)
print_result(cur_docs[0], cur_docs[1], cur_sum)
20 changes: 19 additions & 1 deletion map_reduce_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
#!/usr/bin/env python

from nltk.stem.porter import PorterStemmer
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS as stopwords
import string
import re

"""
map_reduce_utils contains helper functions that are used in multiple
map-reduce tasks.
"""


def clean_text(text):
# TODO remove words w/ numerals, e.g. '14th'
"""
returns a 'cleaned' version of text by filtering out all words
that don't contain strictly alphabetic characters, converting
all words to lowercase, filtering out common stopwords, and
stemming each word using porter stemming.
"""
stemmer = PorterStemmer()
result = text.lower()
result = result.translate(None, string.punctuation)
result = result.replace('\n', ' ')
result = result.split()

# filter out 'numeric' words such as '14th'
is_alpha = re.compile('^[a-z]+$')
result = filter(lambda word: is_alpha.match(word), result)

result = [stemmer.stem(word) for word in result]
return filter(lambda word: word not in stopwords, result)
100 changes: 74 additions & 26 deletions mapred_tfidf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
#!/usr/bin/env python

from __future__ import print_function
import os
import subprocess
import sys
import argparse
import shutil
import sys


class MapReduceError(Exception):
""" error raised when a map reduce job fails"""

def __init__(self, value, source):
self.value = value
self.source = source

def __str__(self):
return repr(self.value)


def run_map_job(mapper, input_dir, output_dir):
Expand All @@ -22,11 +35,11 @@ def run_map_job(mapper, input_dir, output_dir):
-output $NLTK_HOME/{2} \
-file {3}\
'''.format(mapper, input_dir, output_dir, map_file).strip()

try:
subprocess.check_call(command, env=env, shell=True)
except subprocess.CalledProcessError:
print 'ERROR: Map job %s failed' % mapper
raise
except subprocess.CalledProcessError as e:
raise MapReduceError('Map job {0} failed'.format(mapper), e)


def run_map_reduce_job(mapper, reducer, input_dir, output_dir):
Expand All @@ -49,72 +62,107 @@ def run_map_reduce_job(mapper, reducer, input_dir, output_dir):
command = command.strip()
try:
subprocess.check_call(command, env=env, shell=True)
except subprocess.CalledProcessError:
print 'ERROR: Map-Reduce job %s, %s failed' % (mapper, reducer)
raise
except subprocess.CalledProcessError as e:
err_msg = 'ERROR: Map-Reduce job {0}, {1} failed'
raise MapReduceError(err_msg.format(mapper, reducer), e)

if __name__ == '__main__':
# directories where we will store intermediate results
word_join_dir = 'joined_words'
tfidf_dir = 'tfidf'
corpora_frequency_dir = 'corpora_freq'
word_count_dir = 'word_count'
word_frequency_dir = 'word_freq'
clean_content_dir = 'file_contents'

directories = [clean_content_dir, word_frequency_dir,
word_count_dir, corpora_frequency_dir,
tfidf_dir, word_join_dir]

desc = ''' computes the tf-idf cosine simiarity metric for a set
of documents using map reduce streaming. Set appropriate
paths in hadoop-streaming-env.sh and 'source' it before
running this script, or set the corresponding environment
variables manually.'''

parser = argparse.ArgumentParser(description=desc)

input_help = 'The relative path of the corpus to use as input'
parser.add_argument('--input', default='corpora',
dest='input_dir', help=input_help)

output_help = 'The relative path where the results will be placed'
parser.add_argument('--output', default='similarities',
dest='output_dir', help=output_help)

force_help = 'If set, silently overwrite output & intermediate dirs: '
force_help += ' '.join(directories)
parser.add_argument('--force', default=False, dest='force',
help=force_help, action='store_true')
args = vars(parser.parse_args())
input_dir = args['input_dir']
output_dir = args['output_dir']
force = args['force']
directories.append(output_dir)

dirs_to_overwrite = filter(os.path.exists, directories)
if not force and len(dirs_to_overwrite) > 0:
print('The following directories will be overwritten:')
print('\t', '\n\t'.join(dirs_to_overwrite))
response = raw_input('Continue? [y/n] ')
if response not in ['y', 'yes', 'Y', 'Yes']:
exit()

# check to see that environment variables have been set
env = os.environ.copy()
input_dir = "inaugural"
if len(sys.argv) > 1:
input_dir = sys.argv[1]
try:
val = env['NLTK_HOME']
except KeyError:
print 'ERROR: Please run "source ./hadoop-streaming-env.sh"'
raise
except KeyError as e:
err_msg = '''
ERROR: environment variable NLTK_HOME undefined
have you run "source hadoop-streaming-env.sh"?
'''
print(err_msg, file=sys.stderr)
raise e

# we need the size of the corpora to do tfidf:
corp = './' + input_dir
corp_files = [f for f in os.listdir(corp) if os.path.isfile(corp+'/'+f)]
corpora_len = len(corp_files)
print 'CORP_SIZE: ', corpora_len

# TODO: probably shouldn't clobber these dirs in case there's
# anything in them

# do an MR job to clean/stem file contents
clean_content_dir = 'file_contents'
run_map_job('contents_mapper.py', input_dir, clean_content_dir)

# calcualte word frequency
word_frequency_dir = 'word_freq'
run_map_reduce_job('word_freq_map.py',
'word_freq_red.py',
clean_content_dir,
word_frequency_dir)

# caclulate word count for each document
word_count_dir = 'word_count'
run_map_reduce_job('word_count_map.py',
'word_count_red.py',
word_frequency_dir,
word_count_dir)

# calculate word frequency in corpora
corpora_frequency_dir = 'corpora_freq'
run_map_reduce_job('corp_freq_map.py',
'corp_freq_red.py',
word_count_dir,
corpora_frequency_dir)

# now, calculate tfidf scores
tfidf_dir = 'tfidf'
run_map_job('tf_idf_map.py {0}'.format(corpora_len),
corpora_frequency_dir,
tfidf_dir)

# join on words for cosine similarity
word_join_dir = 'joined_words'
run_map_reduce_job('word_join_map.py',
'word_join_red.py',
tfidf_dir,
word_join_dir)

# now, sum up the products to get the cosine similarities
output_dir = "output"
if len(sys.argv) > 2:
output_dir = sys.argv[3]
run_map_reduce_job('cos_sim_map.py',
'cos_sim_red.py',
word_join_dir,
Expand Down
Loading

0 comments on commit a4a23fa

Please sign in to comment.