diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 7a459d7ddbe2..91e4bdab2e90 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.io.IOException; import java.net.URI; import java.sql.Timestamp; @@ -1592,67 +1593,71 @@ boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() { /** * Delete tmp segments for realtime table with low level consumer, split commit and async deletion is enabled. - * @param tableNameWithType - * @param segmentsZKMetadata * @return number of deleted orphan temporary segments - * */ - public long deleteTmpSegments(String tableNameWithType, List segmentsZKMetadata) { + public int deleteTmpSegments(String realtimeTableName, List segmentsZKMetadata) + throws IOException { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); - if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { - return 0L; - } - - TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); - if (tableConfig == null) { - LOGGER.warn("Failed to find table config for table: {}, skipping deletion of tmp segments", tableNameWithType); - return 0L; - } - - if (!isTmpSegmentAsyncDeletionEnabled()) { - return 0L; + // NOTE: Do not delete the file if it is used as download URL. This could happen when user uses temporary file to + // backfill segment. + Set downloadUrls = Sets.newHashSetWithExpectedSize(segmentsZKMetadata.size()); + for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { + if (segmentZKMetadata.getStatus() == Status.DONE) { + downloadUrls.add(segmentZKMetadata.getDownloadUrl()); + } } - Set deepURIs = segmentsZKMetadata.stream().filter(meta -> meta.getStatus() == Status.DONE - && !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map( - SegmentZKMetadata::getDownloadUrl).collect( - Collectors.toSet()); - - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); - long deletedTmpSegments = 0; - try { - for (String filePath : pinotFS.listFiles(tableDirURI, false)) { - // prepend scheme + int numDeletedTmpSegments = 0; + for (String filePath : pinotFS.listFiles(tableDirURI, false)) { + if (isTmpAndCanDelete(filePath, downloadUrls, pinotFS)) { URI uri = URIUtils.getUri(filePath); - if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) { - LOGGER.info("Deleting temporary segment file: {}", uri); + String canonicalPath = uri.toString(); + LOGGER.info("Deleting temporary segment file: {}", canonicalPath); + try { if (pinotFS.delete(uri, true)) { - LOGGER.info("Succeed to delete file: {}", uri); - deletedTmpSegments++; + LOGGER.info("Deleted temporary segment file: {}", canonicalPath); + numDeletedTmpSegments++; } else { - LOGGER.warn("Failed to delete file: {}", uri); + LOGGER.warn("Failed to delete temporary segment file: {}", canonicalPath); } + } catch (Exception e) { + LOGGER.error("Caught exception while deleting temporary segment file: {}", canonicalPath, e); } } - } catch (Exception e) { - LOGGER.warn("Caught exception while deleting temporary files for table: {}", rawTableName, e); } - return deletedTmpSegments; + return numDeletedTmpSegments; } - private boolean isTmpAndCanDelete(URI uri, Set deepURIs, PinotFS pinotFS) - throws Exception { - long lastModified = pinotFS.lastModified(uri); + private boolean isTmpAndCanDelete(String filePath, Set downloadUrls, PinotFS pinotFS) { + if (!SegmentCompletionUtils.isTmpFile(filePath)) { + return false; + } + // Prepend scheme + URI uri = URIUtils.getUri(filePath); + String canonicalPath = uri.toString(); + // NOTE: Do not delete the file if it is used as download URL. This could happen when user uses temporary file to + // backfill segment. + if (downloadUrls.contains(canonicalPath)) { + return false; + } + long lastModified; + try { + lastModified = pinotFS.lastModified(uri); + } catch (Exception e) { + LOGGER.error("Caught exception while getting last modified time for file: {}, ineligible for delete", + canonicalPath, e); + return false; + } if (lastModified <= 0) { - LOGGER.warn("file {} modification time {} is not positive, ineligible for delete", uri.toString(), lastModified); + LOGGER.warn("Last modified time for file: {} is not positive: {}, ineligible for delete", canonicalPath, + lastModified); return false; } - String uriString = uri.toString(); - return SegmentCompletionUtils.isTmpFile(uriString) && !deepURIs.contains(uriString) - && getCurrentTimeMs() - lastModified > _controllerConf.getTmpSegmentRetentionInSeconds() * 1000L; + return getCurrentTimeMs() - lastModified > _controllerConf.getTmpSegmentRetentionInSeconds() * 1000L; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index b8460a406a11..88f1bc6ee692 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -153,13 +153,17 @@ private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig str List segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); // Delete tmp segments - try { - long numDeleteTmpSegments = _llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, segmentsZKMetadata); - LOGGER.info("Deleted {} tmp segments for table: {}", numDeleteTmpSegments, realtimeTableName); - _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.DELETED_TMP_SEGMENT_COUNT, - numDeleteTmpSegments); - } catch (Exception e) { - LOGGER.error("Failed to delete tmp segments for table: {}", realtimeTableName, e); + if (_llcRealtimeSegmentManager.isTmpSegmentAsyncDeletionEnabled()) { + try { + long startTimeMs = System.currentTimeMillis(); + int numDeletedTmpSegments = _llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, segmentsZKMetadata); + LOGGER.info("Deleted {} tmp segments for table: {} in {}ms", numDeletedTmpSegments, realtimeTableName, + System.currentTimeMillis() - startTimeMs); + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.DELETED_TMP_SEGMENT_COUNT, + numDeletedTmpSegments); + } catch (Exception e) { + LOGGER.error("Failed to delete tmp segments for table: {}", realtimeTableName, e); + } } // Update the total document count gauge diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 435303f5e90d..bed25bb16c12 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1046,7 +1046,8 @@ public void testUploadToSegmentStore() } @Test - public void testDeleteTmpSegmentFiles() throws Exception { + public void testDeleteTmpSegmentFiles() + throws Exception { // turn on knobs for async deletion of tmp files ControllerConf config = new ControllerConf(); config.setDataDir(TEMP_DIR.toString()); @@ -1069,19 +1070,19 @@ public void testDeleteTmpSegmentFiles() throws Exception { PinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager( helixResourceManager, config); - long deletedTmpSegCount; + int numDeletedTmpSegments; // case 1: the segmentMetadata download uri is identical to the uri of the tmp segment. Should not delete when(segZKMeta.getStatus()).thenReturn(Status.DONE); when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" + segmentFileName); - deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta)); + numDeletedTmpSegments = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta)); assertTrue(segmentFile.exists()); - assertEquals(0L, deletedTmpSegCount); + assertEquals(numDeletedTmpSegments, 0); // case 2: download url is empty, indicating the tmp segment is absolutely orphan. Delete the file when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD); - deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta)); + numDeletedTmpSegments = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta)); assertFalse(segmentFile.exists()); - assertEquals(1L, deletedTmpSegCount); + assertEquals(numDeletedTmpSegments, 1); } ////////////////////////////////////////////////////////////////////////////////// diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java index c7198eded79f..b47bf147fca2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java @@ -20,15 +20,12 @@ import java.util.UUID; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SegmentCompletionUtils { private SegmentCompletionUtils() { } - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionUtils.class); // Used to create temporary segment file names private static final String TMP = ".tmp.";