Skip to content

Commit

Permalink
WIP emr
Browse files Browse the repository at this point in the history
Signed-off-by: Maximilian Chrzan <[email protected]>
Signed-off-by: mchrza <[email protected]>
  • Loading branch information
mchrza committed Oct 11, 2024
1 parent 3129b74 commit aa7cf22
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,19 @@ private List<NamedState> compileExecutions(List<StepExecution> executions, State
private NamedState<TaskState.Builder> compile(Step<?> step, State.Builder previousState) {
NamedState<TaskState.Builder> state = new NamedState<>(step.getClass().getSimpleName() + "." + step.getId(),
TaskState.builder());
if (step instanceof RunEmrJob emrStep && Config.instance.LOCALSTACK_ENDPOINT == null)
compile(emrStep, state);

if (step instanceof RunEmrJob emrStep) {
if( Config.instance.LOCALSTACK_ENDPOINT == null)
compile(emrStep, state);
else {
//Inject defaults for local execution
emrStep.setSparkParams( "--add-exports=java.base/java.nio=ALL-UNNAMED "
+ "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED "
+ "--add-exports=java.base/java.lang.invoke=ALL-UNNAMED "
+ "--add-exports=java.base/java.util=ALL-UNNAMED " + emrStep.getSparkParams());
compile((LambdaBasedStep<?>) emrStep, state);
}
}
else if (step instanceof LambdaBasedStep lambdaStep)
compile(lambdaStep, state);
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.here.xyz.jobs.steps.inputs.Input;
import com.here.xyz.jobs.steps.outputs.DownloadUrl;
Expand Down Expand Up @@ -86,8 +87,15 @@ public void execute() throws Exception {

scriptParams.set(0, localTmpInputsFolder);
scriptParams.set(1, localTmpOutputsFolder);
scriptParams.add("--local");

sparkParams = sparkParams.replace("$localJarPath$", localJarPath);
sparkParams = "java -Xshare:off --add-exports=java.base/java.nio=ALL-UNNAMED "
+ "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED "
+ "--add-exports=java.base/java.lang.invoke=ALL-UNNAMED "
+ "--add-exports=java.base/java.util=ALL-UNNAMED "
+ sparkParams;

List<String> emrParams = new ArrayList<>(List.of(sparkParams.split(" ")));
emrParams.addAll(scriptParams);

Expand Down Expand Up @@ -244,11 +252,11 @@ private String copyFileFromS3ToLocal(String s3Path) {
createLocalFolder(Paths.get(s3Path).getParent().toString(), false);
Files.copy(jarStream, Paths.get(getLocalTmpPath(s3Path)));
jarStream.close();
}
catch (FileAlreadyExistsException e) {
} catch (FileAlreadyExistsException e) {
logger.info("File: '{}' already exists locally - skip download.", s3Path);
}
catch (IOException e) {
}catch (AmazonS3Exception e){
throw new RuntimeException("Can't download File: '" + s3Path + "' for local copy!", e);
} catch (IOException e) {
throw new RuntimeException("Can't copy File: '" + s3Path + "'!", e);
}
return getLocalTmpPath(s3Path);
Expand Down

0 comments on commit aa7cf22

Please sign in to comment.