Skip to content

Commit

Permalink
Avoid repeated run of setDefaultPipelineOptionsOnce in TestPipeline.r…
Browse files Browse the repository at this point in the history
…un and TestPipelineOptions.create
  • Loading branch information
Abacn committed Oct 10, 2024
1 parent 42cad40 commit 514c8cb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Clients facing {@link FileSystem} utility. */
@SuppressWarnings({
Expand Down Expand Up @@ -558,18 +560,22 @@ static FileSystem getFileSystemInternal(String scheme) {
return rval;
}

private static final Logger LOG = LoggerFactory.getLogger(FileSystems.class);

/** ******************************** METHODS FOR REGISTRATION ********************************* */

/**
* Sets the default configuration in workers.
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
*
* <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
* <p>Outside workers where Beam FileSystem API is used (e.g. test methods, user code executed
* during pipeline submission), consider use {@link #registerFileSystemsOnce} if register
* filesystem schema is the main goal.
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
checkNotNull(options, "options cannot be null");
long id = options.getOptionsId();
int nextRevision = options.revision();

Expand All @@ -593,6 +599,23 @@ public static void setDefaultPipelineOptions(PipelineOptions options) {
}
}

/**
* Register file systems once if never done before.
*
* <p>This method executes {@link #setDefaultPipelineOptions} only if it has never been run,
* otherwise it returns immediately.
*
* <p>It is internally used by test setup to avoid repeated filesystem registrations (involves
* expensive ServiceLoader calls) when there are multiple pipeline and PipelineOptions object
* initialized, which is commonly seen in test execution.
*/
@Internal
public static synchronized void registerFileSystemsOnce(PipelineOptions options) {
if (FILESYSTEM_REVISION.get() == null) {
setDefaultPipelineOptions(options);
}
}

@VisibleForTesting
static Map<String, FileSystem> verifySchemesAreUnique(
PipelineOptions options, Set<FileSystemRegistrar> registrars) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public PipelineResult runWithAdditionalOptionArgs(List<String> additionalArgs) {
}
newOptions.setStableUniqueNames(CheckEnabled.ERROR);

FileSystems.setDefaultPipelineOptions(options);
FileSystems.registerFileSystemsOnce(options);
return run(newOptions);
} catch (IOException e) {
throw new RuntimeException(
Expand Down Expand Up @@ -515,7 +515,7 @@ public static PipelineOptions testingPipelineOptions() {
}
options.setStableUniqueNames(CheckEnabled.ERROR);

FileSystems.setDefaultPipelineOptions(options);
FileSystems.registerFileSystemsOnce(options);
return options;
} catch (IOException e) {
throw new RuntimeException(
Expand Down

0 comments on commit 514c8cb

Please sign in to comment.