Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing remaining database triggers #2230

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ WHERE ds.id IN (SELECT id FROM ids)
LEFT JOIN label_values lv ON dataset.id = lv.dataset_id
LEFT JOIN label ON label.id = label_id
""";

//@formatter:on

@Inject
EntityManager em;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,34 @@ public class RunServiceImpl implements RunService {

//@formatter:off
private static final String FIND_AUTOCOMPLETE = """
SELECT * FROM (
SELECT DISTINCT jsonb_object_keys(q) AS key
FROM run, jsonb_path_query(run.data, ? ::jsonpath) q
WHERE jsonb_typeof(q) = 'object') AS keys
WHERE keys.key LIKE CONCAT(?, '%');
""";
protected static final String FIND_RUNS_WITH_URI = """
SELECT id, testid
FROM run
WHERE NOT trashed
AND (data->>'$schema' = ?1
OR (CASE
WHEN jsonb_typeof(data) = 'object' THEN ?1 IN (SELECT values.value->>'$schema' FROM jsonb_each(data) as values)
WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)->>'$schema')
ELSE false
END)
OR (metadata IS NOT NULL AND ?1 IN (SELECT jsonb_array_elements(metadata)->>'$schema'))
)
""";
SELECT * FROM (
SELECT DISTINCT jsonb_object_keys(q) AS key
FROM run, jsonb_path_query(run.data, ? ::jsonpath) q
WHERE jsonb_typeof(q) = 'object') AS keys
WHERE keys.key LIKE CONCAT(?, '%');
""";
private static final String FIND_RUNS_WITH_URI = """
SELECT id, testid
FROM run
WHERE NOT trashed
AND (data->>'$schema' = ?1
OR (CASE
WHEN jsonb_typeof(data) = 'object' THEN ?1 IN (SELECT values.value->>'$schema' FROM jsonb_each(data) as values)
WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)->>'$schema')
ELSE false
END)
OR (metadata IS NOT NULL AND ?1 IN (SELECT jsonb_array_elements(metadata)->>'$schema'))
)
""";

private static final String UPDATE_DATASET_SCHEMAS = """
WITH uris AS (
SELECT jsonb_array_elements(ds.data)->>'$schema' AS uri FROM dataset ds WHERE ds.id = ?1
), indexed as (
SELECT uri, row_number() over () - 1 as index FROM uris
) INSERT INTO dataset_schemas(dataset_id, uri, index, schema_id)
SELECT ?1 as dataset_id, indexed.uri, indexed.index, schema.id FROM indexed JOIN schema ON schema.uri = indexed.uri;
""";
//@formatter:on
private static final String[] CONDITION_SELECT_TERMINAL = { "==", "!=", "<>", "<", "<=", ">", ">=", " " };
private static final String CHANGE_ACCESS = "UPDATE run SET owner = ?, access = ? WHERE id = ?";
Expand Down Expand Up @@ -188,46 +197,71 @@ void onNewOrUpdatedSchema(int schemaId) {
log.errorf("Cannot process schema add/update: cannot load schema %d", schemaId);
return;
}
processNewOrUpdatedSchema(schema);
}

@Transactional
void processNewOrUpdatedSchema(SchemaDAO schema) {
// we don't have to care about races with new runs
clearRunAndDatasetSchemas(schemaId);
findRunsWithUri(schema.uri, (runId, testId) -> {
log.debugf("Recalculate Datasets for run %d - schema %d (%s) changed", runId, schema.id, schema.uri);
onNewOrUpdatedSchemaForRun(runId, schema.id);
});
}

