From 5b7fc70a91cb61b303d69deac4eb02721befd62e Mon Sep 17 00:00:00 2001 From: mchrza Date: Wed, 25 Sep 2024 09:43:35 +0200 Subject: [PATCH] WIP Signed-off-by: Maximilian Chrzan Signed-off-by: mchrza --- .../jobs/steps/compiler/ExportToFiles.java | 24 +---- .../impl/transport/ExportSpaceToFiles.java | 91 ++++++++----------- .../impl/transport/ImportFilesToSpace.java | 4 +- .../xyz/jobs/steps/impl/ExportStepTest.java | 1 - 4 files changed, 40 insertions(+), 80 deletions(-) diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ExportToFiles.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ExportToFiles.java index bff8de1e27..bcc4095d73 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ExportToFiles.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ExportToFiles.java @@ -22,20 +22,13 @@ import com.here.xyz.jobs.Job; import com.here.xyz.jobs.datasets.DatasetDescription; import com.here.xyz.jobs.datasets.Files; -import com.here.xyz.jobs.datasets.files.Csv; -import com.here.xyz.jobs.datasets.files.FileFormat; -import com.here.xyz.jobs.datasets.files.GeoJson; + import com.here.xyz.jobs.steps.CompilationStepGraph; -import com.here.xyz.jobs.steps.JobCompiler.CompilationError; import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles; -import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.EntityPerLine; -import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.Format; import java.util.HashSet; import java.util.Set; -import static com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.Format.GEOJSON; - public class ExportToFiles implements JobCompilationInterceptor { public static Set> allowedSourceTypes = new HashSet<>(Set.of(DatasetDescription.Space.class)); @@ -49,18 +42,8 @@ public CompilationStepGraph compile(Job job) { DatasetDescription.Space source = (DatasetDescription.Space) job.getSource(); String spaceId = source.getId(); - final FileFormat targetFormat = ((Files) job.getTarget()).getOutputSettings().getFormat(); - - Format outputStepFormat; - if (targetFormat instanceof GeoJson) - outputStepFormat = GEOJSON; - else - throw new CompilationError("Unsupported export file format: " + targetFormat.getClass().getSimpleName()); - ExportSpaceToFiles exportToFilesStep = new ExportSpaceToFiles() //Perform import .withSpaceId(spaceId) - .withFormat(outputStepFormat) - .withEntityPerLine(getEntityPerLine(targetFormat)) .withJobId(job.getId()); return compileImportSteps(exportToFilesStep); @@ -70,9 +53,4 @@ public static CompilationStepGraph compileImportSteps(ExportSpaceToFiles exportT return (CompilationStepGraph) new CompilationStepGraph() .addExecution(exportToFilesStep); } - - private ExportSpaceToFiles.EntityPerLine getEntityPerLine(FileFormat format) { - return EntityPerLine.valueOf((format instanceof GeoJson geoJson - ? geoJson.getEntityPerLine() : ((Csv) format).getEntityPerLine()).toString()); - } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java index 11747464ac..452b3ed022 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java @@ -30,6 +30,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -46,7 +47,7 @@ public class ExportSpaceToFiles extends SpaceBasedStep { private static final Logger logger = LogManager.getLogger(); //Defines how many features a source layer need to have to start parallelization. - private final int PARALLELIZTATION_MIN_THRESHOLD = 10; + private final int PARALLELIZTATION_MIN_THRESHOLD = 10; //TODO: put back to 500k //Defines how many export threads are getting used private final int PARALLELIZTATION_THREAD_COUNT = 8; @@ -56,28 +57,30 @@ public class ExportSpaceToFiles extends SpaceBasedStep { @JsonView({Internal.class, Static.class}) private double overallNeededAcus = -1; - @JsonView({Internal.class, Static.class}) - private EntityPerLine entityPerLine = EntityPerLine.Feature; - private Format format = Format.GEOJSON; - private Phase phase; - -// //Geometry-Filters -// private Geometry geometry; -// private int radius = -1; -// private boolean clipOnFilterGeometry; -// -// //Content-Filters -// private String propertyFilter; -// private SpaceContext context; -// private String targetVersion; -// -// //Partitioning -// private String partitionKey; -// //Required if partitionKey=tileId -// private Integer targetLevel; -// private boolean clipOnPartitions; + + /** + * TODO: + * Geometry-Filters + * private Geometry geometry; + * private int radius = -1; + * private boolean clipOnFilterGeometry; + * + * Content-Filters + * private String propertyFilter; + * private SpaceContext context; + * private String targetVersion; + * + * Version Filter: + * private VersionRef versionRef; + * + * Partitioning - part of EMR? + * private String partitionKey; + * --Required if partitionKey=tileId + * private Integer targetLevel; + * private boolean clipOnPartitions; + */ public enum Format { CSV_JSON_WKB, @@ -89,32 +92,6 @@ public enum Phase { VALIDATE } - public void setFormat(Format format) { - this.format = format; - } - - public ExportSpaceToFiles withFormat(Format format) { - setFormat(format); - return this; - } - - public EntityPerLine getEntityPerLine() { - return entityPerLine; - } - - public void setEntityPerLine(EntityPerLine entityPerLine) { - this.entityPerLine = entityPerLine; - } - - public ExportSpaceToFiles withEntityPerLine(EntityPerLine entityPerLine) { - setEntityPerLine(entityPerLine); - return this; - } - - public Phase getPhase() { - return phase; - } - @JsonView({Internal.class, Static.class}) private StatisticsResponse statistics = null; @@ -172,11 +149,13 @@ public boolean validate() throws ValidationException { @Override public void execute() throws Exception { + statistics = loadSpaceStatistics(getSpaceId(), EXTENSION); calculatedThreadCount = (statistics.getCount().getValue() > PARALLELIZTATION_MIN_THRESHOLD) ? PARALLELIZTATION_THREAD_COUNT : 1; + List s3FileNames = generateS3PathList(calculatedThreadCount); runWriteQuerySync(buildTemporaryTableForExportQuery(getSchema(db())), db(), 0); - + for (int i = 0; i < calculatedThreadCount; i++) { logger.info("Start export thread number: {}", i); runReadQueryAsync(buildExportQuery(i), db(), 0); @@ -202,6 +181,16 @@ protected boolean onAsyncFailure() { //TODO return super.onAsyncFailure(); } + + private List generateS3PathList(int cnt){ + List list = new ArrayList<>(); + + for (int i = 0; i < cnt; i++) { + list.add(outputS3Prefix(true,false) + "/" +UUID.randomUUID()); + } + + return list; + } private SQLQuery buildTemporaryTableForExportQuery(String schema) { return new SQLQuery(""" @@ -245,10 +234,4 @@ PERFORM export_to_s3_perform( .withNamedParameter("format", format.name()) .withNamedParameter("filesize", 0); } - - //TODO: De-duplicate once CSV was removed (see GeoJson format class) - public enum EntityPerLine { - Feature, - FeatureCollection - } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java index df6e9a9681..c73495ad16 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java @@ -223,9 +223,9 @@ public int getEstimatedExecutionSeconds() { @Override public ExecutionMode getExecutionMode() { //CSV is not supported in SYNC mode - //if (format == CSV_JSON_WKB || format == CSV_GEOJSON) + if (format == CSV_JSON_WKB || format == CSV_GEOJSON) return ASYNC; -// return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC; + return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC; } @Override diff --git a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ExportStepTest.java b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ExportStepTest.java index 9b485a587c..0cf6caafcc 100644 --- a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ExportStepTest.java +++ b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ExportStepTest.java @@ -72,7 +72,6 @@ public void testExportSpaceToFilesStep() throws Exception { LambdaBasedStep step = new ExportSpaceToFiles() .withSpaceId(SPACE_ID) - .withFormat(ExportSpaceToFiles.Format.GEOJSON) .withJobId(JOB_ID); sendLambdaStepRequest(step, START_EXECUTION);