Skip to content

Commit

Permalink
feat: add WS station fetcher (#135)
Browse files Browse the repository at this point in the history
* refactor(station): replace timed http request with websockets

* feat(station): send InitMessage on open connection

* refactor: Use jakarta websockets

* feat: re-add http fetcher & move WS to own package

* fix(ws-fetcher): catch client creation error

* feat(ws-fetcher): open WS after setting handlers

* feat(ws-fetcher): retrieve missing values for detections from DB & try to fix simplePositioner cursedness with threading

* fix(simplePositioner): make handle function synchronised

* fix(gradle): replace impl with jersey one
is more in line with what we use for our server impl

* fix(ws-fetcher): use lombok annotation

* fix(ws-fetcher): baton mac's to uppercase

* ingrease message size

---------

Co-authored-by: FKD13 <[email protected]>
  • Loading branch information
NuttyShrimp and FKD13 authored Apr 22, 2024
1 parent 29daae2 commit dfd4510
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 206 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ dependencies {
'org.eclipse.jetty.websocket:websocket-jetty-api:' + jettyVersion,
'org.eclipse.jetty.websocket:websocket-jetty-server:' + jettyVersion,
)

// Websocket client libs
compileOnly 'jakarta.websocket:jakarta.websocket-client-api:2.2.0-M1'
// Impl for jakarta websocket clients
implementation 'org.eclipse.jetty.websocket:websocket-jakarta-client:11.0.20'

// Database
implementation('com.h2database:h2:2.2.220')
implementation('org.postgresql:postgresql:42.7.3')
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/telraam/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import telraam.logic.lapper.slapper.Slapper;
import telraam.logic.positioner.Positioner;
import telraam.logic.positioner.simple.SimplePositioner;
import telraam.station.Fetcher;
import telraam.station.FetcherFactory;
import telraam.station.websocket.WebsocketFetcher;
import telraam.util.AcceptedLapsUtil;
import telraam.websocket.WebSocketConnection;

Expand Down Expand Up @@ -144,9 +145,10 @@ public void run(AppConfiguration configuration, Environment environment) {
positioners.add(new SimplePositioner(this.database));

// Start fetch thread for each station
FetcherFactory fetcherFactory = new FetcherFactory(this.database, lappers, positioners);
StationDAO stationDAO = this.database.onDemand(StationDAO.class);
for (Station station : stationDAO.getAll()) {
new Thread(() -> new Fetcher(this.database, station, lappers, positioners).fetch()).start();
new Thread(() -> fetcherFactory.create(station).fetch()).start();
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/telraam/database/daos/DetectionDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id
@GetGeneratedKeys({"id"})
int insertAll(@BindBean List<Detection> detection);

@SqlBatch("""
INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id, uptime_ms, timestamp_ingestion) \
VALUES (:stationId, (SELECT id FROM baton WHERE mac = :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion)
""")
@GetGeneratedKeys({"id", "baton_id"})
@RegisterBeanMapper(Detection.class)
List<Detection> insertAllWithoutBaton(@BindBean List<Detection> detection, @Bind("batonMac") List<String> batonMac);

@SqlQuery("SELECT * FROM detection WHERE id = :id")
@RegisterBeanMapper(Detection.class)
Optional<Detection> getById(@Bind("id") int id);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/telraam/database/models/Detection.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import java.sql.Timestamp;

@Setter @Getter @NoArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Detection {
private Integer id;
private Integer batonId;
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/telraam/database/models/Team.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter @Setter @NoArgsConstructor
import java.util.Objects;

@Getter
@Setter
@NoArgsConstructor
public class Team {
private Integer id;
private String name;
Expand All @@ -19,4 +23,8 @@ public Team(String name, int batonId) {
this.name = name;
this.batonId = batonId;
}

public boolean equals(Team obj) {
return Objects.equals(id, obj.getId());
}
}
35 changes: 17 additions & 18 deletions src/main/java/telraam/logic/positioner/simple/SimplePositioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@
import telraam.logic.positioner.PositionSender;
import telraam.logic.positioner.Positioner;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SimplePositioner implements Positioner {
private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName());
private final int QUEUE_SIZE = 50;
private final int MIN_RSSI = -85;
private final int DEBOUNCE_TIMEOUT = 1;
private boolean debounceScheduled;
private final ScheduledExecutorService scheduler;
private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName());
private final PositionSender positionSender;
private final Map<Integer, Team> batonIdToTeam;
private final Map<Team, CircularQueue<Detection>> teamDetections;
private final Map<Integer, CircularQueue<Detection>> teamDetections;
private final List<Integer> stations;
private final Map<Team, Position> teamPositions;
private final Map<Integer, Position> teamPositions;

public SimplePositioner(Jdbi jdbi) {
this.debounceScheduled = false;
Expand All @@ -45,14 +45,14 @@ public SimplePositioner(Jdbi jdbi) {

TeamDAO teamDAO = jdbi.onDemand(TeamDAO.class);
List<Team> teams = teamDAO.getAll();
for (Team team: teams) {
teamDetections.put(team, new CircularQueue<>(QUEUE_SIZE));
teamPositions.put(team, new Position(team.getId()));
for (Team team : teams) {
teamDetections.put(team.getId(), new CircularQueue<>(QUEUE_SIZE));
teamPositions.put(team.getId(), new Position(team.getId()));
}
List<BatonSwitchover> switchovers = jdbi.onDemand(BatonSwitchoverDAO.class).getAll();
switchovers.sort(Comparator.comparing(BatonSwitchover::getTimestamp));

for (BatonSwitchover switchover: switchovers) {
for (BatonSwitchover switchover : switchovers) {
batonIdToTeam.put(switchover.getNewBatonId(), teamDAO.getById(switchover.getTeamId()).get());
}

Expand All @@ -63,13 +63,13 @@ public SimplePositioner(Jdbi jdbi) {

public void calculatePositions() {
logger.info("SimplePositioner: Calculating positions...");
for (Map.Entry<Team, CircularQueue<Detection>> entry: teamDetections.entrySet()) {
for (Map.Entry<Integer, CircularQueue<Detection>> entry : teamDetections.entrySet()) {
List<Detection> detections = teamDetections.get(entry.getKey());
detections.sort(Comparator.comparing(Detection::getTimestamp));

int currentStationRssi = MIN_RSSI;
int currentStationPosition = 0;
for (Detection detection: detections) {
for (Detection detection : detections) {
if (detection.getRssi() > currentStationRssi) {
currentStationRssi = detection.getRssi();
currentStationPosition = detection.getStationId();
Expand All @@ -84,21 +84,20 @@ public void calculatePositions() {
logger.info("SimplePositioner: Done calculating positions");
}

public void handle(Detection detection) {
public synchronized void handle(Detection detection) {
Team team = batonIdToTeam.get(detection.getBatonId());
teamDetections.get(team).add(detection);
teamDetections.get(team.getId()).add(detection);

if (! debounceScheduled) {
if (!debounceScheduled) {
debounceScheduled = true;
scheduler.schedule(() -> {
try {
calculatePositions();
} catch (Exception e) {
logger.severe(e.getMessage());
logger.log(Level.SEVERE, e.getMessage(), e);
}
debounceScheduled = false;
}, DEBOUNCE_TIMEOUT, TimeUnit.SECONDS);
}
}

}
182 changes: 7 additions & 175 deletions src/main/java/telraam/station/Fetcher.java
Original file line number Diff line number Diff line change
@@ -1,182 +1,14 @@
package telraam.station;

import org.jdbi.v3.core.Jdbi;
import telraam.database.daos.BatonDAO;
import telraam.database.daos.DetectionDAO;
import telraam.database.daos.StationDAO;
import telraam.database.models.Baton;
import telraam.database.models.Detection;
import telraam.database.models.Station;
import telraam.logic.lapper.Lapper;
import telraam.logic.positioner.Positioner;
import telraam.station.models.RonnyDetection;
import telraam.station.models.RonnyResponse;

import java.io.IOException;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class Fetcher {
private final Set<Lapper> lappers;
private final Set<Positioner> positioners;
private Station station;

private final BatonDAO batonDAO;
private final DetectionDAO detectionDAO;
private final StationDAO stationDAO;

private final HttpClient client = HttpClient.newHttpClient();
private final Logger logger = Logger.getLogger(Fetcher.class.getName());

public interface Fetcher {
//Timeout to wait for before sending the next request after an error.
private final static int ERROR_TIMEOUT_MS = 2000;
int ERROR_TIMEOUT_MS = 2000;
//Timeout for a request to a station.
private final static int REQUEST_TIMEOUT_S = 10;
int REQUEST_TIMEOUT_S = 10;
//Full batch size, if this number of detections is reached, more are probably available immediately.
private final static int FULL_BATCH_SIZE = 1000;
int FULL_BATCH_SIZE = 1000;
//Timeout when result has less than a full batch of detections.
private final static int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds


public Fetcher(Jdbi database, Station station, Set<Lapper> lappers, Set<Positioner> positioners) {
this.batonDAO = database.onDemand(BatonDAO.class);
this.detectionDAO = database.onDemand(DetectionDAO.class);
this.stationDAO = database.onDemand(StationDAO.class);

this.lappers = lappers;
this.positioners = positioners;
this.station = station;
}

public void fetch() {
logger.info("Running Fetcher for station(" + this.station.getId() + ")");
JsonBodyHandler<RonnyResponse> bodyHandler = new JsonBodyHandler<>(RonnyResponse.class);

while (true) {
//Update the station to account for possible changes in the database
this.stationDAO.getById(station.getId()).ifPresentOrElse(
station -> this.station = station,
() -> this.logger.severe("Can't update station from database.")
);

//Get last detection id
int lastDetectionId = 0;
Optional<Detection> lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId());
if (lastDetection.isPresent()) {
lastDetectionId = lastDetection.get().getRemoteId();
}

//Create URL
URI url;
try {
url = new URI(station.getUrl() + "/detections/" + lastDetectionId);
} catch (URISyntaxException ex) {
this.logger.severe(ex.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
continue;
}

//Create request
HttpRequest request;
try {
request = HttpRequest.newBuilder()
.uri(url)
.version(HttpClient.Version.HTTP_1_1)
.timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S))
.build();
} catch (IllegalArgumentException e) {
logger.severe(e.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException ex) {
logger.severe(ex.getMessage());
}
continue;
}

//Do request
HttpResponse<Supplier<RonnyResponse>> response;
try {
try {
response = this.client.send(request, bodyHandler);
} catch (ConnectException | HttpConnectTimeoutException ex) {
this.logger.severe("Could not connect to " + request.uri());
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
continue;
} catch (IOException e) {
logger.severe(e.getMessage());
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
continue;
}
} catch (InterruptedException e) {
logger.severe(e.getMessage());
continue;
}

//Check response state
if (response.statusCode() != 200) {
this.logger.warning(
"Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")"
);
continue;
}

//Fetch all batons and create a map by batonMAC
Map<String, Baton> baton_mac_map = batonDAO.getAll().stream()
.collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity()));

//Insert detections
List<Detection> new_detections = new ArrayList<>();
List<RonnyDetection> detections = response.body().get().detections;
for (RonnyDetection detection : detections) {
if (baton_mac_map.containsKey(detection.mac.toUpperCase())) {
var baton = baton_mac_map.get(detection.mac.toUpperCase());
new_detections.add(new Detection(
baton.getId(),
station.getId(),
detection.rssi,
detection.battery,
detection.uptimeMs,
detection.id,
new Timestamp((long) (detection.detectionTimestamp * 1000)),
new Timestamp(System.currentTimeMillis())
));
}
}
if (!new_detections.isEmpty()) {
detectionDAO.insertAll(new_detections);
new_detections.forEach((detection) -> {
lappers.forEach((lapper) -> lapper.handle(detection));
positioners.forEach((positioner) -> positioner.handle(detection));
});
}

this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size());
int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds

//If few detections are retrieved from the station, wait for some time.
if (detections.size() < Fetcher.FULL_BATCH_SIZE) {
try {
Thread.sleep(Fetcher.IDLE_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
}
}
}
}
void fetch();
}
Loading

0 comments on commit dfd4510

Please sign in to comment.