From 327f5de0199d2d4a0e745f50ee427e7cb76a1ab6 Mon Sep 17 00:00:00 2001 From: Zach Simon Date: Sun, 16 Nov 2014 23:16:47 -0800 Subject: [PATCH] implement cosine similarity in mapreduce streaming Moved tdidf and cosine similarity computation to map reduce streaming mapred_tfidf.py is the main runable file. be sure to run source hadoop-streaming-env.sh before mapred_tfidf.py and make the map and reduce scripts executable --- contents_mapper.py | 9 +++ corp_freq_map.py | 8 +++ corp_freq_red.py | 47 ++++++++++++++ cos_sim_map.py | 8 +++ cos_sim_red.py | 20 ++++++ map_reduce_utils.py | 14 +++++ mapred_tfidf.py | 121 +++++++++++++++++++++++++++++++++++++ materialize_nltk_corpus.py | 22 ++++--- tf_idf_map.py | 19 ++++++ tf_idf_red.py | 0 word_count_map.py | 8 +++ word_count_red.py | 23 +++++++ word_freq_map.py | 8 +++ word_freq_red.py | 24 ++++++++ word_join_map.py | 7 +++ word_join_red.py | 33 ++++++++++ 16 files changed, 362 insertions(+), 9 deletions(-) create mode 100755 contents_mapper.py create mode 100755 corp_freq_map.py create mode 100755 corp_freq_red.py create mode 100755 cos_sim_map.py create mode 100755 cos_sim_red.py create mode 100755 map_reduce_utils.py create mode 100755 mapred_tfidf.py mode change 100644 => 100755 materialize_nltk_corpus.py create mode 100755 tf_idf_map.py create mode 100755 tf_idf_red.py create mode 100755 word_count_map.py create mode 100755 word_count_red.py create mode 100755 word_freq_map.py create mode 100755 word_freq_red.py create mode 100755 word_join_map.py create mode 100755 word_join_red.py diff --git a/contents_mapper.py b/contents_mapper.py new file mode 100755 index 0000000..58fad60 --- /dev/null +++ b/contents_mapper.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +import os +import sys +from map_reduce_utils import clean_text + +for line in sys.stdin: + docname = os.environ['mapreduce_map_input_file'] + contents = clean_text(line) + print docname, '\t', ' '.join(map(str, contents)) diff --git a/corp_freq_map.py b/corp_freq_map.py new file mode 100755 index 0000000..a7793bd --- /dev/null +++ b/corp_freq_map.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +import sys + +for line in sys.stdin: + key, value = line.strip().split('\t') + word, docname = key.strip().split() + print '%s\t%s %s %s' % (word, docname, value, 1) diff --git a/corp_freq_red.py b/corp_freq_red.py new file mode 100755 index 0000000..14960ba --- /dev/null +++ b/corp_freq_red.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +import sys + + +def print_results(count, files): + # This is only printing the first element in files, but the output + # has the same word in two different lines, so it must be the case + # that this is getting + for string in files: + # i think we're not putting a tab in the correct place + print '%s %s' % (string, count) + +processed_files = [] +cur_word = None +cur_count = 0 + +word = None + + +for line in sys.stdin: + # do same thing as in step A, but keep a list of all documents that + # are processed and their n, N values, then when we write out + # to stdout, we can do it for each one of these + key, value = line.strip().split('\t') + word = key.strip() + docname, word_count, doc_count, count = value.strip().split() + count = int(count) + # add document/word combo to processed files + processed_combo = '%s %s\t%s %s' % (word, docname, word_count, doc_count) + count = int(count) + if cur_word == word: + cur_count += count + processed_files.append(processed_combo) + else: + if cur_word: + # This is getting hit on the first time a word is seen, + # we want it on the last one, so need to check before + # adding counts, print there + print_results(cur_count, processed_files) + cur_word = word + cur_count = count + processed_files = [] + processed_files.append(processed_combo) + +if cur_word == word and cur_word is not None: + print_results(cur_count, processed_files) diff --git a/cos_sim_map.py b/cos_sim_map.py new file mode 100755 index 0000000..c2822af --- /dev/null +++ b/cos_sim_map.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python +import sys + +for line in sys.stdin: + key, value = line.strip().split('\t') + doc1, doc2, product = value.strip().split() + product = float(product) + print '%s %s\t%.16f' % (doc1, doc2, product) diff --git a/cos_sim_red.py b/cos_sim_red.py new file mode 100755 index 0000000..fe174a1 --- /dev/null +++ b/cos_sim_red.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +import sys + +cur_sum = 0 +cur_docs = (None, None) # will become (doc1, doc2) + +for line in sys.stdin: + key, value = line.strip().split('\t') + doc1, doc2 = key.strip().split() + product = float(value) + if (doc1, doc2) == cur_docs: + cur_sum += product + else: + if cur_docs[0] is not None and cur_docs[1] is not None: + print '%s %s\t%.16f' % (cur_docs[0], cur_docs[1], cur_sum) + cur_docs = (doc1, doc2) + cur_sum = 0 + +if cur_docs[0] is not None and cur_docs[1] is not None: + print '%s %s\t%.16f' % (cur_docs[0], cur_docs[1], cur_sum) diff --git a/map_reduce_utils.py b/map_reduce_utils.py new file mode 100755 index 0000000..03a1886 --- /dev/null +++ b/map_reduce_utils.py @@ -0,0 +1,14 @@ +from nltk.stem.porter import PorterStemmer +from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS as stopwords +import string + + +def clean_text(text): + # TODO remove words w/ numerals, e.g. '14th' + stemmer = PorterStemmer() + result = text.lower() + result = result.translate(None, string.punctuation) + result = result.replace('\n', ' ') + result = result.split() + result = [stemmer.stem(word) for word in result] + return filter(lambda word: word not in stopwords, result) diff --git a/mapred_tfidf.py b/mapred_tfidf.py new file mode 100755 index 0000000..7306e71 --- /dev/null +++ b/mapred_tfidf.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +import os +import subprocess +import sys +import shutil + + +def run_map_job(mapper, input_dir, output_dir): + env = os.environ.copy() + # we have to pass the specific files as well to allow for + # arguments to the mapper and reducer + map_file = '$NLTK_HOME/' + mapper.strip().split()[0] + map_file = mapper.strip().split()[0] + if os.path.exists('./' + output_dir): + shutil.rmtree('./' + output_dir) + command = ''' + $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$RELATIVE_PATH_JAR \ + -D mapred.job.reduces=0 \ + -mapper "$NLTK_HOME/{0}" \ + -input $NLTK_HOME/{1} \ + -output $NLTK_HOME/{2} \ + -file {3}\ + '''.format(mapper, input_dir, output_dir, map_file).strip() + try: + subprocess.check_call(command, env=env, shell=True) + except subprocess.CalledProcessError: + print 'ERROR: Map job %s failed' % mapper + raise + + +def run_map_reduce_job(mapper, reducer, input_dir, output_dir): + env = os.environ.copy() + # we have to pass the specific files as well to allow for + # arguments to the mapper and reducer + map_file = '$NLTK_HOME/' + mapper.strip().split()[0] + red_file = '$NLTK_HOME/' + mapper.strip().split()[0] + if os.path.exists('./' + output_dir): + shutil.rmtree('./' + output_dir) + command = ''' + $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$RELATIVE_PATH_JAR \ + -mapper "$NLTK_HOME/{0}" \ + -reducer "$NLTK_HOME/{1}" \ + -input $NLTK_HOME/{2} \ + -output $NLTK_HOME/{3} \ + -file {4} \ + -file {5} + '''.format(mapper, reducer, input_dir, output_dir, map_file, red_file) + command = command.strip() + try: + subprocess.check_call(command, env=env, shell=True) + except subprocess.CalledProcessError: + print 'ERROR: Map-Reduce job %s, %s failed' % (mapper, reducer) + raise + +if __name__ == '__main__': + env = os.environ.copy() + input_dir = "inaugural" + if len(sys.argv) > 1: + input_dir = sys.argv[1] + try: + val = env['NLTK_HOME'] + except KeyError: + print 'ERROR: Please run "source ./hadoop-streaming-env.sh"' + raise + + # we need the size of the corpora to do tfidf: + corp = './' + input_dir + corp_files = [f for f in os.listdir(corp) if os.path.isfile(corp+'/'+f)] + corpora_len = len(corp_files) + print 'CORP_SIZE: ', corpora_len + + # TODO: probably shouldn't clobber these dirs in case there's + # anything in them + + # do an MR job to clean/stem file contents + clean_content_dir = 'file_contents' + run_map_job('contents_mapper.py', input_dir, clean_content_dir) + + # calcualte word frequency + word_frequency_dir = 'word_freq' + run_map_reduce_job('word_freq_map.py', + 'word_freq_red.py', + clean_content_dir, + word_frequency_dir) + + # caclulate word count for each document + word_count_dir = 'word_count' + run_map_reduce_job('word_count_map.py', + 'word_count_red.py', + word_frequency_dir, + word_count_dir) + + # calculate word frequency in corpora + corpora_frequency_dir = 'corpora_freq' + run_map_reduce_job('corp_freq_map.py', + 'corp_freq_red.py', + word_count_dir, + corpora_frequency_dir) + + # now, calculate tfidf scores + tfidf_dir = 'tfidf' + run_map_job('tf_idf_map.py {0}'.format(corpora_len), + corpora_frequency_dir, + tfidf_dir) + + # join on words for cosine similarity + word_join_dir = 'joined_words' + run_map_reduce_job('word_join_map.py', + 'word_join_red.py', + tfidf_dir, + word_join_dir) + + # now, sum up the products to get the cosine similarities + output_dir = "output" + if len(sys.argv) > 2: + output_dir = sys.argv[3] + run_map_reduce_job('cos_sim_map.py', + 'cos_sim_red.py', + word_join_dir, + output_dir) diff --git a/materialize_nltk_corpus.py b/materialize_nltk_corpus.py old mode 100644 new mode 100755 index e0a0708..31c39dd --- a/materialize_nltk_corpus.py +++ b/materialize_nltk_corpus.py @@ -6,27 +6,31 @@ corpusname = "inaugural" if len(sys.argv) >= 2: - corpusname = sys.argv[1] + corpusname = sys.argv[1] filelim = 4 if len(sys.argv) >= 3: - filelim = int(sys.argv[2]) + filelim = int(sys.argv[2]) + +corpus = getattr(nltk.corpus, corpusname) -corpus = getattr(nltk.corpus, corpusname) def mkdir_p(path): try: os.makedirs(path) - except OSError as exc: + except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): pass - else: raise + else: + raise path = "./%s" % corpusname mkdir_p(path) -for i in range(0,filelim): - fid = corpus.fileids()[i] - with open("%s/%s"%(path,fid), 'w') as out: - out.write(corpus.raw(fid)) +for i in range(0, filelim): + fid = corpus.fileids()[i] + with open("%s/%s" % (path, fid), 'w') as out: + # need to remove new lines here so MR interprets each file + # as a single input + out.write(corpus.raw(fid).replace('\n', ' ')) diff --git a/tf_idf_map.py b/tf_idf_map.py new file mode 100755 index 0000000..15405ae --- /dev/null +++ b/tf_idf_map.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +import sys +from math import log + + +corpora_size = sys.argv[1] + + +for line in sys.stdin: + key, value = line.strip().split('\t') + n, N, m = value.strip().split() + n = int(n) + N = int(N) + m = int(m) + D = corpora_size + tf = float(n) / float(N) + idf = log((float(D) / float(m)), 10) + tfidf = tf * idf + print '%s\t%.16f' % (key, tfidf) diff --git a/tf_idf_red.py b/tf_idf_red.py new file mode 100755 index 0000000..e69de29 diff --git a/word_count_map.py b/word_count_map.py new file mode 100755 index 0000000..db7a54f --- /dev/null +++ b/word_count_map.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +import sys + +for line in sys.stdin: + key, value = line.strip().split('\t') + word, docname = key.strip().split() + print '%s\t%s %s' % (docname, word, value) diff --git a/word_count_red.py b/word_count_red.py new file mode 100755 index 0000000..dbd2241 --- /dev/null +++ b/word_count_red.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +from map_reduce_utils import clean_text + +import sys + +for line in sys.stdin: + key, value = line.strip().split('\t') + docname = key + word, count = value.strip().split() + # This is ... not very generic but not sure how to pass + # variables from the script running all of the jobs to this one + path = docname.split('file:', 1)[1] + # also, would be better to not re-read file every time, but seems like + # this is done in practice. + + try: + contents = open(path, 'r').read() + contents = clean_text(contents) + except: + with open('errors.txt', 'a+') as f: + f.write(docname.split('file:', 1)[1]) + document_word_count = len(contents) + print '%s %s\t%s %s' % (word, docname, count, document_word_count) diff --git a/word_freq_map.py b/word_freq_map.py new file mode 100755 index 0000000..df901da --- /dev/null +++ b/word_freq_map.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +import sys + +for line in sys.stdin: + file_name, words = line.strip().split('\t') + for word in words.strip().split(): + print '%s %s\t%s' % (word, file_name, 1) diff --git a/word_freq_red.py b/word_freq_red.py new file mode 100755 index 0000000..92d34d8 --- /dev/null +++ b/word_freq_red.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +import sys + +cur_word = None +cur_file = None +cur_count = 0 +word = None + +for line in sys.stdin: + key, value = line.strip().split('\t') + word, filename = key.strip().split() + count = int(value) + if ((cur_word == word) and (filename == cur_file)): + cur_count += count + else: + if cur_word: + print '%s %s\t%s' % (cur_word, cur_file, cur_count) + cur_count = count + cur_word = word + cur_file = filename + +if cur_word == word and cur_word is not None: + print '%s %s\t%s' % (cur_word, cur_file, cur_count) diff --git a/word_join_map.py b/word_join_map.py new file mode 100755 index 0000000..078ae5c --- /dev/null +++ b/word_join_map.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +import sys + +for line in sys.stdin: + key, value = line.strip().split('\t') + word, doc = key.strip().split() + print '%s\t%s %s' % (word, doc, value) diff --git a/word_join_red.py b/word_join_red.py new file mode 100755 index 0000000..fa7b042 --- /dev/null +++ b/word_join_red.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import sys + +cur_word = None +word = None +matching_docs = [] + + +def print_results(docs, word): + for doc1 in docs: + for doc2 in docs: + if doc1 != doc2: + # TODO + print '%s\t%s %s %.16f' % (word, doc1[0], doc2[0], + doc1[1]*doc2[1]) + + +for line in sys.stdin: + key, value = line.strip().split('\t') + word = key.strip() + filename, tfidf = value.strip().split() + tfidf = float(tfidf) + if word == cur_word: + matching_docs.append((filename, tfidf)) + else: + if cur_word: + print_results(matching_docs, cur_word) + cur_word = word + matching_docs = [] + matching_docs.append((filename, tfidf)) + +if cur_word == word: + print_results(matching_docs, cur_word)