diff --git a/db/flags.sql b/db/flags.sql index 3fbb369a..bdb8f50f 100644 --- a/db/flags.sql +++ b/db/flags.sql @@ -7,9 +7,9 @@ CREATE TABLE IF NOT EXISTS flags.kvdata ( corrected REAL NULL, controlinfo TEXT NULL, useinfo TEXT NULL, - cfailed INT4 NULL + cfailed INT4 NULL, + CONSTRAINT unique_kvdata_timeseries_obstime UNIQUE (timeseries, obstime) ); --- TODO: Probably should define unique constraint on (timeseries, obstime) as we have in public.data? --- Can kvkafka resend data with same (timeseries, obstime)? -CREATE INDEX IF NOT EXISTS kvdata_obtime_index ON flags.kvdata (obstime); + +CREATE INDEX IF NOT EXISTS kvdata_obtime_index ON flags.kvdata (obstime); CREATE INDEX IF NOT EXISTS kvdata_timeseries_index ON flags.kvdata USING HASH (timeseries); diff --git a/ingestion/src/kvkafka.rs b/ingestion/src/kvkafka.rs index 0087163e..0a99eebf 100644 --- a/ingestion/src/kvkafka.rs +++ b/ingestion/src/kvkafka.rs @@ -283,11 +283,11 @@ pub async fn insert_kvdata( // NOTE: alternately could use conn.query_one, since we want exactly one response let tsid: i32 = client .query( - "SELECT timeseries FROM labels.met - WHERE station_id = $1 \ - AND param_id = $2 \ - AND type_id = $3 \ - AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4)) \ + "SELECT timeseries FROM labels.met + WHERE station_id = $1 + AND param_id = $2 + AND type_id = $3 + AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4)) AND (($5::int IS NULL AND sensor IS NULL) OR (sensor = $5))", &[ &kvid.station, @@ -305,7 +305,14 @@ pub async fn insert_kvdata( // write the data into the db client.execute( "INSERT INTO flags.kvdata (timeseries, obstime, original, corrected, controlinfo, useinfo, cfailed) - VALUES($1, $2, $3, $4, $5, $6, $7)", + VALUES($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT ON CONSTRAINT unique_kvdata_timeseries_obstime + DO UPDATE SET + original = EXCLUDED.original, + corrected = EXCLUDED.corrected, + controlinfo = EXCLUDED.controlinfo, + useinfo = EXCLUDED.useinfo, + cfailed = EXCLUDED.cfailed", &[&tsid, &obstime, &kvdata.original, &kvdata.corrected, &kvdata.controlinfo, &kvdata.useinfo, &kvdata.cfailed], ).await?;