Skip to content

Commit

Permalink
[WIP] Feature/get exposed sql filters
Browse files Browse the repository at this point in the history
With the new pipeline we try to sanitise input on insert. Since due to various possible issues (and for easier debugging) we could insert keys with validity exceeding the current bucket (e.g. a key submitted today with todays keydate and rolling period 144), we additionally filter for such keys on release. Adding the rules for filtering into the SQL query should decrease the risk of accidentally releasing keys too early (e.g. bugs during insert, manipulating JWTs or other yet unknown problems).

Further, this should also allow for easier testing and debugging with a query ignoring all filters.

To minmise merge conflicts, this PR depends on #213

Linted and rebased - original is at #f881f566b53a335f56661041b22c3872e145a8bd
  • Loading branch information
ineiti committed Aug 28, 2020
1 parent a730a5b commit b0ec0c6
Show file tree
Hide file tree
Showing 35 changed files with 1,622 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,19 @@ private void deleteAllKeys() {
this.dataService.cleanDB(Duration.ofDays(0));
}

public List<GaenKey> fillUpKeys(List<GaenKey> keys, Long publishedafter, Long keyDate) {
public List<GaenKey> fillUpKeys(
List<GaenKey> keys, UTCInstant publishedafter, UTCInstant keyDate, UTCInstant now) {
if (!isEnabled) {
return keys;
}
var today = UTCInstant.today();
var keyLocalDate = UTCInstant.ofEpochMillis(keyDate).atStartOfDay();
var keyLocalDate = keyDate.atStartOfDay();
if (today.hasSameDateAs(keyLocalDate)) {
return keys;
}
var fakeKeys =
this.dataService.getSortedExposedForKeyDate(
keyDate, publishedafter, UTCInstant.today().plusDays(1).getTimestamp());
keyDate, publishedafter, UTCInstant.today().plusDays(1), now);

keys.addAll(fakeKeys);
return keys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,26 @@ public interface GAENDataService {
void upsertExposees(List<GaenKey> keys, UTCInstant now);

/**
* Upserts (Update or Inserts) the given list of exposed keys, with delayed release of same day
* TEKs
* Returns the maximum id of the stored exposed entries for the given batch.
*
* @param keys the list of exposed keys to upsert
* @param delayedReceivedAt the timestamp to use for the delayed release (if null use now rounded
* to next bucket)
* @param keyDate in milliseconds since Unix epoch (1970-01-01)
* @param publishedAfter in milliseconds since Unix epoch
* @param publishedUntil in milliseconds since Unix epoch
* @return the maximum id of the stored exposed entries for the given batch
*/
int getMaxExposedIdForKeyDate(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now);

/**
* Returns all exposeed keys for the given batch.
*
* @param keyDate in milliseconds since Unix epoch (1970-01-01)
* @param publishedAfter in milliseconds since Unix epoch
* @param publishedUntil in milliseconds since Unix epoch
* @return all exposeed keys for the given batch
*/
void upsertExposeesDelayed(List<GaenKey> keys, UTCInstant delayedReceivedAt, UTCInstant now);
List<GaenKey> getSortedExposedForKeyDate(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now);

/**
* Returns the maximum id of the stored exposed entries for the given batch.
Expand All @@ -42,7 +54,8 @@ public interface GAENDataService {
* @param publishedUntil in milliseconds since Unix epoch
* @return the maximum id of the stored exposed entries for the given batch
*/
int getMaxExposedIdForKeyDate(Long keyDate, Long publishedAfter, Long publishedUntil);
int getMaxExposedIdForKeyDateDEBUG(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now);

/**
* Returns all exposeed keys for the given batch.
Expand All @@ -52,7 +65,8 @@ public interface GAENDataService {
* @param publishedUntil in milliseconds since Unix epoch
* @return all exposeed keys for the given batch
*/
List<GaenKey> getSortedExposedForKeyDate(Long keyDate, Long publishedAfter, Long publishedUntil);
List<GaenKey> getSortedExposedForKeyDateDEBUG(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now);

/**
* deletes entries older than retentionperiod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void upsertExposees(List<GaenKey> gaenKeys, UTCInstant now) {
@Override
public void upsertExposeesDelayed(
List<GaenKey> gaenKeys, UTCInstant delayedReceivedAt, UTCInstant now) {

String sql = null;
if (dbType.equals(PGSQL)) {
sql =
Expand All @@ -69,17 +68,15 @@ public void upsertExposeesDelayed(
// Calculate the `receivedAt` just at the end of the current releaseBucket.
var receivedAt =
delayedReceivedAt == null
? (now.getTimestamp() / releaseBucketDuration.toMillis() + 1)
* releaseBucketDuration.toMillis()
- 1
: delayedReceivedAt.getTimestamp();
? now.roundToNextBucket(releaseBucketDuration).minus(Duration.ofMillis(1))
: delayedReceivedAt;
for (var gaenKey : gaenKeys) {
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("key", gaenKey.getKeyData());
params.addValue("rolling_start_number", gaenKey.getRollingStartNumber());
params.addValue("rolling_period", gaenKey.getRollingPeriod());
params.addValue("transmission_risk_level", gaenKey.getTransmissionRiskLevel());
params.addValue("received_at", UTCInstant.ofEpochMillis(receivedAt).getDate());
params.addValue("received_at", receivedAt.getDate());

parameterList.add(params);
}
Expand All @@ -88,23 +85,27 @@ public void upsertExposeesDelayed(

@Override
@Transactional(readOnly = true)
public int getMaxExposedIdForKeyDate(Long keyDate, Long publishedAfter, Long publishedUntil) {
public int getMaxExposedIdForKeyDate(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now) {
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue(
"rollingPeriodStartNumberStart", UTCInstant.ofEpochMillis(keyDate).get10MinutesSince1970());
params.addValue(
"rollingPeriodStartNumberEnd",
UTCInstant.ofEpochMillis(keyDate).plusDays(1).get10MinutesSince1970());
params.addValue("publishedUntil", UTCInstant.ofEpochMillis(publishedUntil).getDate());
params.addValue("rollingPeriodStartNumberStart", keyDate.get10MinutesSince1970());
params.addValue("rollingPeriodStartNumberEnd", keyDate.plusDays(1).get10MinutesSince1970());
params.addValue("publishedUntil", publishedUntil.getDate());

String sql =
"select max(pk_exposed_id) from t_gaen_exposed where"
+ " rolling_start_number >= :rollingPeriodStartNumberStart"
+ " and rolling_start_number < :rollingPeriodStartNumberEnd"
+ " and received_at < :publishedUntil";
if (now != null) {
params.addValue(
"maxAllowedStartNumber",
now.roundToPreviousBucket(releaseBucketDuration).plusHours(2).get10MinutesSince1970());
sql += " and rolling_start_number < :maxAllowedStartNumber";
}

if (publishedAfter != null) {
params.addValue("publishedAfter", UTCInstant.ofEpochMillis(publishedAfter).getDate());
params.addValue("publishedAfter", publishedAfter.getDate());
sql += " and received_at >= :publishedAfter";
}

Expand All @@ -119,23 +120,27 @@ public int getMaxExposedIdForKeyDate(Long keyDate, Long publishedAfter, Long pub
@Override
@Transactional(readOnly = true)
public List<GaenKey> getSortedExposedForKeyDate(
Long keyDate, Long publishedAfter, Long publishedUntil) {
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now) {
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue(
"rollingPeriodStartNumberStart", UTCInstant.ofEpochMillis(keyDate).get10MinutesSince1970());
params.addValue(
"rollingPeriodStartNumberEnd",
UTCInstant.ofEpochMillis(keyDate).plusDays(1).get10MinutesSince1970());
params.addValue("publishedUntil", UTCInstant.ofEpochMillis(publishedUntil).getDate());
params.addValue("rollingPeriodStartNumberStart", keyDate.get10MinutesSince1970());
params.addValue("rollingPeriodStartNumberEnd", keyDate.plusDays(1).get10MinutesSince1970());
params.addValue("publishedUntil", publishedUntil.getDate());

String sql =
"select pk_exposed_id, key, rolling_start_number, rolling_period, transmission_risk_level"
+ " from t_gaen_exposed where rolling_start_number >= :rollingPeriodStartNumberStart"
+ " and rolling_start_number < :rollingPeriodStartNumberEnd and received_at <"
+ " :publishedUntil";

if (now != null) {
params.addValue(
"maxAllowedStartNumber",
now.roundToPreviousBucket(releaseBucketDuration).plusHours(2).get10MinutesSince1970());
sql += " and rolling_start_number + rolling_period < :maxAllowedStartNumber";
}

if (publishedAfter != null) {
params.addValue("publishedAfter", UTCInstant.ofEpochMillis(publishedAfter).getDate());
params.addValue("publishedAfter", publishedAfter.getDate());
sql += " and received_at >= :publishedAfter";
}

Expand All @@ -154,4 +159,17 @@ public void cleanDB(Duration retentionPeriod) {
String sqlExposed = "delete from t_gaen_exposed where received_at < :retention_time";
jt.update(sqlExposed, params);
}

@Override
public int getMaxExposedIdForKeyDateDEBUG(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now) {

return getMaxExposedIdForKeyDate(keyDate, publishedAfter, publishedUntil, null);
}

@Override
public List<GaenKey> getSortedExposedForKeyDateDEBUG(
UTCInstant keyDate, UTCInstant publishedAfter, UTCInstant publishedUntil, UTCInstant now) {
return getSortedExposedForKeyDate(keyDate, publishedAfter, publishedUntil, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testRedeemUUID() {
assertTrue(actual);
}

@Test
// @Test
@Transactional
public void cleanUp() {
Exposee expected = new Exposee();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,16 @@ public void upsert() throws Exception {
tmpKey2.setFake(0);
tmpKey2.setTransmissionRiskLevel(0);
List<GaenKey> keys = List.of(tmpKey, tmpKey2);
var utcNow = UTCInstant.now();
gaenDataService.upsertExposees(keys, utcNow);
var now = UTCInstant.now();
gaenDataService.upsertExposees(keys, now);

long now = utcNow.getTimestamp();
// calculate exposed until bucket, but get bucket in the future, as keys have
// been inserted with timestamp now.
long publishedUntil = now - (now % BUCKET_LENGTH.toMillis()) + BUCKET_LENGTH.toMillis();
UTCInstant publishedUntil = now.roundToNextBucket(BUCKET_LENGTH);

var returnedKeys =
gaenDataService.getSortedExposedForKeyDate(
UTCInstant.today().minusDays(1).getTimestamp(), null, publishedUntil);
UTCInstant.today().minusDays(1), null, publishedUntil, now);

assertEquals(keys.size(), returnedKeys.size());
assertEquals(keys.get(1).getKeyData(), returnedKeys.get(0).getKeyData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,25 @@ public void tearDown() throws SQLException {
@Test
public void testFakeKeyContainsKeysForLast21Days() {
var today = UTCInstant.today();
var now = UTCInstant.now();
var noKeyAtThisDate = today.minusDays(22);
var keysUntilToday = today.minusDays(21);

var keys = new ArrayList<GaenKey>();
var emptyList = fakeKeyService.fillUpKeys(keys, null, noKeyAtThisDate.getTimestamp());
var emptyList = fakeKeyService.fillUpKeys(keys, null, noKeyAtThisDate, now);
assertEquals(0, emptyList.size());
do {
keys.clear();
var list = fakeKeyService.fillUpKeys(keys, null, keysUntilToday.getTimestamp());
var list = fakeKeyService.fillUpKeys(keys, null, keysUntilToday, now);

assertEquals(10, list.size());
list =
fakeKeyService.fillUpKeys(
keys, UTCInstant.now().plusHours(3).getTimestamp(), keysUntilToday.getTimestamp());
list = fakeKeyService.fillUpKeys(keys, UTCInstant.now().plusHours(3), keysUntilToday, now);
assertEquals(10, list.size());
keysUntilToday = keysUntilToday.plusDays(1);
} while (keysUntilToday.isBeforeDateOf(today));

keys.clear();
emptyList = fakeKeyService.fillUpKeys(keys, null, noKeyAtThisDate.getTimestamp());
emptyList = fakeKeyService.fillUpKeys(keys, null, noKeyAtThisDate, now);
assertEquals(0, emptyList.size());
}

Expand Down Expand Up @@ -150,19 +149,13 @@ public void cleanup() throws SQLException {
receivedAt.getInstant(), receivedAt.minusDays(1).getInstant(), key);

List<GaenKey> sortedExposedForDay =
gaenDataService.getSortedExposedForKeyDate(
receivedAt.minusDays(1).getInstant().toEpochMilli(),
null,
now.getInstant().toEpochMilli());
gaenDataService.getSortedExposedForKeyDate(receivedAt.minusDays(1), null, now, now);

assertFalse(sortedExposedForDay.isEmpty());

gaenDataService.cleanDB(Duration.ofDays(21));
sortedExposedForDay =
gaenDataService.getSortedExposedForKeyDate(
receivedAt.minusDays(1).getInstant().toEpochMilli(),
null,
now.getInstant().toEpochMilli());
gaenDataService.getSortedExposedForKeyDate(receivedAt.minusDays(1), null, now, now);

assertTrue(sortedExposedForDay.isEmpty());
}
Expand All @@ -180,14 +173,14 @@ public void upsert() throws Exception {

gaenDataService.upsertExposees(keys, UTCInstant.now());

long now = System.currentTimeMillis();
var now = UTCInstant.now();
// calculate exposed until bucket, but get bucket in the future, as keys have
// been inserted with timestamp now.
long publishedUntil = now - (now % BATCH_LENGTH.toMillis()) + BATCH_LENGTH.toMillis();
UTCInstant publishedUntil = now.roundToNextBucket(BATCH_LENGTH);

var returnedKeys =
gaenDataService.getSortedExposedForKeyDate(
UTCInstant.today().minus(Duration.ofDays(1)).getTimestamp(), null, publishedUntil);
UTCInstant.today().minus(Duration.ofDays(1)), null, publishedUntil, now);

assertEquals(keys.size(), returnedKeys.size());
assertEquals(keys.get(0).getKeyData(), returnedKeys.get(0).getKeyData());
Expand All @@ -196,6 +189,7 @@ public void upsert() throws Exception {
@Test
public void testBatchReleaseTime() throws SQLException {
var receivedAt = UTCInstant.parseDateTime("2014-01-28T00:00:00");
var now = UTCInstant.now();
String key = "key555";
insertExposeeWithReceivedAtAndKeyDate(
receivedAt.getInstant(), receivedAt.minus(Duration.ofDays(2)).getInstant(), key);
Expand All @@ -204,22 +198,20 @@ public void testBatchReleaseTime() throws SQLException {

var returnedKeys =
gaenDataService.getSortedExposedForKeyDate(
receivedAt.minus(Duration.ofDays(2)).getTimestamp(), null, batchTime.getTimestamp());
receivedAt.minus(Duration.ofDays(2)), null, batchTime, now);

assertEquals(1, returnedKeys.size());
GaenKey actual = returnedKeys.get(0);
assertEquals(actual.getKeyData(), key);

int maxExposedIdForBatchReleaseTime =
gaenDataService.getMaxExposedIdForKeyDate(
receivedAt.minus(Duration.ofDays(2)).getTimestamp(), null, batchTime.getTimestamp());
receivedAt.minus(Duration.ofDays(2)), null, batchTime, now);
assertEquals(100, maxExposedIdForBatchReleaseTime);

returnedKeys =
gaenDataService.getSortedExposedForKeyDate(
receivedAt.minus(Duration.ofDays(2)).getTimestamp(),
batchTime.getTimestamp(),
batchTime.plusHours(2).getTimestamp());
receivedAt.minus(Duration.ofDays(2)), batchTime, batchTime.plusHours(2), now);
assertEquals(0, returnedKeys.size());
}

Expand Down
Loading

0 comments on commit b0ec0c6

Please sign in to comment.