Skip to content

Commit

Permalink
Fix storing of deltas by field
Browse files Browse the repository at this point in the history
  • Loading branch information
aleics committed Jan 18, 2025
1 parent 5aa4424 commit fe3ba2d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
4 changes: 2 additions & 2 deletions benches/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ fn bench_apply_deltas(b: &mut Bencher) {
ENGINE.store_deltas(&NAME, &scope, &deltas).await.unwrap();
});

let scope = DeltaScope::date(DATE.next_day().unwrap());

b.iter(move || {
tokio_test::block_on(async {
let scope = DeltaScope::date(DATE.next_day().unwrap());

let query = QueryExecution::new()
.with_scope(scope)
.with_pagination(*PAGINATION);
Expand Down
35 changes: 20 additions & 15 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub struct EntityStorage {
data: Database<OwnedType<DataItemId>, SerdeBincode<DataItem>>,
indices: Database<Str, SerdeBincode<Index>>,
documents: Database<Str, SerdeBincode<RoaringBitmap>>,
deltas: Database<OwnedType<u64>, SerdeBincode<BTreeMap<i64, StoredDelta>>>,
deltas: Database<OwnedType<u64>, SerdeBincode<BTreeMap<i64, HashMap<String, StoredDelta>>>>,
index_descriptors: HashMap<String, TypeDescriptor>,
}

Expand Down Expand Up @@ -441,14 +441,16 @@ impl EntityStorage {
.range((Bound::Unbounded, Bound::Included(scope.timestamp)))
.fold(
HashMap::<String, StoredDelta>::new(),
|mut acc, (_, stored_delta)| {
if let Some(aggregated_delta) = acc.get_mut(&stored_delta.field_name) {
aggregated_delta.before.plus(&stored_delta.before);
aggregated_delta.after.plus(&stored_delta.after);
aggregated_delta.affected |= &stored_delta.affected;
} else {
acc.insert(stored_delta.field_name.clone(), stored_delta.clone());
};
|mut acc, (_, stored_deltas)| {
for (field_name, stored_delta) in stored_deltas {
if let Some(aggregated_delta) = acc.get_mut(field_name) {
aggregated_delta.before.plus(&stored_delta.before);
aggregated_delta.after.plus(&stored_delta.after);
aggregated_delta.affected |= &stored_delta.affected;
} else {
acc.insert(stored_delta.field_name.clone(), stored_delta.clone());
};
}

acc
},
Expand Down Expand Up @@ -539,6 +541,8 @@ impl EntityStorage {
let scope_id = scope.get_id();
let mut current = self.deltas.get(&txn, &scope_id)?.unwrap_or_default();

let stored_deltas = current.entry(scope.timestamp).or_default();

// Iterate over the deltas to create for each field name the before and after index
for delta in deltas {
let type_descriptor = self
Expand All @@ -551,12 +555,13 @@ impl EntityStorage {
)
});

let stored_delta = current
.entry(scope.timestamp)
.or_insert(StoredDelta::from_type(
delta.field_name.clone(),
type_descriptor,
));
let stored_delta =
stored_deltas
.entry(delta.field_name.clone())
.or_insert(StoredDelta::from_type(
delta.field_name.clone(),
type_descriptor,
));

let position = id_to_position(delta.id);

Expand Down

0 comments on commit fe3ba2d

Please sign in to comment.