Skip to content

Commit

Permalink
Gets the correct filesystem when writing an rfile (apache#5296)
Browse files Browse the repository at this point in the history
* Gets the correct filesystem when writing an rfile

Modified the rfile client code to get the filesystem based on the path
being written or read.  This covers the case of having multiple
filesystems defined in hadoop config. 

---------

Co-authored-by: Daniel Roberts ddanielr <[email protected]>
  • Loading branch information
keith-turner and ddanielr authored Jan 31, 2025
1 parent f8fb872 commit 51e152a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

class FSConfArgs {

FileSystem fs;
Configuration conf;

FileSystem getFileSystem(Path path) throws IOException {
if (fs == null) {
return path.getFileSystem(getConf());
}
return fs;
}

FileSystem getFileSystem() throws IOException {
if (fs == null) {
fs = FileSystem.get(getConf());
return FileSystem.get(getConf());
}
return fs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ RFileSource[] getSources() throws IOException {
if (sources == null) {
sources = new RFileSource[paths.length];
for (int i = 0; i < paths.length; i++) {
sources[i] = new RFileSource(getFileSystem().open(paths[i]),
getFileSystem().getFileStatus(paths[i]).getLen());
sources[i] = new RFileSource(getFileSystem(paths[i]).open(paths[i]),
getFileSystem(paths[i]).getFileStatus(paths[i]).getLen());
}
} else {
for (int i = 0; i < sources.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public RFileWriter build() throws IOException {
visCacheSize);
} else {
return new RFileWriter(fileops.newWriterBuilder()
.forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs)
.forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.security.SecureRandom;
import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -67,6 +68,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -840,4 +843,42 @@ public void testMultipleFilesAndCache() throws Exception {
assertEquals(testData, toMap(scanner));
scanner.close();
}

@Test
public void testFileSystemFromUri() throws Exception {
String localFsClass = "LocalFileSystem";

String remoteFsHost = "127.0.0.5:8080";
String fileUri = "hdfs://" + remoteFsHost + "/bulk-xyx/file1.rf";
// There was a bug in the code where the default hadoop file system was always used. This test
// checks that the hadoop filesystem used it based on the URI and not the default filesystem. In
// this env the default file system is the local hadoop file system.
var exception =
assertThrows(ConnectException.class, () -> RFile.newWriter().to(fileUri).build());
assertTrue(exception.getMessage().contains("to " + remoteFsHost
+ " failed on connection exception: java.net.ConnectException: Connection refused"));
// Ensure the DistributedFileSystem was used.
assertTrue(Arrays.stream(exception.getStackTrace())
.anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName())));
assertTrue(Arrays.stream(exception.getStackTrace())
.noneMatch(ste -> ste.getClassName().contains(localFsClass)));

var exception2 = assertThrows(RuntimeException.class, () -> {
var scanner = RFile.newScanner().from(fileUri).build();
scanner.iterator();
});
assertTrue(exception2.getMessage().contains("to " + remoteFsHost
+ " failed on connection exception: java.net.ConnectException: Connection refused"));
assertTrue(Arrays.stream(exception2.getCause().getStackTrace())
.anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName())));
assertTrue(Arrays.stream(exception2.getCause().getStackTrace())
.noneMatch(ste -> ste.getClassName().contains(localFsClass)));

// verify the assumptions this test is making about the local filesystem being the default.
var exception3 = assertThrows(IllegalArgumentException.class,
() -> FileSystem.get(new Configuration()).open(new Path(fileUri)));
assertTrue(exception3.getMessage().contains("Wrong FS: " + fileUri + ", expected: file:///"));
assertTrue(Arrays.stream(exception3.getStackTrace())
.anyMatch(ste -> ste.getClassName().contains(localFsClass)));
}
}

0 comments on commit 51e152a

Please sign in to comment.