diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java index 0ecbe4d5b8ded..b4b139ca3062e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java @@ -370,6 +370,14 @@ protected OperationCostValidator.ExpectedProbe always( return expect(true, cost); } + /** + * Always run a metrics operation. + * @return a probe. + */ + protected OperationCostValidator.ExpectedProbe always() { + return OperationCostValidator.always(); + } + /** * A metric diff which must hold when the fs is keeping markers. * @param cost expected cost diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 63b25f9c8874b..25ffc8fda81cb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -52,6 +52,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; @@ -60,10 +62,12 @@ import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST; +import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; @@ -84,6 +88,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { private int fileLength; + /** + * Is prefetching enabled? + */ + private boolean prefetching; + public ITestS3AOpenCost() { super(true); } @@ -111,6 +120,7 @@ public void setup() throws Exception { writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); fileLength = (int)testFileStatus.getLen(); + prefetching = prefetching(); } /** @@ -161,7 +171,11 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable { @Test public void testStreamIsNotChecksummed() throws Throwable { describe("Verify that an opened stream is not checksummed"); + + // if prefetching is enabled, skip this test + assumeNoPrefetching(); S3AFileSystem fs = getFileSystem(); + // open the file try (FSDataInputStream in = verifyMetrics(() -> fs.openFile(testFile) @@ -173,12 +187,6 @@ public void testStreamIsNotChecksummed() throws Throwable { always(NO_HEAD_OR_LIST), with(STREAM_READ_OPENED, 0))) { - // if prefetching is enabled, skip this test - final InputStream wrapped = in.getWrappedStream(); - if (!(wrapped instanceof S3AInputStream)) { - skip("Not an S3AInputStream: " + wrapped); - } - // open the stream. in.read(); // now examine the innermost stream and make sure it doesn't have a checksum @@ -239,16 +247,20 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { byte[] out = new byte[(int) (longLen)]; - intercept(EOFException.class, () -> in.readFully(0, out)); + intercept(EOFException.class, () -> { + in.readFully(0, out); + return in; + }); in.seek(longLen - 1); assertEquals("read past real EOF on " + in, -1, in.read()); return in.toString(); } }, + always(), // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 - with(STREAM_READ_OPENED, 1 + 1)); + probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -293,15 +305,19 @@ private FSDataInputStream openFile(final long longLen, String policy) public void testReadPastEOF() throws Throwable { // set a length past the actual file length + describe("read() up to the end of the real file"); + assumeNoPrefetching(); + final int extra = 10; int longLen = fileLength + extra; try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { for (int i = 0; i < fileLength; i++) { Assertions.assertThat(in.read()) - .describedAs("read() at %d", i) + .describedAs("read() at %d from stream %s", i, in) .isEqualTo(TEXT.charAt(i)); } + LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics())); } // now open and read after the EOF; this is @@ -323,10 +339,12 @@ public void testReadPastEOF() throws Throwable { .describedAs("read() at %d", p) .isEqualTo(-1); } + LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics())); return in.toString(); } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); } /** @@ -353,10 +371,12 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable { return in; }); assertS3StreamClosed(in); - return "readFully past EOF"; + return "readFully past EOF with statistics" + + ioStatisticsToPrettyString(in.getIOStatistics()); } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } /** @@ -370,6 +390,7 @@ public void testPositionedReadableReadPastEOF() throws Throwable { int longLen = fileLength + extra; describe("PositionedReadable.read() past the end of the file"); + assumeNoPrefetching(); verifyMetrics(() -> { try (FSDataInputStream in = @@ -388,10 +409,11 @@ public void testPositionedReadableReadPastEOF() throws Throwable { // stream is closed as part of this failure assertS3StreamClosed(in); - return "PositionedReadable.read()) past EOF"; + return "PositionedReadable.read()) past EOF with " + in; } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } /** @@ -405,7 +427,8 @@ public void testVectorReadPastEOF() throws Throwable { final int extra = 10; int longLen = fileLength + extra; - describe("Vector read past the end of the file"); + describe("Vector read past the end of the file, expecting an EOFException"); + verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { @@ -420,10 +443,29 @@ public void testVectorReadPastEOF() throws Throwable { TimeUnit.SECONDS, range.getData()); assertS3StreamClosed(in); - return "vector read past EOF"; + return "vector read past EOF with " + in; } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); + } + + /** + * Probe the FS for supporting prefetching. + * @return true if the fs has prefetching enabled. + */ + private boolean prefetching() { + return getFileSystem().getConf().getBoolean( + PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); + } + + /** + * Skip the test if prefetching is enabled. + */ + private void assumeNoPrefetching(){ + if (prefetching) { + skip("Prefetching is enabled"); + } } /** @@ -431,20 +473,26 @@ public void testVectorReadPastEOF() throws Throwable { * @param in input stream */ private static void assertS3StreamClosed(final FSDataInputStream in) { - S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); - Assertions.assertThat(s3ain.isObjectStreamOpen()) - .describedAs("stream is open") - .isFalse(); + final InputStream wrapped = in.getWrappedStream(); + if (wrapped instanceof S3AInputStream) { + S3AInputStream s3ain = (S3AInputStream) wrapped; + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is open: %s", s3ain) + .isFalse(); + } } /** - * Assert that the inner S3 Stream is open. + * Assert that the inner S3 Stream is closed. * @param in input stream */ private static void assertS3StreamOpen(final FSDataInputStream in) { - S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); - Assertions.assertThat(s3ain.isObjectStreamOpen()) - .describedAs("stream is closed") - .isTrue(); + final InputStream wrapped = in.getWrappedStream(); + if (wrapped instanceof S3AInputStream) { + S3AInputStream s3ain = (S3AInputStream) wrapped; + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is closed: %s", s3ain) + .isTrue(); + } } }