Skip to content

Commit

Permalink
fix: allow fractional seconds in periodicUpload timer to go faster th…
Browse files Browse the repository at this point in the history
…an 1 second (#203)
  • Loading branch information
MikeDombo authored Jun 1, 2023
1 parent f5e882d commit ed24ea1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,19 @@ void GIVEN_periodicUploadIntervalSec_config_WHEN_value_is_reset_and_replaced_THE
Exception {
setupKernel();

assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(60),
assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(60d),
Duration.ofSeconds(30)));

logManagerService.getConfig().find(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC).remove();
assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(),
eventuallyEval(is(LogManagerService.DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC), Duration.ofSeconds(30)));

logManagerService.getConfig().lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC).withValue(600);
assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(600),
assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(600d),
Duration.ofSeconds(30)));

logManagerService.getConfig().lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC).withValue(0.0001d);
assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(0.0001d),
Duration.ofSeconds(30)));
}

Expand Down Expand Up @@ -489,4 +493,4 @@ void GIVEN_runtimeConfiguration_withOldProcessingFileFormat_WHEN_onStartUp_THEN_
}};
assertEquals(expected, processingFiles.toMap());
}
}
}
23 changes: 13 additions & 10 deletions src/main/java/com/aws/greengrass/logmanager/LogManagerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class LogManagerService extends PluginService {
public static final String DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME = "deleteLogFileAfterCloudUpload";
public static final String UPLOAD_TO_CW_CONFIG_TOPIC_NAME = "uploadToCloudWatch";
public static final String MULTILINE_PATTERN_CONFIG_TOPIC_NAME = "multiLineStartPattern";
public static final int DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300;
public static final double DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300;
public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = 60 * 60 * 24; // 1 day

private final List<Consumer<EventType>> serviceStatusListeners = new ArrayList<>();
Expand All @@ -124,7 +124,7 @@ public class LogManagerService extends PluginService {
private final CloudWatchAttemptLogsProcessor logsProcessor;
private final AtomicBoolean isCurrentlyUploading = new AtomicBoolean(false);
@Getter
private int periodicUpdateIntervalSec;
private double periodicUpdateIntervalSec;
private final Path workDir;

/**
Expand Down Expand Up @@ -156,16 +156,15 @@ public class LogManagerService extends PluginService {
}

private void handlePeriodicUploadIntervalSecConfig(Topics topics) {
int periodicUploadIntervalSecInput = Coerce.toInt(topics.lookup(CONFIGURATION_CONFIG_KEY,
LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)
.dflt(DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC)
.toPOJO());
double periodicUploadIntervalSecInput =
Coerce.toDouble(topics.findOrDefault(Double.toString(DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC),
CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC));

if (periodicUploadIntervalSecInput > 0) {
periodicUpdateIntervalSec = periodicUploadIntervalSecInput;
} else {
logger.atWarn().log("Invalid config value, {}, for periodicUploadIntervalSec. Must be an "
+ "integer greater than 0. Using default value of 300 (5 minutes)",
logger.atWarn().log("Invalid config value, {}, for periodicUploadIntervalSec. Must be a "
+ "number greater than 0. Using default value of 300 (5 minutes)",
periodicUploadIntervalSecInput);
periodicUpdateIntervalSec = DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC;
}
Expand Down Expand Up @@ -674,7 +673,7 @@ private void processLogsAndUpload() throws InterruptedException {
//TODO: this is only done for passing the current text. But in practise, we don`t need to intentionally
// sleep here.
if (!isCurrentlyUploading.compareAndSet(false, true)) {
TimeUnit.SECONDS.sleep(periodicUpdateIntervalSec);
sleepFractionalSeconds(periodicUpdateIntervalSec);
continue;
}
List<ComponentLogFileInformation> componentMetadata = new ArrayList<>();
Expand Down Expand Up @@ -762,10 +761,14 @@ private void processLogsAndUpload() throws InterruptedException {
emitEventStatus(EventType.ALL_COMPONENTS_PROCESSED);
// after handle one cycle, we sleep for interval to avoid seamless scanning and processing next cycle.
// TODO, do not use lazy sleep. Use scheduler to unblock the thread.
TimeUnit.SECONDS.sleep(periodicUpdateIntervalSec);
sleepFractionalSeconds(periodicUpdateIntervalSec);
}
}

private void sleepFractionalSeconds(double timeToSleep) throws InterruptedException {
TimeUnit.MICROSECONDS.sleep(Math.round(timeToSleep * TimeUnit.SECONDS.toMicros(1)));
}

public void registerEventStatusListener(Consumer<EventType> callback) {
serviceStatusListeners.add(callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
Expand Down Expand Up @@ -280,9 +281,9 @@ public void cleanup() throws InterruptedException {
void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_log_files()
throws InterruptedException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -334,9 +335,9 @@ void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_lo
void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array_WHEN_merger_merges_THEN_we_get_all_log_files()
throws InterruptedException, UnsupportedInputTypeException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -385,11 +386,11 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array

@Test
void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_WHEN_merger_merges_THEN_we_get_all_log_files()
throws InterruptedException, UnsupportedInputTypeException, IOException {
throws InterruptedException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -437,11 +438,11 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_WHEN_mer

@Test
void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_merges_THEN_we_get_all_log_files()
throws InterruptedException, UnsupportedInputTypeException, IOException {
throws InterruptedException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -490,11 +491,11 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_m

@Test
void GIVEN_multiple_user_components_log_files_to_be_uploaded_with_all_config_WHEN_merger_merges_THEN_we_get_all_log_files()
throws InterruptedException, UnsupportedInputTypeException, IOException {
throws InterruptedException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -564,9 +565,9 @@ void GIVEN_null_config_WHEN_config_is_processed_THEN_no_component_config_is_adde
ignoreExceptionOfType(context1, MismatchedInputException.class);
Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics);
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
Topics logsUploaderConfigTopic = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null);
when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC))
.thenReturn(logsUploaderConfigTopic);
Expand All @@ -583,9 +584,9 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_
mockDefaultPersistedState();
LogFile processingFile = createLogFileWithSize(directoryPath.resolve("testlogs1.log").toUri(), 1061);
LogFile lastProcessedFile = createLogFileWithSize(directoryPath.resolve("testlogs2.log").toUri(), 2943);
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1000");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1000");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics);
Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null);
Expand Down Expand Up @@ -743,9 +744,9 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_
void GIVEN_some_system_files_uploaded_and_another_partially_uploaded_WHEN_merger_merges_THEN_sets_the_start_position_correctly()
throws InterruptedException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -800,9 +801,9 @@ void GIVEN_some_system_files_uploaded_and_another_partially_uploaded_WHEN_merger
void GIVEN_a_partially_uploaded_file_but_rotated_WHEN_merger_merges_THEN_sets_the_start_position_correctly()
throws InterruptedException, IOException {
mockDefaultPersistedState();
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt());

Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
Expand Down Expand Up @@ -851,9 +852,9 @@ void GIVEN_a_partially_uploaded_file_but_rotated_WHEN_merger_merges_THEN_sets_th
@Test
void GIVEN_persisted_data_WHEN_log_uploader_initialises_THEN_correctly_sets_the_persisted_data() throws
IOException {
Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))
.thenReturn(periodicUpdateIntervalMsTopic);
Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3");
when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)))
.thenReturn(periodicUpdateIntervalSecTopic);
Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null);
when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics);
Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null);
Expand Down

0 comments on commit ed24ea1

Please sign in to comment.