diff --git a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java index 6a15366ad970..301dde5c54bc 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java @@ -152,7 +152,7 @@ public class InternalResourceGroup private final CounterStat timeBetweenStartsSec = new CounterStat(); @GuardedBy("root") - private AtomicLong lastRunningQueryStartTime = new AtomicLong(); + private AtomicLong lastRunningQueryStartTime = new AtomicLong(currentTimeMillis()); @GuardedBy("root") private AtomicBoolean isDirty = new AtomicBoolean(); @@ -758,7 +758,13 @@ private void startInBackground(ManagedQueryExecution query) } updateEligibility(); executor.execute(query::startWaitingForResources); - lastRunningQueryStartTime.set(currentTimeMillis()); + group = this; + long lastRunningQueryStartTimeMillis = currentTimeMillis(); + lastRunningQueryStartTime.set(lastRunningQueryStartTimeMillis); + while (group.parent.isPresent()) { + group.parent.get().lastRunningQueryStartTime.set(lastRunningQueryStartTimeMillis); + group = group.parent.get(); + } } } @@ -863,7 +869,6 @@ protected boolean internalStartNext() if (subGroup == null) { return false; } - boolean started = subGroup.internalStartNext(); if (started) { long currentTime = System.currentTimeMillis(); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java index 7e666160dd4d..43d2891f692e 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java @@ -100,7 +100,7 @@ public final class InternalResourceGroupManager private final ResourceGroupService resourceGroupService; private final AtomicReference> resourceGroupRuntimeInfos = new AtomicReference<>(ImmutableMap.of()); private final AtomicReference> resourceGroupRuntimeInfosSnapshot = new AtomicReference<>(ImmutableMap.of()); - private final AtomicLong lastUpdatedResourceGroupRuntimeInfo = new AtomicLong(0L); + private final AtomicLong lastUpdatedResourceGroupRuntimeInfo = new AtomicLong(-1L); private final double concurrencyThreshold; private final Duration resourceGroupRuntimeInfoRefreshInterval; private final boolean isResourceManagerEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java index ebb638f3e99b..d4d013b3cb25 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java @@ -37,6 +37,8 @@ public class ResourceManagerConfig private int resourceManagerExecutorThreads = 1000; private Duration proxyAsyncTimeout = new Duration(60, SECONDS); private Duration memoryPoolFetchInterval = new Duration(1, SECONDS); + private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS); + private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS); @MinDuration("1ms") public Duration getQueryExpirationTimeout() @@ -195,4 +197,28 @@ public ResourceManagerConfig setMemoryPoolFetchInterval(Duration memoryPoolFetch this.memoryPoolFetchInterval = memoryPoolFetchInterval; return this; } + + public Duration getResourceGroupServiceCacheExpireInterval() + { + return resourceGroupServiceCacheExpireInterval; + } + + @Config("resource-manager.resource-group-service-cache-expire-interval") + public ResourceManagerConfig setResourceGroupServiceCacheExpireInterval(Duration resourceGroupServiceCacheExpireInterval) + { + this.resourceGroupServiceCacheExpireInterval = resourceGroupServiceCacheExpireInterval; + return this; + } + + public Duration getResourceGroupServiceCacheRefreshInterval() + { + return resourceGroupServiceCacheRefreshInterval; + } + + @Config("resource-manager.resource-group-service-cache-refresh-interval") + public ResourceManagerConfig setResourceGroupServiceCacheRefreshInterval(Duration resourceGroupServiceCacheRefreshInterval) + { + this.resourceGroupServiceCacheRefreshInterval = resourceGroupServiceCacheRefreshInterval; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java index 264f2be285bc..af059bd3bc9a 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerResourceGroupService.java @@ -20,6 +20,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; +import io.airlift.units.Duration; import javax.inject.Inject; @@ -31,7 +32,7 @@ import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.cache.CacheLoader.asyncReloading; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class ResourceManagerResourceGroupService implements ResourceGroupService @@ -44,13 +45,16 @@ public class ResourceManagerResourceGroupService @Inject public ResourceManagerResourceGroupService( @ForResourceManager DriftClient resourceManagerClient, + ResourceManagerConfig resourceManagerConfig, InternalNodeManager internalNodeManager) { this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null"); this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null"); + Duration cacheExpireDuration = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getResourceGroupServiceCacheExpireInterval(); + Duration cacheRefreshDuration = resourceManagerConfig.getResourceGroupServiceCacheRefreshInterval(); this.cache = CacheBuilder.newBuilder() - .expireAfterWrite(10, SECONDS) - .refreshAfterWrite(1, SECONDS) + .expireAfterWrite(cacheExpireDuration.roundTo(MILLISECONDS), MILLISECONDS) + .refreshAfterWrite(cacheRefreshDuration.roundTo(MILLISECONDS), MILLISECONDS) .build(asyncReloading(new CacheLoader>() { @Override public List load(InternalNode internalNode) diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java index d7527669aa02..bb1ef2a84a3e 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java @@ -43,7 +43,9 @@ public void testDefaults() .setNodeHeartbeatInterval(new Duration(1, SECONDS)) .setQueryHeartbeatInterval(new Duration(1, SECONDS)) .setProxyAsyncTimeout(new Duration(60, SECONDS)) - .setMemoryPoolFetchInterval(new Duration(1, SECONDS))); + .setMemoryPoolFetchInterval(new Duration(1, SECONDS)) + .setResourceGroupServiceCacheExpireInterval(new Duration(10, SECONDS)) + .setResourceGroupServiceCacheRefreshInterval(new Duration(1, SECONDS))); } @Test @@ -62,6 +64,8 @@ public void testExplicitPropertyMappings() .put("resource-manager.query-heartbeat-interval", "75m") .put("resource-manager.proxy-async-timeout", "345m") .put("resource-manager.memory-pool-fetch-interval", "6m") + .put("resource-manager.resource-group-service-cache-expire-interval", "1m") + .put("resource-manager.resource-group-service-cache-refresh-interval", "10m") .build(); ResourceManagerConfig expected = new ResourceManagerConfig() @@ -76,7 +80,9 @@ public void testExplicitPropertyMappings() .setNodeHeartbeatInterval(new Duration(25, MINUTES)) .setQueryHeartbeatInterval(new Duration(75, MINUTES)) .setProxyAsyncTimeout(new Duration(345, MINUTES)) - .setMemoryPoolFetchInterval(new Duration(6, MINUTES)); + .setMemoryPoolFetchInterval(new Duration(6, MINUTES)) + .setResourceGroupServiceCacheExpireInterval(new Duration(1, MINUTES)) + .setResourceGroupServiceCacheRefreshInterval(new Duration(10, MINUTES)); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java index 610fb3123f06..5ff05f42bc80 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerResourceGroupService.java @@ -34,7 +34,8 @@ public void testGetResourceGroupInfo() { TestingResourceManagerClient resourceManagerClient = new TestingResourceManagerClient(); InMemoryNodeManager nodeManager = new InMemoryNodeManager(); - ResourceManagerResourceGroupService service = new ResourceManagerResourceGroupService((addressSelectionContext, headers) -> resourceManagerClient, nodeManager); + ResourceManagerConfig resourceManagerConfig = new ResourceManagerConfig(); + ResourceManagerResourceGroupService service = new ResourceManagerResourceGroupService((addressSelectionContext, headers) -> resourceManagerClient, resourceManagerConfig, nodeManager); List resourceGroupInfos = service.getResourceGroupInfo(); assertNotNull(resourceGroupInfos); assertTrue(resourceGroupInfos.isEmpty()); diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/H2TestUtil.java b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/H2TestUtil.java index 436cc0f4b990..8334d16b2a4e 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/H2TestUtil.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/H2TestUtil.java @@ -24,6 +24,7 @@ import com.facebook.presto.resourceGroups.reloading.ReloadingResourceGroupConfigurationManager; import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.security.Identity; import com.facebook.presto.tests.DistributedQueryRunner; import com.facebook.presto.tpch.TpchPlugin; import com.google.common.collect.ImmutableList; @@ -62,6 +63,16 @@ public static Session adhocSession() .build(); } + public static Session testSession(Identity identity) + { + return testSessionBuilder() + .setCatalog("tpch") + .setSchema("sf100000") + .setSource("abc") + .setIdentity(identity) + .build(); + } + public static Session dashboardSession() { return testSessionBuilder() @@ -185,6 +196,8 @@ private static void setup(DistributedQueryRunner queryRunner, H2ResourceGroupsDa dao.insertResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 1, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT); dao.insertResourceGroup(6, "no-queueing", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT_2); dao.insertResourceGroup(7, "explain", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT); + dao.insertResourceGroup(8, "test", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT); + dao.insertResourceGroup(9, "test-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, 8L, TEST_ENVIRONMENT); dao.insertSelector(2, 10_000, "user.*", "test", null, null, null); dao.insertSelector(4, 1_000, "user.*", "(?i).*adhoc.*", null, null, null); dao.insertSelector(5, 100, "user.*", "(?i).*dashboard.*", null, null, null); @@ -192,8 +205,9 @@ private static void setup(DistributedQueryRunner queryRunner, H2ResourceGroupsDa dao.insertSelector(2, 1, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1")), null); dao.insertSelector(6, 6, ".*", ".*", null, null, null); dao.insertSelector(7, 100_000, null, null, EXPLAIN.name(), null, null); + dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null); - int expectedSelectors = 6; + int expectedSelectors = 7; if (environment.equals(TEST_ENVIRONMENT_2)) { expectedSelectors = 1; } diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestDistributedQueuesDb.java b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestDistributedQueuesDb.java index 0310562e3adf..4a03bd5556c4 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestDistributedQueuesDb.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestDistributedQueuesDb.java @@ -17,6 +17,7 @@ import com.facebook.presto.resourceGroups.db.H2ResourceGroupsDao; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.facebook.presto.spi.security.Identity; import com.facebook.presto.tests.DistributedQueryRunner; import com.google.common.collect.ImmutableMap; import org.testng.annotations.AfterMethod; @@ -24,6 +25,7 @@ import org.testng.annotations.Test; import java.util.Map; +import java.util.Optional; import static com.facebook.airlift.testing.Closeables.closeQuietly; import static com.facebook.presto.execution.QueryState.QUEUED; @@ -36,6 +38,7 @@ import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.dashboardSession; import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getDao; import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getDbConfigUrl; +import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.testSession; import static java.util.concurrent.TimeUnit.MILLISECONDS; // run single threaded to avoid creating multiple query runners at once @@ -51,12 +54,15 @@ public void setup() { String dbConfigUrl = getDbConfigUrl(); H2ResourceGroupsDao dao = getDao(dbConfigUrl); - queryRunner = createQueryRunner(dbConfigUrl, dao, ImmutableMap.of( - "query-manager.experimental.required-coordinators", "2", - "resource-manager.query-heartbeat-interval", "10ms", - "resource-group-runtimeinfo-refresh-interval", "100ms", - "concurrency-threshold-to-enable-resource-group-refresh", "0.1"), - 2); + ImmutableMap.Builder coordinatorProperties = new ImmutableMap.Builder<>(); + coordinatorProperties.put("query-manager.experimental.required-coordinators", "2"); + coordinatorProperties.put("resource-manager.query-heartbeat-interval", "10ms"); + coordinatorProperties.put("resource-group-runtimeinfo-refresh-interval", "100ms"); + coordinatorProperties.put("concurrency-threshold-to-enable-resource-group-refresh", "0"); + coordinatorProperties.put("resource-manager.resource-group-service-cache-expire-interval", "1s"); + coordinatorProperties.put("resource-manager.resource-group-service-cache-refresh-interval", "10ms"); + + queryRunner = createQueryRunner(dbConfigUrl, dao, coordinatorProperties.build(), 2); } @AfterMethod(alwaysRun = true) @@ -165,4 +171,34 @@ public void testDistributedQueue() cancelQuery(queryRunner, 0, thirdAdhocQuery); waitForQueryState(queryRunner, 0, firstDashboardQuery, RUNNING); } + + @Test(timeOut = 1_000) + public void testDistributedQueue_burstTraffic() + throws Exception + { + QueryId firstAdhocQuery = createQuery(queryRunner, 1, testSession(new Identity("user1", Optional.empty())), LONG_LASTING_QUERY); + + QueryId secondAdhocQuery = createQuery(queryRunner, 0, testSession(new Identity("user2", Optional.empty())), LONG_LASTING_QUERY); + + QueryId thirdAdhocQuery = createQuery(queryRunner, 1, testSession(new Identity("user3", Optional.empty())), LONG_LASTING_QUERY); + + QueryId fourthAdhocQuery = createQuery(queryRunner, 0, testSession(new Identity("user4", Optional.empty())), LONG_LASTING_QUERY); + + Map resourceGroupRuntimeInfoSnapshot; + int globalRunningQueries = 0; + int globalQueriedQueries = 0; + do { + MILLISECONDS.sleep(100); + globalRunningQueries = 0; + globalQueriedQueries = 0; + for (int coordinator = 0; coordinator < 2; coordinator++) { + resourceGroupRuntimeInfoSnapshot = queryRunner.getCoordinator(coordinator).getResourceGroupManager().get().getResourceGroupRuntimeInfosSnapshot(); + ResourceGroupRuntimeInfo resourceGroupRuntimeInfo = resourceGroupRuntimeInfoSnapshot.get(new ResourceGroupId("global")); + if (resourceGroupRuntimeInfo != null) { + globalRunningQueries += resourceGroupRuntimeInfo.getDescendantRunningQueries(); + globalQueriedQueries += resourceGroupRuntimeInfo.getDescendantQueuedQueries(); + } + } + } while (globalRunningQueries != 3 && globalQueriedQueries != 1); + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java index 8b3d4026c66e..79e757e7a49c 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java @@ -245,10 +245,10 @@ public void testSelectorPriority() assertEquals(resourceGroup.get().toString(), "global.user-user.dashboard-user"); // create a new resource group that rejects all queries submitted to it - dao.insertResourceGroup(8, "reject-all-queries", "1MB", 0, 0, 0, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT); + dao.insertResourceGroup(10, "reject-all-queries", "1MB", 0, 0, 0, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT); // add a new selector that has a higher priority than the existing dashboard selector and that routes queries to the "reject-all-queries" resource group - dao.insertSelector(8, 200, "user.*", "(?i).*dashboard.*", null, null, null); + dao.insertSelector(10, 200, "user.*", "(?i).*dashboard.*", null, null, null); // reload the configuration reloadingConfigurationManager.load();