Skip to content

Commit

Permalink
Merge branch 'avro-refactor'
Browse files Browse the repository at this point in the history
avro-refactor became master, so I'm just merging it back in so we can
work there

Conflicts:
	README.md
	cpp/Makefile
	cpp/cosine_similarity.cpp
  • Loading branch information
zbsimon committed May 24, 2015
2 parents 7964e30 + 508752a commit 379f74c
Show file tree
Hide file tree
Showing 45 changed files with 2,046 additions and 523 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
out/
*~
*.pyc
113 changes: 110 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,47 @@ python materialize_nltk_corpus.py inaugral

# Set the appropriate environment variables
```sh
source ./hadoop-streaming-env.sh
source ./settings.sh
```
or

Or, just set the variables by hand:
```sh
export HADOOP_VERSION= # the version of hadoop you are using, e.g. 2.5.1
export AVRO_VERSION= # if you are using avro, the version, e.g. 1.7.7
export HADOOP_HOME= # the location of your hadoop installation
export RELATIVE_PATH_JAR= # location of hadoop streaming jar in HADOOP_HOME
export NLTK_HOME= # the location of your corpus, mappers and reducers
export AVRO_JAR= # if you are using avro, the jar location
```

On the sampa cluster, you may also need to execute
```
source /shared/patents/settings.sh
```
in order to get hadoop, linux brew, python packages and nltk data to work.

you may also want to ensure that the mapper and reducer scripts are executable


# Run the MapReduce jobs to produce output
(note, this depends upon avro, nltk and scikit-learn)

```sh
./mapred_tfidf --input INPUT_DIR --output OUTPUT_DIR
```

* run with the `--help` flag to view all options
* run with `--force` to automatically overwrite intermediate directories


On the cluster, this can all be done by executing ./run.sh, which sets
the appropriate environment variables as well as using the appropriate
hdfs dirs:
```bash
cd /shared/patents/nltk-hadoop
./run.sh
```

See the cosine similarities of all documents:
```sh
ls $OUTPUT_DIR/part-*
Expand All @@ -52,6 +72,91 @@ with `nose` installed,
nosetests
```

# Write a new map / reduce job and run it
Hadoop streaming accepts any command as a mapper or reducer, but to use the `map_reduce_utils` module, the basic pattern is as follows:

first, write a mapper like the abstract one below:

```python
#!/usr/bin/env python

import map_reduce_utils as mru


def mapper():
for in_key, in_value in mru.json_loader():
out_key = {} # the key that is emitted by hadoop as json
out_value = {} # the value that is emitted by hadoop as json
mru.mapper_emit(out_key, out_value, sys.stdout)


if __name__ == '__main__':
mapper() # feel free to pass arguments here as well

```

then, write a reducer similar to:

```python
#!/usr/bin/env python

import map_reduce_utils as mru
import sys


def reducer():
for in_key, key_stream in mru.reducer_stream():
values = [] # will contain each value associated with in_key
for in_value in key_stream:
values.append(in_value)
# now, values contains all of the values stored as Dicts, so we can
# do our "reduction" with arbitrary python. note that you don't need to
# store all of the in_values if, for example, we only need a running sum
out_key = {} # the key that is emitted by hadoop as json
out_value = {} # the value that is emitted by hadoop as json
mru.reducer_emit(out_key, out_value, sys.stdout)
# you can also emit more than one key-value pairs here, for example
# one for each key-value pair where key = in_key:
for value in values:
out_key = {} # the key that is emitted by hadoop as json
out_value = {} # the value that is emitted by hadoop as json
mru.reducer_emit(out_key, out_value, sys.stdout)


if __name__ == '__main__':
reducer() # feel free to pass arguments here as well
```

now, in your main driver (let's call it `run_hadoop.py` for future reference),
invoke your mapper and reducer

```python
import map_reduce_utils as mru

# input_dir contains the lines piped into the reducer, output_dir is where the
# results will be placed.
mru.run_map_reduce_job('mapper.py', 'reducer.py', input_dir, output_dir)

# note that we can pass arguments or arbitrary commands as mappers and reducers
# and use the output of one job as the input of the next job to chain MR jobs

mru.run_map_reduce_job('second_mapper.py --arg 1', 'wc -l',
output_dir, second_MR_job_output_dir)

```

Before running the previous code, however, remember to define the
appropriate environment variables. For example, in a shell, run:
```sh
source settings.sh
python run_hadoop.py
```

Note that
* You don't need to use avro and json. If you want, you can specify the input and output format when invoking `map_reduce_utils.run_map_reduce_job`, as well as the tokenizers for the generators in both the mapper and reducer.
* You can run just a map job (i.e. no reducer) with `map_reduce_utils.run_map_job`
* To see a concrete example of a mapper and reduer, look at `word_join_map.py` and `word_join_red.py`.
* To see a concrete example of invoking a hadoop job, look at `mapred_tfidf.py`

# The TFIDF Metric
After cleaning and stemming a document, we obtain a list of words, `d`, for that document. The tfidf score of a word `w` in `d` is defined as follows:
Expand All @@ -60,7 +165,9 @@ After cleaning and stemming a document, we obtain a list of words, `d`, for that
* let `D` be the number of documents in the corpus
* let `m` be the number of documents in which the word `d` appears at least once
* `tf = n / N` (tf is the 'term frequency' of the word)
* `idf = log(D / m)` (idf is the 'inverse document frequency' of the word)
* `idf = D / m` (idf is the 'inverse document frequency' of the word)
* `log_idf = log(D / m)` (log_idf is the log inverse document frequency)
* `tfidf = tf*idf`
* `tf_log_idf = tf*log_idf`

These naming conventions are used in certain places in the codebase, for example in the docstrings for many mapper and reducer functions.
27 changes: 0 additions & 27 deletions contents_mapper.py

This file was deleted.

23 changes: 0 additions & 23 deletions corp_freq_map.py

This file was deleted.

39 changes: 0 additions & 39 deletions corp_freq_red.py

This file was deleted.

40 changes: 0 additions & 40 deletions cos_sim_red.py

This file was deleted.

9 changes: 7 additions & 2 deletions cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ OPTFLAGS=-O3

CPPFLAGS=\
-I/sampa/share/gcc-4.8.2/src/boost_1_55_0 \
-I/sampa/share/eigen-3.2.4/include \
-I/sampa/share/gflags/include \
-I/sampa/share/avro/avrocpp-1.7.7/include \
-I$(HADOOP_INSTALL)/include

Expand All @@ -19,12 +21,15 @@ LDFLAGS=\
-L$(HADOOP_INSTALL)/lib/native

LDLIBS=\
-lhdfs \
-Wl,-Bstatic \
-lavrocpp_s \
-lgflags \
-Wl,-Bdynamic \
-lhdfs \
-lboost_iostreams \
-lpthread \
-lsqlite3 \
-lcrypto \
-lssl

cosine_similarity: cosine_similarity.cpp

Loading

0 comments on commit 379f74c

Please sign in to comment.