Skip to content

Commit

Permalink
placed mappers/reducers back in top dir
Browse files Browse the repository at this point in the history
python's relative imports don't play nice with copying files to hadoop
workers prior to map-reduce jobs, so I'm going to put everything back in
one giant top-level directory and pretend like that doesn't bother me.
  • Loading branch information
zbsimon committed May 25, 2015
1 parent c2ffe6c commit b396fe9
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 0 deletions.
35 changes: 35 additions & 0 deletions claims_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python

import sys
import json
import argparse
import map_reduce_utils as mru

# this should become an arg to map_claims
INPUT_KV_DELIM = '"~~'


def map_claims(input=sys.stdin, output=sys.stdout,
kv_delim=INPUT_KV_DELIM, stop_words_file=None):
for line in input:
key, value = line.strip().split(kv_delim)
patent_id = key.strip()
if stop_words_file is not None:
stop_words = json.loads(open(stop_words_file).read())
contents = mru.clean_text(value, stop_words)
else:
contents = mru.clean_text(value)
key = {'filename': patent_id}
contents = {'words': [word for word in contents]}
mru.reducer_emit(key, contents, output)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--stop-words', dest='stop_words_file')
args = parser.parse_args()
stop_words_file = args.stop_words_file
if stop_words_file is not None:
map_claims(stop_words_file=stop_words_file)
else:
map_claims()
55 changes: 55 additions & 0 deletions contents_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python

from __future__ import print_function
import os
import sys
import argparse
import map_reduce_utils as mru

"""
(file_contents) --> (file_name) (file_contents)
for each line from stdin consisting of a document in the corpus, emits
a key-value pair to stdout with a key of the corresponding filename
and a value of the file contents cleaned with
map_reduce_utils.clean_text
"""


def map_contents(input=sys.stdin, output=sys.stdout, stop_words=None):
for line in input:
docname = os.environ['mapreduce_map_input_file']
if stop_words is None:
contents = mru.clean_text(line)
else:
contents = mru.clean_text(line, stop_words)
key = {'filename': docname}
value = {'words': [word for word in contents]}
# we emit as if we were a reducer since the contents don't get put
# through a reducer
mru.reducer_emit(key, value, output)


def words_in_file(filename):
results = []
with open(filename, 'r') as f:
for line in f:
words = line.split()
results += words
return results


if __name__ == '__main__':
formatter = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(formatter_class=formatter)
# default stopwords list is in NLTK
stop_words_help = 'the list of stop words to filter out. If none, '
stop_words_help += 'sklearn.feature_extraction.text stop words are used'
parser.add_argument('-s', '--stop-words', default=None,
help=stop_words_help, dest='stop_words')
args = parser.parse_args()
if args.stop_words is not None:
stop_words_list = words_in_file(args.stop_words)
map_contents(stop_words=stop_words_list)
else:
map_contents()
26 changes: 26 additions & 0 deletions corp_freq_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import map_reduce_utils as mru


def map_corpus_frequency(input=sys.stdin, output=sys.stdout):
"""
(word filename) (n N) --> (word) (filename n N 1)
emits a line for each unique word in each file to be consumed
by corp_freq_red to find the number of occurences of each
unique word throughout the entire corpus.
"""
for in_key, in_value in mru.json_loader(input):
out_key = {'word': in_key['word']}
out_value = {'filename': in_key['filename'],
'word_freq': in_value['word_freq'],
'doc_size': in_value['doc_size'],
'count': 1}
mru.mapper_emit(out_key, out_value, output)


if __name__ == '__main__':
map_corpus_frequency()
31 changes: 31 additions & 0 deletions corp_freq_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python

from __future__ import print_function
from sys import stdout
import map_reduce_utils as mru


def reduce_corpus_frequency(input=mru.reducer_stream(), output=stdout):
"""
(word) (filename n N 1) --> (word filename) (n N m)
sums up the number of occurences of each unique word throughout
the corpus and emits this sum for each document that the word
occurs in.
"""
for in_key, key_stream in input:
corpus_frequency = 0
values = []
for in_value in key_stream:
corpus_frequency += in_value['count']
values.append(in_value)
for value in values:
out_key = {'word': in_key['word'], 'filename': value['filename']}
out_value = {'word_freq': value['word_freq'],
'doc_size': value['doc_size'],
'corp_freq': corpus_frequency}
mru.reducer_emit(out_key, out_value, output)


if __name__ == '__main__':
reduce_corpus_frequency()
15 changes: 15 additions & 0 deletions corpus_size_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python

import sys
import map_reduce_utils as mru


def map_corpus_size(input=sys.stdin, output=sys.stdout):
for in_key, in_value in mru.json_loader(input):
out_key = {'count': 1}
out_value = {'count': 1}
mru.mapper_emit(out_key, out_value, output)


if __name__ == '__main__':
map_corpus_size()
18 changes: 18 additions & 0 deletions corpus_size_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python

import sys
import map_reduce_utils as mru


def reduce_corpus_size(input=mru.reducer_stream(), output=sys.stdout):
corpus_size = 0
for in_key, key_stream in input:
for in_value in key_stream:
corpus_size += 1
out_key = 'corpus size'
out_value = corpus_size
mru.reducer_emit(out_key, out_value, output)


if __name__ == '__main__':
reduce_corpus_size()
29 changes: 29 additions & 0 deletions cos_sim_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import map_reduce_utils as mru


