95
95
import javax .annotation .Nullable ;
96
96
97
97
import java .io .File ;
98
+ import java .io .FileOutputStream ;
98
99
import java .io .IOException ;
99
- import java .io .InputStream ;
100
100
import java .net .InetSocketAddress ;
101
101
import java .net .Socket ;
102
102
import java .net .SocketAddress ;
114
114
import java .util .concurrent .Executors ;
115
115
import java .util .concurrent .TimeUnit ;
116
116
import java .util .concurrent .TimeoutException ;
117
+ import java .util .jar .JarOutputStream ;
118
+ import java .util .jar .Manifest ;
117
119
import java .util .stream .Collectors ;
118
120
119
121
import static org .apache .flink .kubernetes .operator .config .FlinkConfigBuilder .FLINK_VERSION ;
126
128
public abstract class AbstractFlinkService implements FlinkService {
127
129
128
130
private static final Logger LOG = LoggerFactory .getLogger (AbstractFlinkService .class );
129
- private static final String NOOP_JAR_FILENAME = "noop .jar" ;
131
+ private static final String EMPTY_JAR_FILENAME = "empty .jar" ;
130
132
131
133
protected final KubernetesClient kubernetesClient ;
132
134
protected final FlinkConfigManager configManager ;
133
135
private final ExecutorService executorService ;
134
136
protected final ArtifactManager artifactManager ;
135
- private final String noopJarPath ;
137
+ private final String emptyJar ;
136
138
137
139
public AbstractFlinkService (
138
140
KubernetesClient kubernetesClient , FlinkConfigManager configManager ) {
@@ -142,7 +144,7 @@ public AbstractFlinkService(
142
144
this .executorService =
143
145
Executors .newFixedThreadPool (
144
146
4 , new ExecutorThreadFactory ("Flink-RestClusterClient-IO" ));
145
- this .noopJarPath = copyNoopJar ();
147
+ this .emptyJar = createEmptyJar ();
146
148
}
147
149
148
150
protected abstract PodList getJmPodList (String namespace , String clusterId );
@@ -717,7 +719,7 @@ private String findJarURI(JobSpec jobSpec) {
717
719
if (jobSpec .getJarURI () != null ) {
718
720
return jobSpec .getJarURI ();
719
721
} else {
720
- return noopJarPath ;
722
+ return emptyJar ;
721
723
}
722
724
}
723
725
@@ -836,22 +838,19 @@ private void validateHaMetadataExists(Configuration conf) {
836
838
}
837
839
}
838
840
839
- private String copyNoopJar () {
841
+ private String createEmptyJar () {
840
842
try {
841
- InputStream noopJarSource =
842
- AbstractFlinkService .class
843
- .getClassLoader ()
844
- .getResourceAsStream (NOOP_JAR_FILENAME );
843
+ String emptyJarPath =
844
+ Files .createTempDirectory ("flink" ).toString () + "/" + EMPTY_JAR_FILENAME ;
845
845
846
- String noopJarDestination =
847
- Files .createTempDirectory ("flink" ).toString () + "/" + NOOP_JAR_FILENAME ;
846
+ LOG .debug ("Creating empty jar to {}" , emptyJarPath );
847
+ JarOutputStream target =
848
+ new JarOutputStream (new FileOutputStream (emptyJarPath ), new Manifest ());
849
+ target .close ();
848
850
849
- LOG .debug ("Copying noop jar to {}" , noopJarDestination );
850
- org .apache .commons .io .FileUtils .copyToFile (noopJarSource , new File (noopJarDestination ));
851
-
852
- return noopJarDestination ;
851
+ return emptyJarPath ;
853
852
} catch (Exception e ) {
854
- throw new RuntimeException ("Failed to copy noop jar" , e );
853
+ throw new RuntimeException ("Failed to create empty jar" , e );
855
854
}
856
855
}
857
856
}
0 commit comments