Skip to content

Commit

Permalink
feat: use single db connection (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 authored Nov 14, 2023
1 parent 22d7fef commit bc882d6
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.aws.greengrass.mqttclient.v5.QOS;
import com.aws.greengrass.mqttclient.v5.UserProperty;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import com.aws.greengrass.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -38,7 +39,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.nio.file.Files.deleteIfExists;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -66,8 +66,8 @@ void beforeEach() throws InterruptedException, IOException {
}

@AfterEach
void afterEach() throws IOException {
stopNucleus(true);
void afterEach() {
kernel.shutdown();
}

@Test
Expand Down Expand Up @@ -183,7 +183,7 @@ void GIVEN_persistence_spool_plugin_with_messages_WHEN_nucleus_restarts_THEN_mes
spooler.addMessage(publishRequestFromPayload(payload3));

// restart nucleus
stopNucleus(false);
kernel.shutdown();
startNucleus();

// Read messages
Expand Down Expand Up @@ -243,8 +243,7 @@ void GIVEN_disk_spool_plugin_WHEN_operation_fails_with_database_corruption_THEN_

// Corrupt Database
try (RandomAccessFile f = new RandomAccessFile(spoolerDatabaseFile.toFile(), "rw")) {
f.seek(100);
f.writeBytes("Garbage");
f.writeBytes(Utils.generateRandomString(256));
}

// Fail to add second message
Expand All @@ -267,16 +266,6 @@ void startNucleus() throws InterruptedException, IOException {
spooler = new Spool(kernel.getContext().get(DeviceConfiguration.class), kernel);
}

void stopNucleus(boolean clearSpoolDb) throws IOException {
try {
if (clearSpoolDb) {
deleteIfExists(spoolerDatabaseFile);
}
} finally {
kernel.shutdown();
}
}

private void startKernelWithConfig() throws InterruptedException {
kernel = new Kernel();
CountDownLatch diskSpoolerRunning = new CountDownLatch(1);
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/com/aws/greengrass/disk/spool/DiskSpool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class DiskSpool extends PluginService implements CloudMessageSpool {

public static final String PERSISTENCE_SERVICE_NAME = "aws.greengrass.DiskSpooler";
private static final Logger logger = LogManager.getLogger(DiskSpool.class);
private static final String KV_MESSAGE_ID = "messageId";
private final DiskSpoolDAO dao;

@Inject
Expand All @@ -41,12 +42,11 @@ public SpoolMessage getMessageById(long id) {
return dao.getSpoolMessageById(id);
} catch (SQLException e) {
logger.atError()
.kv("messageId", id)
.kv(KV_MESSAGE_ID, id)
.cause(e)
.log("Failed to retrieve message by messageId");
return null;
}

}


Expand All @@ -58,10 +58,10 @@ public SpoolMessage getMessageById(long id) {
public void removeMessageById(long id) {
try {
dao.removeSpoolMessageById(id);
logger.atTrace().kv("MessageId", id).log("Removed message from Disk Spooler");
logger.atTrace().kv(KV_MESSAGE_ID, id).log("Removed message from Disk Spooler");
} catch (SQLException e) {
logger.atWarn()
.kv("messageId", id)
.kv(KV_MESSAGE_ID, id)
.cause(e)
.log("Failed to delete message by messageId");
}
Expand All @@ -76,7 +76,7 @@ public void removeMessageById(long id) {
public void add(long id, SpoolMessage message) throws IOException {
try {
dao.insertSpoolMessage(message);
logger.atTrace().kv("MessageId", id).log("Added message to Disk Spooler");
logger.atTrace().kv(KV_MESSAGE_ID, id).log("Added message to Disk Spooler");
} catch (SQLException e) {
throw new IOException(e);
}
Expand All @@ -94,10 +94,17 @@ public Iterable<Long> getAllMessageIds() throws IOException {
@Override
public void initializeSpooler() throws IOException {
try {
dao.initialize();
dao.setUpDatabase();
logger.atInfo().log("Finished setting up Database");
} catch (SQLException exception) {
throw new IOException(exception);
} catch (SQLException e) {
throw new IOException(e);
}
}

@Override
protected void shutdown() throws InterruptedException {
super.shutdown();
dao.close();
}
}
Loading

0 comments on commit bc882d6

Please sign in to comment.