void findRunsWithUri(String uri, BiConsumer<Integer, Integer> consumer) {
ScrollableResults<RunFromUri> results = session.createNativeQuery(FIND_RUNS_WITH_URI, Tuple.class).setParameter(1, uri)
try (ScrollableResults<RunFromUri> results = session.createNativeQuery(FIND_RUNS_WITH_URI, Tuple.class)
.setParameter(1, uri)
.setTupleTransformer((tuple, aliases) -> {
RunFromUri r = new RunFromUri();
r.id = (int) tuple[0];
r.testId = (int) tuple[1];
return r;
})
.setFetchSize(100)
.scroll(ScrollMode.FORWARD_ONLY);
while (results.next()) {
RunFromUri r = results.get();
consumer.accept(r.id, r.testId);
.scroll(ScrollMode.FORWARD_ONLY)) {
while (results.next()) {
RunFromUri r = results.get();
consumer.accept(r.id, r.testId);
}
}
}

/**
* Keep the run_schemas table up to date with the associated schemas
* If `recalculate` is true, trigger the run recalculation as well.
* This is not required when creating a new run as the datasets will be
* created automatically by the process, the recalculation is required when updating
* the Schema
* @param runId id of the run
* @param schemaId id of the schema
*/
@WithRoles(extras = Roles.HORREUM_SYSTEM)
@Transactional
void onNewOrUpdatedSchemaForRun(int runId, int schemaId) {
em.createNativeQuery("SELECT update_run_schemas(?1)::text").setParameter(1, runId).getSingleResult();
//clear validation error tables by schemaId
updateRunSchemas(runId);

// clear validation error tables by schemaId
em.createNativeQuery("DELETE FROM dataset_validationerrors WHERE schema_id = ?1")
.setParameter(1, schemaId).executeUpdate();
em.createNativeQuery("DELETE FROM run_validationerrors WHERE schema_id = ?1")
.setParameter(1, schemaId).executeUpdate();

Util.registerTxSynchronization(tm, txStatus -> mediator.queueRunRecalculation(runId));
// transform(runId, true);
}

@Transactional
void updateRunSchemas(int runId) {
em.createNativeQuery("SELECT update_run_schemas(?1)::text").setParameter(1, runId).getSingleResult();
}

@Transactional
public void updateDatasetSchemas(int datasetId) {
em.createNativeQuery(UPDATE_DATASET_SCHEMAS).setParameter(1, datasetId).executeUpdate();
}

@Transactional
void clearRunAndDatasetSchemas(int schemaId) {
// clear old run and dataset schemas associations
em.createNativeQuery("DELETE FROM run_schemas WHERE schemaid = ?1")
.setParameter(1, schemaId).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schemaId).executeUpdate();
}

@PermitAll
Expand Down Expand Up @@ -336,13 +370,13 @@ public JsonNode getMetadata(int id, String schemaUri) {
@Override
// TODO: it would be nicer to use @FormParams but fetchival on client side doesn't support that
public void updateAccess(int id, String owner, Access access) {
Query query = em.createNativeQuery(CHANGE_ACCESS);
query.setParameter(1, owner);
query.setParameter(2, access.ordinal());
query.setParameter(3, id);
if (query.executeUpdate() != 1) {
int updatedRecords = RunDAO.update("owner = ?1, access = ?2 WHERE id = ?3", owner, access, id);
if (updatedRecords != 1) {
throw ServiceException.serverError("Access change failed (missing permissions?)");
}

// propagate the same change to all datasets belonging to the run
DatasetDAO.update("owner = ?1, access = ?2 WHERE run.id = ?3", owner, access, id);
}

@RolesAllowed(Roles.UPLOADER)
Expand Down Expand Up @@ -670,6 +704,7 @@ public RunPersistence addAuthenticated(RunDAO run, TestDAO test) {
}
log.debugf("Upload flushed, run ID %d", run.id);

updateRunSchemas(run.id);
mediator.newRun(RunMapper.from(run));
List<Integer> datasetIds = transform(run.id, false);
if (mediator.testMode())
Expand Down Expand Up @@ -991,6 +1026,7 @@ private void trashInternal(int id, boolean trashed) {
run.trashed = false;
run.persistAndFlush();
transform(id, true);
updateRunSchemas(run.id);
} else
throw ServiceException.badRequest("Not possible to un-trash a run that's not referenced to a Test");
}
Expand All @@ -1017,7 +1053,8 @@ public void updateDescription(int id, String description) {
throw ServiceException.notFound("Run not found: " + id);
}
run.description = description;
run.persistAndFlush();
// propagate the same change to all datasets belonging to the run
DatasetDAO.update("description = ?1 WHERE run.id = ?2", description, run.id);
}

