Skip to content

Commit

Permalink
Fix resource group concurrency for multi coordinator
Browse files Browse the repository at this point in the history
When a burst of traffic comes to coordinator, it ended up running more than allowed queries.
Two reasons for that:
1. In multi coordinator, we were not stamping last running query for non leaf resource groups, which lead to
   shouldWaitForResourceManagerUpdate to return always true for non leaf resource groups. So if the traffic is coming from
   lot of different resource groups, coordinator end up running less than allowed in each resource group but at root level it
   ends up running more.
2. ResourceManagerResourceGroupService cache end up having stale resource group info which also end up allowing coordinators
   to run more than allowed queries at a cluster level.
As part of this diff we are fixing 1 by stamping last running query to all it's parent resource groups. And to address 2, making
cache refresh rate and expiration configerable.
  • Loading branch information
swapsmagic authored and tdcmeehan committed Nov 10, 2021
1 parent 3fdbd18 commit 48fadf1
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public class InternalResourceGroup
private final CounterStat timeBetweenStartsSec = new CounterStat();

@GuardedBy("root")
private AtomicLong lastRunningQueryStartTime = new AtomicLong();
private AtomicLong lastRunningQueryStartTime = new AtomicLong(currentTimeMillis());
@GuardedBy("root")
private AtomicBoolean isDirty = new AtomicBoolean();

Expand Down Expand Up @@ -758,7 +758,13 @@ private void startInBackground(ManagedQueryExecution query)
}
updateEligibility();
executor.execute(query::startWaitingForResources);
lastRunningQueryStartTime.set(currentTimeMillis());
group = this;
long lastRunningQueryStartTimeMillis = currentTimeMillis();
lastRunningQueryStartTime.set(lastRunningQueryStartTimeMillis);
while (group.parent.isPresent()) {
group.parent.get().lastRunningQueryStartTime.set(lastRunningQueryStartTimeMillis);
group = group.parent.get();
}
}
}

Expand Down Expand Up @@ -863,7 +869,6 @@ protected boolean internalStartNext()
if (subGroup == null) {
return false;
}

boolean started = subGroup.internalStartNext();
if (started) {
long currentTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public final class InternalResourceGroupManager<C>
private final ResourceGroupService resourceGroupService;
private final AtomicReference<Map<ResourceGroupId, ResourceGroupRuntimeInfo>> resourceGroupRuntimeInfos = new AtomicReference<>(ImmutableMap.of());
private final AtomicReference<Map<ResourceGroupId, ResourceGroupRuntimeInfo>> resourceGroupRuntimeInfosSnapshot = new AtomicReference<>(ImmutableMap.of());
private final AtomicLong lastUpdatedResourceGroupRuntimeInfo = new AtomicLong(0L);
private final AtomicLong lastUpdatedResourceGroupRuntimeInfo = new AtomicLong(-1L);
private final double concurrencyThreshold;
private final Duration resourceGroupRuntimeInfoRefreshInterval;
private final boolean isResourceManagerEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ResourceManagerConfig
private int resourceManagerExecutorThreads = 1000;
private Duration proxyAsyncTimeout = new Duration(60, SECONDS);
private Duration memoryPoolFetchInterval = new Duration(1, SECONDS);
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);

@MinDuration("1ms")
public Duration getQueryExpirationTimeout()
Expand Down Expand Up @@ -195,4 +197,28 @@ public ResourceManagerConfig setMemoryPoolFetchInterval(Duration memoryPoolFetch
this.memoryPoolFetchInterval = memoryPoolFetchInterval;
return this;
}

public Duration getResourceGroupServiceCacheExpireInterval()
{
return resourceGroupServiceCacheExpireInterval;
}

@Config("resource-manager.resource-group-service-cache-expire-interval")
public ResourceManagerConfig setResourceGroupServiceCacheExpireInterval(Duration resourceGroupServiceCacheExpireInterval)
{
this.resourceGroupServiceCacheExpireInterval = resourceGroupServiceCacheExpireInterval;
return this;
}

public Duration getResourceGroupServiceCacheRefreshInterval()
{
return resourceGroupServiceCacheRefreshInterval;
}

