Skip to content

Commit

Permalink
implement cosine similarity in mapreduce streaming
Browse files Browse the repository at this point in the history
Moved tdidf and cosine similarity computation to map reduce streaming
mapred_tfidf.py is the main runable file.
be sure to run source hadoop-streaming-env.sh before mapred_tfidf.py
and make the map and reduce scripts executable
  • Loading branch information
zbsimon committed Nov 17, 2014
1 parent e56eefd commit 327f5de
Show file tree
Hide file tree
Showing 16 changed files with 362 additions and 9 deletions.
9 changes: 9 additions & 0 deletions contents_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env python
import os
import sys
from map_reduce_utils import clean_text

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

add brief descriptions of what each of these mapper/reducer scripts does

for line in sys.stdin:
docname = os.environ['mapreduce_map_input_file']
contents = clean_text(line)
print docname, '\t', ' '.join(map(str, contents))
8 changes: 8 additions & 0 deletions corp_freq_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python

import sys

for line in sys.stdin:
key, value = line.strip().split('\t')
word, docname = key.strip().split()
print '%s\t%s %s %s' % (word, docname, value, 1)
47 changes: 47 additions & 0 deletions corp_freq_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python

import sys


def print_results(count, files):
# This is only printing the first element in files, but the output
# has the same word in two different lines, so it must be the case
# that this is getting
for string in files:
# i think we're not putting a tab in the correct place

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

what does this comment mean? Does this comment still hold

This comment has been minimized.

Copy link
@zbsimon

zbsimon Nov 24, 2014

Author Contributor

Whoops. Nope, forgot to remove this sorry.

print '%s %s' % (string, count)

processed_files = []
cur_word = None
cur_count = 0

word = None


for line in sys.stdin:
# do same thing as in step A, but keep a list of all documents that
# are processed and their n, N values, then when we write out
# to stdout, we can do it for each one of these
key, value = line.strip().split('\t')
word = key.strip()
docname, word_count, doc_count, count = value.strip().split()
count = int(count)
# add document/word combo to processed files
processed_combo = '%s %s\t%s %s' % (word, docname, word_count, doc_count)
count = int(count)
if cur_word == word:
cur_count += count
processed_files.append(processed_combo)
else:
if cur_word:
# This is getting hit on the first time a word is seen,
# we want it on the last one, so need to check before
# adding counts, print there
print_results(cur_count, processed_files)
cur_word = word
cur_count = count
processed_files = []
processed_files.append(processed_combo)

if cur_word == word and cur_word is not None:

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

This somehow seems more confusing than necessary. Is there a different way to structure the logic or use a well-named flag instead of this condition in two places?

This comment has been minimized.

Copy link
@zbsimon

zbsimon Nov 24, 2014

Author Contributor

I cleaned this up a little in the other branch. We don't need the word comparison at the end. The check for None is still needed in the loop for the first time through the loop. It's needed at the end only to guard against a situation where there is no input, so we could potentially get rid of it. If we wanted.

print_results(cur_count, processed_files)
8 changes: 8 additions & 0 deletions cos_sim_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
import sys

for line in sys.stdin:
key, value = line.strip().split('\t')
doc1, doc2, product = value.strip().split()
product = float(product)
print '%s %s\t%.16f' % (doc1, doc2, product)

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

is it necessary to convert the product between string -> float -> string?

20 changes: 20 additions & 0 deletions cos_sim_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env python
import sys

cur_sum = 0
cur_docs = (None, None) # will become (doc1, doc2)

for line in sys.stdin:
key, value = line.strip().split('\t')
doc1, doc2 = key.strip().split()
product = float(value)
if (doc1, doc2) == cur_docs:
cur_sum += product
else:
if cur_docs[0] is not None and cur_docs[1] is not None:
print '%s %s\t%.16f' % (cur_docs[0], cur_docs[1], cur_sum)
cur_docs = (doc1, doc2)
cur_sum = 0

if cur_docs[0] is not None and cur_docs[1] is not None:
print '%s %s\t%.16f' % (cur_docs[0], cur_docs[1], cur_sum)

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

this duplicate print code merits a function

14 changes: 14 additions & 0 deletions map_reduce_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from nltk.stem.porter import PorterStemmer
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS as stopwords
import string


def clean_text(text):
# TODO remove words w/ numerals, e.g. '14th'

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

now issue #1