@RolesAllowed(Roles.TESTER)
Expand Down Expand Up @@ -1071,7 +1108,7 @@ public Map<Integer, String> updateSchema(int id, String path, String schemaUri)
.distinct()
.collect(
Collectors.toMap(
tuple -> ((Integer) tuple.get("key")).intValue(),
tuple -> (Integer) tuple.get("key"),
tuple -> ((String) tuple.get("value"))));

em.flush();
Expand Down Expand Up @@ -1377,6 +1414,9 @@ List<Integer> transform(int runId, boolean isRecalculation) {
*/
private Integer createDataset(DatasetDAO ds, boolean isRecalculation) {
ds.persistAndFlush();
// re-create the dataset_schemas associations
updateDatasetSchemas(ds.id);

if (isRecalculation) {
try {
Dataset.EventNew event = new Dataset.EventNew(DatasetMapper.from(ds), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
public class SchemaServiceImpl implements SchemaService {
private static final Logger log = Logger.getLogger(SchemaServiceImpl.class);

//@formatter:off
private static final String FETCH_SCHEMAS_RECURSIVE =
//@formatter:off
private static final String FETCH_SCHEMAS_RECURSIVE =
"""
WITH RECURSIVE refs(uri) AS
(
Expand All @@ -86,7 +86,7 @@ SELECT substring(jsonb_path_query(schema, '$.**.\"$ref\" ? (! (@ starts with \"#
FROM schema
INNER JOIN refs ON schema.uri = refs.uri
""";
//@formatter:on
//@formatter:on

private static final JsonSchemaFactory JSON_SCHEMA_FACTORY = new JsonSchemaFactory.Builder()
.defaultMetaSchemaIri(JsonMetaSchema.getV4().getIri())
Expand Down Expand Up @@ -160,13 +160,6 @@ public Integer add(Schema schemaDTO) {
em.flush();
if (!Objects.equals(schema.uri, existing.uri) ||
Objects.equals(schema.schema, existing.schema)) {
//We need to delete from run_schemas and dataset_schemas as they will be recreated
//when we create new datasets psql will still create new entries in dataset_schemas
// https://github.com/Hyperfoil/Horreum/blob/master/horreum-backend/src/main/resources/db/changeLog.xml#L2522
em.createNativeQuery("DELETE FROM run_schemas WHERE schemaid = ?1")
.setParameter(1, schema.id).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schema.id).executeUpdate();
newOrUpdatedSchema(schema);
}
} else {
Expand Down Expand Up @@ -710,7 +703,7 @@ public Integer addOrUpdateLabel(int schemaId, Label labelDTO) {
}
existing.name = label.name;

//When we clear extractors we should also delete label_values
// when we clear extractors we should also delete label_values
em.createNativeQuery(
"DELETE FROM dataset_view WHERE dataset_id IN (SELECT dataset_id FROM label_values WHERE label_id = ?1)")
.setParameter(1, existing.id).executeUpdate();
Expand Down Expand Up @@ -865,7 +858,7 @@ public void importSchema(ObjectNode node) {
}

boolean newSchema = true;
SchemaDAO schema = null;
SchemaDAO schema;
if (importSchema.id != null) {
//first check if this schema exists
schema = SchemaDAO.findById(importSchema.id);
Expand Down Expand Up @@ -917,6 +910,7 @@ public void importSchema(ObjectNode node) {
//let's wrap flush in a try/catch, if we get any role issues at commit we can give a sane msg
try {
em.flush();
newOrUpdatedSchema(schema);
} catch (Exception e) {
throw ServiceException.serverError("Failed to persist Schema: " + e.getMessage());
}
Expand Down
44 changes: 44 additions & 0 deletions horreum-backend/src/main/resources/db/changeLog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4796,4 +4796,48 @@
$$ LANGUAGE plpgsql;
</sql>
</changeSet>
<changeSet id="128" author="lampajr">
<validCheckSum>ANY</validCheckSum>
<sql>
-- drop triggers
DROP TRIGGER IF EXISTS rs_after_run_untrash ON run;
DROP TRIGGER IF EXISTS rs_after_run_update ON run;
DROP TRIGGER IF EXISTS before_schema_update ON schema;
DROP TRIGGER IF EXISTS ds_after_insert ON dataset;

-- drop functions
DROP FUNCTION rs_after_run_update;
DROP FUNCTION before_schema_update_func;
DROP FUNCTION ds_after_dataset_insert_func;
</sql>
</changeSet>
<changeSet id="129" author="lampajr">
<validCheckSum>ANY</validCheckSum>
<sql>
-- drop triggers
DROP TRIGGER IF EXISTS after_run_update_non_data ON run;
DROP TRIGGER IF EXISTS delete_run_validations ON run;

-- drop functions
DROP FUNCTION after_run_update_non_data_func;
DROP FUNCTION delete_run_validations;
</sql>
</changeSet>
<changeSet id="130" author="lampajr">
<validCheckSum>ANY</validCheckSum>
<sql>
-- drop triggers
DROP TRIGGER IF EXISTS lv_before_update ON label;
DROP TRIGGER IF EXISTS lv_after_update ON label;
DROP TRIGGER IF EXISTS recalc_labels ON label_recalc_queue;

-- drop functions
DROP FUNCTION lv_before_label_update_func;
DROP FUNCTION lv_after_label_update_func;
DROP FUNCTION recalc_label_values;

-- drop table as no longer used
DROP TABLE label_recalc_queue;
</sql>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void testMissingRules(TestInfo info) throws InterruptedException {
em.clear();

pollMissingDataRuleResultsByDataset(thirdEvent.datasetId, 1);
trashRun(thirdRunId, test.id);
trashRun(thirdRunId, test.id, true);
pollMissingDataRuleResultsByDataset(thirdEvent.datasetId, 0);

alertingService.checkMissingDataset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,6 @@ public static Test createExampleTest(String testName, Integer datastoreID) {
return test;
}

public static List<View> createExampleViews(int testId) {
View defaultView = new View();
defaultView.name = "Default";
defaultView.testId = testId;
defaultView.components = new ArrayList<>();
defaultView.components.add(new io.hyperfoil.tools.horreum.api.data.ViewComponent("Some column", null, "foo"));

return Collections.singletonList(defaultView);
}

public static String getAccessToken(String userName, String... groups) {
return Jwt.preferredUserName(userName)
.groups(new HashSet<>(Arrays.asList(groups)))
Expand Down Expand Up @@ -616,10 +606,12 @@ protected ArrayNode jsonArray(String... items) {
return array;
}

protected BlockingQueue<Integer> trashRun(int runId, Integer testId) throws InterruptedException {
protected BlockingQueue<Integer> trashRun(int runId, Integer testId, boolean trashed) throws InterruptedException {
BlockingQueue<Integer> trashedQueue = serviceMediator.getEventQueue(AsyncEventChannels.RUN_TRASHED, testId);
jsonRequest().post("/api/run/" + runId + "/trash").then().statusCode(204);
assertEquals(runId, trashedQueue.poll(10, TimeUnit.SECONDS));
jsonRequest().post("/api/run/" + runId + "/trash?isTrashed=" + trashed).then().statusCode(204);
if (trashed) {
assertEquals(runId, trashedQueue.poll(10, TimeUnit.SECONDS));
}
return trashedQueue;
}

Expand Down
Loading
Loading