From 8b6b68ad8a8e04f34e8373aa4ad7edbde5800abf Mon Sep 17 00:00:00 2001 From: Dan Galdi Date: Mon, 22 Jan 2024 16:03:39 -0500 Subject: [PATCH] Add try-catch where missing in DatasetManager for malformed datasets --- .../vdi/lib/s3/datasets/DatasetManager.kt | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/components/s3/src/main/kotlin/org/veupathdb/vdi/lib/s3/datasets/DatasetManager.kt b/components/s3/src/main/kotlin/org/veupathdb/vdi/lib/s3/datasets/DatasetManager.kt index 8e7a7108..9846db0b 100644 --- a/components/s3/src/main/kotlin/org/veupathdb/vdi/lib/s3/datasets/DatasetManager.kt +++ b/components/s3/src/main/kotlin/org/veupathdb/vdi/lib/s3/datasets/DatasetManager.kt @@ -104,7 +104,7 @@ class DatasetManager(private val s3Bucket: S3Bucket) { } var s3Object = objectStream.next() // Get our initial dataset ID, this will be used to detect when we've hit a new dataset. - val (initialUserID, initialDatasetID) = datasetIdFromS3Object(s3Object) + var (initialUserID, initialDatasetID) = datasetIdFromS3Object(s3Object) stagedObjects = mutableListOf(s3Object) while (objectStream.hasNext()) { s3Object = objectStream.next() @@ -121,7 +121,9 @@ class DatasetManager(private val s3Bucket: S3Bucket) { ) } catch (e: MalformedDatasetException) { Metrics.malformedDatasetFound.inc() + initialDatasetID = currDatasetID log.warn("Found a malformed dataset with ID $initialDatasetID.") + continue } // Set staged objects to contain the object belonging to new dataset. stagedObjects = mutableListOf(s3Object) @@ -140,6 +142,7 @@ class DatasetManager(private val s3Bucket: S3Bucket) { } catch (e: MalformedDatasetException) { Metrics.malformedDatasetFound.inc() log.warn("Found a malformed dataset with ID $initialDatasetID.") + return null } // Stream is exhausted. Indicate as much. stagedObjects = emptyList() @@ -171,14 +174,20 @@ class DatasetManager(private val s3Bucket: S3Bucket) { val (_, nextObjectDatasetID) = datasetIdFromS3Object(s3Object) // Check if the next object in the stream is in the same dataset as staged objects. if (currDatasetID == nextObjectDatasetID) { - // If so, add to staged objects + // If so, add to staged objects. stagedObjects = stagedObjects + s3Object } else { - // Otherwise, create the dataset directory and reset staged objects. - val pathFactory = S3DatasetPathFactory(currUserID, currDatasetID) - currentDataset = EagerlyLoadedDatasetDirectory(stagedObjects, currUserID, currDatasetID, pathFactory) - stagedObjects = mutableListOf(s3Object) - return currentDataset + try { + // Otherwise, create the dataset directory and reset staged objects. + val pathFactory = S3DatasetPathFactory(currUserID, currDatasetID) + currentDataset = EagerlyLoadedDatasetDirectory(stagedObjects, currUserID, currDatasetID, pathFactory) + stagedObjects = mutableListOf(s3Object) + return currentDataset + } catch (e: MalformedDatasetException) { + Metrics.malformedDatasetFound.inc() + log.warn("Found a malformed dataset with ID $currDatasetID.") + continue + } } } }