def map_cosine_similarity(input=sys.stdin, output=sys.stdout):
"""
(word) (file1 file2 tfidf1*tfidf2) --> (file1 file2) (tfidf1*tfidf2)
for each word common to two documents, removes the word from the
key/value pair and replaces it with the two filenames so that we can
sum up the values for each pair of documents in the reducer.
"""
for in_key, in_value in mru.json_loader(input):
file1 = in_value['file1']
file2 = in_value['file2']
# we want to ensure that (file1 file2) and (file2 file1) get
# sent to the same reducer, so we order them alphabetically
if file1 > file2:
file1, file2 = file2, file1
out_key = {'file1': file1, 'file2': file2}
out_value = {'product': in_value['product']}
mru.mapper_emit(out_key, out_value, output)


if __name__ == '__main__':
map_cosine_similarity()
25 changes: 25 additions & 0 deletions cos_sim_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python

from __future__ import print_function
from sys import stdout
import map_reduce_utils as mru


def reduce_cosine_similarity(input=mru.reducer_stream(), output=stdout):
"""
(file1 file2) (tfidf1*tfidf2) --> (file1 file2) (cosine_similarity(f1, f2))
sums up the products of the tfidf values of words common between every
pair of documents to produce the cosine similarity of the two documents
"""
for in_key, key_stream in input:
sum_for_docs = 0
for in_value in key_stream:
sum_for_docs += in_value['product']
out_key = {'file1': in_key['file1'], 'file2': in_key['file2']}
out_value = {'cos_similarity': sum_for_docs}
mru.reducer_emit(out_key, out_value, output)


if __name__ == '__main__':
reduce_cosine_similarity()
17 changes: 17 additions & 0 deletions normalize_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python

import sys
import map_reduce_utils as mru


def normalize_mapper(input=sys.stdin, output=sys.stdout):
for in_key, in_value in mru.json_loader(input):
ngram = in_key['word']
uid = in_key['filename']
out_key = {'uid': uid}
in_value['ngram'] = ngram
out_value = in_value
mru.mapper_emit(out_key, out_value, output)

if __name__ == '__main__':
normalize_mapper()
29 changes: 29 additions & 0 deletions normalize_reducer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import math
import map_reduce_utils as mru

KEYS_TO_NORMALIZE = ['tfidf', 'log idf', 'idf', 'tf', 'tf log idf']


def normalize_reducer(input=mru.reducer_stream(), output=sys.stdout,
keys_to_normalize=KEYS_TO_NORMALIZE):
for in_key, key_stream in input:
normalize_factors = {to_factor: 0.0 for to_factor in keys_to_normalize}
terms_to_normalize = []
for in_value in key_stream:
terms_to_normalize.append(in_value)
normalize_factors = {k: normalize_factors[k] + in_value[k] ** 2
for k, v in normalize_factors.iteritems()}
for term in terms_to_normalize:
out_key = {'uid': in_key['uid'], 'ngram': term['ngram']}
out_value = term
del out_value['ngram']
for key in keys_to_normalize:
out_value[key] /= math.sqrt(normalize_factors[key])
mru.reducer_emit(out_key, out_value, output)

if __name__ == '__main__':
normalize_reducer()
44 changes: 44 additions & 0 deletions tf_idf_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import argparse
import map_reduce_utils as mru
from math import log


def map_tf_idf(corpus_size, input=sys.stdin, output=sys.stdout):
"""
(word file_name) (n N m) --> (word file_name) (tfidf)
computes the tf-idf metric for each word in each file in the corpus
which is defined as the term frequency multiplied by the inverse document
frequency. The term frequency is what porportion of the words in
the document are a given word. The inverse document frequency is the
number of documents in the corpus that the word appears.
"""

for in_key, in_value in mru.json_loader(input):
n = in_value['word_freq']
N = in_value['doc_size']
m = in_value['corp_freq']
D = corpus_size
tf = float(n) / float(N)
idf = (float(D) / float(m))
log_idf = log(idf, 10)
tfidf = tf * idf
tf_log_idf = tf * log_idf
# in_key == out_key
out_value = {'tfidf': tfidf, 'tf log idf': tf_log_idf,
'log idf': log_idf, 'idf': idf, 'tf': tf,
'word frequency': n, 'document length': N,
'corpus frequency': m, 'corpus size': D}
mru.reducer_emit(in_key, out_value, output)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--corpus_size', dest='s', type=int)
args = parser.parse_args()
corpus_size = args.s
map_tf_idf(corpus_size)
26 changes: 26 additions & 0 deletions word_count_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import map_reduce_utils as mru


def map_word_count(input=sys.stdin, output=sys.stdout):
"""
(word filename) (n) --> (filename) (word n)
for each word in each document, emits the document name as the key
and the word and the number of occurrences in that file as the value
"""

for in_key, in_value in mru.json_loader(input):
filename = in_key['filename']
word = in_key['word']
word_frequency = in_value['word_freq']
out_key = {'filename': filename}
out_value = {'word': word, 'word_freq': word_frequency}
mru.mapper_emit(out_key, out_value, output)


if __name__ == '__main__':
map_word_count()
29 changes: 29 additions & 0 deletions word_count_red.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python

from __future__ import print_function
import sys
import map_reduce_utils as mru


def reduce_word_count(input=mru.reducer_stream(), output=sys.stdout):
"""
(file_name) (word word_freq) --> (word file_name) (n N)
sums up the total number of words in each document and emits
that sum for each word along with the number of occurences of that
word in the given document
"""

for in_key, key_stream in input:
doc_size = 0
values = []
for in_value in key_stream:
values.append(in_value)
doc_size += in_value['word_freq']
for value in values:
out_key = {'word': value['word'], 'filename': in_key['filename']}
out_value = {'word_freq': value['word_freq'], 'doc_size': doc_size}
mru.reducer_emit(out_key, out_value, output)

if __name__ == '__main__':
reduce_word_count()
Loading

0 comments on commit b396fe9

Please sign in to comment.