Skip to content

Commit

Permalink
Revert "perf: use BatchedSubscriber for authZ changes (#1196)"
Browse files Browse the repository at this point in the history
This reverts commit 0ce078e.
  • Loading branch information
junfuchen99 committed May 13, 2022
1 parent 00eaed4 commit 2ee9ae2
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,70 +390,69 @@ public void onStreamClosed() {

@SuppressWarnings({"PMD.CloseResource", "PMD.AvoidCatchingGenericException"})
@Test
void GIVEN_ConfigStoreEventStreamClient_WHEN_adding_new_leaf_node_to_existing_container_node_THEN_config_is_updated3()
throws Exception {
void GIVEN_ConfigStoreEventStreamClient_WHEN_adding_new_leaf_node_to_existing_container_node_THEN_config_is_updated3() throws Exception {
LogConfig.getRootLogConfig().setLevel(Level.DEBUG);
Topics configuration = kernel.findServiceTopic("ServiceName").createInteriorChild(CONFIGURATION_CONFIG_KEY);
configuration.createInteriorChild("SomeContainerKeyToUpdate").createLeafChild("SomeContainerValue")
.withValue("InitialValue");
configuration.createInteriorChild("SomeContainerKeyToUpdate").createLeafChild("SomeContainerValue").withValue("InitialValue");
Topics configToUpdate = configuration.lookupTopics("SomeContainerKeyToUpdate");
CountDownLatch cdl = new CountDownLatch(1);
CountDownLatch subscriptionLatch = new CountDownLatch(1);
try (AutoCloseable c = TestUtils.createCloseableLogListener(m -> {
Slf4jLogAdapter.addGlobalListener(m -> {
if (m.getMessage().contains("subscribed to configuration update")) {
subscriptionLatch.countDown();
}
})) {
SubscribeToConfigurationUpdateRequest subscribe = new SubscribeToConfigurationUpdateRequest();
subscribe.setComponentName("ServiceName");
subscribe.setKeyPath(Collections.singletonList("SomeContainerKeyToUpdate"));
CompletableFuture<SubscribeToConfigurationUpdateResponse> fut =
greengrassCoreIPCClient.subscribeToConfigurationUpdate(subscribe,
Optional.of(new StreamResponseHandler<ConfigurationUpdateEvents>() {
@Override
public void onStreamEvent(ConfigurationUpdateEvents event) {
assertNotNull(event.getConfigurationUpdateEvent());
assertEquals("ServiceName",
event.getConfigurationUpdateEvent().getComponentName());
assertNotNull(event.getConfigurationUpdateEvent().getKeyPath());
cdl.countDown();
}

@Override
public boolean onStreamError(Throwable error) {
logger.atError().log("Received stream error.", error);
return false;
}

@Override
public void onStreamClosed() {

}
})).getResponse();
fut.get(3, TimeUnit.SECONDS);
});
SubscribeToConfigurationUpdateRequest subscribe = new SubscribeToConfigurationUpdateRequest();
subscribe.setComponentName("ServiceName");
subscribe.setKeyPath(Collections.singletonList("SomeContainerKeyToUpdate"));
CompletableFuture<SubscribeToConfigurationUpdateResponse> fut =
greengrassCoreIPCClient.subscribeToConfigurationUpdate(subscribe, Optional.of(new StreamResponseHandler<ConfigurationUpdateEvents>() {
@Override
public void onStreamEvent(ConfigurationUpdateEvents event) {
assertNotNull(event.getConfigurationUpdateEvent());
assertEquals("ServiceName", event.getConfigurationUpdateEvent().getComponentName());
assertNotNull(event.getConfigurationUpdateEvent().getKeyPath());
cdl.countDown();
}

assertTrue(subscriptionLatch.await(20, TimeUnit.SECONDS));
@Override
public boolean onStreamError(Throwable error) {
logger.atError().log("Received stream error.", error);
return false;
}

@Override
public void onStreamClosed() {

// count down 1 is during the call to subscribe
CountDownLatch configUpdated = new CountDownLatch(2);
configToUpdate.subscribe((what, node) -> configUpdated.countDown());
kernel.getContext().waitForPublishQueueToClear();

Map<String, Object> map2 = new HashMap<>();
map2.put("SomeNewChild", "NewValue");
List<String> l = new ArrayList<>();
l.add("SomeContainerKeyToUpdate");
Instant now = Instant.now();
UpdateConfigurationRequest updateConfigurationRequest = new UpdateConfigurationRequest();
updateConfigurationRequest.setKeyPath(l);
updateConfigurationRequest.setValueToMerge(map2);
updateConfigurationRequest.setTimestamp(now);
greengrassCoreIPCClient.updateConfiguration(updateConfigurationRequest, Optional.empty()).getResponse().get(50, TimeUnit.SECONDS);
assertTrue(configUpdated.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS));
assertTrue(cdl.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS));
Topic topic = (Topic) configToUpdate.getChild("SomeNewChild");
assertEquals("NewValue", topic.getOnce());
}
})).getResponse();
try {
fut.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
logger.atError().setCause(e).log("Error when subscribing to component updates");
fail("Caught exception when subscribing to component updates");
}

assertTrue(subscriptionLatch.await(20, TimeUnit.SECONDS));

CountDownLatch configUpdated = new CountDownLatch(1);
configToUpdate.subscribe((what, node) -> configUpdated.countDown());

Map<String, Object> map2 = new HashMap<>();
map2.put("SomeNewChild", "NewValue");
List<String> l = new ArrayList<>();
l.add("SomeContainerKeyToUpdate");
Instant now = Instant.now();
UpdateConfigurationRequest updateConfigurationRequest = new UpdateConfigurationRequest();
updateConfigurationRequest.setKeyPath(l);
updateConfigurationRequest.setValueToMerge(map2);
updateConfigurationRequest.setTimestamp(now);
greengrassCoreIPCClient.updateConfiguration(updateConfigurationRequest, Optional.empty()).getResponse().get(50, TimeUnit.SECONDS);
assertTrue(configUpdated.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS));
assertTrue(cdl.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS));
Topic topic = (Topic) configToUpdate.getChild("SomeNewChild");
assertEquals("NewValue", topic.getOnce());

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.util.BatchedSubscriber;
import com.aws.greengrass.util.LockScope;
import com.aws.greengrass.util.Utils;
import lombok.NonNull;
Expand Down Expand Up @@ -87,8 +86,7 @@ public AuthorizationHandler(Kernel kernel, AuthorizationModule authModule,
this.kernel = kernel;
this.authModule = authModule;
// Adding TES component and operation before it's default policies are fetched
componentToOperationsMap.put(TOKEN_EXCHANGE_SERVICE_TOPICS, new HashSet<>(
Collections.singletonList(AUTHZ_TES_OPERATION)));
componentToOperationsMap.put(TOKEN_EXCHANGE_SERVICE_TOPICS, new HashSet<>(Arrays.asList(AUTHZ_TES_OPERATION)));
componentToOperationsMap.put(PUB_SUB_SERVICE_NAME, new HashSet<>(Arrays.asList(PUBLISH_TO_TOPIC,
SUBSCRIBE_TO_TOPIC, ANY_REGEX)));
componentToOperationsMap.put(MQTT_PROXY_SERVICE_NAME, new HashSet<>(Arrays.asList(PUBLISH_TO_IOT_CORE,
Expand All @@ -110,58 +108,57 @@ public AuthorizationHandler(Kernel kernel, AuthorizationModule authModule,
}

// Subscribe to future auth config updates
new BatchedSubscriber(this.kernel.getConfig().lookupTopics(SERVICES_NAMESPACE_TOPIC), (why, newv) -> {
if (WhatHappened.interiorAdded.equals(why) || WhatHappened.timestampUpdated.equals(why)) {
return true;
}
if (newv == null) {
return false;
}
this.kernel.getConfig().lookupTopics(SERVICES_NAMESPACE_TOPIC).subscribe(
(why, newv) -> {
if (newv == null) {
return;
}

//If there is a childChanged event, it has to be the 'accessControl' Topic that has bubbled up
//If there is a childRemoved event, it could be the component is removed, or either the
//'accessControl' Topic or/the 'parameters' Topics that has bubbled up, so we need to handle and
//filter out all other WhatHappeneds
if (WhatHappened.childRemoved.equals(why) || WhatHappened.removed.equals(why)) {
// Either a service or a parameter block or acl subkey
if (!newv.parent.getName().equals(SERVICES_NAMESPACE_TOPIC) && !newv.getName()
.equals(CONFIGURATION_CONFIG_KEY) && !newv.getName().equals(ACCESS_CONTROL_NAMESPACE_TOPIC)
&& !newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC)) {
return true;
}
} else if (!newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC) && !newv.getName()
.equals(ACCESS_CONTROL_NAMESPACE_TOPIC)) {
// for all other WhatHappened cases we only care about access control change
return true;
}
return false;
}, (why) -> {
// TODO: [V243584397]: Partial policy reload
// For now, reload all policies
Map<String, List<AuthorizationPolicy>> reloadedPolicies =
policyParser.parseAllAuthorizationPolicies(kernel);

// Load default policies
reloadedPolicies.putAll(getDefaultPolicies());

try (LockScope scope = LockScope.lock(rwLock.writeLock())) {
for (Map.Entry<String, List<AuthorizationPolicy>> primaryPolicyList
: componentToAuthZConfig.entrySet()) {
String policyType = primaryPolicyList.getKey();
if (!reloadedPolicies.containsKey(policyType)) {
//If the policyType already exists and was not reparsed correctly and/or removed from
//the newly parsed list, delete it from our store since it is now an unwanted relic
componentToAuthZConfig.remove(policyType);
authModule.deletePermissionsWithDestination(policyType);
//If there is a childChanged event, it has to be the 'accessControl' Topic that has bubbled up
//If there is a childRemoved event, it could be the component is removed, or either the
//'accessControl' Topic or/the 'parameters' Topics that has bubbled up, so we need to handle and
//filter out all other WhatHappeneds
if (WhatHappened.childRemoved.equals(why) || WhatHappened.removed.equals(why)) {
// Either a service or a parameter block or acl subkey
if (!newv.parent.getName().equals(SERVICES_NAMESPACE_TOPIC)
&& !newv.getName().equals(CONFIGURATION_CONFIG_KEY)
&& !newv.getName().equals(ACCESS_CONTROL_NAMESPACE_TOPIC)
&& !newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC)) {
return;
}
} else if (!newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC)
&& !newv.getName().equals(ACCESS_CONTROL_NAMESPACE_TOPIC)) {
// for all other WhatHappened cases we only care about access control change
return;
}
}

//Now we reload the policies that reflect the current state of the Nucleus config
for (Map.Entry<String, List<AuthorizationPolicy>> acl : reloadedPolicies.entrySet()) {
this.loadAuthorizationPolicies(acl.getKey(), acl.getValue(), true);
}
}
}).subscribe();
// TODO: [V243584397]: Partial policy reload
// For now, reload all policies
Map<String, List<AuthorizationPolicy>> reloadedPolicies = policyParser
.parseAllAuthorizationPolicies(kernel);

// Load default policies
reloadedPolicies.putAll(getDefaultPolicies());

try (LockScope scope = LockScope.lock(rwLock.writeLock())) {

for (Map.Entry<String, List<AuthorizationPolicy>> primaryPolicyList :
componentToAuthZConfig.entrySet()) {
String policyType = primaryPolicyList.getKey();
if (!reloadedPolicies.containsKey(policyType)) {
//If the policyType already exists and was not reparsed correctly and/or removed from
//the newly parsed list, delete it from our store since it is now an unwanted relic
componentToAuthZConfig.remove(policyType);
authModule.deletePermissionsWithDestination(policyType);
}
}

//Now we reload the policies that reflect the current state of the Nucleus config
for (Map.Entry<String, List<AuthorizationPolicy>> acl : reloadedPolicies.entrySet()) {
this.loadAuthorizationPolicies(acl.getKey(), acl.getValue(), true);
}
}
});
}

/**
Expand Down
9 changes: 1 addition & 8 deletions src/main/java/com/aws/greengrass/dependency/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void run() {
}
}
};
private static final Crashable doNothing = () -> {};
// magical
private boolean shuttingDown = false;
// global state change notification
Expand Down Expand Up @@ -305,14 +304,8 @@ public Throwable runOnPublishQueueAndWait(Crashable r) {
return ret.get();
}

@SuppressWarnings("checkstyle:MissingJavadocMethod")
public void waitForPublishQueueToClear() {
// Run on the publish queue until it is empty. When the queue is empty that doesn't mean that
// all jobs have finished processing though, so we run it once again at the end.
do {
runOnPublishQueueAndWait(doNothing);
} while (!serialized.isEmpty());
runOnPublishQueueAndWait(doNothing);
runOnPublishQueueAndWait(() -> {});
}

private boolean onPublishThread() {
Expand Down
Loading

0 comments on commit 2ee9ae2

Please sign in to comment.