-
Notifications
You must be signed in to change notification settings - Fork 80
Makes sure we don't pull the whole corpus into memory when training #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fe6d329
8e81df1
e5f7d44
94db87d
61e7fc7
464abcc
8cf84b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
package com.medallia.word2vec.neuralnetwork; | ||
|
||
import com.google.common.collect.Iterables; | ||
import com.google.common.collect.Iterators; | ||
import com.google.common.collect.Lists; | ||
import com.google.common.collect.Multiset; | ||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
|
@@ -15,8 +17,11 @@ | |
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.NoSuchElementException; | ||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** Parent class for training word2vec's neural network */ | ||
|
@@ -51,7 +56,7 @@ public abstract class NeuralNetworkTrainer { | |
/** | ||
* In the C version, this includes the </s> token that replaces a newline character | ||
*/ | ||
int numTrainedTokens; | ||
long numTrainedTokens; | ||
|
||
/* The following includes shared state that is updated per worker thread */ | ||
|
||
|
@@ -151,28 +156,33 @@ public interface NeuralNetworkModel { | |
} | ||
|
||
/** @return Trained NN model */ | ||
public NeuralNetworkModel train(Iterable<List<String>> sentences) throws InterruptedException { | ||
ListeningExecutorService ex = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(config.numThreads)); | ||
|
||
public NeuralNetworkModel train(final Iterable<List<String>> sentences) throws InterruptedException { | ||
// Create an executor that runs as many threads as are defined in the config, and blocks if | ||
// you're trying to run more. This is to make sure we don't read the entire corpus into | ||
// memory. | ||
final ListeningExecutorService ex = | ||
MoreExecutors.listeningDecorator( | ||
new ThreadPoolExecutor(config.numThreads, config.numThreads, | ||
0L, TimeUnit.MILLISECONDS, | ||
new ArrayBlockingQueue<Runnable>(config.numThreads), | ||
new ThreadPoolExecutor.CallerRunsPolicy())); | ||
|
||
int numSentences = Iterables.size(sentences); | ||
numTrainedTokens += numSentences; | ||
// Partition the sentences evenly amongst the threads | ||
Iterable<List<List<String>>> partitioned = Iterables.partition(sentences, numSentences / config.numThreads + 1); | ||
|
||
// Partition the sentences into batches | ||
final Iterable<List<List<String>>> batched = Iterables.partition(sentences, 1024); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I played around with this during implementation and IIRC, it actually does have an effect on the output word vectors since the learning rate is updated concurrently by the workers ... Can you explain why this is necessary? Maybe we can add a flag to toggle behavior on it if it's truly necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's necessary because in the old version, each of the inner lists is completely materialized in memory, all at the same time. If I have 10M sentences, and 10 processors, it makes 10 lists of 1M sentences each, and keep them in memory. Unless your entire corpus fits in memory, this will never run. I looked at the code that updates the learning rate. It looks like the problem is that this change might affect which sentences get used when the learning rate is high, and which ones get used when it's lower? If sentences get processed in a different order, doesn't that also change the neural net, and therefore the gradients, that are encountered by each sentence? Either way, that wasn't deterministic before, and it still isn't. I'd suspect that the number of cores affects the output as well, regardless of whether it runs the old version of this code or the new one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, the C version got around this by having each thread read from the file directly and using a buffer that made repeated passes over the file for each iteration. Do you mind verifying the output vectors remain similar before and after this change on the text8 sample on e.g. 20 threads? If they differ drastically, then let's add a flag to the Word2VecTrainerBuilder to enable this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran Curiously, it runs single-threaded, both in master and in my branch. I suspect that's why there are no differences. Why does it run single-threaded? |
||
|
||
try { | ||
listener.update(Stage.TRAIN_NEURAL_NETWORK, 0.0); | ||
for (int iter = config.iterations; iter > 0; iter--) { | ||
List<CallableVoid> tasks = new ArrayList<>(); | ||
List<ListenableFuture<?>> futures = new ArrayList<>(64); | ||
int i = 0; | ||
for (final List<List<String>> batch : partitioned) { | ||
tasks.add(createWorker(i, iter, batch)); | ||
for (final List<List<String>> batch : batched) { | ||
futures.add(ex.submit(createWorker(i, iter, batch))); | ||
i++; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
|
||
List<ListenableFuture<?>> futures = new ArrayList<>(tasks.size()); | ||
for (CallableVoid task : tasks) | ||
futures.add(ex.submit(task)); | ||
try { | ||
Futures.allAsList(futures).get(); | ||
} catch (ExecutionException e) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat trick, but let's leave a comment to what we're trying to accomplish here. If I understand correctly, the overall idea is to have executor.submit block if there are no available threads to avoid materializing the sentences in memory before they are needed. The ArrayBlockingQueue and CallerRunsPolicy is one way to accomplish this.
Any reason why the blocking queue starts with size 8 instead of config.numThreads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. I'll add a comment.
The queue size could be
config.numThreads
, but it's not really connected to the number of processors. It's just connected to the amount of overhead there is in creating these threads. In principle, a queue size of 1 should do, but I tried that and it was slower. I'm worried that if I set it to the number of processors, I'll run out of memory on a machine with lots of cores.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that's not correct. The queue size matters when the main thread is running a task due to the
CallerRunsPolicy
. So it is connected to the number of processors. I changed it.