Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conformance-profiles: Database table and worker to process data #78

Merged
merged 1 commit into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/stage/profile-normalised-data.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# Profile Normalised Data

Not Written Yet
This will profile all normalised data against the data profiles and store the results in the database.

To run this:

`$ node ./src/bin/profile-normalised-data.js`

It can be stopped at any time and it will not leave the database in a bad state or lose to much work.

When restarted it will pick up where it left off.

## Database Storage & Errors

It will store the results of this in the `normalised_data_profile_results` table.

Rows are stored per item of normalised data and per profile,
so you should expect this table to have 3 times as many rows as `normalised_data` (if there are 3 profiles).

For any data profile and normalised data item, there are 4 states:

* no row - we haven't tried to run the check yet
* a row with `checked=FALSE` - we tried to run the check but it went wrong. See `error_checking_message`.
* a row with `checked=TRUE` and nothing in `results` - we checked it and the data passed the check!
* a row with `checked=TRUE` and things in `results` - we checked it and the data failed the check. See `results`.
4 changes: 2 additions & 2 deletions src/bin/heroku.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import spider from '../lib/spider.js';
import download_raw_all_publisher_feeds from '../lib/download-raw.js';
import normalise_data_all_publisher_feeds from '../lib/normalise-data.js';
import validate_raw_data_all from '../lib/validate-raw-data.js';
import profile_normalised_data_all from '../lib/profile-normalised-data.js';
import Settings from '../lib/settings.js';
import tls from 'tls';
import Utils from '../lib/utils.js';
Expand All @@ -19,8 +20,7 @@ spider(Settings.spiderDataCatalogStartURL);
download_raw_all_publisher_feeds();
validate_raw_data_all();
normalise_data_all_publisher_feeds();
// TODO need to add data profile work here, when it exists

profile_normalised_data_all();

// When a Heroku worker ends, Heroku starts a new one. https://devcenter.heroku.com/articles/dynos#restarting
// When there is no work to be done, we don't want the worker to be constantly checking as the worker starts, ends, starts, ends, etc in a loop
Expand Down
5 changes: 5 additions & 0 deletions src/bin/profile-normalised-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env node
import profile_normalised_data_all from '../lib/profile-normalised-data.js';


