Skip to content

Commit

Permalink
Propogate parent spans for dataloader
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 11, 2024
1 parent 8f2e1f0 commit 2c4b6b6
Showing 1 changed file with 81 additions and 33 deletions.
114 changes: 81 additions & 33 deletions processed_data/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use models::{
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use std::collections::HashMap;
use std::time::Duration;
use tracing::instrument;
use tracing::{instrument, Span};
use url::Url;

use self::entities::AutoProcProgram;
Expand All @@ -31,7 +31,6 @@ pub trait AddDataLoadersExt {
}

impl AddDataLoadersExt for async_graphql::Request {
#[instrument(name = "add_data_loaders", skip(self))]
fn add_data_loaders(self, database: DatabaseConnection) -> Self {
self.data(DataLoader::new(
ProcessedDataLoader::new(database.clone()),
Expand Down Expand Up @@ -86,88 +85,119 @@ pub fn root_schema_builder() -> SchemaBuilder<Query, EmptyMutation, EmptySubscri
#[derive(Debug, Clone, Default)]
pub struct Query;

pub struct ProcessedDataLoader(DatabaseConnection);
pub struct ProcessingJobDataLoader(DatabaseConnection);
pub struct ProcessingJobParameterDataLoader(DatabaseConnection);
pub struct AutoProcIntegrationDataLoader(DatabaseConnection);
pub struct AutoProcProgramDataLoader(DatabaseConnection);
pub struct AutoProcDataLoader(DatabaseConnection);
pub struct AutoProcScalingDataLoader(DatabaseConnection);
pub struct AutoProcScalingOverall(DatabaseConnection);
pub struct AutoProcScalingInnerShell(DatabaseConnection);
pub struct AutoProcScalingOuterShell(DatabaseConnection);
pub struct ProcessedDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct ProcessingJobDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct ProcessingJobParameterDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcIntegrationDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcProgramDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcScalingDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcScalingOverall {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcScalingInnerShell {
database: DatabaseConnection,
parent_span: Span,
}
pub struct AutoProcScalingOuterShell {
database: DatabaseConnection,
parent_span: Span,
}

impl ProcessingJobDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl ProcessedDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl ProcessingJobParameterDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcIntegrationDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcProgramDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcScalingDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcScalingOverall {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcScalingInnerShell {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl AutoProcScalingOuterShell {
fn new(database: DatabaseConnection) -> Self {
Self(database)
Self{database, parent_span: Span::current()}
}
}

impl Loader<u32> for ProcessedDataLoader {
type Value = DataProcessing;
type Error = async_graphql::Error;

#[instrument(name = "load_processed_data", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = data_collection_file_attachment::Entity::find()
.filter(data_collection_file_attachment::Column::DataCollectionId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -187,11 +217,13 @@ impl Loader<u32> for ProcessingJobDataLoader {

#[instrument(name = "load_processing_job", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = processing_job::Entity::find()
.filter(processing_job::Column::DataCollectionId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -213,11 +245,13 @@ impl Loader<u32> for ProcessingJobParameterDataLoader {

#[instrument(name = "load_processing_job_parameter", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = processing_job_parameter::Entity::find()
.filter(processing_job_parameter::Column::ProcessingJobId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -239,11 +273,13 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {

#[instrument(name = "load_auto_proc_integration", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_integration::Entity::find()
.filter(auto_proc_integration::Column::DataCollectionId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -265,11 +301,13 @@ impl Loader<u32> for AutoProcProgramDataLoader {

#[instrument(name = "load_auto_proc_program", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_program::Entity::find()
.filter(auto_proc_program::Column::AutoProcProgramId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -288,11 +326,13 @@ impl Loader<u32> for AutoProcDataLoader {

#[instrument(name = "load_auto_proc", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc::Entity::find()
.filter(auto_proc::Column::AutoProcProgramId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -311,11 +351,13 @@ impl Loader<u32> for AutoProcScalingDataLoader {

#[instrument(name = "load_auto_proc_scaling", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_scaling::Entity::find()
.filter(auto_proc_scaling::Column::AutoProcId.is_in(keys_vec))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -334,12 +376,14 @@ impl Loader<u32> for AutoProcScalingOverall {

#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_scaling_statistics::Entity::find()
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("overall"))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -358,12 +402,14 @@ impl Loader<u32> for AutoProcScalingInnerShell {

#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_scaling_statistics::Entity::find()
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("innerShell"))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand All @@ -382,12 +428,14 @@ impl Loader<u32> for AutoProcScalingOuterShell {

#[instrument(name = "load_auto_proc_scaling_statics", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_scaling_statistics::Entity::find()
.filter(auto_proc_scaling_statistics::Column::AutoProcScalingId.is_in(keys_vec))
.filter(auto_proc_scaling_statistics::Column::ScalingStatisticsType.eq("outerShell"))
.all(&self.0)
.all(&self.database)
.await?;

for record in records {
Expand Down

0 comments on commit 2c4b6b6

Please sign in to comment.