diff --git a/ambry-api/src/main/java/com/github/ambry/config/FrontendConfig.java b/ambry-api/src/main/java/com/github/ambry/config/FrontendConfig.java index 0684b038cd..f8acee764e 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/FrontendConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/FrontendConfig.java @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.github.ambry.rest.RestUtils.*; + /** * Configuration parameters required by the Ambry frontend. @@ -44,7 +46,9 @@ public class FrontendConfig { public static final String CONTAINER_METRICS_AGGREGATED_ACCOUNTS = PREFIX + "container.metrics.aggregated.accounts"; public static final String ACCOUNT_STATS_STORE_FACTORY = PREFIX + "account.stats.store.factory"; public static final String CONTAINER_METRICS_ENABLED_REQUEST_TYPES = PREFIX + "container.metrics.enabled.request.types"; - public static final String CONTAINER_METRICS_ENABLED_GET_REQUEST_TYPES = PREFIX + "container.metrics.enabled.get.request.types"; + public static final String CONTAINER_METRICS_ENABLED_GET_REQUEST_TYPES = + PREFIX + "container.metrics.enabled.get.request.types"; + public static final String LIST_MAX_RESULTS = PREFIX + "list.max.results"; // Default values private static final String DEFAULT_ENDPOINT = "http://localhost:1174"; @@ -294,6 +298,14 @@ public class FrontendConfig { */ public final boolean oneHundredContinueEnable; + /** + * The maximum number of entries to return per response page when listing blobs. + * TODO: Remove the config in {@link MySqlNamedBlobDbConfig} later. + */ + @Config(LIST_MAX_RESULTS) + @Default("1000") + public final int listMaxResults; + public FrontendConfig(VerifiableProperties verifiableProperties) { NettyConfig nettyConfig = new NettyConfig(verifiableProperties); cacheValiditySeconds = verifiableProperties.getLong("frontend.cache.validity.seconds", 365 * 24 * 60 * 60); @@ -357,6 +369,8 @@ public FrontendConfig(VerifiableProperties verifiableProperties) { Utils.splitString(verifiableProperties.getString(CONTAINER_METRICS_EXCLUDED_ACCOUNTS, ""), ","); containerMetricsAggregatedAccounts = Utils.splitString(verifiableProperties.getString(CONTAINER_METRICS_AGGREGATED_ACCOUNTS, ""), ","); + this.listMaxResults = + verifiableProperties.getIntInRange(LIST_MAX_RESULTS, DEFAULT_MAX_KEY_VALUE, 1, Integer.MAX_VALUE); } /** diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobListEntry.java b/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobListEntry.java index 523ed786ab..9b9c468518 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobListEntry.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobListEntry.java @@ -29,11 +29,13 @@ public class NamedBlobListEntry { private static final String EXPIRATION_TIME_MS_KEY = "expirationTimeMs"; private static final String BLOB_SIZE_KEY = "blobSize"; private static final String MODIFIED_TIME_MS_KEY = "modifiedTimeMs"; + private static final String IS_DIRECTORY_KEY = "isDirectory"; private final String blobName; private final long expirationTimeMs; private final long blobSize; private final long modifiedTimeMs; + private final boolean isDirectory; /** * Read a {@link NamedBlobRecord} from JSON. @@ -41,7 +43,8 @@ public class NamedBlobListEntry { */ public NamedBlobListEntry(JSONObject jsonObject) { this(jsonObject.getString(BLOB_NAME_KEY), jsonObject.optLong(EXPIRATION_TIME_MS_KEY, Utils.Infinite_Time), - jsonObject.optLong(BLOB_SIZE_KEY, 0), jsonObject.optLong(MODIFIED_TIME_MS_KEY, Utils.Infinite_Time)); + jsonObject.optLong(BLOB_SIZE_KEY, 0), jsonObject.optLong(MODIFIED_TIME_MS_KEY, Utils.Infinite_Time), + jsonObject.optBoolean(IS_DIRECTORY_KEY, false)); } /** @@ -49,20 +52,24 @@ public NamedBlobListEntry(JSONObject jsonObject) { * @param record the {@link NamedBlobRecord}. */ NamedBlobListEntry(NamedBlobRecord record) { - this(record.getBlobName(), record.getExpirationTimeMs(), record.getBlobSize(), record.getModifiedTimeMs()); + this(record.getBlobName(), record.getExpirationTimeMs(), record.getBlobSize(), record.getModifiedTimeMs(), + record.isDirectory()); } /** - * @param blobName the blob name within a container. + * @param blobName the blob name within a container. * @param expirationTimeMs the expiration time in milliseconds since epoch, or -1 if the blob should be permanent. * @param blobSize the size of the blob * @param modifiedTimeMs the modified time of the blob in milliseconds since epoch + * @param isDirectory whether the blob is a directory (virtual folder name separated by '/') */ - private NamedBlobListEntry(String blobName, long expirationTimeMs, long blobSize, long modifiedTimeMs) { + private NamedBlobListEntry(String blobName, long expirationTimeMs, long blobSize, long modifiedTimeMs, + boolean isDirectory) { this.blobName = blobName; this.expirationTimeMs = expirationTimeMs; this.blobSize = blobSize; this.modifiedTimeMs = modifiedTimeMs; + this.isDirectory = isDirectory; } /** @@ -103,6 +110,7 @@ public JSONObject toJson() { } jsonObject.put(BLOB_SIZE_KEY, blobSize); jsonObject.put(MODIFIED_TIME_MS_KEY, modifiedTimeMs); + jsonObject.put(IS_DIRECTORY_KEY, isDirectory); return jsonObject; } @@ -116,6 +124,10 @@ public boolean equals(Object o) { } NamedBlobListEntry that = (NamedBlobListEntry) o; return expirationTimeMs == that.expirationTimeMs && Objects.equals(blobName, that.blobName) - && modifiedTimeMs == that.modifiedTimeMs && blobSize == that.blobSize; + && modifiedTimeMs == that.modifiedTimeMs && blobSize == that.blobSize && isDirectory == that.isDirectory; + } + + public boolean isDirectory() { + return isDirectory; } } diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java b/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java index 9929ab2f07..37123df467 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java @@ -14,12 +14,10 @@ */ package com.github.ambry.frontend.s3; -import java.util.ArrayList; -import java.util.List; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; -import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.List; /** @@ -157,6 +155,7 @@ public String toString() { } } + @JsonInclude(JsonInclude.Include.NON_EMPTY) public static abstract class AbstractListBucketResult { @JacksonXmlProperty(localName = "Name") private String name; @@ -175,13 +174,17 @@ public static abstract class AbstractListBucketResult { private String encodingType; @JacksonXmlProperty(localName = "IsTruncated") private Boolean isTruncated; + // New optional field for CommonPrefixes with wrapping and nested Prefix elements + @JacksonXmlProperty(localName = "CommonPrefixes") + @JacksonXmlElementWrapper(useWrapping = false) + private List commonPrefixes; private AbstractListBucketResult() { } public AbstractListBucketResult(String name, String prefix, int maxKeys, int keyCount, String delimiter, - List contents, String encodingType, boolean isTruncated) { + List contents, String encodingType, boolean isTruncated, List commonPrefixes) { this.name = name; this.prefix = prefix; this.maxKeys = maxKeys; @@ -190,6 +193,7 @@ public AbstractListBucketResult(String name, String prefix, int maxKeys, int key this.contents = contents; this.encodingType = encodingType; this.isTruncated = isTruncated; + this.commonPrefixes = commonPrefixes; } public String getPrefix() { @@ -227,13 +231,23 @@ public String getName() { @Override public String toString() { return "Name=" + name + ", Prefix=" + prefix + ", MaxKeys=" + maxKeys + ", KeyCount=" + keyCount + ", Delimiter=" - + delimiter + ", Contents=" + contents + ", Encoding type=" + encodingType + ", IsTruncated=" + isTruncated; + + delimiter + ", Contents=" + contents + ", Encoding type=" + encodingType + ", IsTruncated=" + isTruncated + + ", CommonPrefixes=" + commonPrefixes; + } + + public List getCommonPrefixes() { + return commonPrefixes; + } + + public void setCommonPrefixes(List commonPrefixes) { + this.commonPrefixes = commonPrefixes; } } /** * ListBucketResult for listObjects API. */ + @JsonInclude(JsonInclude.Include.NON_EMPTY) public static class ListBucketResult extends AbstractListBucketResult { @JacksonXmlProperty(localName = "Marker") private String marker; @@ -245,8 +259,9 @@ private ListBucketResult() { } public ListBucketResult(String name, String prefix, int maxKeys, int keyCount, String delimiter, - List contents, String encodingType, String marker, String nextMarker, boolean isTruncated) { - super(name, prefix, maxKeys, keyCount, delimiter, contents, encodingType, isTruncated); + List contents, String encodingType, String marker, String nextMarker, boolean isTruncated, + List commonPrefixes) { + super(name, prefix, maxKeys, keyCount, delimiter, contents, encodingType, isTruncated, commonPrefixes); this.marker = marker; this.nextMarker = nextMarker; } @@ -268,6 +283,7 @@ public String toString() { /** * ListBucketResult for listObjectsV2 API. */ + @JsonInclude(JsonInclude.Include.NON_EMPTY) public static class ListBucketResultV2 extends AbstractListBucketResult { @JacksonXmlProperty(localName = "ContinuationToken") private String continuationToken; @@ -280,8 +296,8 @@ private ListBucketResultV2() { public ListBucketResultV2(String name, String prefix, int maxKeys, int keyCount, String delimiter, List contents, String encodingType, String continuationToken, String nextContinuationToken, - boolean isTruncated) { - super(name, prefix, maxKeys, keyCount, delimiter, contents, encodingType, isTruncated); + boolean isTruncated, List commonPrefixes) { + super(name, prefix, maxKeys, keyCount, delimiter, contents, encodingType, isTruncated, commonPrefixes); this.continuationToken = continuationToken; this.nextContinuationToken = nextContinuationToken; } @@ -322,7 +338,9 @@ public String getKey() { return key; } - public long getSize() { return size; } + public long getSize() { + return size; + } public String getLastModified() { return lastModified; @@ -359,11 +377,38 @@ public String toString() { } } + // Inner class for wrapping each Prefix inside CommmomPrefixes + public static class Prefix { + @JacksonXmlProperty(localName = "Prefix") + private String prefix; + + public Prefix() { + } + + public Prefix(String prefix) { + this.prefix = prefix; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + @Override + public String toString() { + return "Prefix=" + prefix; + } + } + public static class S3BatchDeleteObjects { // Ensure that the "Delete" wrapper element is mapped correctly to the list of "Object" elements @JacksonXmlElementWrapper(useWrapping = false) // Avoids wrapping the element itself - @JacksonXmlProperty(localName = "Object") // Specifies that each element maps to an instance of S3BatchDeleteKeys + @JacksonXmlProperty(localName = "Object") + // Specifies that each element maps to an instance of S3BatchDeleteKeys private List objects; public List getObjects() { diff --git a/ambry-api/src/main/java/com/github/ambry/named/NamedBlobDb.java b/ambry-api/src/main/java/com/github/ambry/named/NamedBlobDb.java index 5c94ef076a..ee55389c0d 100644 --- a/ambry-api/src/main/java/com/github/ambry/named/NamedBlobDb.java +++ b/ambry-api/src/main/java/com/github/ambry/named/NamedBlobDb.java @@ -60,8 +60,8 @@ default CompletableFuture get(String accountName, String contai * @param pageToken if {@code null}, return the first page of {@link NamedBlobRecord}s that start with * {@code blobNamePrefix}. If set, use this as a token to resume reading additional pages of * records that start with the prefix. - * @param maxKey the maximum number of keys returned in the response. By default, the action returns up to listMaxResults - * which can be tuned by config. + * @param maxKey the maximum number of keys returned in the response. By default, the action returns up to + * listMaxResults which can be tuned by config. * @return a {@link CompletableFuture} that will eventually contain a {@link Page} of {@link NamedBlobRecord}s * starting with the specified prefix or an exception if an error occurred. */ diff --git a/ambry-api/src/main/java/com/github/ambry/named/NamedBlobRecord.java b/ambry-api/src/main/java/com/github/ambry/named/NamedBlobRecord.java index 48b11fc369..99ca9b85bd 100644 --- a/ambry-api/src/main/java/com/github/ambry/named/NamedBlobRecord.java +++ b/ambry-api/src/main/java/com/github/ambry/named/NamedBlobRecord.java @@ -30,6 +30,7 @@ public class NamedBlobRecord { private final String blobId; private final long blobSize; private long modifiedTimeMs; + private final boolean isDirectory; /** * @param accountName the account name. @@ -67,7 +68,7 @@ public NamedBlobRecord(String accountName, String containerName, String blobName */ public NamedBlobRecord(String accountName, String containerName, String blobName, String blobId, long expirationTimeMs, long version, long blobSize) { - this(accountName, containerName, blobName, blobId, expirationTimeMs, version, blobSize, 0); + this(accountName, containerName, blobName, blobId, expirationTimeMs, version, blobSize, 0, false); } /** @@ -79,9 +80,10 @@ public NamedBlobRecord(String accountName, String containerName, String blobName * @param version the version of this named blob. * @param blobSize the size of the blob. * @param modifiedTimeMs the modified time of the blob in milliseconds since epoch + * @param isDirectory whether the blob is a directory (virtual folder name separated by '/') */ public NamedBlobRecord(String accountName, String containerName, String blobName, String blobId, - long expirationTimeMs, long version, long blobSize, long modifiedTimeMs) { + long expirationTimeMs, long version, long blobSize, long modifiedTimeMs, boolean isDirectory) { this.accountName = accountName; this.containerName = containerName; this.blobName = blobName; @@ -90,6 +92,7 @@ public NamedBlobRecord(String accountName, String containerName, String blobName this.version = version; this.blobSize = blobSize; this.modifiedTimeMs = modifiedTimeMs; + this.isDirectory = isDirectory; } /** @@ -180,4 +183,11 @@ public long getModifiedTimeMs() { public void setModifiedTimeMs(long modifiedTimeMs) { this.modifiedTimeMs = modifiedTimeMs; } + + /** + * @return whether the blob is a directory (virtual folder name separated by '/') + */ + public boolean isDirectory() { + return isDirectory; + } } diff --git a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java index d03092e86f..c9d865f090 100644 --- a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java +++ b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java @@ -739,6 +739,7 @@ private static VerifiableProperties buildFrontendVProps(File trustStoreFile, boo properties.setProperty(FrontendConfig.ENABLE_UNDELETE, Boolean.toString(enableUndelete)); properties.setProperty(FrontendConfig.NAMED_BLOB_DB_FACTORY, "com.github.ambry.frontend.TestNamedBlobDbFactory"); properties.setProperty(MySqlNamedBlobDbConfig.LIST_MAX_RESULTS, String.valueOf(NAMED_BLOB_LIST_RESULT_MAX)); + properties.setProperty(FrontendConfig.LIST_MAX_RESULTS, String.valueOf(NAMED_BLOB_LIST_RESULT_MAX)); return new VerifiableProperties(properties); } diff --git a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java index a2340f308c..fe30755489 100644 --- a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java +++ b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTestBase.java @@ -13,6 +13,7 @@ */ package com.github.ambry.frontend; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.ambry.account.Account; import com.github.ambry.account.AccountCollectionSerde; @@ -1847,15 +1848,18 @@ static class NamedBlobEntry { private long expirationTimeMs; private long blobSize; private long modifiedTimeMs; + @JsonProperty("isDirectory") + private boolean isDirectory; public NamedBlobEntry() { } - public NamedBlobEntry(String blobName, long expiration, long blobSize, long modifiedTimeMs) { + public NamedBlobEntry(String blobName, long expiration, long blobSize, long modifiedTimeMs, boolean isDirectory) { this.blobName = blobName; this.expirationTimeMs = expiration; this.blobSize = blobSize; this.modifiedTimeMs = modifiedTimeMs; + this.isDirectory = isDirectory; } public String getBlobName() { @@ -1889,6 +1893,14 @@ public long getModifiedTimeMs() { public void setModifiedTimeMs(long modifiedTimeMs) { this.modifiedTimeMs = modifiedTimeMs; } + + public boolean isDirectory() { + return isDirectory; + } + + public void setDirectory(boolean isDirectory) { + this.isDirectory = isDirectory; + } } /** diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java index 085f07fb47..cdefca2a21 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java @@ -56,7 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.github.ambry.frontend.DatasetVersionPath.*; import static com.github.ambry.frontend.Operations.*; import static com.github.ambry.rest.RestUtils.*; import static com.github.ambry.rest.RestUtils.Headers.*; @@ -219,7 +218,8 @@ public void start() throws InstantiationException { clusterMap, quotaManager); namedBlobListHandler = - new NamedBlobListHandler(securityService, namedBlobDb, accountAndContainerInjector, frontendMetrics); + new NamedBlobListHandler(securityService, namedBlobDb, accountAndContainerInjector, frontendMetrics, + frontendConfig); namedBlobPutHandler = new NamedBlobPutHandler(securityService, namedBlobDb, idConverter, idSigningService, router, accountAndContainerInjector, frontendConfig, frontendMetrics, clusterName, quotaManager, accountService, deleteBlobHandler); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java index c10e3bb20c..1a02a8c5b5 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java @@ -17,6 +17,7 @@ import com.github.ambry.commons.Callback; import com.github.ambry.commons.CallbackUtils; +import com.github.ambry.config.FrontendConfig; import com.github.ambry.named.NamedBlobDb; import com.github.ambry.named.NamedBlobRecord; import com.github.ambry.rest.RestRequest; @@ -26,7 +27,11 @@ import com.github.ambry.rest.RestServiceException; import com.github.ambry.rest.RestUtils; import com.github.ambry.router.ReadableStreamChannel; +import com.github.ambry.utils.Utils; +import java.util.ArrayList; import java.util.GregorianCalendar; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,19 +50,25 @@ public class NamedBlobListHandler { private final NamedBlobDb namedBlobDb; private final AccountAndContainerInjector accountAndContainerInjector; private final FrontendMetrics frontendMetrics; + private final FrontendConfig frontendConfig; + private static final String DELIMITER = "/"; /** * Constructs a handler for handling requests for listing blobs in named blob accounts. + * * @param securityService the {@link SecurityService} to use. - * @param namedBlobDb the {@link NamedBlobDb} to use. + * @param namedBlobDb the {@link NamedBlobDb} to use. * @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded. + * @param frontendConfig {@link FrontendConfig} instance from which to fetch configs. */ NamedBlobListHandler(SecurityService securityService, NamedBlobDb namedBlobDb, - AccountAndContainerInjector accountAndContainerInjector, FrontendMetrics frontendMetrics) { + AccountAndContainerInjector accountAndContainerInjector, FrontendMetrics frontendMetrics, + FrontendConfig frontendConfig) { this.securityService = securityService; this.namedBlobDb = namedBlobDb; this.accountAndContainerInjector = accountAndContainerInjector; this.frontendMetrics = frontendMetrics; + this.frontendConfig = frontendConfig; } /** @@ -132,10 +143,13 @@ private Callback securityPostProcessRequestCallback() { return buildCallback(frontendMetrics.listSecurityPostProcessRequestMetrics, securityCheckResult -> { NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs()); String maxKeys = getHeader(restRequest.getArgs(), MAXKEYS_PARAM_NAME, false); + // If DELIMITER is not equal to "/", then the directory grouping is not supported. + String delimiter = getHeader(restRequest.getArgs(), DELIMITER_PARAM_NAME, false); CallbackUtils.callCallbackAfter( - namedBlobDb.list(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), + listRecursively(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), namedBlobPath.getBlobNamePrefix(), namedBlobPath.getPageToken(), - maxKeys == null ? null : Integer.parseInt(maxKeys)), listBlobsCallback()); + maxKeys == null ? frontendConfig.listMaxResults : Integer.parseInt(maxKeys), + delimiter != null && delimiter.equals(DELIMITER)), listBlobsCallback()); }, uri, LOGGER, finalCallback); } @@ -153,5 +167,183 @@ private Callback> listBlobsCallback() { finalCallback.onCompletion(channel, null); }, uri, LOGGER, finalCallback); } + + /** + * Top-level recursive method to aggregate results from the underlying NamedBlobDb.list() API + * using S3 list objects semantics. + *

+ * This method repeatedly invokes the underlying NamedBlobDb.list() API to retrieve pages of NamedBlobRecord + * objects and recursively merges them into an aggregated Page until the aggregated Page contains at + * most {@code maxKey} entries or no further pages exist. When grouping is enabled, entries are examined + * via {@link #extractDirectory(String, String)} to form aggregated directory entries. + * @param accountName the account name. + * @param containerName the container name. + * @param blobNamePrefix the blob name prefix. + * @param pageToken the token for the first page (null for initial call). + * @param maxKey the maximum number of entries to accumulate. + * @param groupDirectories flag indicating whether to group directories. + * @return a CompletableFuture that, when complete, contains an immutable Page with at most maxKey entries + * and an appropriate nextPageToken. + */ + public CompletableFuture> listRecursively(String accountName, String containerName, + String blobNamePrefix, String pageToken, Integer maxKey, boolean groupDirectories) { + + // Start with an empty aggregated Page. + Page initialAggregatedPage = new Page<>(new ArrayList<>(), null); + return listRecursivelyInternal(accountName, containerName, blobNamePrefix, pageToken, maxKey, groupDirectories, + initialAggregatedPage).thenApply( + finalPage -> new Page<>(finalPage.getEntries(), finalPage.getNextPageToken())); + } + + /** + * Internal recursive helper that aggregates pages returned by the underlying list() API into a single Page. + *

+ * This method calls the underlying NamedBlobDb.list() API with the given pageToken and merges the returned Page + * with the previously aggregated results (contained in {@code aggregatedPage}) using {@link this#mergePageResults}. + * If the aggregated Page is not yet full (i.e. contains fewer than {@code maxKey} entries) or the last entry is + * a directory whose blobName prefixes the next page token, the method recurses to merge further entries from that + * directory; otherwise, it returns the aggregated Page. + *

+ * @param accountName the account name. + * @param containerName the container name. + * @param blobNamePrefix the blob name prefix. + * @param pageToken the token for the current page. + * @param maxKey the maximum number of entries to accumulate. + * @param groupDirectories flag indicating whether to group directories. + * @param aggregatedPage the aggregated Page so far (immutable). + * @return a CompletableFuture containing a new aggregated Page (immutable) with updated entries and nextPageToken. + */ + private CompletableFuture> listRecursivelyInternal(String accountName, String containerName, + String blobNamePrefix, String pageToken, int maxKey, boolean groupDirectories, + Page aggregatedPage) { + + return namedBlobDb.list(accountName, containerName, blobNamePrefix, pageToken, maxKey) + .thenCompose(currentPage -> { + // Merge the current page into the aggregated page. + Page updatedAggregatedPage = + mergePageResults(aggregatedPage, currentPage, accountName, containerName, blobNamePrefix, maxKey, + groupDirectories); + String tokenToUse = updatedAggregatedPage.getNextPageToken(); + + // If no token is available, no further pages exist. Return the updated aggregated page. + // This is the final result + if (tokenToUse == null) { + return CompletableFuture.completedFuture(updatedAggregatedPage); + } + // If we haven't reached maxKey yet, continue fetching. + if (updatedAggregatedPage.getEntries().size() < maxKey) { + return listRecursivelyInternal(accountName, containerName, blobNamePrefix, tokenToUse, maxKey, + groupDirectories, updatedAggregatedPage); + } else { + // Aggregated page is full. + NamedBlobRecord lastRecord = + updatedAggregatedPage.getEntries().get(updatedAggregatedPage.getEntries().size() - 1); + // If the last record is a directory and tokenToUse starts with that directory's blobName, + // then continue fetching so that we merge further entries from that directory. + if (lastRecord.isDirectory() && tokenToUse.startsWith(lastRecord.getBlobName())) { + return listRecursivelyInternal(accountName, containerName, blobNamePrefix, tokenToUse, maxKey, + groupDirectories, updatedAggregatedPage); + } else { + // Otherwise, return the updated aggregated page. This is the final result. + return CompletableFuture.completedFuture(updatedAggregatedPage); + } + } + }); + } + + /** + * Merges the current page of results into the aggregated Page. + *

+ * This method iterates over the entries of the provided {@code currentPage}. If {@code groupDirectories} is true, + * each entry is examined via {@link #extractDirectory(String, String)}. If a directory is detected, a new directory + * record is constructed and added to the aggregated results only if not already present. If adding an entry would + * cause the aggregated results to reach {@code maxKey} entries, the blobName of that entry is captured as the new + * page token, and processing of the current page stops. + *

+ * + * @param aggregatedPage the aggregated Page so far. + * @param currentPage the Page returned from the underlying list call. + * @param accountName the account name. + * @param containerName the container name. + * @param blobNamePrefix the blob name prefix. + * @param maxKey the maximum number of entries to accumulate. + * @param groupDirectories flag indicating whether to group directories. + * @return a new immutable Page whose entries are the merged entries (accumulator) and whose nextPageToken is + * either the blobName of the first unprocessed record or the underlying currentPage's token. + */ + private Page mergePageResults(Page aggregatedPage, + Page currentPage, String accountName, String containerName, String blobNamePrefix, int maxKey, + boolean groupDirectories) { + + // Start with a copy of the current aggregated entries. + List accumulator = new ArrayList<>(aggregatedPage.getEntries()); + String newToken = null; + List entries = currentPage.getEntries(); + + for (NamedBlobRecord record : entries) { + NamedBlobRecord recordToAdd = record; + if (groupDirectories) { + String directory = extractDirectory(record.getBlobName(), blobNamePrefix); + if (directory != null) { + // Skip duplicate directory records. + if (accumulator.stream() + .filter(NamedBlobRecord::isDirectory) + .anyMatch(existing -> existing.getBlobName().equals(directory))) { + continue; + } + recordToAdd = + new NamedBlobRecord(accountName, containerName, directory, null, Utils.Infinite_Time, 0, 0, 0, true); + } + } + // If adding this record would reach maxKey, capture its blobName as newToken and stop processing. + if (accumulator.size() == maxKey) { + newToken = record.getBlobName(); + break; + } + accumulator.add(recordToAdd); + } + // Determine the next page token: use newToken if set, else use the token from the underlying currentPage. + String tokenToUse = (newToken != null) ? newToken : currentPage.getNextPageToken(); + return new Page<>(accumulator, tokenToUse); + } + } + + /** + * Extracts the directory from the given blob name if possible. This method implements logic similar to S3 + * ListObjectsV2 when both a "Prefix" and a "Delimiter" ("/") are provided. It groups keys into common prefixes + * ("directories"). Below are some examples: + * 1. If blobName = "abc/def/ghi" and blobNamePrefix = "", then the directory is "abc/". + * 2. If blobName = "abc/def/ghi" and blobNamePrefix = "abc", then the directory is "abc/". + * 3. If blobName = "abc/def/ghi" and blobNamePrefix = "abc/", then the directory is "abc/def/". + * 4. If blobName = "abc//def/ghi" and blobNamePrefix = "abc/", then the directory is "abc//". + * @param blobName the blob name from the ResultSet. + * @param blobNamePrefix the prefix to remove (can be null). + * @return the directory (with a trailing '/') if found; otherwise, return null. + */ + private String extractDirectory(String blobName, String blobNamePrefix) { + + // Treat null blobNamePrefix as an empty string. + blobNamePrefix = (blobNamePrefix == null) ? "" : blobNamePrefix; + + // Since we assume blobNamePrefix is either empty or is contained in blobName, we directly compute the remainder. + String remainder = blobName.substring(blobNamePrefix.length()); + if (remainder.isEmpty()) { + return null; + } + + // If the remainder starts with the delimiter, then per S3 behavior the common prefix is the blobNamePrefix + // plus an extra delimiter. + if (remainder.startsWith(DELIMITER)) { + return blobNamePrefix + DELIMITER; + } + + // Otherwise, look for the delimiter in the remainder. + int index = remainder.indexOf(DELIMITER); + if (index == -1) { + return null; + } else { + // Return the blobNamePrefix plus the substring of the remainder up to and including the delimiter. + return blobNamePrefix + remainder.substring(0, index + DELIMITER.length()); + } } } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3ListHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3ListHandler.java index 5cd20e82e2..d13fe25496 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3ListHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3ListHandler.java @@ -31,18 +31,15 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.text.SimpleDateFormat; import java.time.Instant; -import java.time.LocalDateTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; import java.util.GregorianCalendar; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import org.json.JSONObject; import org.json.JSONTokener; import org.slf4j.Logger; @@ -68,6 +65,7 @@ public class S3ListHandler extends S3BaseHandler { private final NamedBlobListHandler namedBlobListHandler; private final FrontendMetrics metrics; public static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"); + private static final String DELIMITER = "/"; /** * Constructs a handler for handling s3 requests for listing blobs. @@ -115,10 +113,16 @@ private ReadableStreamChannel serializeAsXml(RestRequest restRequest, Page contentsList = new ArrayList<>(); - int keyCount = 0; + // Use LinkedHashSet to maintain order of directories. + Set dirSet = new LinkedHashSet<>(); for (NamedBlobListEntry namedBlobRecord : namedBlobRecordPage.getEntries()) { + if (namedBlobRecord.isDirectory()) { + dirSet.add(namedBlobRecord.getBlobName()); + continue; + } String blobName = namedBlobRecord.getBlobName(); long blobSize = namedBlobRecord.getBlobSize(); long modifiedTimeMs = namedBlobRecord.getModifiedTimeMs(); @@ -128,23 +132,26 @@ private ReadableStreamChannel serializeAsXml(RestRequest restRequest, Page commonPrefixes = new ArrayList<>(); + for (String dir : dirSet) { + commonPrefixes.add(new Prefix(dir)); + } + if (LIST_TYPE_VERSION_2.equals(getHeader(restRequest.getArgs(), LIST_TYPE, false))) { ListBucketResultV2 resultV2 = - new ListBucketResultV2(containerName, prefix, maxKeysValue, keyCount, delimiter, contentsList, - encodingType, continuationToken, namedBlobRecordPage.getNextPageToken(), - namedBlobRecordPage.getNextPageToken() != null); + new ListBucketResultV2(containerName, prefix, maxKeysValue, keyCount, delimiter, contentsList, encodingType, + continuationToken, namedBlobRecordPage.getNextPageToken(), namedBlobRecordPage.getNextPageToken() != null, + delimiter != null && delimiter.equals(DELIMITER) ? commonPrefixes : null); LOGGER.debug("Sending response for S3 ListObjects {}", resultV2); // Serialize xml xmlMapper.writeValue(outputStream, resultV2); } else { ListBucketResult result = - new ListBucketResult(containerName, prefix, maxKeysValue, keyCount, delimiter, contentsList, - encodingType, marker, namedBlobRecordPage.getNextPageToken(), - namedBlobRecordPage.getNextPageToken() != null); + new ListBucketResult(containerName, prefix, maxKeysValue, keyCount, delimiter, contentsList, encodingType, + marker, namedBlobRecordPage.getNextPageToken(), namedBlobRecordPage.getNextPageToken() != null, + delimiter != null && delimiter.equals(DELIMITER) ? commonPrefixes : null); LOGGER.debug("Sending response for S3 ListObjects {}", result); // Serialize xml xmlMapper.writeValue(outputStream, result); diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java index 31925b302c..62a3deb6f0 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java @@ -132,6 +132,7 @@ import org.mockito.Mockito; import com.fasterxml.jackson.core.type.TypeReference; +import static com.github.ambry.rest.RestUtils.*; import static com.github.ambry.rest.RestUtils.Headers.*; import static com.github.ambry.utils.TestUtils.*; import static org.junit.Assert.*; @@ -2704,7 +2705,7 @@ public void listNamedBlobsTest() throws Exception { new NamedBlobRecord(refAccount.getName(), refContainer.getName(), "blob1", "abc", Utils.Infinite_Time), new NamedBlobRecord(refAccount.getName(), refContainer.getName(), "blob2", "def", System.currentTimeMillis()), new NamedBlobRecord(refAccount.getName(), refContainer.getName(), "blob3", "ghi", Utils.Infinite_Time)); - Page page = new Page<>(blobs, "blob4"); + Page page = new Page<>(blobs, null); doListNamedBlobsTest("blob", null, page, null); doListNamedBlobsTest("blob", "blob1", page, null); @@ -2736,7 +2737,8 @@ private void doListNamedBlobsTest(String prefix, String pageToken, Page> future = new CompletableFuture<>(); future.completeExceptionally(new RestServiceException("NamedBlobDb error", expectedErrorCode)); @@ -2746,7 +2748,7 @@ private void doListNamedBlobsTest(String prefix, String pageToken, Page response = Page.fromJson(new JSONObject(new String(restResponseChannel.getResponseBody())), NamedBlobListEntry::new); assertEquals("Unexpected blobs returned", diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/S3ListHandlerTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/S3ListHandlerTest.java index c425097392..a80c054439 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/S3ListHandlerTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/S3ListHandlerTest.java @@ -42,14 +42,13 @@ import com.github.ambry.router.ReadableStreamChannel; import com.github.ambry.utils.TestUtils; import java.nio.ByteBuffer; -import java.time.Instant; -import java.time.ZoneId; import java.time.ZonedDateTime; -import java.time.format.DateTimeParseException; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.Properties; +import java.util.Set; import org.json.JSONObject; import org.junit.Test; @@ -88,7 +87,7 @@ public S3ListHandlerTest() throws Exception { } @Test - public void listS3BlobsTest() throws Exception { + public void listObjectsTest() throws Exception { // 1. Put a named blob String PREFIX = "directory-name"; @@ -110,8 +109,8 @@ public void listS3BlobsTest() throws Exception { // 2. Get list of blobs by sending matching s3 request String s3_list_request_uri = - S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + "?prefix=" + PREFIX - + "&delimiter=/" + "&Marker=/" + "&max-keys=1" + "&encoding-type=url"; + S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + "?prefix=" + PREFIX + "&Marker=/" + + "&max-keys=1" + "&encoding-type=url"; request = FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3_list_request_uri, new JSONObject(), null); request.setArg(InternalKeys.REQUEST_PATH, @@ -123,16 +122,13 @@ public void listS3BlobsTest() throws Exception { // 3. Verify results ReadableStreamChannel readableStreamChannel = futureResult.get(); ByteBuffer byteBuffer = ((ByteBufferReadableStreamChannel) readableStreamChannel).getContent(); - ListBucketResult listBucketResult = - xmlMapper.readValue(byteBuffer.array(), ListBucketResult.class); + ListBucketResult listBucketResult = xmlMapper.readValue(byteBuffer.array(), ListBucketResult.class); assertEquals("Mismatch on status", ResponseStatus.Ok, restResponseChannel.getStatus()); - assertEquals("Mismatch in content type", XML_CONTENT_TYPE, - restResponseChannel.getHeader(Headers.CONTENT_TYPE)); + assertEquals("Mismatch in content type", XML_CONTENT_TYPE, restResponseChannel.getHeader(Headers.CONTENT_TYPE)); Contents contents = listBucketResult.getContents().get(0); assertEquals("Mismatch in key name", KEY_NAME, contents.getKey()); assertEquals("Mismatch in key count", 1, listBucketResult.getKeyCount()); assertEquals("Mismatch in prefix", PREFIX, listBucketResult.getPrefix()); - assertEquals("Mismatch in delimiter", "/", listBucketResult.getDelimiter()); assertEquals("Mismatch in max key count", 1, listBucketResult.getMaxKeys()); assertEquals("Mismatch in encoding type", "url", listBucketResult.getEncodingType()); @@ -150,8 +146,8 @@ public void listS3BlobsTest() throws Exception { // 5. Get list of blobs with continuation-token s3_list_request_uri = - S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + "?prefix=" + PREFIX - + "&delimiter=/" + "&marker=" + KEY_NAME + "&max-keys=1" + "&encoding-type=url"; + S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + "?prefix=" + PREFIX + "&marker=" + + KEY_NAME + "&max-keys=1" + "&encoding-type=url"; request = FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3_list_request_uri, new JSONObject(), null); request.setArg(InternalKeys.REQUEST_PATH, @@ -171,11 +167,11 @@ public void listS3BlobsTest() throws Exception { assertEquals("Mismatch in key count", 1, listBucketResult.getKeyCount()); assertEquals("Mismatch in next token", KEY_NAME, listBucketResult.getMarker()); assertEquals("Mismatch in next token", KEY_NAME1, listBucketResult.getNextMarker()); - assertEquals("Mismatch in IsTruncated", true, listBucketResult.getIsTruncated()); + assertTrue("Mismatch in IsTruncated", listBucketResult.getIsTruncated()); } @Test - public void listObjectV2S3BlobsTest() throws Exception { + public void listObjectsV2Test() throws Exception { // 1. Put a named blob String PREFIX = "directory-name"; String KEY_NAME = PREFIX + SLASH + "key_name"; @@ -210,7 +206,7 @@ public void listObjectV2S3BlobsTest() throws Exception { // 3. Get list of blobs by sending matching s3 list object v2 request String s3_list_request_uri = S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + "?list-type=2" + "&prefix=" - + "&delimiter=/" + "&continuation-token=/" + "&encoding-type=url"; + + "&continuation-token=/" + "&encoding-type=url"; request = FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3_list_request_uri, new JSONObject(), null); request.setArg(InternalKeys.REQUEST_PATH, @@ -229,19 +225,19 @@ public void listObjectV2S3BlobsTest() throws Exception { assertEquals("Mismatch in key name", KEY_NAME, listBucketResultV2.getContents().get(0).getKey()); assertEquals("Mismatch in key name", KEY_NAME1, listBucketResultV2.getContents().get(1).getKey()); assertEquals("Mismatch in key count", 2, listBucketResultV2.getKeyCount()); - assertEquals("Mismatch in delimiter", "/", listBucketResultV2.getDelimiter()); assertEquals("Mismatch in encoding type", "url", listBucketResultV2.getEncodingType()); assertEquals("Mismatch in size", BLOB_SIZE, listBucketResultV2.getContents().get(0).getSize()); + assertNull("No common prefixes should be present", listBucketResultV2.getCommonPrefixes()); // Verify the modified timestamp is formatted correctly String lastModified = listBucketResultV2.getContents().get(0).getLastModified(); - assertNotEquals( "Last modified should not be -1", "-1", lastModified); + assertNotEquals("Last modified should not be -1", "-1", lastModified); // Attempt to parse the string. This should throw DateTimeParseException if the format is incorrect. ZonedDateTime.parse(lastModified, S3ListHandler.TIMESTAMP_FORMATTER); // 4. Get list of blobs with continuation-token s3_list_request_uri = S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + "?list-type=2" + "&prefix=" - + "&delimiter=/" + "&continuation-token=" + KEY_NAME + "&max-keys=1" + "&encoding-type=url"; + + "&continuation-token=" + KEY_NAME + "&max-keys=1" + "&encoding-type=url"; request = FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3_list_request_uri, new JSONObject(), null); request.setArg(InternalKeys.REQUEST_PATH, @@ -262,6 +258,230 @@ public void listObjectV2S3BlobsTest() throws Exception { assertEquals("Mismatch in next token", KEY_NAME1, listBucketResultV2.getNextContinuationToken()); } + @Test + public void listObjectsV2EmptyResultTest() throws Exception { + // Use a prefix that does not match any blobs. + String prefix = "nonexistent/"; + String listUri = + S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + "/?list-type=2&max-keys=10&prefix=" + + prefix; + RestRequest listRequest = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, listUri, new JSONObject(), null); + listRequest.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(listRequest, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + RestResponseChannel listResponse = new MockRestResponseChannel(); + FutureResult futureResult = new FutureResult<>(); + s3ListHandler.handle(listRequest, listResponse, futureResult::done); + + ReadableStreamChannel channel = futureResult.get(); + ByteBuffer buffer = ((ByteBufferReadableStreamChannel) channel).getContent(); + ListBucketResultV2 result = xmlMapper.readValue(buffer.array(), ListBucketResultV2.class); + + // Expect zero entries and no NextContinuationToken. + assertEquals("Expected zero entries", 0, result.getKeyCount()); + assertNull("Expected no next token", result.getNextContinuationToken()); + } + + @Test + public void listObjectsV2DirectoryGroupingTest() throws Exception { + // This test creates blobs in 5 directories: + // dir1/file1.txt, dir1/file2.txt, + // dir2/file1.txt, dir2/file2.txt, + // dir3/file1.txt, dir3/file2.txt, + // dir4/file1.txt, dir4/file2.txt, + // dir5/file1.txt, dir5/file2.txt + // With grouping enabled (empty prefix and delimiter="/"), S3 ListObjectsV2 would group them into: + // dir1/, dir2/, dir3/, dir4/, dir5/ + // We set max-keys=2 so that the first aggregated page contains two directory entries, + // and the NextContinuationToken is the blobName of the first unprocessed directory. + + int numDirectories = 5; + int filesPerDirectory = 2; + String baseDir = "dir"; + for (int i = 1; i <= numDirectories; i++) { + for (int j = 1; j <= filesPerDirectory; j++) { + String key = baseDir + i + SLASH + "file" + j + ".txt"; + String requestPath = NAMED_BLOB_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + key; + JSONObject headers = new JSONObject(); + FrontendRestRequestServiceTest.setAmbryHeadersForPut(headers, TestUtils.TTL_SECS, container.isCacheable(), + SERVICE_ID, CONTENT_TYPE, OWNER_ID, null, null, null); + byte[] content = TestUtils.getRandomBytes(1024); + RestRequest putRequest = FrontendRestRequestServiceTest.createRestRequest(RestMethod.PUT, requestPath, headers, + new LinkedList<>(Arrays.asList(ByteBuffer.wrap(content), null))); + putRequest.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(putRequest, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + RestResponseChannel putResponse = new MockRestResponseChannel(); + FutureResult putResult = new FutureResult<>(); + namedBlobPutHandler.handle(putRequest, putResponse, putResult::done); + putResult.get(); + } + } + + // Issue a GET request that maps to an S3 ListObjectsV2 request. + // Using max-keys=2 and delimiter="/" (with empty prefix) will trigger directory grouping via listRecursively. + String s3ListRequestUri = + S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + "/?list-type=2&max-keys=2&delimiter=/"; + RestRequest listRequest = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3ListRequestUri, new JSONObject(), null); + listRequest.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(listRequest, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + RestResponseChannel listResponse = new MockRestResponseChannel(); + FutureResult futureResult = new FutureResult<>(); + s3ListHandler.handle(listRequest, listResponse, futureResult::done); + + // Deserialize XML response. + ReadableStreamChannel channel = futureResult.get(); + ByteBuffer buffer = ((ByteBufferReadableStreamChannel) channel).getContent(); + ListBucketResultV2 resultPage1 = xmlMapper.readValue(buffer.array(), ListBucketResultV2.class); + + // Expect the first page to contain 2 directory entries: "dir1/" and "dir2/". + Set expectedDirsPage1 = new HashSet<>(Arrays.asList("dir1/", "dir2/")); + Set actualDirsPage1 = new HashSet<>(); + resultPage1.getCommonPrefixes().forEach(p -> actualDirsPage1.add(p.getPrefix())); + assertEquals("First page directory grouping mismatch", expectedDirsPage1, actualDirsPage1); + + // NextContinuationToken should be "dir3/". + assertNotNull("Expected NextContinuationToken", resultPage1.getNextContinuationToken()); + assertEquals("NextContinuationToken mismatch", "dir3/file1.txt", resultPage1.getNextContinuationToken()); + + // Now, simulate fetching the second page by using the NextContinuationToken. + String s3ListRequestUri2 = S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + + "/?list-type=2&max-keys=2&delimiter=/&continuation-token=dir3/file1.txt"; + RestRequest listRequest2 = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3ListRequestUri2, new JSONObject(), null); + listRequest2.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(listRequest2, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + listResponse = new MockRestResponseChannel(); + futureResult = new FutureResult<>(); + s3ListHandler.handle(listRequest2, listResponse, futureResult::done); + channel = futureResult.get(); + buffer = ((ByteBufferReadableStreamChannel) channel).getContent(); + ListBucketResultV2 resultPage2 = xmlMapper.readValue(buffer.array(), ListBucketResultV2.class); + + // Expect the second page to contain "dir3/" and "dir4/" and NextContinuationToken "dir5/". + Set expectedDirsPage2 = new HashSet<>(Arrays.asList("dir3/", "dir4/")); + Set actualDirsPage2 = new HashSet<>(); + resultPage2.getCommonPrefixes().forEach(p -> actualDirsPage2.add(p.getPrefix())); + assertEquals("Second page directory grouping mismatch", expectedDirsPage2, actualDirsPage2); + assertEquals("Second page token mismatch", "dir5/file1.txt", resultPage2.getNextContinuationToken()); + + // Finally, fetch the third page using token "dir5/". + String s3ListRequestUri3 = S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + + "/?list-type=2&max-keys=2&delimiter=/&continuation-token=dir5/file1.txt"; + RestRequest listRequest3 = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, s3ListRequestUri3, new JSONObject(), null); + listRequest3.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(listRequest3, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + listResponse = new MockRestResponseChannel(); + futureResult = new FutureResult<>(); + s3ListHandler.handle(listRequest3, listResponse, futureResult::done); + channel = futureResult.get(); + buffer = ((ByteBufferReadableStreamChannel) channel).getContent(); + ListBucketResultV2 resultPage3 = xmlMapper.readValue(buffer.array(), ListBucketResultV2.class); + + // Expect the third page to contain only "dir5/" and no NextContinuationToken. + Set expectedDirsPage3 = new HashSet<>(Collections.singletonList("dir5/")); + Set actualDirsPage3 = new HashSet<>(); + resultPage3.getCommonPrefixes().forEach(p -> actualDirsPage3.add(p.getPrefix())); + assertEquals("Third page directory grouping mismatch", expectedDirsPage3, actualDirsPage3); + assertNull("Expected no NextContinuationToken on third page", resultPage3.getNextContinuationToken()); + } + + @Test + public void listObjectsV2MixedGroupingTest() throws Exception { + // Insert blobs that yield both grouped and ungrouped entries. + // Grouped keys (will be merged into directories): + // "group1/file1.txt" and "group1/file2.txt" → aggregated as "group1/" + // "group2/file1.txt" and "group2/file2.txt" → aggregated as "group2/" + // Ungrouped keys (no '/' in key, so appear as-is): + // "ungrouped1.txt" + // "ungrouped2.txt" + + String[] keysToPut = + {"group1/file1.txt", "group1/file2.txt", "group2/file1.txt", "group2/file2.txt", "ungrouped1.txt", + "ungrouped2.txt"}; + + for (String key : keysToPut) { + String requestPath = NAMED_BLOB_PREFIX + SLASH + account.getName() + SLASH + container.getName() + SLASH + key; + JSONObject headers = new JSONObject(); + FrontendRestRequestServiceTest.setAmbryHeadersForPut(headers, TestUtils.TTL_SECS, container.isCacheable(), + SERVICE_ID, CONTENT_TYPE, OWNER_ID, null, null, null); + byte[] content = TestUtils.getRandomBytes(1024); + RestRequest putRequest = FrontendRestRequestServiceTest.createRestRequest(RestMethod.PUT, requestPath, headers, + new LinkedList<>(Arrays.asList(ByteBuffer.wrap(content), null))); + putRequest.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(putRequest, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + RestResponseChannel putResponse = new MockRestResponseChannel(); + FutureResult putResult = new FutureResult<>(); + namedBlobPutHandler.handle(putRequest, putResponse, putResult::done); + putResult.get(); + } + + // Issue an S3 GET request with grouping enabled. + // Use list-type=2, delimiter="/", max-keys=3, and empty prefix. + String listUri = + S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + "/?list-type=2&max-keys=3&delimiter=/"; + RestRequest listRequest = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, listUri, new JSONObject(), null); + listRequest.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(listRequest, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + RestResponseChannel listResponse = new MockRestResponseChannel(); + FutureResult futureResult = new FutureResult<>(); + s3ListHandler.handle(listRequest, listResponse, futureResult::done); + + // Deserialize XML response into ListBucketResultV2. + ReadableStreamChannel channel = futureResult.get(); + ByteBuffer buffer = ((ByteBufferReadableStreamChannel) channel).getContent(); + ListBucketResultV2 resultPage1 = xmlMapper.readValue(buffer.array(), ListBucketResultV2.class); + + // With max-keys=3 and grouping enabled, the first page should contain: + // - Aggregated directory "group1/" + // - Aggregated directory "group2/" + // - Individual key "ungrouped1.txt" + // and NextContinuationToken should be "ungrouped2.txt". + Set expectedEntriesPage1 = new HashSet<>(Arrays.asList("group1/", "group2/", "ungrouped1.txt")); + Set actualEntriesPage1 = new HashSet<>(); + if (resultPage1.getContents() != null) { + resultPage1.getContents().forEach(c -> actualEntriesPage1.add(c.getKey())); + } + if (resultPage1.getCommonPrefixes() != null) { + resultPage1.getCommonPrefixes().forEach(p -> actualEntriesPage1.add(p.getPrefix())); + } + + assertEquals("Mismatch in number of directory entries", 2, resultPage1.getCommonPrefixes().size()); + assertEquals("Mismatch in number of key entries", 1, resultPage1.getContents().size()); + assertEquals("Mismatch in values of entries in page 1", expectedEntriesPage1, actualEntriesPage1); + assertNotNull("Expected NextContinuationToken on page 1", resultPage1.getNextContinuationToken()); + assertEquals("Page 1 NextContinuationToken mismatch", "ungrouped2.txt", resultPage1.getNextContinuationToken()); + + // Now, fetch the second page using the continuation token. + String continuationToken = resultPage1.getNextContinuationToken(); + String listUri2 = S3_PREFIX + SLASH + account.getName() + SLASH + container.getName() + + "/?list-type=2&max-keys=3&delimiter=/&continuation-token=" + continuationToken; + RestRequest listRequest2 = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.GET, listUri2, new JSONObject(), null); + listRequest2.setArg(InternalKeys.REQUEST_PATH, + RequestPath.parse(listRequest2, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + listResponse = new MockRestResponseChannel(); + futureResult = new FutureResult<>(); + s3ListHandler.handle(listRequest2, listResponse, futureResult::done); + ReadableStreamChannel channel2 = futureResult.get(); + ByteBuffer buffer2 = ((ByteBufferReadableStreamChannel) channel2).getContent(); + ListBucketResultV2 resultPage2 = xmlMapper.readValue(buffer2.array(), ListBucketResultV2.class); + + // Expect the second page to contain the remaining entry: "ungrouped2.txt" + Set expectedEntriesPage2 = new HashSet<>(Collections.singletonList("ungrouped2.txt")); + Set actualEntriesPage2 = new HashSet<>(); + if (resultPage2.getContents() != null) { + resultPage2.getContents().forEach(c -> actualEntriesPage2.add(c.getKey())); + } + assertNull("Directory entries must not be present", resultPage2.getCommonPrefixes()); + assertEquals("Mismatch in number of key entries on page 2", 1, resultPage2.getContents().size()); + assertEquals("Aggregated page 2 entries mismatch", expectedEntriesPage2, actualEntriesPage2); + // There should be no NextContinuationToken. + assertNull("Expected no NextContinuationToken on page 2", resultPage2.getNextContinuationToken()); + } + /** * Initates a {@link NamedBlobPutHandler} and a {@link S3ListHandler} */ @@ -284,7 +504,8 @@ private void setup() throws Exception { ambryIdConverterFactory.getIdConverter(), idSigningService, router, injector, frontendConfig, metrics, CLUSTER_NAME, QuotaTestUtils.createDummyQuotaManager(), ACCOUNT_SERVICE, null); NamedBlobListHandler namedBlobListHandler = - new NamedBlobListHandler(securityServiceFactory.getSecurityService(), namedBlobDb, injector, metrics); + new NamedBlobListHandler(securityServiceFactory.getSecurityService(), namedBlobDb, injector, metrics, + frontendConfig); s3ListHandler = new S3ListHandler(namedBlobListHandler, metrics); } } diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/TestNamedBlobDb.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/TestNamedBlobDb.java index fbcc5dd7ae..1cfce1dac8 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/TestNamedBlobDb.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/TestNamedBlobDb.java @@ -95,6 +95,10 @@ public CompletableFuture> list(String accountName, String return future; } + if (blobNamePrefix == null) { + blobNamePrefix = ""; + } + TreeMap>>> allNamedBlobsInContainer = allRecords.get(accountName).get(containerName); NavigableMap>>> nextMap; diff --git a/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java b/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java index ed3df2ff92..aa96e7e941 100644 --- a/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java +++ b/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java @@ -380,12 +380,13 @@ public void testListNamedBlobs() throws Exception { } catch (Exception e) { throw new RuntimeException(e); } - } - ); + }); // List named blob should only put out valid ones without empty entries. - Page page = namedBlobDb.list(account.getName(), container.getName(), blobNamePrefix, null, null).get(); - assertEquals("List named blob entries should match the valid records", validRecords, new HashSet<>(page.getEntries())); + Page page = + namedBlobDb.list(account.getName(), container.getName(), blobNamePrefix, null, null).get(); + assertEquals("List named blob entries should match the valid records", validRecords, + new HashSet<>(page.getEntries())); assertNull("Next page token should be null", page.getNextPageToken()); } diff --git a/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java b/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java index 450c605d02..990e474d2e 100644 --- a/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java +++ b/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java @@ -715,10 +715,9 @@ private Page run_list_v2(String accountName, String containerNa Timestamp deletionTime = resultSet.getTimestamp(4); long blobSize = resultSet.getLong(5); Timestamp modifiedTime = resultSet.getTimestamp(6); - entries.add( new NamedBlobRecord(accountName, containerName, blobName, blobId, timestampToMs(deletionTime), version, - blobSize, timestampToMs(modifiedTime))); + blobSize, timestampToMs(modifiedTime), false)); } return new Page<>(entries, nextContinuationToken); } diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerf.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerf.java index 274584d5e6..847e85ebba 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerf.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerf.java @@ -542,8 +542,9 @@ public void run() { String containerName = String.format(CONTAINER_NAME_FORMAT, HUGE_LIST_CONTAINER_ID); String token = null; for (int i = 0; i < 100; i++) { - token = - namedBlobDb.list(accountName, containerName, HUGE_LIST_COMMON_PREFIX, token, null).get().getNextPageToken(); + token = namedBlobDb.list(accountName, containerName, HUGE_LIST_COMMON_PREFIX, token, null) + .get() + .getNextPageToken(); } System.out.println("PerformanceTestWorker " + id + " finishes listing for huge records"); }