stemmer = PorterStemmer()
result = text.lower()
result = result.translate(None, string.punctuation)
result = result.replace('\n', ' ')
result = result.split()
result = [stemmer.stem(word) for word in result]
return filter(lambda word: word not in stopwords, result)
121 changes: 121 additions & 0 deletions mapred_tfidf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/env python

import os
import subprocess
import sys
import shutil


def run_map_job(mapper, input_dir, output_dir):
env = os.environ.copy()
# we have to pass the specific files as well to allow for
# arguments to the mapper and reducer
map_file = '$NLTK_HOME/' + mapper.strip().split()[0]
map_file = mapper.strip().split()[0]
if os.path.exists('./' + output_dir):
shutil.rmtree('./' + output_dir)

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

this seems possibly hazardous. Perhaps you can have a force-remove flag and without it it warns and exits?

command = '''
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$RELATIVE_PATH_JAR \
-D mapred.job.reduces=0 \
-mapper "$NLTK_HOME/{0}" \
-input $NLTK_HOME/{1} \
-output $NLTK_HOME/{2} \
-file {3}\
'''.format(mapper, input_dir, output_dir, map_file).strip()
try:
subprocess.check_call(command, env=env, shell=True)
except subprocess.CalledProcessError:
print 'ERROR: Map job %s failed' % mapper
raise

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

It's better to raise an exception containing the message, and typically ERROR should not be printed to stdout



def run_map_reduce_job(mapper, reducer, input_dir, output_dir):
env = os.environ.copy()
# we have to pass the specific files as well to allow for
# arguments to the mapper and reducer
map_file = '$NLTK_HOME/' + mapper.strip().split()[0]
red_file = '$NLTK_HOME/' + mapper.strip().split()[0]
if os.path.exists('./' + output_dir):
shutil.rmtree('./' + output_dir)

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

consider above

command = '''
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$RELATIVE_PATH_JAR \
-mapper "$NLTK_HOME/{0}" \
-reducer "$NLTK_HOME/{1}" \
-input $NLTK_HOME/{2} \
-output $NLTK_HOME/{3} \
-file {4} \
-file {5}
'''.format(mapper, reducer, input_dir, output_dir, map_file, red_file)
command = command.strip()
try:
subprocess.check_call(command, env=env, shell=True)
except subprocess.CalledProcessError:
print 'ERROR: Map-Reduce job %s, %s failed' % (mapper, reducer)
raise

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

exception as above


if __name__ == '__main__':
env = os.environ.copy()
input_dir = "inaugural"
if len(sys.argv) > 1:
input_dir = sys.argv[1]

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

this is the main script, so its worth using argparse with named arguments and description here

try:
val = env['NLTK_HOME']
except KeyError:
print 'ERROR: Please run "source ./hadoop-streaming-env.sh"'
raise

# we need the size of the corpora to do tfidf:
corp = './' + input_dir
corp_files = [f for f in os.listdir(corp) if os.path.isfile(corp+'/'+f)]
corpora_len = len(corp_files)
print 'CORP_SIZE: ', corpora_len

# TODO: probably shouldn't clobber these dirs in case there's
# anything in them

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

agreed


# do an MR job to clean/stem file contents
clean_content_dir = 'file_contents'
run_map_job('contents_mapper.py', input_dir, clean_content_dir)

# calcualte word frequency
word_frequency_dir = 'word_freq'
run_map_reduce_job('word_freq_map.py',
'word_freq_red.py',
clean_content_dir,
word_frequency_dir)

# caclulate word count for each document
word_count_dir = 'word_count'
run_map_reduce_job('word_count_map.py',
'word_count_red.py',
word_frequency_dir,
word_count_dir)

# calculate word frequency in corpora
corpora_frequency_dir = 'corpora_freq'
run_map_reduce_job('corp_freq_map.py',
'corp_freq_red.py',
word_count_dir,
corpora_frequency_dir)

# now, calculate tfidf scores
tfidf_dir = 'tfidf'
run_map_job('tf_idf_map.py {0}'.format(corpora_len),
corpora_frequency_dir,
tfidf_dir)

# join on words for cosine similarity
word_join_dir = 'joined_words'
run_map_reduce_job('word_join_map.py',
'word_join_red.py',
tfidf_dir,
word_join_dir)

# now, sum up the products to get the cosine similarities
output_dir = "output"
if len(sys.argv) > 2:
output_dir = sys.argv[3]

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

definitely hoist the argv parsing up so you can just use output_dir here

