Skip to content

Commit

Permalink
perf(MonitorAllTripJob): Fetch only all trip ids instead of all trips.
Browse files Browse the repository at this point in the history
  • Loading branch information
binh-dam-ibigroup committed Nov 30, 2023
1 parent a1e9352 commit cf944be
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ MonitoredTrip preCreateHook(MonitoredTrip monitoredTrip, Request req) {
@Override
MonitoredTrip postCreateHook(MonitoredTrip monitoredTrip, Request req) {
try {
MonitoredTripLocks.lock(monitoredTrip);
MonitoredTripLocks.lock(monitoredTrip.id);
return runCheckMonitoredTrip(monitoredTrip);
} catch (Exception e) {
// FIXME: an error happened while checking the trip, but the trip was saved to the DB, so return the raw
// trip as it was saved in the db?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down Expand Up @@ -170,7 +170,7 @@ MonitoredTrip preUpdateHook(MonitoredTrip monitoredTrip, MonitoredTrip preExisti
// the raw trip as it was saved in the db before the check monitored trip job ran?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.opentripplanner.middleware.tripmonitor.jobs;

import org.opentripplanner.middleware.models.MonitoredTrip;
import com.mongodb.BasicDBObject;
import org.opentripplanner.middleware.persistence.Persistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -30,8 +30,8 @@ public void run() {
LOG.info("MonitorAllTripsJob started");
// analyze all trips

// create a blocking queue of monitored trips to process
BlockingQueue<MonitoredTrip> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);
// create a blocking queue of monitored trip IDs to process
BlockingQueue<String> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);

// create an Atomic Boolean for TripAnalyzer threads to check whether the queue is actually depleted
AtomicBoolean queueDepleted = new AtomicBoolean();
Expand All @@ -47,11 +47,13 @@ public void run() {
}

try {
// request all monitored trips from the mongo collection
for (MonitoredTrip monitoredTrip : Persistence.monitoredTrips.getAll()) {
// attempt to add trip to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// request all monitored trips from the Mongo collection
// TODO: Filter out trips that would be skipped by the CheckMonitoredTrip.
BasicDBObject tripFilter = new BasicDBObject();
for (String tripId : Persistence.monitoredTrips.getDistinctFieldValues("_id", tripFilter, String.class)) {
// attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// exceeded, an InterruptedException is throw.
tripAnalysisQueue.offer(monitoredTrip, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

// wait for queue to deplete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ public class MonitoredTripLocks {
/** the amount of time in milliseconds to wait to check if a lock has been released */
private static final int LOCK_CHECK_WAIT_MILLIS = 500;

private static final ConcurrentHashMap<MonitoredTrip, Boolean> locks = new ConcurrentHashMap();
private static final ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap();

/**
* Locks the given MonitoredTrip
*/
public static void lock(MonitoredTrip trip) {
locks.put(trip, true);
public static void lock(String tripId) {
locks.put(tripId, true);
}

/**
* Removes a lock for a given MonitoredTrip
*/
public static void unlock(MonitoredTrip trip) {
locks.remove(trip);
public static void unlock(String tripId) {
locks.remove(tripId);
}

/**
* Returns true if a lock exists for the given MonitoredTrip
*/
public static boolean isLocked(MonitoredTrip trip) {
return locks.containsKey(trip);
public static boolean isLocked(String tripId) {
return locks.containsKey(tripId);
}

/**
Expand All @@ -48,7 +48,7 @@ public static boolean isLocked(MonitoredTrip trip) {
public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) {
// Wait for any existing CheckMonitoredTrip jobs to complete before proceeding
String busyMessage = "A trip monitor check prevented the trip from being updated. Please try again in a moment.";
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
int timeWaitedMillis = 0;
do {
try {
Expand All @@ -59,17 +59,17 @@ public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req)
timeWaitedMillis += LOCK_CHECK_WAIT_MILLIS;

// if the lock has been released, exit this wait loop
if (!isLocked(monitoredTrip)) break;
if (!isLocked(monitoredTrip.id)) break;
} while (timeWaitedMillis <= MAX_UNLOCKING_WAIT_TIME_MILLIS);
}

// If a lock still exists, prevent the update
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
logMessageAndHalt(req, HttpStatus.INTERNAL_SERVER_ERROR_500, busyMessage);
return;
}

// lock the trip so that the a CheckMonitoredTrip job won't concurrently analyze/update the trip.
lock(monitoredTrip);
lock(monitoredTrip.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public class TripAnalyzer implements Runnable {
private final int BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS = 250;

private final AtomicBoolean analyzerIsIdle;
private final BlockingQueue<MonitoredTrip> tripAnalysisQueue;
private final BlockingQueue<String> tripAnalysisQueue;
private final AtomicBoolean queueDepleted;

public TripAnalyzer(
BlockingQueue<MonitoredTrip> tripAnalysisQueue,
BlockingQueue<String> tripAnalysisQueue,
AtomicBoolean queueDepleted,
AtomicBoolean analyzerIsIdle
) {
Expand All @@ -34,10 +34,10 @@ public void run() {
while (!queueDepleted.get()) {
analyzerIsIdle.set(false);

// get the next monitored trip from the queue
MonitoredTrip trip;
// get the next monitored trip ID from the queue
String tripId;
try {
trip = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
tripId = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("TripAnalyzer thread interrupted");
e.printStackTrace();
Expand All @@ -48,23 +48,22 @@ public void run() {

// The implementation of the ArrayBlockingQueue can result in null items being returned if the wait is
// exceeded on an empty queue. Therefore, check if the trip is null and if so, wait and then continue.
if (trip == null) {
if (tripId == null) {
Thread.sleep(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS);
analyzerIsIdle.set(true);
continue;
}

// verify that a lock hasn't been placed on trip by another trip analyzer task
if (MonitoredTripLocks.isLocked(trip)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", trip);
if (MonitoredTripLocks.isLocked(tripId)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", tripId);
analyzerIsIdle.set(true);
continue;
}

// Refetch the trip from the database. This is to ensure the trip has any updates made to the trip
// between when the trip was placed in the analysis queue and the current time.
String tripId = trip.id;
trip = Persistence.monitoredTrips.getById(tripId);
MonitoredTrip trip = Persistence.monitoredTrips.getById(tripId);
if (trip == null) {
// trip was deleted between the time when it was placed in the queue and the current time. Don't
// analyze the trip.
Expand All @@ -76,7 +75,7 @@ public void run() {
LOG.info("Analyzing trip {}", tripId);

// place lock on trip
MonitoredTripLocks.lock(trip);
MonitoredTripLocks.lock(tripId);

/////// BEGIN TRIP ANALYSIS
try {
Expand All @@ -88,7 +87,7 @@ public void run() {
LOG.info("Finished analyzing trip {}", tripId);

// remove lock on trip
MonitoredTripLocks.unlock(trip);
MonitoredTripLocks.unlock(tripId);

analyzerIsIdle.set(true);
}
Expand Down

0 comments on commit cf944be

Please sign in to comment.