Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Maximilian Chrzan <[email protected]>
Signed-off-by: mchrza <[email protected]>
  • Loading branch information
mchrza committed Sep 25, 2024
1 parent 9508dea commit 5b7fc70
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,13 @@
import com.here.xyz.jobs.Job;
import com.here.xyz.jobs.datasets.DatasetDescription;
import com.here.xyz.jobs.datasets.Files;
import com.here.xyz.jobs.datasets.files.Csv;
import com.here.xyz.jobs.datasets.files.FileFormat;
import com.here.xyz.jobs.datasets.files.GeoJson;

import com.here.xyz.jobs.steps.CompilationStepGraph;
import com.here.xyz.jobs.steps.JobCompiler.CompilationError;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.EntityPerLine;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.Format;

import java.util.HashSet;
import java.util.Set;

import static com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.Format.GEOJSON;

public class ExportToFiles implements JobCompilationInterceptor {
public static Set<Class<? extends DatasetDescription.Space>> allowedSourceTypes = new HashSet<>(Set.of(DatasetDescription.Space.class));

Expand All @@ -49,18 +42,8 @@ public CompilationStepGraph compile(Job job) {
DatasetDescription.Space source = (DatasetDescription.Space) job.getSource();
String spaceId = source.getId();

final FileFormat targetFormat = ((Files) job.getTarget()).getOutputSettings().getFormat();

Format outputStepFormat;
if (targetFormat instanceof GeoJson)
outputStepFormat = GEOJSON;
else
throw new CompilationError("Unsupported export file format: " + targetFormat.getClass().getSimpleName());

ExportSpaceToFiles exportToFilesStep = new ExportSpaceToFiles() //Perform import
.withSpaceId(spaceId)
.withFormat(outputStepFormat)
.withEntityPerLine(getEntityPerLine(targetFormat))
.withJobId(job.getId());

return compileImportSteps(exportToFilesStep);
Expand All @@ -70,9 +53,4 @@ public static CompilationStepGraph compileImportSteps(ExportSpaceToFiles exportT
return (CompilationStepGraph) new CompilationStepGraph()
.addExecution(exportToFilesStep);
}

private ExportSpaceToFiles.EntityPerLine getEntityPerLine(FileFormat format) {
return EntityPerLine.valueOf((format instanceof GeoJson geoJson
? geoJson.getEntityPerLine() : ((Csv) format).getEntityPerLine()).toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

Expand All @@ -46,7 +47,7 @@ public class ExportSpaceToFiles extends SpaceBasedStep<ExportSpaceToFiles> {
private static final Logger logger = LogManager.getLogger();

//Defines how many features a source layer need to have to start parallelization.
private final int PARALLELIZTATION_MIN_THRESHOLD = 10;
private final int PARALLELIZTATION_MIN_THRESHOLD = 10; //TODO: put back to 500k
//Defines how many export threads are getting used
private final int PARALLELIZTATION_THREAD_COUNT = 8;

Expand All @@ -56,28 +57,30 @@ public class ExportSpaceToFiles extends SpaceBasedStep<ExportSpaceToFiles> {
@JsonView({Internal.class, Static.class})
private double overallNeededAcus = -1;

@JsonView({Internal.class, Static.class})
private EntityPerLine entityPerLine = EntityPerLine.Feature;

private Format format = Format.GEOJSON;

private Phase phase;

// //Geometry-Filters
// private Geometry geometry;
// private int radius = -1;
// private boolean clipOnFilterGeometry;
//
// //Content-Filters
// private String propertyFilter;
// private SpaceContext context;
// private String targetVersion;
//
// //Partitioning
// private String partitionKey;
// //Required if partitionKey=tileId
// private Integer targetLevel;
// private boolean clipOnPartitions;

/**
* TODO:
* Geometry-Filters
* private Geometry geometry;
* private int radius = -1;
* private boolean clipOnFilterGeometry;
*
* Content-Filters
* private String propertyFilter;
* private SpaceContext context;
* private String targetVersion;
*
* Version Filter:
* private VersionRef versionRef;
*
* Partitioning - part of EMR?
* private String partitionKey;
* --Required if partitionKey=tileId
* private Integer targetLevel;
* private boolean clipOnPartitions;
*/

public enum Format {
CSV_JSON_WKB,
Expand All @@ -89,32 +92,6 @@ public enum Phase {
VALIDATE
}

public void setFormat(Format format) {
this.format = format;
}

public ExportSpaceToFiles withFormat(Format format) {
setFormat(format);
return this;
}

public EntityPerLine getEntityPerLine() {
return entityPerLine;
}

public void setEntityPerLine(EntityPerLine entityPerLine) {
this.entityPerLine = entityPerLine;
}

public ExportSpaceToFiles withEntityPerLine(EntityPerLine entityPerLine) {
setEntityPerLine(entityPerLine);
return this;
}

public Phase getPhase() {
return phase;
}

@JsonView({Internal.class, Static.class})
private StatisticsResponse statistics = null;

Expand Down Expand Up @@ -172,11 +149,13 @@ public boolean validate() throws ValidationException {

@Override
public void execute() throws Exception {

statistics = loadSpaceStatistics(getSpaceId(), EXTENSION);
calculatedThreadCount = (statistics.getCount().getValue() > PARALLELIZTATION_MIN_THRESHOLD) ? PARALLELIZTATION_THREAD_COUNT : 1;

List<String> s3FileNames = generateS3PathList(calculatedThreadCount);
runWriteQuerySync(buildTemporaryTableForExportQuery(getSchema(db())), db(), 0);

for (int i = 0; i < calculatedThreadCount; i++) {
logger.info("Start export thread number: {}", i);
runReadQueryAsync(buildExportQuery(i), db(), 0);
Expand All @@ -202,6 +181,16 @@ protected boolean onAsyncFailure() {
//TODO
return super.onAsyncFailure();
}

private List<String> generateS3PathList(int cnt){
List<String> list = new ArrayList<>();

for (int i = 0; i < cnt; i++) {
list.add(outputS3Prefix(true,false) + "/" +UUID.randomUUID());
}

return list;
}

private SQLQuery buildTemporaryTableForExportQuery(String schema) {
return new SQLQuery("""
Expand Down Expand Up @@ -245,10 +234,4 @@ PERFORM export_to_s3_perform(
.withNamedParameter("format", format.name())
.withNamedParameter("filesize", 0);
}

//TODO: De-duplicate once CSV was removed (see GeoJson format class)
public enum EntityPerLine {
Feature,
FeatureCollection
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ public int getEstimatedExecutionSeconds() {
@Override
public ExecutionMode getExecutionMode() {
//CSV is not supported in SYNC mode
//if (format == CSV_JSON_WKB || format == CSV_GEOJSON)
if (format == CSV_JSON_WKB || format == CSV_GEOJSON)
return ASYNC;
// return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC;
return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public void testExportSpaceToFilesStep() throws Exception {

LambdaBasedStep step = new ExportSpaceToFiles()
.withSpaceId(SPACE_ID)
.withFormat(ExportSpaceToFiles.Format.GEOJSON)
.withJobId(JOB_ID);

sendLambdaStepRequest(step, START_EXECUTION);
Expand Down

0 comments on commit 5b7fc70

Please sign in to comment.