From 51e152a85d8406ba1e8c88de5cedb56d94f71120 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 31 Jan 2025 15:47:57 -0500 Subject: [PATCH] Gets the correct filesystem when writing an rfile (#5296) * Gets the correct filesystem when writing an rfile Modified the rfile client code to get the filesystem based on the path being written or read. This covers the case of having multiple filesystems defined in hadoop config. --------- Co-authored-by: Daniel Roberts ddanielr --- .../core/client/rfile/FSConfArgs.java | 10 ++++- .../client/rfile/RFileScannerBuilder.java | 4 +- .../core/client/rfile/RFileWriterBuilder.java | 2 +- .../core/client/rfile/RFileClientTest.java | 41 +++++++++++++++++++ 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java index 4062b4c02b0..58aa64671ea 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java @@ -22,15 +22,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; class FSConfArgs { FileSystem fs; Configuration conf; + FileSystem getFileSystem(Path path) throws IOException { + if (fs == null) { + return path.getFileSystem(getConf()); + } + return fs; + } + FileSystem getFileSystem() throws IOException { if (fs == null) { - fs = FileSystem.get(getConf()); + return FileSystem.get(getConf()); } return fs; } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java index 02b88d1d62f..d9ad691cace 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java @@ -56,8 +56,8 @@ RFileSource[] getSources() throws IOException { if (sources == null) { sources = new RFileSource[paths.length]; for (int i = 0; i < paths.length; i++) { - sources[i] = new RFileSource(getFileSystem().open(paths[i]), - getFileSystem().getFileStatus(paths[i]).getLen()); + sources[i] = new RFileSource(getFileSystem(paths[i]).open(paths[i]), + getFileSystem(paths[i]).getFileStatus(paths[i]).getLen()); } } else { for (int i = 0; i < sources.length; i++) { diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index be1850c8c32..6382d568b5f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -119,7 +119,7 @@ public RFileWriter build() throws IOException { visCacheSize); } else { return new RFileWriter(fileops.newWriterBuilder() - .forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs) + .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); } } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index ec07de4ac07..716803b3563 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.IOException; +import java.net.ConnectException; import java.security.SecureRandom; import java.util.AbstractMap; import java.util.ArrayList; @@ -67,6 +68,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -840,4 +843,42 @@ public void testMultipleFilesAndCache() throws Exception { assertEquals(testData, toMap(scanner)); scanner.close(); } + + @Test + public void testFileSystemFromUri() throws Exception { + String localFsClass = "LocalFileSystem"; + + String remoteFsHost = "127.0.0.5:8080"; + String fileUri = "hdfs://" + remoteFsHost + "/bulk-xyx/file1.rf"; + // There was a bug in the code where the default hadoop file system was always used. This test + // checks that the hadoop filesystem used it based on the URI and not the default filesystem. In + // this env the default file system is the local hadoop file system. + var exception = + assertThrows(ConnectException.class, () -> RFile.newWriter().to(fileUri).build()); + assertTrue(exception.getMessage().contains("to " + remoteFsHost + + " failed on connection exception: java.net.ConnectException: Connection refused")); + // Ensure the DistributedFileSystem was used. + assertTrue(Arrays.stream(exception.getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName()))); + assertTrue(Arrays.stream(exception.getStackTrace()) + .noneMatch(ste -> ste.getClassName().contains(localFsClass))); + + var exception2 = assertThrows(RuntimeException.class, () -> { + var scanner = RFile.newScanner().from(fileUri).build(); + scanner.iterator(); + }); + assertTrue(exception2.getMessage().contains("to " + remoteFsHost + + " failed on connection exception: java.net.ConnectException: Connection refused")); + assertTrue(Arrays.stream(exception2.getCause().getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName()))); + assertTrue(Arrays.stream(exception2.getCause().getStackTrace()) + .noneMatch(ste -> ste.getClassName().contains(localFsClass))); + + // verify the assumptions this test is making about the local filesystem being the default. + var exception3 = assertThrows(IllegalArgumentException.class, + () -> FileSystem.get(new Configuration()).open(new Path(fileUri))); + assertTrue(exception3.getMessage().contains("Wrong FS: " + fileUri + ", expected: file:///")); + assertTrue(Arrays.stream(exception3.getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(localFsClass))); + } }