diff --git a/query-engine/connectors/query-connector/src/query_arguments.rs b/query-engine/connectors/query-connector/src/query_arguments.rs index 2d8fc93452f7..2347e8ae22ad 100644 --- a/query-engine/connectors/query-connector/src/query_arguments.rs +++ b/query-engine/connectors/query-connector/src/query_arguments.rs @@ -71,15 +71,17 @@ impl QueryArguments { /// retrieved by the connector or if it requires the query engine to fetch a raw set /// of records and perform certain operations itself, in-memory. pub fn requires_inmemory_processing(&self) -> bool { - let has_distinct = self.distinct.is_some() + self.contains_unstable_cursor() || self.contains_null_cursor() + } + + pub fn requires_inmemory_distinct(&self) -> bool { + self.distinct.is_some() && !self .model() .dm .schema .connector - .has_capability(ConnectorCapability::Distinct); - - has_distinct || self.contains_unstable_cursor() || self.contains_null_cursor() + .has_capability(ConnectorCapability::Distinct) } /// An unstable cursor is a cursor that is used in conjunction with an unstable (non-unique) combination of orderBys. diff --git a/query-engine/core/src/interpreter/query_interpreters/inmemory_record_processor.rs b/query-engine/core/src/interpreter/query_interpreters/inmemory_record_processor.rs index a324b499d759..6e6074e07abf 100644 --- a/query-engine/core/src/interpreter/query_interpreters/inmemory_record_processor.rs +++ b/query-engine/core/src/interpreter/query_interpreters/inmemory_record_processor.rs @@ -1,6 +1,6 @@ -use connector::QueryArguments; +use connector::{Filter, QueryArguments}; use itertools::Itertools; -use prisma_models::{FieldSelection, ManyRecords, Record, SelectionResult}; +use prisma_models::{FieldSelection, ManyRecords, Model, OrderBy, Record, SelectionResult}; use std::ops::Deref; #[derive(Debug)] @@ -80,7 +80,7 @@ impl InMemoryRecordProcessor { records.records.first().map(|x| x.parent_id.is_some()).unwrap_or(false) } - fn apply_distinct(&self, mut records: ManyRecords) -> ManyRecords { + pub(crate) fn apply_distinct(&self, mut records: ManyRecords) -> ManyRecords { let field_names = &records.field_names; let distinct_selection = if let Some(ref distinct) = self.distinct { @@ -189,3 +189,53 @@ impl InMemoryRecordProcessor { self.take.or(self.skip).is_some() || self.cursor.is_some() } } + +pub struct InMemoryRecordProcessorBuilder { + args: QueryArguments, +} + +impl InMemoryRecordProcessorBuilder { + pub fn new(model: Model, order_by: Vec, ignore_skip: bool, ignore_take: bool) -> Self { + Self { + args: QueryArguments { + model, + cursor: None, + take: None, + skip: None, + filter: None, + order_by, + distinct: None, + ignore_skip, + ignore_take, + }, + } + } + pub fn _cursor(mut self, cursor: SelectionResult) -> Self { + self.args.cursor = Some(cursor); + self + } + + pub fn _take(mut self, take: i64) -> Self { + self.args.take = Some(take); + self + } + + pub fn _skip(mut self, skip: i64) -> Self { + self.args.skip = Some(skip); + self + } + + pub fn _filter(mut self, filter: Filter) -> Self { + self.args.filter = Some(filter); + self + } + + pub fn distinct(mut self, distinct: FieldSelection) -> Self { + self.args.distinct = Some(distinct); + self + } + + pub fn build(self) -> InMemoryRecordProcessor { + InMemoryRecordProcessor { args: self.args } + } +} diff --git a/query-engine/core/src/interpreter/query_interpreters/nested_read.rs b/query-engine/core/src/interpreter/query_interpreters/nested_read.rs index 238dd814f812..e7a4d4ca1a9b 100644 --- a/query-engine/core/src/interpreter/query_interpreters/nested_read.rs +++ b/query-engine/core/src/interpreter/query_interpreters/nested_read.rs @@ -1,4 +1,4 @@ -use super::{inmemory_record_processor::InMemoryRecordProcessor, read}; +use super::{inmemory_record_processor::InMemoryRecordProcessorBuilder, read}; use crate::{interpreter::InterpretationResult, query_ast::*}; use connector::{ self, filter::Filter, ConditionListValue, ConnectionLike, QueryArguments, RelAggregationRow, @@ -13,7 +13,25 @@ pub(crate) async fn m2m( parent_result: Option<&ManyRecords>, trace_id: Option, ) -> InterpretationResult<(ManyRecords, Option>)> { - let processor = InMemoryRecordProcessor::new_from_query_args(&mut query.args); + let inm_builder = InMemoryRecordProcessorBuilder::new( + query.args.model.clone(), + query.args.order_by.clone(), + query.args.ignore_skip, + query.args.ignore_take, + ); + + let processor = match query.args.distinct { + Some(ref fs) => { + if query.args.requires_inmemory_distinct() { + inm_builder.distinct(fs.clone()) + } else { + inm_builder + } + } + None => inm_builder, + } + .build(); + let parent_field = &query.parent_field; let child_link_id = parent_field.related_field().linking_fields(); @@ -138,7 +156,7 @@ pub async fn one2m( parent_field: &RelationFieldRef, parent_selections: Option>, parent_result: Option<&ManyRecords>, - mut query_args: QueryArguments, + query_args: QueryArguments, selected_fields: &FieldSelection, aggr_selections: Vec, trace_id: Option, @@ -192,10 +210,31 @@ pub async fn one2m( // If we're fetching related records from a single parent, then we can apply normal pagination instead of in-memory processing. // However, we can't just apply a LIMIT/OFFSET for multiple parents as we need N related records PER parent. // We could use ROW_NUMBER() but it requires further refactoring so we're still using in-memory processing for now. + + let req_inmem_distinct = query_args.requires_inmemory_distinct(); + let processor = if uniq_selections.len() == 1 && !query_args.requires_inmemory_processing() { None } else { - Some(InMemoryRecordProcessor::new_from_query_args(&mut query_args)) + let inm_builder = InMemoryRecordProcessorBuilder::new( + query_args.model.clone(), + query_args.order_by.clone(), + query_args.ignore_skip, + query_args.ignore_take, + ); + + let inm_builder = match query_args.distinct { + Some(ref fs) => { + if req_inmem_distinct { + inm_builder.distinct(fs.clone()) + } else { + inm_builder + } + } + None => inm_builder, + }; + + Some(inm_builder.build()) }; let mut scalars = { @@ -265,6 +304,12 @@ pub async fn one2m( } let scalars = if let Some(processor) = processor { + let scalars = if req_inmem_distinct { + processor.apply_distinct(scalars) + } else { + scalars + }; + processor.apply(scalars) } else { scalars diff --git a/query-engine/core/src/interpreter/query_interpreters/read.rs b/query-engine/core/src/interpreter/query_interpreters/read.rs index 5c3dd05e387e..3114d7d6648a 100644 --- a/query-engine/core/src/interpreter/query_interpreters/read.rs +++ b/query-engine/core/src/interpreter/query_interpreters/read.rs @@ -1,4 +1,4 @@ -use super::{inmemory_record_processor::InMemoryRecordProcessor, *}; +use super::{inmemory_record_processor::InMemoryRecordProcessorBuilder, *}; use crate::{interpreter::InterpretationResult, query_ast::*, result_ast::*}; use connector::{self, error::ConnectorError, ConnectionLike, RelAggregationRow, RelAggregationSelection}; use futures::future::{BoxFuture, FutureExt}; @@ -88,27 +88,31 @@ fn read_many( query: ManyRecordsQuery, trace_id: Option, ) -> BoxFuture<'_, InterpretationResult> { - // let req_inmem_proc = query.args.requires_inmemory_processing(); - - // let processor = if req_inmem_proc { - // let inm_builder = InMemoryRecordProcessorBuilder::new( - // query.args.model.clone(), - // query.args.order_by.clone(), - // query.args.ignore_skip.clone(), - // query.args.ignore_take.clone(), - // ); - - // let inmemory_record_processor = match query.args.distinct { - // Some(ref fs) => inm_builder.distinct(fs.clone()).build(), - // None => inm_builder.build(), - // }; - - // Some(inmemory_record_processor) - // } else { - // None - // }; + let req_inmem_distinct = query.args.requires_inmemory_distinct(); + + let processor = if query.args.requires_inmemory_processing() { + let inm_builder = InMemoryRecordProcessorBuilder::new( + query.args.model.clone(), + query.args.order_by.clone(), + query.args.ignore_skip, + query.args.ignore_take, + ); + + let inm_builder = match query.args.distinct { + Some(ref fs) => { + if req_inmem_distinct { + inm_builder.distinct(fs.clone()) + } else { + inm_builder + } + } + None => inm_builder, + }; - let processor: Option = None; + Some(inm_builder.build()) + } else { + None + }; let fut = async move { let scalars = tx @@ -122,6 +126,12 @@ fn read_many( .await?; let scalars = if let Some(p) = processor { + let scalars = if req_inmem_distinct { + p.apply_distinct(scalars) + } else { + scalars + }; + p.apply(scalars) } else { scalars