run_map_reduce_job('cos_sim_map.py',
'cos_sim_red.py',
word_join_dir,
output_dir)
22 changes: 13 additions & 9 deletions materialize_nltk_corpus.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,31 @@

corpusname = "inaugural"
if len(sys.argv) >= 2:
corpusname = sys.argv[1]
corpusname = sys.argv[1]

filelim = 4
if len(sys.argv) >= 3:
filelim = int(sys.argv[2])
filelim = int(sys.argv[2])

corpus = getattr(nltk.corpus, corpusname)

corpus = getattr(nltk.corpus, corpusname)

def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc:
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
else:
raise

path = "./%s" % corpusname
mkdir_p(path)


for i in range(0,filelim):
fid = corpus.fileids()[i]
with open("%s/%s"%(path,fid), 'w') as out:
out.write(corpus.raw(fid))
for i in range(0, filelim):
fid = corpus.fileids()[i]
with open("%s/%s" % (path, fid), 'w') as out:
# need to remove new lines here so MR interprets each file
# as a single input
out.write(corpus.raw(fid).replace('\n', ' '))
19 changes: 19 additions & 0 deletions tf_idf_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python
import sys
from math import log

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

brief descrip


corpora_size = sys.argv[1]


for line in sys.stdin:
key, value = line.strip().split('\t')
n, N, m = value.strip().split()
n = int(n)
N = int(N)
m = int(m)
D = corpora_size
tf = float(n) / float(N)
idf = log((float(D) / float(m)), 10)
tfidf = tf * idf
print '%s\t%.16f' % (key, tfidf)
Empty file added tf_idf_red.py
Empty file.
8 changes: 8 additions & 0 deletions word_count_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python

import sys

for line in sys.stdin:
key, value = line.strip().split('\t')
word, docname = key.strip().split()
print '%s\t%s %s' % (docname, word, value)
23 changes: 23 additions & 0 deletions word_count_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python
from map_reduce_utils import clean_text

import sys

for line in sys.stdin:
key, value = line.strip().split('\t')
docname = key
word, count = value.strip().split()
# This is ... not very generic but not sure how to pass
# variables from the script running all of the jobs to this one

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

environment variable should work if you use the subprocess methods' env= argument

path = docname.split('file:', 1)[1]
# also, would be better to not re-read file every time, but seems like
# this is done in practice.

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

are you referring to the fact that between every job you are forced by hadoop streaming to read from stdin and write to stdout instead of chaining? Or something else?

This comment has been minimized.

Copy link
@zbsimon

zbsimon Nov 24, 2014

Author Contributor

This was referring to the fact that we were re-reading the contents of each document again in order to find the length of each document. This is fixed in my branch though.


try:
contents = open(path, 'r').read()
contents = clean_text(contents)
except:
with open('errors.txt', 'a+') as f:
f.write(docname.split('file:', 1)[1])
document_word_count = len(contents)
print '%s %s\t%s %s' % (word, docname, count, document_word_count)
8 changes: 8 additions & 0 deletions word_freq_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python

import sys

for line in sys.stdin:
file_name, words = line.strip().split('\t')
for word in words.strip().split():
print '%s %s\t%s' % (word, file_name, 1)
24 changes: 24 additions & 0 deletions word_freq_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env python

import sys

cur_word = None
cur_file = None
cur_count = 0
word = None

for line in sys.stdin:
key, value = line.strip().split('\t')
word, filename = key.strip().split()
count = int(value)
if ((cur_word == word) and (filename == cur_file)):

This comment has been minimized.

Copy link
@bmyerz

bmyerz Nov 21, 2014

Member

This pattern of checking for change in sorted tuples' keys seems to be common in the reducers. Can you abstract it into an iterator over keys that you can use everywhere?

Same goes for the parsing, which has similar structure in all of the scripts

cur_count += count
else:
if cur_word:
print '%s %s\t%s' % (cur_word, cur_file, cur_count)
cur_count = count
cur_word = word
cur_file = filename

if cur_word == word and cur_word is not None:
print '%s %s\t%s' % (cur_word, cur_file, cur_count)
7 changes: 7 additions & 0 deletions word_join_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env python
import sys

for line in sys.stdin:
key, value = line.strip().split('\t')
word, doc = key.strip().split()
print '%s\t%s %s' % (word, doc, value)
Loading

0 comments on commit 327f5de

Please sign in to comment.