Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read kafka checked #20

Merged
merged 26 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4047e18
basic parsing of checked data from kafka
lcoram Jun 14, 2024
07944f5
also parse text data
lcoram Jun 14, 2024
1c4e4f5
connect to db and try to write kvdata to flags schema
lcoram Jun 19, 2024
29220cb
code improvements based on clippy
lcoram Jun 19, 2024
012e321
code review and better printouts
lcoram Jun 20, 2024
88f69cd
try to improve code, avoid unsafe unwraps
lcoram Jun 21, 2024
84af1a8
use channel to send between kafka and db write threads
lcoram Jun 25, 2024
1bc4b9a
simplify code, use tosql for db insert
lcoram Jun 25, 2024
16711a8
convert 0 back to none for kvalobsid, use manuel's suggestion to dese…
lcoram Jun 27, 2024
64690a3
if else for checking kafka consumer errors
lcoram Jul 2, 2024
7fc5d8d
Transform level and sensor directly during deserialization
Lun4m Jul 4, 2024
f78bea1
Change wrong datatypes
Lun4m Jul 4, 2024
b308412
Extract parsing logic in separate function
Lun4m Jul 4, 2024
9fca373
Extract read and insert logic in separate function
Lun4m Jul 4, 2024
5fbc671
Fix small inconsistency
Lun4m Jul 4, 2024
b8e7fde
Move package to module
Lun4m Jul 4, 2024
79add79
match consumer.poll()
Lun4m Jul 4, 2024
b14754c
Add explanation comment
Lun4m Jul 4, 2024
5d192dd
Add kafka test template
Lun4m Jul 4, 2024
8281fea
Minor changes
Lun4m Jul 8, 2024
984d9a2
Improve kafka test
Lun4m Jul 8, 2024
20788ac
Don't skip formatting
Lun4m Jul 8, 2024
24b7a7a
Fix argument parsing
Lun4m Jul 8, 2024
1ca0852
timeseries can't be a primary key here!
Lun4m Jul 9, 2024
aa0b558
Change field names and obstime parsing
Lun4m Jul 9, 2024
94fce6a
Add unique constraint and update row on conflict
Lun4m Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
571 changes: 254 additions & 317 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ bytes = "1.5.0"
chrono = { version = "0.4.31", features = ["serde"] }
csv = "1.3.0"
futures = "0.3.28"
kafka = "0.10.0"
postgres-types = { version = "0.2.6", features = ["derive", "with-chrono-0_4"] }
quick-xml = { version = "0.35.0", features = [ "serialize", "overlapped-lists" ] }
rand = "0.8.5"
rand_distr = "0.4.3"
regex = "1.10.2"
Expand Down
5 changes: 1 addition & 4 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,8 @@ async fn latest_handler(

Ok(Json(LatestResp { data }))
}
pub async fn run(connect_string: &str) {
// set up postgres connection pool
let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap();
let pool = bb8::Pool::builder().build(manager).await.unwrap();

pub async fn run(pool: PgConnectionPool) {
// build our application with routes
let app = Router::new()
.route(
Expand Down
9 changes: 8 additions & 1 deletion api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::NoTls;

#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();
Expand All @@ -12,5 +15,9 @@ async fn main() {
connect_string.push_str(&args[4])
}

lard_api::run(&connect_string).await;
// set up postgres connection pool
let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap();
let pool = bb8::Pool::builder().build(manager).await.unwrap();

lard_api::run(pool).await;
}
13 changes: 13 additions & 0 deletions db/flags.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE SCHEMA IF NOT EXISTS flags;

CREATE TABLE IF NOT EXISTS flags.kvdata (
timeseries INT4 PRIMARY KEY REFERENCES public.timeseries,
obstime TIMESTAMPTZ NOT NULL,
original REAL NULL, -- could decide not to store this in the future? (KDVH migration will not contain this)
corrected REAL NULL,
controlinfo TEXT NULL,
useinfo TEXT NULL,
cfailed INT4 NULL
);
CREATE INDEX IF NOT EXISTS kvdata_obstime_index ON flags.kvdata (obstime);
-- the timeseries will get an index since it is the primary key
2 changes: 2 additions & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ bytes.workspace = true
chrono.workspace = true
csv.workspace = true
futures.workspace = true
kafka.workspace = true
quick-xml.workspace = true
regex.workspace = true
serde.workspace = true
thiserror.workspace = true
Expand Down
321 changes: 321 additions & 0 deletions ingestion/src/kvkafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use serde::{Deserialize, Deserializer};
use thiserror::Error;
use tokio::sync::mpsc;

use crate::PgConnectionPool;

#[derive(Error, Debug)]
pub enum Error {
#[error("parsing xml error: {0}")]
IssueParsingXML(String),
#[error("parsing time error: {0}")]
IssueParsingTime(#[from] chrono::ParseError),
#[error("kafka returned an error: {0}")]
Kafka(#[from] kafka::Error),
#[error("postgres returned an error: {0}")]
Database(#[from] tokio_postgres::Error),
#[error(
"no Timeseries ID found for this data - station {}, param {}",
station,
param
)]
TimeseriesMissing { station: i32, param: i32 },
#[error("error while deserializing message: {0}")]
Deserialize(#[from] quick_xml::DeError),
}

#[derive(Debug, Deserialize)]
/// Represents <KvalobsData>...</KvalobsData>
struct KvalobsData {
station: Vec<Stations>,
}
#[derive(Debug, Deserialize)]
/// Represents <station>...</station>
struct Stations {
#[serde(rename = "@val")]
val: i32,
typeid: Vec<Typeid>,
}
#[derive(Debug, Deserialize)]
/// Represents <typeid>...</typeid>
struct Typeid {
#[serde(rename = "@val")]
val: i32,
obstime: Vec<Obstime>,
}
#[derive(Debug, Deserialize)]
/// Represents <obstime>...</obstime>
struct Obstime {
#[serde(rename = "@val")]
val: String, // avoiding parsing time at this point...
tbtime: Vec<Tbtime>,
}
#[derive(Debug, Deserialize)]
/// Represents <tbtime>...</tbtime>
struct Tbtime {
#[serde(rename = "@val")]
_val: String, // avoiding parsing time at this point...
_kvtextdata: Option<Vec<Kvtextdata>>,
sensor: Vec<Sensor>,
}
/// Represents <kvtextdata>...</kvtextdata>
#[derive(Debug, Deserialize)]
struct Kvtextdata {
_paramid: Option<i32>,
_original: Option<String>,
}
#[derive(Debug, Deserialize)]
/// Represents <sensor>...</sensor>
struct Sensor {
#[serde(rename = "@val", deserialize_with = "zero_to_none")]
val: Option<i32>,
level: Vec<Level>,
}
/// Represents <level>...</level>
#[derive(Debug, Deserialize)]
struct Level {
#[serde(rename = "@val", deserialize_with = "zero_to_none")]
val: Option<i32>,
kvdata: Option<Vec<Kvdata>>,
}

// Change the sensor and level back to null if they are 0
// 0 is the default for kvalobs, but through obsinn it's actually just missing
fn zero_to_none<'de, D>(des: D) -> Result<Option<i32>, D::Error>
where
D: Deserializer<'de>,
{
Option::deserialize(des).map(|opt| match opt {
Some("0") | Some("") | None => None,
Some(val) => Some(val.parse::<i32>().unwrap()),
})
}

/// Represents <kvdata>...</kvdata>
#[derive(Debug, Deserialize)]
pub struct Kvdata {
#[serde(rename = "@paramid")]
paramid: i32,
#[serde(default, deserialize_with = "optional")]
original: Option<f32>,
#[serde(default, deserialize_with = "optional")]
corrected: Option<f32>,
#[serde(default, deserialize_with = "optional")]
controlinfo: Option<String>,
#[serde(default, deserialize_with = "optional")]
useinfo: Option<String>,
#[serde(default, deserialize_with = "optional")]
cfailed: Option<i32>,
}

// If the field is either empty or missing it should deserialize to None.
// The latter is ensured by the #[serde(default)] macro,
// while this function takes care of the former case.
fn optional<'de, D, T>(des: D) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: std::str::FromStr,
<T as std::str::FromStr>::Err: std::fmt::Debug,
{
Option::deserialize(des).map(|opt| match opt {
Some("") | None => None,
Some(val) => Some(val.parse::<T>().unwrap()),
})
}

#[derive(Debug, Deserialize)]
struct KvalobsId {
station: i32,
paramid: i32,
typeid: i32,
sensor: Option<i32>,
level: Option<i32>,
}

#[derive(Debug)]
pub struct Msg {
kvid: KvalobsId,
obstime: DateTime<Utc>,
kvdata: Kvdata,
}

pub async fn read_and_insert(pool: PgConnectionPool, group_string: String) {
let (tx, mut rx) = mpsc::channel(10);

tokio::spawn(async move {
read_kafka(group_string, tx).await;
});

let client = pool.get().await.expect("Couldn't connect to database");
while let Some(msg) = rx.recv().await {
if let Err(e) = insert_kvdata(&client, msg).await {
eprintln!("Database insert error: {e}");
}
}
}

pub async fn parse_message(message: &[u8], tx: &mpsc::Sender<Msg>) -> Result<(), Error> {
// do some basic trimming / processing of the raw message
// received from the kafka queue
let xmlmsg = std::str::from_utf8(message)
.map_err(|_| Error::IssueParsingXML("couldn't convert message from utf8".to_string()))?
.trim()
.replace(['\n', '\\'], "");

// do some checking / further processing of message
if !xmlmsg.starts_with("<?xml") {
return Err(Error::IssueParsingXML(
"kv2kvdata must be xml starting with '<?xml'".to_string(),
));
}

let kvalobs_xmlmsg = match xmlmsg.find("?>") {
Some(loc) => &xmlmsg[(loc + 2)..],
None => {
return Err(Error::IssueParsingXML(
"couldn't find end of xml tag '?>'".to_string(),
))
}
};
let item: KvalobsData = quick_xml::de::from_str(kvalobs_xmlmsg)?;

// get the useful stuff out of this struct
for station in item.station {
for typeid in station.typeid {
for obstime in typeid.obstime {
// TODO: should we return on error here
let obs_time =
NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S")?.and_utc();
// TODO: or continue/break?
// let obs_time =
// match NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S") {
// Ok(time) => time.and_utc(),
// Err(e) => {
// eprintln!("{e}");
// break; // continue;
// }
// };
for tbtime in obstime.tbtime {
// NOTE: this is "table time" which can vary from the actual observation time,
// its the first time it entered the db in kvalobs
// currently not using it
// TODO: Do we want to handle text data at all, it doesn't seem to be QCed
// if let Some(textdata) = tbtime.kvtextdata {...}
for sensor in tbtime.sensor {
for level in sensor.level {
if let Some(kvdata) = level.kvdata {
for data in kvdata {
let msg = Msg {
kvid: KvalobsId {
station: station.val,
paramid: data.paramid,
typeid: typeid.val,
sensor: sensor.val,
level: level.val,
},
obstime: obs_time,
kvdata: data,
};
tx.send(msg).await.unwrap();
}
}
}
}
}
}
}
}

Ok(())
}

async fn read_kafka(group_name: String, tx: mpsc::Sender<Msg>) {
// NOTE: reading from the 4 redundant kafka queues, but only reading the checked data (other topics exists)
let mut consumer = Consumer::from_hosts(vec![
"kafka2-a1.met.no:9092".to_owned(),
"kafka2-a2.met.no:9092".to_owned(),
"kafka2-b1.met.no:9092".to_owned(),
"kafka2-b2.met.no:9092".to_owned(),
])
.with_topic_partitions("kvalobs.production.checked".to_owned(), &[0, 1])
.with_fallback_offset(FetchOffset::Earliest)
.with_group(group_name)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()
.expect("failed to create consumer");

// Consume the kafka queue infinitely
loop {
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
match consumer.poll() {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
}
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
}
}
consumer
.commit_consumed()
.expect("could not commit offset in consumer"); // ensure we keep offset
}
Err(e) => {
eprintln!("{}\nRetrying in 5 seconds...", Error::Kafka(e));
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}

pub async fn insert_kvdata(
client: &tokio_postgres::Client,
Msg {
kvid,
obstime,
kvdata,
}: Msg,
) -> Result<(), Error> {
// what timeseries is this?
// NOTE: alternately could use conn.query_one, since we want exactly one response
let tsid: i32 = client
.query(
"SELECT timeseries FROM labels.met
WHERE station_id = $1 \
AND param_id = $2 \
AND type_id = $3 \
AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4)) \
AND (($5::int IS NULL AND sensor IS NULL) OR (sensor = $5))",
&[
&kvid.station,
&kvid.paramid,
&kvid.typeid,
&kvid.level,
&kvid.sensor,
],
)
.await?
.first()
.ok_or(Error::TimeseriesMissing {
station: kvid.station,
param: kvid.paramid,
})?
.get(0);

// write the data into the db
// kvdata derives ToSql therefore options should be nullable
// https://docs.rs/postgres-types/latest/postgres_types/trait.ToSql.html#nullability
client.execute(
"INSERT INTO flags.kvdata (timeseries, obstime, original, corrected, controlinfo, useinfo, cfailed)
VALUES($1, $2, $3, $4, $5, $6, $7)",
&[&tsid, &obstime, &kvdata.original, &kvdata.corrected, &kvdata.controlinfo, &kvdata.useinfo, &kvdata.cfailed],
).await?;

Ok(())
}
Loading
Loading