Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Options to skip small files and not recurse on input paths #90

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
115 changes: 87 additions & 28 deletions src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.hadoop.mapreduce.LzoIndexOutputFormat;
import com.hadoop.mapreduce.LzoSplitInputFormat;
import com.hadoop.mapreduce.LzoSplitRecordReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -25,57 +27,108 @@

public class DistributedLzoIndexer extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class);
private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

private static final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

private static final String LZO_SKIP_INDEXING_SMALL_FILES_KEY = "lzo_skip_indexing_small_files";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it's a common practice for hadoop and related code bases to use dots (".") as separators for config keys; e.g. "lzo_skip_indexing_small_files" -> "lzo.skip-indexing-small-files". By the same token, how about "lzo.skip-indexing-small-files.size" instead of "lzo_small_file_size", and "lzo.recursive-indexing.enabled" instead of "lzo_recursive_indexing"?

Another nit: let's pair each key definition and its default.

Another nit: have an empty line between the static members and the instance members.

private static final String LZO_SMALL_FILE_SIZE_KEY = "lzo_small_file_size";
private static final String LZO_RECURSIVE_INDEXING_KEY = "lzo_recursive_indexing";
private static final boolean LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT = false;
private static final boolean LZO_RECURSIVE_INDEXING_DEFAULT = true;
private static final long LZO_SMALL_FILE_SIZE_DEFAULT = 0;
private boolean lzoSkipIndexingSmallFiles = LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT;
private boolean lzoRecursiveIndexing = LZO_RECURSIVE_INDEXING_DEFAULT;
private long lzoSmallFileSize = LZO_SMALL_FILE_SIZE_DEFAULT;

private static final String TEMP_FILE_EXTENSION = "/_temporary";

private Configuration conf = getConf();

/**
* Accepts paths which don't end in TEMP_FILE_EXTENSION
*/
private final PathFilter nonTemporaryFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.toString().endsWith("/_temporary");
return !path.toString().endsWith(TEMP_FILE_EXTENSION);
}
};

private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
/**
* Returns whether a file should be considered small enough to skip indexing.
*/
private boolean isSmallFile(FileStatus status) {
return status.getLen() < lzoSmallFileSize;
}

private void visitPath(Path path, PathFilter pathFilter, List<Path> accumulator, boolean recursive) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have lzoRecursiveIndexing as a member variable, we don't need argument "recursive" for this method.

try {
FileSystem fs = path.getFileSystem(getConf());
FileSystem fs = path.getFileSystem(this.conf);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the style of this code does not use the "this." notation. Can we remove the "this" reference from the changes?

FileStatus fileStatus = fs.getFileStatus(path);

if (fileStatus.isDir()) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
walkPath(childStatus.getPath(), pathFilter, accumulator);
}
} else if (path.toString().endsWith(LZO_EXTENSION)) {
Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
accumulator.add(path);
if (fileStatus.isDirectory()) {
if (recursive) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a discussion. This behavior of "recursive" is a little surprising to me. If I'm not mistaken (from things like FileSystem API), recursive=false means still processing direct file children of the directory but not traversing into subdirectories. But this patch would skip the directory entirely.

I think it may be good to be consistent and consider direct file children still. Thoughts? Either way, I think we need some comments in the code to clarify what it means.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. We should try to be consistent with hadoop's definition of recursive and still process depth = 1 levels of child files.

FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
visitPath(childStatus.getPath(), pathFilter, accumulator, recursive);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the recursive case, I think we are calling Path.getFileSystem() and getFileStatus() redundantly. I know this is carried over from the original, but I think it's a little wasteful. How about something like this?

private void visitPath(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List<Path> accumulator) {
  ...
  if (fileStatus.isDirectory()) {
    if (recursive) {
      FileStatus[] children = fs.listStatus(fileStatus.getPath(), pathFilter);
      for (FileStatus childStatus : children) {
        visitPath(childStatus, fs, pathFilter, accumulator);
    ...
}

}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
accumulator.add(path);
LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled.");
}
} else if (shouldIndexPath(fileStatus, fs)) {
accumulator.add(path);
}
} catch (IOException ioe) {
LOG.warn("Error walking path: " + path, ioe);
}
}

private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException {
Path path = fileStatus.getPath();
if (path.toString().endsWith(LZO_EXTENSION)) {
if (this.lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) {
LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small");
return false;
}

Path lzoIndexPath = new Path(path, LzoIndex.LZO_INDEX_SUFFIX);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, I noticed this changed from "path.suffix(LzoIndex.LZO_INDEX_SUFFIX)" to "new Path(path, LzoIndex.LZO_INDEX_SUFFIX)". Is this an intentional change? In what way do these differ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they're roughly the same. I'm more used to using the constructor. The constructor does some additional resolution of the URIs for parent/child and normalization of the paths. The suffix method does this:

public Path suffix(String suffix) {
    return new Path(this.getParent(), this.getName() + suffix);
}

Which does seem like it should also work, so I will switch back to using .suffix here since it is what already works.

if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fileStatus.getLen() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely incorrect. We should check the length of the index file, not the original lzo file. "fileStatus" is for the lzo file. See the corresponding line in the existing code.

LOG.info("[SKIP] LZO index file already exists for " + path);
return false;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
return true;
}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
return true;
}
}
return false;
}

public int run(String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
printUsage();
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

this.lzoSkipIndexingSmallFiles =
this.conf.getBoolean(LZO_SKIP_INDEXING_SMALL_FILES_KEY, LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT);

this.lzoSmallFileSize =
this.conf.getLong(LZO_SMALL_FILE_SIZE_KEY, LZO_SMALL_FILE_SIZE_DEFAULT);

// Find paths to index based on recursive/not
this.lzoRecursiveIndexing = this.conf.getBoolean(LZO_RECURSIVE_INDEXING_KEY, LZO_RECURSIVE_INDEXING_DEFAULT);
List<Path> inputPaths = new ArrayList<Path>();
for (String strPath: args) {
walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
for (String strPath : args) {
visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.lzoRecursiveIndexing);
}

if (inputPaths.isEmpty()) {
Expand All @@ -84,7 +137,7 @@ public int run(String[] args) throws Exception {
return 0;
}

Job job = new Job(getConf());
Job job = new Job(this.conf);
job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args));

job.setOutputKeyClass(Path.class);
Expand Down Expand Up @@ -134,7 +187,13 @@ public static void main(String[] args) throws Exception {
System.exit(exitCode);
}

public static void printUsage() {
System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
public void printUsage() {
String usage =
"Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]" +
"\nConfiguration options: \"key\" [values] <default> description" +
"\n" + LZO_SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_SMALL_FILE_SIZE_KEY + " bytes." +
"\n" + LZO_SMALL_FILE_SIZE_KEY + " [long] <" +LZO_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_SKIP_INDEXING_SMALL_FILES_KEY + " is true." +
"\n" + LZO_RECURSIVE_INDEXING_KEY + " [true,false] <" + LZO_RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line.";
System.err.println(usage);
}
}