@Config("resource-manager.resource-group-service-cache-refresh-interval")
public ResourceManagerConfig setResourceGroupServiceCacheRefreshInterval(Duration resourceGroupServiceCacheRefreshInterval)
{
this.resourceGroupServiceCacheRefreshInterval = resourceGroupServiceCacheRefreshInterval;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import io.airlift.units.Duration;

import javax.inject.Inject;

Expand All @@ -31,7 +32,7 @@
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.cache.CacheLoader.asyncReloading;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class ResourceManagerResourceGroupService
implements ResourceGroupService
Expand All @@ -44,13 +45,16 @@ public class ResourceManagerResourceGroupService
@Inject
public ResourceManagerResourceGroupService(
@ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
ResourceManagerConfig resourceManagerConfig,
InternalNodeManager internalNodeManager)
{
this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
Duration cacheExpireDuration = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getResourceGroupServiceCacheExpireInterval();
Duration cacheRefreshDuration = resourceManagerConfig.getResourceGroupServiceCacheRefreshInterval();
this.cache = CacheBuilder.newBuilder()
.expireAfterWrite(10, SECONDS)
.refreshAfterWrite(1, SECONDS)
.expireAfterWrite(cacheExpireDuration.roundTo(MILLISECONDS), MILLISECONDS)
.refreshAfterWrite(cacheRefreshDuration.roundTo(MILLISECONDS), MILLISECONDS)
.build(asyncReloading(new CacheLoader<InternalNode, List<ResourceGroupRuntimeInfo>>() {
@Override
public List<ResourceGroupRuntimeInfo> load(InternalNode internalNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public void testDefaults()
.setNodeHeartbeatInterval(new Duration(1, SECONDS))
.setQueryHeartbeatInterval(new Duration(1, SECONDS))
.setProxyAsyncTimeout(new Duration(60, SECONDS))
.setMemoryPoolFetchInterval(new Duration(1, SECONDS)));
.setMemoryPoolFetchInterval(new Duration(1, SECONDS))
.setResourceGroupServiceCacheExpireInterval(new Duration(10, SECONDS))
.setResourceGroupServiceCacheRefreshInterval(new Duration(1, SECONDS)));
}

@Test
Expand All @@ -62,6 +64,8 @@ public void testExplicitPropertyMappings()
.put("resource-manager.query-heartbeat-interval", "75m")
.put("resource-manager.proxy-async-timeout", "345m")
.put("resource-manager.memory-pool-fetch-interval", "6m")
.put("resource-manager.resource-group-service-cache-expire-interval", "1m")
.put("resource-manager.resource-group-service-cache-refresh-interval", "10m")
.build();

ResourceManagerConfig expected = new ResourceManagerConfig()
Expand All @@ -76,7 +80,9 @@ public void testExplicitPropertyMappings()
.setNodeHeartbeatInterval(new Duration(25, MINUTES))
.setQueryHeartbeatInterval(new Duration(75, MINUTES))
.setProxyAsyncTimeout(new Duration(345, MINUTES))
.setMemoryPoolFetchInterval(new Duration(6, MINUTES));
.setMemoryPoolFetchInterval(new Duration(6, MINUTES))
.setResourceGroupServiceCacheExpireInterval(new Duration(1, MINUTES))
.setResourceGroupServiceCacheRefreshInterval(new Duration(10, MINUTES));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public void testGetResourceGroupInfo()
{
TestingResourceManagerClient resourceManagerClient = new TestingResourceManagerClient();
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
ResourceManagerResourceGroupService service = new ResourceManagerResourceGroupService((addressSelectionContext, headers) -> resourceManagerClient, nodeManager);
ResourceManagerConfig resourceManagerConfig = new ResourceManagerConfig();
ResourceManagerResourceGroupService service = new ResourceManagerResourceGroupService((addressSelectionContext, headers) -> resourceManagerClient, resourceManagerConfig, nodeManager);
List<ResourceGroupRuntimeInfo> resourceGroupInfos = service.getResourceGroupInfo();
assertNotNull(resourceGroupInfos);
assertTrue(resourceGroupInfos.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.resourceGroups.reloading.ReloadingResourceGroupConfigurationManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -62,6 +63,16 @@ public static Session adhocSession()
.build();
}

public static Session testSession(Identity identity)
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("sf100000")
.setSource("abc")
.setIdentity(identity)
.build();
}

public static Session dashboardSession()
{
return testSessionBuilder()
Expand Down Expand Up @@ -185,15 +196,18 @@ private static void setup(DistributedQueryRunner queryRunner, H2ResourceGroupsDa
dao.insertResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 1, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(6, "no-queueing", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT_2);
dao.insertResourceGroup(7, "explain", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(8, "test", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(9, "test-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, 8L, TEST_ENVIRONMENT);
dao.insertSelector(2, 10_000, "user.*", "test", null, null, null);
dao.insertSelector(4, 1_000, "user.*", "(?i).*adhoc.*", null, null, null);
dao.insertSelector(5, 100, "user.*", "(?i).*dashboard.*", null, null, null);
dao.insertSelector(4, 10, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null);
dao.insertSelector(2, 1, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1")), null);
dao.insertSelector(6, 6, ".*", ".*", null, null, null);
dao.insertSelector(7, 100_000, null, null, EXPLAIN.name(), null, null);
dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null);

int expectedSelectors = 6;
int expectedSelectors = 7;
if (environment.equals(TEST_ENVIRONMENT_2)) {
expectedSelectors = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import com.facebook.presto.resourceGroups.db.H2ResourceGroupsDao;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Optional;

import static com.facebook.airlift.testing.Closeables.closeQuietly;
import static com.facebook.presto.execution.QueryState.QUEUED;
Expand All @@ -36,6 +38,7 @@
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.dashboardSession;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getDao;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getDbConfigUrl;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.testSession;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

// run single threaded to avoid creating multiple query runners at once
Expand All @@ -51,12 +54,15 @@ public void setup()
{
String dbConfigUrl = getDbConfigUrl();
H2ResourceGroupsDao dao = getDao(dbConfigUrl);
queryRunner = createQueryRunner(dbConfigUrl, dao, ImmutableMap.of(
"query-manager.experimental.required-coordinators", "2",
"resource-manager.query-heartbeat-interval", "10ms",
"resource-group-runtimeinfo-refresh-interval", "100ms",
"concurrency-threshold-to-enable-resource-group-refresh", "0.1"),
2);
ImmutableMap.Builder<String, String> coordinatorProperties = new ImmutableMap.Builder<>();
coordinatorProperties.put("query-manager.experimental.required-coordinators", "2");
coordinatorProperties.put("resource-manager.query-heartbeat-interval", "10ms");
coordinatorProperties.put("resource-group-runtimeinfo-refresh-interval", "100ms");
coordinatorProperties.put("concurrency-threshold-to-enable-resource-group-refresh", "0");
coordinatorProperties.put("resource-manager.resource-group-service-cache-expire-interval", "1s");
coordinatorProperties.put("resource-manager.resource-group-service-cache-refresh-interval", "10ms");

queryRunner = createQueryRunner(dbConfigUrl, dao, coordinatorProperties.build(), 2);
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -165,4 +171,34 @@ public void testDistributedQueue()
cancelQuery(queryRunner, 0, thirdAdhocQuery);
waitForQueryState(queryRunner, 0, firstDashboardQuery, RUNNING);
}

@Test(timeOut = 1_000)
public void testDistributedQueue_burstTraffic()
throws Exception
{
QueryId firstAdhocQuery = createQuery(queryRunner, 1, testSession(new Identity("user1", Optional.empty())), LONG_LASTING_QUERY);

QueryId secondAdhocQuery = createQuery(queryRunner, 0, testSession(new Identity("user2", Optional.empty())), LONG_LASTING_QUERY);

QueryId thirdAdhocQuery = createQuery(queryRunner, 1, testSession(new Identity("user3", Optional.empty())), LONG_LASTING_QUERY);

QueryId fourthAdhocQuery = createQuery(queryRunner, 0, testSession(new Identity("user4", Optional.empty())), LONG_LASTING_QUERY);

Map<ResourceGroupId, ResourceGroupRuntimeInfo> resourceGroupRuntimeInfoSnapshot;
int globalRunningQueries = 0;
int globalQueriedQueries = 0;
do {
MILLISECONDS.sleep(100);
globalRunningQueries = 0;
globalQueriedQueries = 0;
for (int coordinator = 0; coordinator < 2; coordinator++) {
resourceGroupRuntimeInfoSnapshot = queryRunner.getCoordinator(coordinator).getResourceGroupManager().get().getResourceGroupRuntimeInfosSnapshot();
ResourceGroupRuntimeInfo resourceGroupRuntimeInfo = resourceGroupRuntimeInfoSnapshot.get(new ResourceGroupId("global"));
if (resourceGroupRuntimeInfo != null) {
globalRunningQueries += resourceGroupRuntimeInfo.getDescendantRunningQueries();
globalQueriedQueries += resourceGroupRuntimeInfo.getDescendantQueuedQueries();
}
}
} while (globalRunningQueries != 3 && globalQueriedQueries != 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ public void testSelectorPriority()
assertEquals(resourceGroup.get().toString(), "global.user-user.dashboard-user");

// create a new resource group that rejects all queries submitted to it
dao.insertResourceGroup(8, "reject-all-queries", "1MB", 0, 0, 0, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(10, "reject-all-queries", "1MB", 0, 0, 0, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);

// add a new selector that has a higher priority than the existing dashboard selector and that routes queries to the "reject-all-queries" resource group
dao.insertSelector(8, 200, "user.*", "(?i).*dashboard.*", null, null, null);
dao.insertSelector(10, 200, "user.*", "(?i).*dashboard.*", null, null, null);

// reload the configuration
reloadingConfigurationManager.load();
Expand Down

0 comments on commit 48fadf1

Please sign in to comment.