Skip to content

Commit

Permalink
Add try-catch where missing in DatasetManager for malformed datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
dmgaldi committed Jan 22, 2024
1 parent 9c48076 commit 8b6b68a
Showing 1 changed file with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DatasetManager(private val s3Bucket: S3Bucket) {
}
var s3Object = objectStream.next()
// Get our initial dataset ID, this will be used to detect when we've hit a new dataset.
val (initialUserID, initialDatasetID) = datasetIdFromS3Object(s3Object)
var (initialUserID, initialDatasetID) = datasetIdFromS3Object(s3Object)
stagedObjects = mutableListOf(s3Object)
while (objectStream.hasNext()) {
s3Object = objectStream.next()
Expand All @@ -121,7 +121,9 @@ class DatasetManager(private val s3Bucket: S3Bucket) {
)
} catch (e: MalformedDatasetException) {
Metrics.malformedDatasetFound.inc()
initialDatasetID = currDatasetID
log.warn("Found a malformed dataset with ID $initialDatasetID.")
continue
}
// Set staged objects to contain the object belonging to new dataset.
stagedObjects = mutableListOf(s3Object)
Expand All @@ -140,6 +142,7 @@ class DatasetManager(private val s3Bucket: S3Bucket) {
} catch (e: MalformedDatasetException) {
Metrics.malformedDatasetFound.inc()
log.warn("Found a malformed dataset with ID $initialDatasetID.")
return null
}
// Stream is exhausted. Indicate as much.
stagedObjects = emptyList()
Expand Down Expand Up @@ -171,14 +174,20 @@ class DatasetManager(private val s3Bucket: S3Bucket) {
val (_, nextObjectDatasetID) = datasetIdFromS3Object(s3Object)
// Check if the next object in the stream is in the same dataset as staged objects.
if (currDatasetID == nextObjectDatasetID) {
// If so, add to staged objects
// If so, add to staged objects.
stagedObjects = stagedObjects + s3Object
} else {
// Otherwise, create the dataset directory and reset staged objects.
val pathFactory = S3DatasetPathFactory(currUserID, currDatasetID)
currentDataset = EagerlyLoadedDatasetDirectory(stagedObjects, currUserID, currDatasetID, pathFactory)
stagedObjects = mutableListOf(s3Object)
return currentDataset
try {
// Otherwise, create the dataset directory and reset staged objects.
val pathFactory = S3DatasetPathFactory(currUserID, currDatasetID)
currentDataset = EagerlyLoadedDatasetDirectory(stagedObjects, currUserID, currDatasetID, pathFactory)
stagedObjects = mutableListOf(s3Object)
return currentDataset
} catch (e: MalformedDatasetException) {
Metrics.malformedDatasetFound.inc()
log.warn("Found a malformed dataset with ID $currDatasetID.")
continue
}
}
}
}
Expand Down

0 comments on commit 8b6b68a

Please sign in to comment.