From c1e9aefc2449a4ea0ff3fa590cf1eb6c1cb484a2 Mon Sep 17 00:00:00 2001 From: GuoWei Ma Date: Mon, 9 Sep 2019 19:02:28 +0800 Subject: [PATCH] [FLINK-14465] Let StandaloneJobClusterEntryPoint use the user code class loader [FLINK-14465] The PackageProgram's constructor does not throw excpetion any more when jarFile is null. Introducing this change is because there may be no jarFile in perjob mode. All jars the user code depends on are in the classpaths. [FLINK-14465] ClassPathJobGraphRetriever creates PackagesProgram with user class paths. [FLINK-14465] StandaloneJobClusterEntryPoint uses "FLINK_HOME/usrlib" as the job's class path. The environment variable FLINK_HOME is set at Dockerfile. Link the FLINK_JOB_ARTIFACTS_DIR to the FLINK_HOME/job, which makes the FlinkUserClassloader load the user class in the standalone perjob mode. This closes #10076. --- .../apache/flink/client/cli/CliFrontend.java | 9 +- .../flink/client/program/PackagedProgram.java | 255 ++++++++---------- .../client/program/PackagedProgramUtils.java | 8 + .../cli/CliFrontendPackageProgramTest.java | 1 + .../flink/client/program/ClientTest.java | 15 +- .../program/ExecutionPlanCreationTest.java | 5 +- .../client/program/PackagedProgramTest.java | 17 +- flink-container/docker/Dockerfile | 3 +- flink-container/docker/docker-entrypoint.sh | 2 +- flink-container/pom.xml | 60 +++++ .../ClassPathJobGraphRetriever.java | 149 ++++++++-- .../StandaloneJobClusterEntryPoint.java | 10 +- ...assembly-test-user-classloader-job-jar.xml | 35 +++ ...mbly-test-user-classloader-job-lib-jar.xml | 35 +++ .../ClassPathJobGraphRetrieverTest.java | 181 ++++++++++--- .../entrypoint/testjar/TestJobInfo.java | 33 +++ .../testjar/TestUserClassLoaderJob.java | 41 +++ .../testjar/TestUserClassLoaderJobLib.java | 32 +++ .../flink/configuration/ConfigConstants.java | 3 + .../java/org/apache/flink/util/FileUtils.java | 28 ++ .../examples/wordcount/WordCount.java | 1 - .../avro/AvroExternalJarProgramITCase.java | 5 +- .../handlers/EntryClassQueryParameter.java | 4 +- .../webmonitor/handlers/JarListHandler.java | 2 +- .../handlers/ProgramArgQueryParameter.java | 4 +- .../handlers/ProgramArgsQueryParameter.java | 4 +- .../handlers/utils/JarHandlerUtils.java | 9 +- .../runtime/entrypoint/ClusterEntrypoint.java | 2 +- .../runtime/util/ClusterEntrypointUtils.java | 62 +++++ .../test/classloading/ClassLoaderITCase.java | 58 ++-- .../flink/yarn/YarnConfigurationITCase.java | 2 +- 31 files changed, 813 insertions(+), 262 deletions(-) create mode 100644 flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml create mode 100644 flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/ClusterEntrypointUtils.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 6f6e933265380..9552978c35464 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -796,9 +796,12 @@ PackagedProgram buildProgram( jarFile = getJarFile(jarFilePath); } - PackagedProgram program = entryPointClass == null ? - new PackagedProgram(jarFile, classpaths, programArgs) : - new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); + PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(jarFile) + .setUserClassPaths(classpaths) + .setEntryPointClassName(entryPointClass) + .setArguments(programArgs) + .build(); program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 2f765b14ed7ec..0a09367d3ae2d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -54,6 +54,10 @@ import java.util.jar.JarFile; import java.util.jar.Manifest; +import static org.apache.flink.client.program.PackagedProgramUtils.isPython; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * This class encapsulates represents a program, packaged in a jar file. It supplies * functionality to extract nested libraries, search for the program entry point, and extract @@ -92,88 +96,27 @@ public class PackagedProgram { */ private final boolean isPython; - /** - * Creates an instance that wraps the plan defined in the jar file using the given - * argument. - * - * @param jarFile - * The jar file which contains the plan and a Manifest which defines - * the program-class - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. - */ - public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException { - this(jarFile, Collections.emptyList(), null, args); - } - - /** - * Creates an instance that wraps the plan defined in the jar file using the given - * argument. - * - * @param jarFile - * The jar file which contains the plan and a Manifest which defines - * the program-class - * @param classpaths - * Additional classpath URLs needed by the Program. - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. - */ - public PackagedProgram(File jarFile, List classpaths, String... args) throws ProgramInvocationException { - this(jarFile, classpaths, null, args); - } - /** * Creates an instance that wraps the plan defined in the jar file using the given * arguments. For generating the plan the class defined in the className parameter * is used. * - * @param jarFile - * The jar file which contains the plan. - * @param entryPointClassName - * Name of the class which generates the plan. Overrides the class defined - * in the jar file manifest - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. + * @param jarFile The jar file which contains the plan. + * @param classpaths Additional classpath URLs needed by the Program. + * @param entryPointClassName Name of the class which generates the plan. Overrides the class defined + * in the jar file manifest + * @param args Optional. The arguments used to create the pact plan, depend on + * implementation of the pact plan. See getDescription(). + * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes + * may be a missing / wrong class or manifest files. */ - public PackagedProgram(File jarFile, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { - this(jarFile, Collections.emptyList(), entryPointClassName, args); - } + private PackagedProgram(@Nullable File jarFile, List classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { + checkNotNull(classpaths); + checkNotNull(args); + checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null."); - /** - * Creates an instance that wraps the plan defined in the jar file using the given - * arguments. For generating the plan the class defined in the className parameter - * is used. - * - * @param jarFile - * The jar file which contains the plan. - * @param classpaths - * Additional classpath URLs needed by the Program. - * @param entryPointClassName - * Name of the class which generates the plan. Overrides the class defined - * in the jar file manifest - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. - */ - public PackagedProgram(File jarFile, List classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { // Whether the job is a Python job. - isPython = entryPointClassName != null && (entryPointClassName.equals("org.apache.flink.client.python.PythonDriver") - || entryPointClassName.equals("org.apache.flink.client.python.PythonGatewayServer")); + isPython = isPython(entryPointClassName); URL jarFileUrl = null; if (jarFile != null) { @@ -184,12 +127,10 @@ public PackagedProgram(File jarFile, List classpaths, @Nullable String entr } checkJarFile(jarFileUrl); - } else if (!isPython) { - throw new IllegalArgumentException("The jar file must not be null."); } this.jarFile = jarFileUrl; - this.args = args == null ? new String[0] : args; + this.args = args; // if no entryPointClassName name was given, we try and look one up through the manifest if (entryPointClassName == null) { @@ -209,23 +150,6 @@ public PackagedProgram(File jarFile, List classpaths, @Nullable String entr } } - public PackagedProgram(Class entryPointClass, String... args) throws ProgramInvocationException { - this.jarFile = null; - this.args = args == null ? new String[0] : args; - - this.extractedTempLibraries = Collections.emptyList(); - this.classpaths = Collections.emptyList(); - this.userCodeClassLoader = entryPointClass.getClassLoader(); - - // load the entry point class - this.mainClass = entryPointClass; - isPython = entryPointClass.getCanonicalName().equals("org.apache.flink.client.python.PythonDriver"); - - if (!hasMainMethod(mainClass)) { - throw new ProgramInvocationException("The given program class does not have a main(String[]) method."); - } - } - public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings) { this.savepointSettings = savepointSettings; } @@ -247,9 +171,8 @@ public String getMainClassName() { * may contain a description of the plan itself and its arguments. * * @return The description of the PactProgram's input parameters. - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. + * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes + * may be a missing / wrong class or manifest files. */ @Nullable public String getDescription() throws ProgramInvocationException { @@ -258,17 +181,16 @@ public String getDescription() throws ProgramInvocationException { ProgramDescription descr; try { descr = InstantiationUtil.instantiate( - this.mainClass.asSubclass(ProgramDescription.class), ProgramDescription.class); + this.mainClass.asSubclass(ProgramDescription.class), ProgramDescription.class); } catch (Throwable t) { return null; } try { return descr.getDescription(); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Error while getting the program description" + - (t.getMessage() == null ? "." : ": " + t.getMessage()), t); + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } else { @@ -280,7 +202,7 @@ public String getDescription() throws ProgramInvocationException { * This method assumes that the context environment is prepared, or the execution * will be a local execution by default. */ - public void invokeInteractiveModeForExecution() throws ProgramInvocationException{ + public void invokeInteractiveModeForExecution() throws ProgramInvocationException { callMainMethod(mainClass, args); } @@ -314,8 +236,7 @@ public List getAllLibraries() { for (File tmpLib : this.extractedTempLibraries) { try { libs.add(tmpLib.getAbsoluteFile().toURI().toURL()); - } - catch (MalformedURLException e) { + } catch (MalformedURLException e) { throw new RuntimeException("URL is invalid. This should not happen.", e); } } @@ -367,10 +288,9 @@ private static boolean hasMainMethod(Class entryClass) { mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { return false; - } - catch (Throwable t) { + } catch (Throwable t) { throw new RuntimeException("Could not look up the main(String[]) method from the class " + - entryClass.getName() + ": " + t.getMessage(), t); + entryClass.getName() + ": " + t.getMessage(), t); } return Modifier.isStatic(mainMethod.getModifiers()) && Modifier.isPublic(mainMethod.getModifiers()); @@ -386,10 +306,9 @@ private static void callMainMethod(Class entryClass, String[] args) throws Pr mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method."); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " + - entryClass.getName() + ": " + t.getMessage(), t); + entryClass.getName() + ": " + t.getMessage(), t); } if (!Modifier.isStatic(mainMethod.getModifiers())) { @@ -401,14 +320,11 @@ private static void callMainMethod(Class entryClass, String[] args) throws Pr try { mainMethod.invoke(null, (Object) args); - } - catch (IllegalArgumentException e) { + } catch (IllegalArgumentException e) { throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e); - } - catch (IllegalAccessException e) { + } catch (IllegalAccessException e) { throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e); - } - catch (InvocationTargetException e) { + } catch (InvocationTargetException e) { Throwable exceptionInMethod = e.getTargetException(); if (exceptionInMethod instanceof Error) { throw (Error) exceptionInMethod; @@ -419,8 +335,7 @@ private static void callMainMethod(Class entryClass, String[] args) throws Pr } else { throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod); } - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t); } } @@ -468,10 +383,9 @@ private static String getEntryPointClassNameFromJar(URL jarFile) throws ProgramI return className; } else { throw new ProgramInvocationException("Neither a '" + MANIFEST_ATTRIBUTE_MAIN_CLASS + "', nor a '" + - MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS + "' entry was found in the jar file."); + MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS + "' entry was found in the jar file."); } - } - finally { + } finally { try { jar.close(); } catch (Throwable t) { @@ -486,20 +400,16 @@ private static Class loadMainClass(String className, ClassLoader cl) throws P contextCl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); return Class.forName(className, false, cl); - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' was not found in the jar file.", e); - } - catch (ExceptionInInitializerError e) { + } catch (ExceptionInInitializerError e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' threw an error during initialization.", e); - } - catch (LinkageError e) { + } catch (LinkageError e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' could not be loaded due to a linkage failure.", e); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("The program's entry point class '" + className + "' caused an exception during initialization: " + t.getMessage(), t); } finally { @@ -537,8 +447,7 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn if (containedJarFileEntries.isEmpty()) { return Collections.emptyList(); - } - else { + } else { // go over all contained jar files final List extractedTempLibraries = new ArrayList(containedJarFileEntries.size()); final byte[] buffer = new byte[4096]; @@ -557,11 +466,10 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn try { tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name); tempFile.deleteOnExit(); - } - catch (IOException e) { + } catch (IOException e) { throw new ProgramInvocationException( "An I/O error occurred while creating temporary file to extract nested library '" + - entry.getName() + "'.", e); + entry.getName() + "'.", e); } extractedTempLibraries.add(tempFile); @@ -578,12 +486,10 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn while ((numRead = in.read(buffer)) != -1) { out.write(buffer, 0, numRead); } - } - catch (IOException e) { + } catch (IOException e) { throw new ProgramInvocationException("An I/O error occurred while extracting nested library '" - + entry.getName() + "' to temporary file '" + tempFile.getAbsolutePath() + "'."); - } - finally { + + entry.getName() + "' to temporary file '" + tempFile.getAbsolutePath() + "'."); + } finally { if (out != null) { out.close(); } @@ -594,8 +500,7 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn } incomplete = false; - } - finally { + } finally { if (incomplete) { deleteExtractedLibraries(extractedTempLibraries); } @@ -603,15 +508,14 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn return extractedTempLibraries; } - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Unknown I/O error while extracting contained jar files.", t); - } - finally { + } finally { if (jar != null) { try { jar.close(); - } catch (Throwable t) {} + } catch (Throwable t) { + } } } } @@ -625,13 +529,64 @@ public static void deleteExtractedLibraries(List tempLibraries) { private static void checkJarFile(URL jarfile) throws ProgramInvocationException { try { ClientUtils.checkJarFile(jarfile); - } - catch (IOException e) { + } catch (IOException e) { throw new ProgramInvocationException(e.getMessage(), e); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } + /** + * A Builder For {@link PackagedProgram}. + */ + public static class Builder { + + @Nullable + private File jarFile; + + @Nullable + private String entryPointClassName; + + private String[] args = new String[0]; + + private List userClassPaths = Collections.emptyList(); + + public Builder setJarFile(@Nullable File jarFile) { + this.jarFile = jarFile; + return this; + } + + public Builder setUserClassPaths(List userClassPaths) { + this.userClassPaths = userClassPaths; + return this; + } + + public Builder setEntryPointClassName(@Nullable String entryPointClassName) { + this.entryPointClassName = entryPointClassName; + return this; + } + + public Builder setArguments(String... args) { + this.args = args; + return this; + } + + public PackagedProgram build() throws ProgramInvocationException { + if (jarFile == null && entryPointClassName == null) { + throw new IllegalArgumentException("The jarFile and entryPointClassName can not be null at the same time."); + } + return new PackagedProgram( + jarFile, + userClassPaths, + entryPointClassName, + args); + } + + private Builder() { + } + } + + public static Builder newBuilder() { + return new Builder(); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index fa9f8b0486d15..2f2719318a772 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -33,6 +33,9 @@ */ public class PackagedProgramUtils { + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; /** * Creates a {@link JobGraph} with a specified {@link JobID} * from the given {@link PackagedProgram}. @@ -102,5 +105,10 @@ public static Pipeline getPipelineFromProgram(PackagedProgram prog, int parallel } } + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) && + (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + private PackagedProgramUtils() {} } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index 2efb8ca9bc258..2b64c8238fb34 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -97,6 +97,7 @@ public void testFileNotJarFile() throws Exception { ProgramOptions programOptions = mock(ProgramOptions.class); ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class); when(executionOptions.getJarFilePath()).thenReturn(getNonJarFilePath()); + when(programOptions.getProgramArgs()).thenReturn(new String[0]); try { frontend.buildProgram(programOptions, executionOptions); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 6c6ec1dfc85a8..78023d7cd2e08 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -99,7 +99,7 @@ public void setUp() throws Exception { public void testDetachedMode() throws Exception{ final ClusterClient clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster()); try { - PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -109,7 +109,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestEager.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -119,7 +119,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestGetRuntime.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -129,7 +129,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -139,7 +139,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -194,7 +194,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable { @Test public void testGetExecutionPlan() throws ProgramInvocationException { - PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp"); + PackagedProgram prg = PackagedProgram.newBuilder() + .setEntryPointClassName(TestOptimizerPlan.class.getName()) + .setArguments("/dev/random", "/tmp") + .build(); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index 1b52f377c7497..6343a2f22277f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -49,7 +49,10 @@ public class ExecutionPlanCreationTest { @Test public void testGetExecutionPlan() { try { - PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp"); + PackagedProgram prg = PackagedProgram.newBuilder() + .setEntryPointClassName(TestOptimizerPlan.class.getName()) + .setArguments("/dev/random", "/tmp") + .build(); InetAddress mockAddress = InetAddress.getLocalHost(); InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java index 7a7cf64a1ac34..c30d3f518c8e8 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.client.cli.CliFrontendTestUtils; import org.apache.flink.configuration.ConfigConstants; import org.junit.Assert; @@ -28,10 +29,13 @@ import java.io.File; import java.io.FileOutputStream; import java.nio.file.Files; +import java.util.Collections; import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS; + /** * Tests for the {@link PackagedProgram}. */ @@ -57,8 +61,15 @@ public void testExtractContainedLibraries() throws Exception { Assert.assertArrayEquals(nestedJarContent, Files.readAllBytes(files.iterator().next().toPath())); } - private static final class NullOutputStream extends java.io.OutputStream { - @Override - public void write(int b) {} + @Test + public void testNotThrowExceptionWhenJarFileIsNull() throws Exception { + PackagedProgram.newBuilder() + .setUserClassPaths(Collections.singletonList(new File(CliFrontendTestUtils.getTestJarPath()).toURI().toURL())) + .setEntryPointClassName(TEST_JAR_MAIN_CLASS); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuilderThrowExceptionIfjarFileAndEntryPointClassNameAreBothNull() throws ProgramInvocationException { + PackagedProgram.newBuilder().build(); } } diff --git a/flink-container/docker/Dockerfile b/flink-container/docker/Dockerfile index a68835edff20b..a0d3d8fb1bce2 100644 --- a/flink-container/docker/Dockerfile +++ b/flink-container/docker/Dockerfile @@ -28,6 +28,7 @@ ENV FLINK_LIB_DIR $FLINK_HOME/lib ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins ENV FLINK_OPT_DIR $FLINK_HOME/opt ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts +ENV FLINK_USR_LIB_DIR $FLINK_HOME/usrlib ENV PATH $PATH:$FLINK_HOME/bin # flink-dist can point to a directory or a tarball on the local system @@ -51,7 +52,7 @@ ADD $job_artifacts/* $FLINK_JOB_ARTIFACTS_DIR/ RUN set -x && \ ln -s $FLINK_INSTALL_PATH/flink-[0-9]* $FLINK_HOME && \ - for jar in $FLINK_JOB_ARTIFACTS_DIR/*.jar; do [ -f "$jar" ] || continue; ln -s $jar $FLINK_LIB_DIR; done && \ + ln -s $FLINK_JOB_ARTIFACTS_DIR $FLINK_USR_LIB_DIR && \ if [ -n "$python_version" ]; then ln -s $FLINK_OPT_DIR/flink-python*.jar $FLINK_LIB_DIR; fi && \ if [ -f ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* ]; then ln -s ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* $FLINK_LIB_DIR; fi && \ addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \ diff --git a/flink-container/docker/docker-entrypoint.sh b/flink-container/docker/docker-entrypoint.sh index 0bf7c04fa93c0..0c1df000acb9b 100755 --- a/flink-container/docker/docker-entrypoint.sh +++ b/flink-container/docker/docker-entrypoint.sh @@ -19,7 +19,7 @@ ################################################################################ ### If unspecified, the hostname of the container is taken as the JobManager address -FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"} +FLINK_HOME=${FLINK_HOME:-"/opt/flink"} JOB_CLUSTER="job-cluster" TASK_MANAGER="task-manager" diff --git a/flink-container/pom.xml b/flink-container/pom.xml index 78dfe3d909c58..e94579b39d641 100644 --- a/flink-container/pom.xml +++ b/flink-container/pom.xml @@ -95,6 +95,66 @@ under the License. + + create-test-dependency-user-jar + process-test-classes + + single + + + + + org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJob + + + maven + false + + src/test/assembly/test-assembly-test-user-classloader-job-jar.xml + + + + + create-test-dependency-user-jar-depend + process-test-classes + + single + + + maven + false + + src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml + + + + + + + + maven-clean-plugin + 2.5 + + + remove-externaltestclasses + process-test-classes + + clean + + + true + + + ${project.build.testOutputDirectory} + + **/testjar/TestUser*.class + + + + + diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 57808e3328506..9ed109a49023d 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -26,9 +26,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.container.entrypoint.JarManifestParser.JarFileWithEntryClass; +import org.apache.flink.runtime.entrypoint.component.AbstractUserClassPathJobGraphRetriever; import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.slf4j.Logger; @@ -39,9 +42,14 @@ import java.io.File; import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.NoSuchElementException; import java.util.function.Supplier; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -50,7 +58,7 @@ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class * on the class path. */ -class ClassPathJobGraphRetriever implements JobGraphRetriever { +class ClassPathJobGraphRetriever extends AbstractUserClassPathJobGraphRetriever { private static final Logger LOG = LoggerFactory.getLogger(ClassPathJobGraphRetriever.class); @@ -69,26 +77,23 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever { @Nonnull private final Supplier> jarsOnClassPath; - ClassPathJobGraphRetriever( - @Nonnull JobID jobId, - @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments, - @Nullable String jobClassName) { - this(jobId, savepointRestoreSettings, programArguments, jobClassName, JarsOnClassPath.INSTANCE); - } + @Nullable + private final File userLibDirectory; - @VisibleForTesting - ClassPathJobGraphRetriever( - @Nonnull JobID jobId, - @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments, - @Nullable String jobClassName, - @Nonnull Supplier> jarsOnClassPath) { + private ClassPathJobGraphRetriever( + @Nonnull JobID jobId, + @Nonnull SavepointRestoreSettings savepointRestoreSettings, + @Nonnull String[] programArguments, + @Nullable String jobClassName, + @Nonnull Supplier> jarsOnClassPath, + @Nullable File userLibDirectory) throws IOException { + super(userLibDirectory); + this.userLibDirectory = userLibDirectory; this.jobId = requireNonNull(jobId, "jobId"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.programArguments = requireNonNull(programArguments, "programArguments"); this.jobClassName = jobClassName; - this.jarsOnClassPath = requireNonNull(jarsOnClassPath, "jarsOnClassPath"); + this.jarsOnClassPath = requireNonNull(jarsOnClassPath); } @Override @@ -112,15 +117,28 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept private PackagedProgram createPackagedProgram() throws FlinkException { final String entryClass = getJobClassNameOrScanClassPath(); try { - final Class mainClass = getClass().getClassLoader().loadClass(entryClass); - return new PackagedProgram(mainClass, programArguments); - } catch (ClassNotFoundException | ProgramInvocationException e) { + return PackagedProgram.newBuilder() + .setUserClassPaths(new ArrayList<>(getUserClassPaths())) + .setEntryPointClassName(entryClass) + .setArguments(programArguments) + .build(); + } catch (ProgramInvocationException e) { throw new FlinkException("Could not load the provided entrypoint class.", e); } } private String getJobClassNameOrScanClassPath() throws FlinkException { if (jobClassName != null) { + if (userLibDirectory != null) { + // check that we find the entrypoint class in the user lib directory. + if (!userClassPathContainsJobClass(jobClassName)) { + throw new FlinkException( + String.format( + "Could not find the provided job class (%s) in the user lib directory (%s).", + jobClassName, + userLibDirectory)); + } + } return jobClassName; } @@ -131,10 +149,47 @@ private String getJobClassNameOrScanClassPath() throws FlinkException { } } + private boolean userClassPathContainsJobClass(String jobClassName) { + for (URL userClassPath : getUserClassPaths()) { + try (final JarFile jarFile = new JarFile(userClassPath.getFile())) { + if (jarContainsJobClass(jobClassName, jarFile)) { + return true; + } + } catch (IOException e) { + ExceptionUtils.rethrow( + e, + String.format( + "Failed to open user class path %s. Make sure that all files on the user class path can be accessed.", + userClassPath)); + } + } + return false; + } + + private boolean jarContainsJobClass(String jobClassName, JarFile jarFile) { + return jarFile + .stream() + .map(JarEntry::getName) + .filter(fileName -> fileName.endsWith(FileUtils.CLASS_FILE_EXTENSION)) + .map(FileUtils::stripFileExtension) + .map(fileName -> fileName.replaceAll(Pattern.quote(File.separator), FileUtils.PACKAGE_SEPARATOR)) + .anyMatch(name -> name.equals(jobClassName)); + } + private String scanClassPathForJobJar() throws IOException { - LOG.info("Scanning class path for job JAR"); - JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jarsOnClassPath.get()); + final Iterable jars; + if (userLibDirectory == null) { + LOG.info("Scanning system class path for job JAR"); + jars = jarsOnClassPath.get(); + } else { + LOG.info("Scanning user class path for job JAR"); + jars = getUserClassPaths() + .stream() + .map(url -> new File(url.getFile())) + .collect(Collectors.toList()); + } + final JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jars); LOG.info("Using {} as job jar", jobJar); return jobJar.getEntryClass(); } @@ -164,4 +219,56 @@ private static boolean notNullAndNotEmpty(String string) { } } + static class Builder { + + private final JobID jobId; + + private final SavepointRestoreSettings savepointRestoreSettings; + + private final String[] programArguments; + + @Nullable + private String jobClassName; + + @Nullable + private File userLibDirectory; + + private Supplier> jarsOnClassPath = JarsOnClassPath.INSTANCE; + + private Builder(JobID jobId, SavepointRestoreSettings savepointRestoreSettings, String[] programArguments) { + this.jobId = requireNonNull(jobId); + this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings); + this.programArguments = requireNonNull(programArguments); + } + + Builder setJobClassName(@Nullable String jobClassName) { + this.jobClassName = jobClassName; + return this; + } + + Builder setUserLibDirectory(File userLibDirectory) { + this.userLibDirectory = userLibDirectory; + return this; + } + + Builder setJarsOnClassPath(Supplier> jarsOnClassPath) { + this.jarsOnClassPath = jarsOnClassPath; + return this; + } + + ClassPathJobGraphRetriever build() throws IOException { + return new ClassPathJobGraphRetriever( + jobId, + savepointRestoreSettings, + programArguments, + jobClassName, + jarsOnClassPath, + userLibDirectory); + } + } + + static Builder newBuilder(JobID jobId, SavepointRestoreSettings savepointRestoreSettings, String[] programArguments) { + return new Builder(jobId, savepointRestoreSettings, programArguments); + } + } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 8935f358d5237..b5be5c4151237 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -36,9 +36,11 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Optional; import static java.util.Objects.requireNonNull; +import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory; /** * {@link JobClusterEntrypoint} which is started with a job in a predefined @@ -74,10 +76,14 @@ private StandaloneJobClusterEntryPoint( } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException { + final ClassPathJobGraphRetriever.Builder classPathJobGraphRetrieverBuilder = ClassPathJobGraphRetriever.newBuilder(jobId, savepointRestoreSettings, programArguments) + .setJobClassName(jobClassName); + tryFindUserLibDirectory().ifPresent(classPathJobGraphRetrieverBuilder::setUserLibDirectory); + return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( StandaloneResourceManagerFactory.INSTANCE, - new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName)); + classPathJobGraphRetrieverBuilder.build()); } public static void main(String[] args) { diff --git a/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml new file mode 100644 index 0000000000000..ac08f684ead37 --- /dev/null +++ b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml @@ -0,0 +1,35 @@ + + + + test-user-classloader-job-jar + + jar + + false + + + ${project.build.testOutputDirectory} + / + + org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.class + + + + diff --git a/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml new file mode 100644 index 0000000000000..b1513219044a5 --- /dev/null +++ b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml @@ -0,0 +1,35 @@ + + + + test-user-classloader-job-lib-jar + + jar + + false + + + ${project.build.testOutputDirectory} + / + + org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.class + + + + diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java index 433263dff4d2b..92228fd09a0ac 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java @@ -22,21 +22,34 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.JarsOnClassPath; +import org.apache.flink.container.entrypoint.testjar.TestJobInfo; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -44,6 +57,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Tests for the {@link ClassPathJobGraphRetriever}. @@ -53,20 +67,69 @@ public class ClassPathJobGraphRetrieverTest extends TestLogger { @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @ClassRule + public static final TemporaryFolder JOB_DIRS = new TemporaryFolder(); + private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"}; + /* + * The directory structure used to test + * + * userDirHasEntryClass/ + * |_jarWithEntryClass + * |_jarWithoutEntryClass + * |_textFile + * + * userDirHasNotEntryClass/ + * |_jarWithoutEntryClass + * |_textFile + */ + + private static final Collection expectedURLs = new ArrayList<>(); + + private static File userDirHasEntryClass; + + private static File userDirHasNotEntryClass; + + @BeforeClass + public static void init() throws IOException { + final String textFileName = "test.txt"; + final String userDirHasEntryClassName = "_test_user_dir_has_entry_class"; + final String userDirHasNotEntryClassName = "_test_user_dir_has_not_entry_class"; + + userDirHasEntryClass = JOB_DIRS.newFolder(userDirHasEntryClassName); + final Path userJarPath = userDirHasEntryClass.toPath().resolve(TestJobInfo.JOB_JAR_PATH.toFile().getName()); + final Path userLibJarPath = + userDirHasEntryClass.toPath().resolve(TestJobInfo.JOB_LIB_JAR_PATH.toFile().getName()); + userDirHasNotEntryClass = JOB_DIRS.newFolder(userDirHasNotEntryClassName); + + //create files + Files.copy(TestJobInfo.JOB_JAR_PATH, userJarPath); + Files.copy(TestJobInfo.JOB_LIB_JAR_PATH, userLibJarPath); + Files.createFile(userDirHasEntryClass.toPath().resolve(textFileName)); + + Files.copy(TestJobInfo.JOB_LIB_JAR_PATH, userDirHasNotEntryClass.toPath().resolve(TestJobInfo.JOB_LIB_JAR_PATH.toFile().getName())); + Files.createFile(userDirHasNotEntryClass.toPath().resolve(textFileName)); + + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + Arrays.asList(userJarPath, userLibJarPath) + .stream() + .map(path -> FileUtils.relativizePath(workingDirectory, path)) + .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) + .forEach(expectedURLs::add); + } + @Test - public void testJobGraphRetrieval() throws FlinkException { + public void testJobGraphRetrieval() throws FlinkException, IOException { final int parallelism = 42; final Configuration configuration = new Configuration(); configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); final JobID jobId = new JobID(); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - jobId, - SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS, - TestJob.class.getCanonicalName()); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(jobId, SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); @@ -76,15 +139,12 @@ public void testJobGraphRetrieval() throws FlinkException { } @Test - public void testJobGraphRetrievalFromJar() throws FlinkException, FileNotFoundException { + public void testJobGraphRetrievalFromJar() throws FlinkException, IOException { final File testJar = TestJob.getTestJobJar(); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - new JobID(), - SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS, - // No class name specified, but the test JAR "is" on the class path - null, - () -> Collections.singleton(testJar)); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJarsOnClassPath(() -> Collections.singleton(testJar)) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); @@ -92,17 +152,16 @@ public void testJobGraphRetrievalFromJar() throws FlinkException, FileNotFoundEx } @Test - public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws FlinkException, FileNotFoundException { + public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws FlinkException, IOException { final File testJar = new File("non-existing"); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - new JobID(), - SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS, - // Both a class name is specified and a JAR "is" on the class path - // The class name should have precedence. - TestJob.class.getCanonicalName(), - () -> Collections.singleton(testJar)); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + // Both a class name is specified and a JAR "is" on the class path + // The class name should have precedence. + .setJobClassName(TestJob.class.getCanonicalName()) + .setJarsOnClassPath(() -> Collections.singleton(testJar)) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); @@ -110,16 +169,15 @@ public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws } @Test - public void testSavepointRestoreSettings() throws FlinkException { + public void testSavepointRestoreSettings() throws FlinkException, IOException { final Configuration configuration = new Configuration(); final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true); final JobID jobId = new JobID(); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - jobId, - savepointRestoreSettings, - PROGRAM_ARGUMENTS, - TestJob.class.getCanonicalName()); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(jobId, savepointRestoreSettings, PROGRAM_ARGUMENTS) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); @@ -160,6 +218,68 @@ public void testJarFromClassPathSupplier() throws IOException { assertThat(jarFiles, contains(file1, file2)); } + @Test + public void testJobGraphRetrievalFailIfJobDirDoesNotHaveEntryClass() throws IOException { + final File testJar = TestJob.getTestJobJar(); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJarsOnClassPath(() -> Collections.singleton(testJar)) + .setUserLibDirectory(userDirHasNotEntryClass) + .build(); + try { + classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + Assert.fail("This case should throw exception !"); + } catch (FlinkException e) { + assertTrue(ExceptionUtils + .findThrowableWithMessage(e, "Failed to find job JAR on class path") + .isPresent()); + } + } + + @Test + public void testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir() throws IOException { + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJobClassName(TestJobInfo.JOB_CLASS) + .setJarsOnClassPath(Collections::emptyList) + .setUserLibDirectory(userDirHasNotEntryClass) + .build(); + try { + classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + Assert.fail("This case should throw class not found exception!!"); + } catch (FlinkException e) { + assertTrue(ExceptionUtils + .findThrowableWithMessage(e, "Could not find the provided job class") + .isPresent()); + } + + } + + @Test + public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() throws IOException, FlinkException { + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJarsOnClassPath(Collections::emptyList) + .setUserLibDirectory(userDirHasEntryClass) + .build(); + final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + + assertThat(jobGraph.getClasspaths(), containsInAnyOrder(expectedURLs.toArray())); + } + + @Test + public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() throws IOException, FlinkException { + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJobClassName(TestJobInfo.JOB_CLASS) + .setJarsOnClassPath(Collections::emptyList) + .setUserLibDirectory(userDirHasEntryClass) + .build(); + final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + + assertThat(jobGraph.getClasspaths(), containsInAnyOrder(expectedURLs.toArray())); + } + private static String javaClassPath(String... entries) { String pathSeparator = System.getProperty(JarsOnClassPath.PATH_SEPARATOR); return String.join(pathSeparator, entries); @@ -175,5 +295,4 @@ private static Iterable setClassPathAndGetJarsOnClassPath(String classPath System.setProperty(JarsOnClassPath.JAVA_CLASS_PATH, originalClassPath); } } - } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java new file mode 100644 index 0000000000000..d682eccebc9a0 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint.testjar; + +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * The test job information. + */ +public class TestJobInfo { + + public static final String JOB_CLASS = "org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJob"; + public static final String JOB_LIB_CLASS = "org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJobLib"; + public static final Path JOB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-jar.jar"); + public static final Path JOB_LIB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-lib-jar.jar"); +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java new file mode 100644 index 0000000000000..1f68db037fff2 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint.testjar; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; + +/** + * This class can used to test situation that the jar is not in the system classpath. + */ +public class TestUserClassLoaderJob { + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStreamSource source = env.fromElements(new TestUserClassLoaderJobLib().getValue(), 1, 2, 3, 4); + final SingleOutputStreamOperator mapper = source.map(element -> 2 * element); + mapper.addSink(new DiscardingSink<>()); + + ParameterTool parameterTool = ParameterTool.fromArgs(args); + env.execute(TestUserClassLoaderJob.class.getCanonicalName() + "-" + parameterTool.getRequired("arg")); + } +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java new file mode 100644 index 0000000000000..82f5a29b33d87 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint.testjar; + +/** + * This class is depended by {@link TestUserClassLoaderJob}. + */ +class TestUserClassLoaderJobLib { + + int getValue() { + return 0; + } + + public static void main(String[] args) { + } +} diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d84b5626000cf..b156ec844e551 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -2021,6 +2021,9 @@ public final class ConfigConstants { /** The environment variable name which contains the Flink installation root directory. */ public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME"; + /** The user lib directory name. */ + public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib"; + // ---------------------------- Encoding ------------------------------ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index 08ea86281d2a7..37150edc5e643 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -84,6 +84,11 @@ public final class FileUtils { private static final String JAR_FILE_EXTENSION = "jar"; + public static final String CLASS_FILE_EXTENSION = "class"; + + public static final String PACKAGE_SEPARATOR = "."; + + // ------------------------------------------------------------------------ public static void writeCompletely(WritableByteChannel channel, ByteBuffer src) throws IOException { @@ -595,6 +600,16 @@ public static java.nio.file.Path getCurrentWorkingDirectory() { return Paths.get(System.getProperty("user.dir")); } + /** + * Checks whether the given file has a class extension. + * + * @param file to check + * @return true if the file has a class extension, otherwise false + */ + public static boolean isClassFile(java.nio.file.Path file) { + return CLASS_FILE_EXTENSION.equals(org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(file.toString())); + } + /** * Checks whether the given file has a jar extension. * @@ -605,6 +620,19 @@ public static boolean isJarFile(java.nio.file.Path file) { return JAR_FILE_EXTENSION.equals(org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(file.toString())); } + /** + * Remove the extension of the file name. + * @param fileName to strip + * @return the file name without extension + */ + public static String stripFileExtension(String fileName) { + final String extension = org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(fileName); + if (!extension.isEmpty()) { + return fileName.substring(0, fileName.lastIndexOf(extension) - 1); + } + return fileName; + } + /** * Converts the given {@link java.nio.file.Path} into a file {@link URL}. The resulting url is * relative iff the given path is relative. diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index ad34d71380333..56eb9b110c67d 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -84,7 +84,6 @@ public static void main(String[] args) throws Exception { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } - // execute program env.execute("Streaming WordCount"); } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java index fa79a26e23fbb..281fb56ddc487 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java @@ -81,7 +81,10 @@ public void testExternalProgram() throws Exception { String testData = getClass().getResource(TEST_DATA_FILE).toString(); - PackagedProgram program = new PackagedProgram(new File(jarFile), new String[]{testData}); + PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(new File(jarFile)) + .setArguments(new String[]{testData}) + .build(); program.invokeInteractiveModeForExecution(); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java index e1c7c44e362f8..70935bb199da2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java @@ -18,11 +18,9 @@ package org.apache.flink.runtime.webmonitor.handlers; -import java.io.File; - /** * Query parameter specifying the name of the entry point class. - * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...) + * @see org.apache.flink.client.program.PackagedProgram.Builder#setEntryPointClassName(String) */ public class EntryClassQueryParameter extends StringQueryParameter { public EntryClassQueryParameter() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index 359b65543cdea..dc3b0beb02e9a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -129,7 +129,7 @@ protected CompletableFuture handleRequest(@Nonnull HandlerRequest tryFindUserLibDirectory() { + final File flinkHomeDirectory = deriveFlinkHomeDirectoryFromLibDirectory(); + final File usrLibDirectory = new File(flinkHomeDirectory, ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR); + + if (!usrLibDirectory.isDirectory()) { + return Optional.empty(); + } + return Optional.of(usrLibDirectory); + } + + @Nullable + private static File deriveFlinkHomeDirectoryFromLibDirectory() { + final String libDirectory = System.getenv().get(ConfigConstants.ENV_FLINK_LIB_DIR); + + if (libDirectory == null) { + return null; + } else { + return new File(libDirectory).getParentFile(); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 2f755193844d1..23280f0d54a6c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -140,7 +140,9 @@ public void tearDown() { @Test public void testCustomSplitJobWithCustomClassLoaderJar() throws ProgramInvocationException { - PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE)); + PackagedProgram inputSplitTestProg = PackagedProgram.newBuilder() + .setJarFile(new File(INPUT_SPLITS_PROG_JAR_FILE)) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -153,7 +155,9 @@ public void testCustomSplitJobWithCustomClassLoaderJar() throws ProgramInvocatio @Test public void testStreamingCustomSplitJobWithCustomClassLoader() throws ProgramInvocationException { - PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)); + PackagedProgram streamingInputSplitTestProg = PackagedProgram.newBuilder() + .setJarFile(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -167,7 +171,9 @@ public void testStreamingCustomSplitJobWithCustomClassLoader() throws ProgramInv @Test public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException { URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL(); - PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE)); + PackagedProgram inputSplitTestProg2 = PackagedProgram.newBuilder() + .setJarFile(new File(INPUT_SPLITS_PROG_JAR_FILE)) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -181,7 +187,7 @@ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, Pr @Test public void testStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException { // regular streaming job - PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE)); + PackagedProgram streamingProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_PROG_JAR_FILE)).build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -196,7 +202,9 @@ public void testStreamingClassloaderJobWithCustomClassLoader() throws ProgramInv public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException { // checkpointed streaming job with custom classes for the checkpoint (FLINK-2543) // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. - PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); + PackagedProgram streamingCheckpointedProg = PackagedProgram.newBuilder() + .setJarFile(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -230,13 +238,13 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw @Test public void testKMeansJobWithCustomClassLoader() throws ProgramInvocationException { - PackagedProgram kMeansProg = new PackagedProgram( - new File(KMEANS_JAR_PATH), - new String[] { + PackagedProgram kMeansProg = PackagedProgram.newBuilder() + .setJarFile(new File(KMEANS_JAR_PATH)) + .setArguments(new String[] { KMeansData.DATAPOINTS, KMeansData.INITIAL_CENTERS, - "25" - }); + "25"}) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -249,7 +257,9 @@ public void testKMeansJobWithCustomClassLoader() throws ProgramInvocationExcepti @Test public void testUserCodeTypeJobWithCustomClassLoader() throws ProgramInvocationException { - PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH)); + PackagedProgram userCodeTypeProg = PackagedProgram.newBuilder() + .setJarFile(new File(USERCODETYPE_JAR_PATH)) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -265,12 +275,10 @@ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOEx File checkpointDir = FOLDER.newFolder(); File outputDir = FOLDER.newFolder(); - final PackagedProgram program = new PackagedProgram( - new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH), - new String[] { - checkpointDir.toURI().toString(), - outputDir.toURI().toString() - }); + final PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)) + .setArguments(new String[] { checkpointDir.toURI().toString(), outputDir.toURI().toString()}) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -298,14 +306,14 @@ public void testDisposeSavepointWithCustomKvState() throws Exception { File checkpointDir = FOLDER.newFolder(); File outputDir = FOLDER.newFolder(); - final PackagedProgram program = new PackagedProgram( - new File(CUSTOM_KV_STATE_JAR_PATH), - new String[] { - String.valueOf(parallelism), - checkpointDir.toURI().toString(), - "5000", - outputDir.toURI().toString() - }); + final PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(new File(CUSTOM_KV_STATE_JAR_PATH)) + .setArguments(new String[] { + String.valueOf(parallelism), + checkpointDir.toURI().toString(), + "5000", + outputDir.toURI().toString()}) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index dfab48ef3c96e..76370a22ba0b1 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -108,7 +108,7 @@ public void testFlinkContainerMemory() throws Exception { final File streamingWordCountFile = getTestJarPath("WindowJoin.jar"); - final PackagedProgram packagedProgram = new PackagedProgram(streamingWordCountFile); + final PackagedProgram packagedProgram = PackagedProgram.newBuilder().setJarFile(streamingWordCountFile).build(); final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 1); try {