profile_normalised_data_all();
9 changes: 9 additions & 0 deletions src/lib/database-migrations/011-data-profiles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE normalised_data_profile_results (
normalised_data_id BIGINT NOT NULL,
profile_name TEXT NOT NULL,
checked BOOLEAN NOT NULL,
error_checking_message TEXT NULL,
results JSONB NULL,
PRIMARY KEY(normalised_data_id, profile_name),
CONSTRAINT normalised_data_profile_results_normalised_data_id FOREIGN KEY (normalised_data_id) REFERENCES normalised_data(id)
);
1 change: 1 addition & 0 deletions src/lib/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async function delete_database() {
client = await database_pool.connect();
await client.query('DROP TABLE IF EXISTS download_raw_errors CASCADE');
await client.query('DROP TABLE IF EXISTS spider_data_catalog_error CASCADE');
await client.query('DROP TABLE IF EXISTS normalised_data_profile_results CASCADE');
await client.query('DROP TABLE IF EXISTS normalised_data CASCADE');
await client.query('DROP TABLE IF EXISTS raw_data CASCADE');
await client.query('DROP TABLE IF EXISTS publisher_feed CASCADE');
Expand Down
9 changes: 7 additions & 2 deletions src/lib/normalise-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ async function store_normalised_callback(raw_data_id, normalised_events) {
normalised_event.parentId
];

await client.query(
const res = await client.query(
'INSERT INTO normalised_data (raw_data_id, data_id, data_deleted, data, data_kind, raw_data_parent_id) ' +
'VALUES ($1, $2, \'f\', $3, $4, $5) ' +
'ON CONFLICT (data_id) DO UPDATE SET ' +
'raw_data_id=$1, data_id=$2, data=$3, data_kind=$4, raw_data_parent_id=$5, updated_at=(now() at time zone \'utc\'), data_deleted=\'f\'' ,
'raw_data_id=$1, data_id=$2, data=$3, data_kind=$4, raw_data_parent_id=$5, updated_at=(now() at time zone \'utc\'), data_deleted=\'f\'' +
'RETURNING id',
query_data
);

// Because we have updated the data, the results of the profile checks are now stale. Delete them so we recalculate.
// (We only need to do this on UPDATE not INSERT but we can't tell the difference).
await client.query('DELETE FROM normalised_data_profile_results WHERE normalised_data_id=$1', [res.rows[0].id])

}

await client.query(
Expand Down
87 changes: 87 additions & 0 deletions src/lib/profile-normalised-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { database_pool } from './database.js';
import Settings from './settings.js';
import apply_data_profile from './util-data-profile.js';



async function profile_normalised_data_all() {
for(var profile_name of Settings.dataProfiles) {
// not await - run each profile at once
profile_normalised_data_all_for_profile(profile_name);
};
}

async function profile_normalised_data_all_for_profile(profile_name) {

const select_sql = 'SELECT normalised_data.* FROM normalised_data '+
'LEFT JOIN normalised_data_profile_results '+
'ON normalised_data_profile_results.normalised_data_id = normalised_data.id AND normalised_data_profile_results.profile_name=$1 '+
'WHERE normalised_data_profile_results.normalised_data_id IS NULL AND normalised_data.data_deleted=FALSE '+
'ORDER BY normalised_data.updated_at ASC LIMIT 10';

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use back ticks ` for multi line strings rather than concatenation at run time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ta

while(true) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually try to make sure there is some kind of safety catch in a while true, might be worth adding if there is way to do that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point - The catch() block should have break in it - that will apply in several places tho so I'll do that in another P.R.

let rows = []

// Step 1 - load data to process
// we open and make sure we CLOSE the database connection after this, so the DB connection is not held open when processing in an unneeded manner
const client = await database_pool.connect();
try {
const res_find_raw_data = await client.query(select_sql, [profile_name]);
if (res_find_raw_data.rows.length == 0) {
break;
}
// Make sure we just store raw data and no database cursors, etc
for (var raw_data of res_find_raw_data.rows) {
rows.push(raw_data)
}
} catch(error) {
console.error("ERROR validate_raw_data_all");
console.error(error);
} finally {
client.release()
}

// Step 2 - process each item of data we got
for (var raw_data of rows) {
await profile_normalised_data_for_item_for_profile(raw_data, profile_name);
}

}

}

async function profile_normalised_data_for_item_for_profile(normalised_data, profile_name) {

//console.log("Profiling Normalised Data id "+ normalised_data.id + " for Profile " + profile_name);

const results = await apply_data_profile(normalised_data.data, profile_name);

const client = await database_pool.connect();
try {
if (results.done) {
await client.query(
"INSERT INTO normalised_data_profile_results (normalised_data_id, profile_name, checked, results) VALUES ($1, $2, TRUE, $3)",
[normalised_data.id, profile_name, JSON.stringify(results.results)]
);
} else {
await client.query(
"INSERT INTO normalised_data_profile_results (normalised_data_id, profile_name, checked, error_checking_message) VALUES ($1, $2, FALSE, $3)",
[normalised_data.id, profile_name, results.error]
);
}
} catch(error) {
console.error("ERROR profile_normalised_data_for_item_for_profile");
console.error(error);
} finally {
client.release()
}

}


export {
profile_normalised_data_all,
};

export default profile_normalised_data_all;
4 changes: 3 additions & 1 deletion src/lib/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const Settings = {
// Some publishers may only support older versions
"tlsDefaultMinimumVersion": "TLSv1",

"herokuWorkerMinimumCycleHours": 6
"herokuWorkerMinimumCycleHours": 6,

"dataProfiles": ["core", "accessibility", "socialrx"]

}

Expand Down