Skip to content

Makes sure we don't pull the whole corpus into memory when training #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.medallia.word2vec.neuralnetwork;

import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -15,8 +17,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/** Parent class for training word2vec's neural network */
Expand Down Expand Up @@ -51,7 +56,7 @@ public abstract class NeuralNetworkTrainer {
/**
* In the C version, this includes the </s> token that replaces a newline character
*/
int numTrainedTokens;
long numTrainedTokens;

/* The following includes shared state that is updated per worker thread */

Expand Down Expand Up @@ -151,28 +156,33 @@ public interface NeuralNetworkModel {
}

/** @return Trained NN model */
public NeuralNetworkModel train(Iterable<List<String>> sentences) throws InterruptedException {
ListeningExecutorService ex = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(config.numThreads));

public NeuralNetworkModel train(final Iterable<List<String>> sentences) throws InterruptedException {
// Create an executor that runs as many threads as are defined in the config, and blocks if
// you're trying to run more. This is to make sure we don't read the entire corpus into
// memory.
final ListeningExecutorService ex =
MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(config.numThreads, config.numThreads,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat trick, but let's leave a comment to what we're trying to accomplish here. If I understand correctly, the overall idea is to have executor.submit block if there are no available threads to avoid materializing the sentences in memory before they are needed. The ArrayBlockingQueue and CallerRunsPolicy is one way to accomplish this.

Any reason why the blocking queue starts with size 8 instead of config.numThreads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I'll add a comment.

The queue size could be config.numThreads, but it's not really connected to the number of processors. It's just connected to the amount of overhead there is in creating these threads. In principle, a queue size of 1 should do, but I tried that and it was slower. I'm worried that if I set it to the number of processors, I'll run out of memory on a machine with lots of cores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that's not correct. The queue size matters when the main thread is running a task due to the CallerRunsPolicy. So it is connected to the number of processors. I changed it.

0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(config.numThreads),
new ThreadPoolExecutor.CallerRunsPolicy()));

int numSentences = Iterables.size(sentences);
numTrainedTokens += numSentences;
// Partition the sentences evenly amongst the threads
Iterable<List<List<String>>> partitioned = Iterables.partition(sentences, numSentences / config.numThreads + 1);

// Partition the sentences into batches
final Iterable<List<List<String>>> batched = Iterables.partition(sentences, 1024);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I played around with this during implementation and IIRC, it actually does have an effect on the output word vectors since the learning rate is updated concurrently by the workers ...

Can you explain why this is necessary? Maybe we can add a flag to toggle behavior on it if it's truly necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary because in the old version, each of the inner lists is completely materialized in memory, all at the same time. If I have 10M sentences, and 10 processors, it makes 10 lists of 1M sentences each, and keep them in memory. Unless your entire corpus fits in memory, this will never run.

I looked at the code that updates the learning rate. It looks like the problem is that this change might affect which sentences get used when the learning rate is high, and which ones get used when it's lower? If sentences get processed in a different order, doesn't that also change the neural net, and therefore the gradients, that are encountered by each sentence?

Either way, that wasn't deterministic before, and it still isn't. I'd suspect that the number of cores affects the output as well, regardless of whether it runs the old version of this code or the new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the C version got around this by having each thread read from the file directly and using a buffer that made repeated passes over the file for each iteration.

Do you mind verifying the output vectors remain similar before and after this change on the text8 sample on e.g. 20 threads?

If they differ drastically, then let's add a flag to the Word2VecTrainerBuilder to enable this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran Word2VecExamples with and without my changes, and there are no differences at all.

Curiously, it runs single-threaded, both in master and in my branch. I suspect that's why there are no differences.

Why does it run single-threaded?


try {
listener.update(Stage.TRAIN_NEURAL_NETWORK, 0.0);
for (int iter = config.iterations; iter > 0; iter--) {
List<CallableVoid> tasks = new ArrayList<>();
List<ListenableFuture<?>> futures = new ArrayList<>(64);
int i = 0;
for (final List<List<String>> batch : partitioned) {
tasks.add(createWorker(i, iter, batch));
for (final List<List<String>> batch : batched) {
futures.add(ex.submit(createWorker(i, iter, batch)));
i++;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for loop would pull the entire training data into memory, because every worker contains a batch, and all workers are instantiated before the first one starts working.


List<ListenableFuture<?>> futures = new ArrayList<>(tasks.size());
for (CallableVoid task : tasks)
futures.add(ex.submit(task));
try {
Futures.allAsList(futures).get();
} catch (ExecutionException e) {
Expand Down