Skip to content

Commit

Permalink
feat(deleteMany): add limit option to deleteMany (#5105)
Browse files Browse the repository at this point in the history
  • Loading branch information
FGoessler authored Jan 8, 2025
1 parent 169dafa commit 66ff515
Show file tree
Hide file tree
Showing 18 changed files with 131 additions and 14 deletions.
24 changes: 24 additions & 0 deletions prisma-fmt/src/get_dmmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5592,6 +5592,18 @@ mod tests {
"isList": false
}
]
},
{
"name": "limit",
"isRequired": false,
"isNullable": false,
"inputTypes": [
{
"type": "Int",
"location": "scalar",
"isList": false
}
]
}
],
"isNullable": false,
Expand Down Expand Up @@ -5954,6 +5966,18 @@ mod tests {
"isList": false
}
]
},
{
"name": "limit",
"isRequired": false,
"isNullable": false,
"inputTypes": [
{
"type": "Int",
"location": "scalar",
"isList": false
}
]
}
],
"isNullable": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,32 @@ mod delete_many {
Ok(())
}

// "The delete many Mutation" should "delete max the number of items specified in the limit"
#[connector_test]
async fn should_delete_max_limit_items(runner: Runner) -> TestResult<()> {
create_row(&runner, r#"{ id: 1, title: "title1" }"#).await?;
create_row(&runner, r#"{ id: 2, title: "title2" }"#).await?;
create_row(&runner, r#"{ id: 3, title: "title3" }"#).await?;
create_row(&runner, r#"{ id: 4, title: "title4" }"#).await?;

assert_todo_count(&runner, 4).await?;

insta::assert_snapshot!(
run_query!(&runner, r#"mutation {
deleteManyTodo(
limit: 3
){
count
}
}"#),
@r###"{"data":{"deleteManyTodo":{"count":3}}}"###
);

assert_todo_count(&runner, 1).await?;

Ok(())
}

fn nested_del_many() -> String {
let schema = indoc! {
r#"model ZChild{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,15 @@ impl WriteOperations for MongoDbConnection {
&mut self,
model: &Model,
record_filter: connector_interface::RecordFilter,
limit: Option<i64>,
_traceparent: Option<TraceParent>,
) -> connector_interface::Result<usize> {
catch(write::delete_records(
&self.database,
&mut self.session,
model,
record_filter,
limit,
))
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,15 @@ impl WriteOperations for MongoDbTransaction<'_> {
&mut self,
model: &Model,
record_filter: connector_interface::RecordFilter,
limit: Option<i64>,
_traceparent: Option<TraceParent>,
) -> connector_interface::Result<usize> {
catch(write::delete_records(
&self.connection.database,
&mut self.connection.session,
model,
record_filter,
limit,
))
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub async fn update_records<'conn>(
.collect::<crate::Result<Vec<_>>>()?
} else {
let filter = MongoFilterVisitor::new(FilterPrefix::default(), false).visit(record_filter.filter)?;
find_ids(coll.clone(), session, model, filter).await?
find_ids(coll.clone(), session, model, filter, None).await?
};

if ids.is_empty() {
Expand Down Expand Up @@ -228,13 +228,15 @@ pub async fn delete_records<'conn>(
session: &mut ClientSession,
model: &Model,
record_filter: RecordFilter,
limit: Option<i64>,
) -> crate::Result<usize> {
let coll = database.collection::<Document>(model.db_name());
let id_field = pick_singular_id(model);

let ids = if let Some(selectors) = record_filter.selectors {
selectors
.into_iter()
.take(limit.unwrap_or(i64::MAX) as usize)
.map(|p| {
(&id_field, p.values().next().unwrap())
.into_bson()
Expand All @@ -243,7 +245,7 @@ pub async fn delete_records<'conn>(
.collect::<crate::Result<Vec<_>>>()?
} else {
let filter = MongoFilterVisitor::new(FilterPrefix::default(), false).visit(record_filter.filter)?;
find_ids(coll.clone(), session, model, filter).await?
find_ids(coll.clone(), session, model, filter, limit).await?
};

if ids.is_empty() {
Expand Down Expand Up @@ -303,6 +305,7 @@ async fn find_ids(
session: &mut ClientSession,
model: &Model,
filter: MongoFilter,
limit: Option<i64>,
) -> crate::Result<Vec<Bson>> {
let id_field = model.primary_identifier();
let mut builder = MongoReadQueryBuilder::new(model.clone());
Expand All @@ -316,7 +319,10 @@ async fn find_ids(
builder.query = Some(filter);
};

let builder = builder.with_model_projection(id_field)?;
let mut builder = builder.with_model_projection(id_field)?;

builder.limit = limit;

let query = builder.build()?;
let docs = query.execute(collection, session).await?;
let ids = docs.into_iter().map(|mut doc| doc.remove("_id").unwrap()).collect();
Expand Down
1 change: 1 addition & 0 deletions query-engine/connectors/query-connector/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ pub trait WriteOperations {
&mut self,
model: &Model,
record_filter: RecordFilter,
limit: Option<i64>,
traceparent: Option<TraceParent>,
) -> crate::Result<usize>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,13 @@ where
&mut self,
model: &Model,
record_filter: RecordFilter,
limit: Option<i64>,
traceparent: Option<TraceParent>,
) -> connector::Result<usize> {
let ctx = Context::new(&self.connection_info, traceparent);
catch(
&self.connection_info,
write::delete_records(&self.inner, model, record_filter, &ctx),
write::delete_records(&self.inner, model, record_filter, limit, &ctx),
)
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ pub(crate) async fn delete_records(
conn: &dyn Queryable,
model: &Model,
record_filter: RecordFilter,
limit: Option<i64>,
ctx: &Context<'_>,
) -> crate::Result<usize> {
let filter_condition = FilterBuilder::without_top_level_joins().visit_filter(record_filter.clone().filter, ctx);
Expand All @@ -453,14 +454,23 @@ pub(crate) async fn delete_records(
let row_count = if record_filter.has_selectors() {
let ids: Vec<_> = record_filter.selectors.as_ref().unwrap().iter().collect();
let mut row_count = 0;
let mut remaining_limit = limit;

for delete in write::delete_many_from_ids_and_filter(model, ids.as_slice(), filter_condition, ctx) {
for delete in
write::delete_many_from_ids_and_filter(model, ids.as_slice(), filter_condition, remaining_limit, ctx)
{
row_count += conn.execute(delete).await?;
if let Some(old_remaining_limit) = remaining_limit {
let new_remaining_limit = old_remaining_limit - row_count as i64;
if new_remaining_limit <= 0 {
break;
}
remaining_limit = Some(new_remaining_limit);
}
}

row_count
} else {
conn.execute(write::delete_many_from_filter(model, filter_condition, ctx))
conn.execute(write::delete_many_from_filter(model, filter_condition, limit, ctx))
.await?
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,12 @@ impl WriteOperations for SqlConnectorTransaction<'_> {
&mut self,
model: &Model,
record_filter: RecordFilter,
limit: Option<i64>,
traceparent: Option<TraceParent>,
) -> connector::Result<usize> {
catch(&self.connection_info, async {
let ctx = Context::new(&self.connection_info, traceparent);
write::delete_records(self.inner.as_queryable(), model, record_filter, &ctx).await
write::delete_records(self.inner.as_queryable(), model, record_filter, limit, &ctx).await
})
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,32 @@ pub(crate) fn delete_returning(
pub(crate) fn delete_many_from_filter(
model: &Model,
filter_condition: ConditionTree<'static>,
limit: Option<i64>,
ctx: &Context<'_>,
) -> Query<'static> {
let condition = if let Some(limit) = limit {
let columns = model
.primary_identifier()
.as_scalar_fields()
.expect("primary identifier must contain scalar fields")
.into_iter()
.map(|f| f.as_column(ctx))
.collect::<Vec<_>>();

ConditionTree::from(
Row::from(columns.clone()).in_selection(
Select::from_table(model.as_table(ctx))
.columns(columns)
.so_that(filter_condition)
.limit(limit as usize),
),
)
} else {
filter_condition
};

Delete::from_table(model.as_table(ctx))
.so_that(filter_condition)
.so_that(condition)
.add_traceparent(ctx.traceparent)
.into()
}
Expand All @@ -238,14 +260,15 @@ pub(crate) fn delete_many_from_ids_and_filter(
model: &Model,
ids: &[&SelectionResult],
filter_condition: ConditionTree<'static>,
limit: Option<i64>,
ctx: &Context<'_>,
) -> Vec<Query<'static>> {
let columns: Vec<_> = ModelProjection::from(model.primary_identifier())
.as_columns(ctx)
.collect();

super::chunked_conditions(&columns, ids, ctx, |conditions| {
delete_many_from_filter(model, conditions.and(filter_condition.clone()), ctx)
delete_many_from_filter(model, conditions.and(filter_condition.clone()), limit, ctx)
})
}

Expand Down
6 changes: 4 additions & 2 deletions query-engine/core/src/interpreter/query_interpreters/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async fn delete_one(

Ok(QueryResult::RecordSelection(Some(Box::new(selection))))
} else {
let result = tx.delete_records(&q.model, filter, traceparent).await?;
let result = tx.delete_records(&q.model, filter, None, traceparent).await?;
Ok(QueryResult::Count(result))
}
}
Expand Down Expand Up @@ -337,7 +337,9 @@ async fn delete_many(
q: DeleteManyRecords,
traceparent: Option<TraceParent>,
) -> InterpretationResult<QueryResult> {
let res = tx.delete_records(&q.model, q.record_filter, traceparent).await?;
let res = tx
.delete_records(&q.model, q.record_filter, q.limit, traceparent)
.await?;

Ok(QueryResult::Count(res))
}
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/src/query_ast/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ pub struct DeleteRecordFields {
pub struct DeleteManyRecords {
pub model: Model,
pub record_filter: RecordFilter,
pub limit: Option<i64>,
}

#[derive(Debug, Clone)]
Expand Down
11 changes: 10 additions & 1 deletion query-engine/core/src/query_graph_builder/write/delete.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::*;
use crate::query_document::ParsedInputValue;
use crate::{
query_ast::*,
query_graph::{Node, QueryGraph, QueryGraphDependency},
ArgumentListLookup, FilteredQuery, ParsedField,
};
use psl::datamodel_connector::ConnectorCapability;
use query_structure::{Filter, Model};
use query_structure::{Filter, Model, PrismaValue};
use schema::{constants::args, QuerySchema};
use std::convert::TryInto;

Expand Down Expand Up @@ -110,12 +111,20 @@ pub fn delete_many_records(
Some(where_arg) => extract_filter(where_arg.value.try_into()?, &model)?,
None => Filter::empty(),
};
let limit = field
.arguments
.lookup(args::LIMIT)
.and_then(|limit_arg| match limit_arg.value {
ParsedInputValue::Single(PrismaValue::Int(i)) => Some(i),
_ => None,
});

let model_id = model.primary_identifier();
let record_filter = filter.clone().into();
let delete_many = WriteQuery::DeleteManyRecords(DeleteManyRecords {
model: model.clone(),
record_filter,
limit,
});

let delete_many_node = graph.create_node(Query::Write(delete_many));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn nested_delete(
let delete_many = WriteQuery::DeleteManyRecords(DeleteManyRecords {
model: child_model.clone(),
record_filter: or_filter.clone().into(),
limit: None,
});

let delete_many_node = graph.create_node(Query::Write(delete_many));
Expand Down Expand Up @@ -157,6 +158,7 @@ pub fn nested_delete_many(
let delete_many = WriteQuery::DeleteManyRecords(DeleteManyRecords {
model: child_model.clone(),
record_filter: RecordFilter::empty(),
limit: None,
});

let delete_many_node = graph.create_node(Query::Write(delete_many));
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/src/query_graph_builder/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ pub fn emulate_on_delete_cascade(
let delete_query = WriteQuery::DeleteManyRecords(DeleteManyRecords {
model: dependent_model.clone(),
record_filter: RecordFilter::empty(),
limit: None,
});

let delete_dependents_node = graph.create_node(Query::Write(delete_query));
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ pub(crate) fn update_many_arguments(ctx: &QuerySchema, model: Model) -> Vec<Inpu
pub(crate) fn delete_many_arguments(ctx: &QuerySchema, model: Model) -> Vec<InputField<'_>> {
let where_arg = where_argument(ctx, &model);

vec![where_arg]
vec![
where_arg,
input_field(args::LIMIT, vec![InputType::int()], None).optional(),
]
}

/// Builds "many records where" arguments based on the given model and field.
Expand Down
3 changes: 3 additions & 0 deletions query-engine/schema/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod args {

// createMany-specific args
pub const SKIP_DUPLICATES: &str = "skipDuplicates";

// deleteMany-specific args
pub const LIMIT: &str = "limit";
}

pub mod operations {
Expand Down

0 comments on commit 66ff515

Please sign in to comment.