4
4
mod migrations;
5
5
mod prop_val_sub_index;
6
6
mod query_index;
7
- mod reference_index;
8
7
#[ cfg( test) ]
9
8
pub mod test;
9
+ mod val_prop_sub_index;
10
10
11
11
use std:: {
12
12
collections:: { HashMap , HashSet } ,
13
13
sync:: { Arc , Mutex } ,
14
14
} ;
15
15
16
- use tracing:: { info, instrument, trace } ;
16
+ use tracing:: { info, instrument} ;
17
17
18
18
use crate :: {
19
19
atoms:: IndexAtom ,
20
20
commit:: CommitResponse ,
21
- db:: reference_index :: key_to_atom ,
21
+ db:: val_prop_sub_index :: find_in_val_prop_sub_index ,
22
22
endpoints:: { default_endpoints, Endpoint } ,
23
23
errors:: { AtomicError , AtomicResult } ,
24
24
resources:: PropVals ,
25
25
storelike:: { Query , QueryResult , Storelike } ,
26
- Atom , Resource , Value ,
26
+ values:: SortableValue ,
27
+ Atom , Resource ,
27
28
} ;
28
29
29
30
use self :: {
@@ -34,9 +35,9 @@ use self::{
34
35
} ,
35
36
query_index:: {
36
37
check_if_atom_matches_watched_query_filters, query_indexed, update_indexed_member,
37
- watch_collection , IndexIterator , QueryFilter ,
38
+ IndexIterator , QueryFilter ,
38
39
} ,
39
- reference_index :: { add_atom_to_reference_index, remove_atom_from_reference_index} ,
40
+ val_prop_sub_index :: { add_atom_to_reference_index, remove_atom_from_reference_index} ,
40
41
} ;
41
42
42
43
// A function called by the Store when a Commit is accepted
@@ -474,29 +475,45 @@ impl Storelike for Db {
474
475
#[ instrument( skip( self ) ) ]
475
476
fn query ( & self , q : & Query ) -> AtomicResult < QueryResult > {
476
477
if let Ok ( res) = query_indexed ( self , q) {
478
+ // TODO: Maybe this is not the best check.
479
+ // If nothing is found, this may indicate both that the query is not indexed,
480
+ // or that there are simply no results.
481
+ // Probably should use `q_filter.is_watched` to check if the query is indexed.
477
482
if res. count > 0 {
478
483
// Yay, we have a cache hit!
479
- // We don't have to perform a (more expansive) TPF query + sorting
484
+ // We don't have to create the indexes, so we can return early.
480
485
return Ok ( res) ;
481
486
}
482
487
}
483
488
484
489
let q_filter: QueryFilter = q. into ( ) ;
485
490
486
491
// Maybe make this optional?
487
- watch_collection ( self , & q_filter ) ?;
492
+ q_filter . watch ( self ) ?;
488
493
489
494
info ! ( filter = ?q_filter, "Building query index" ) ;
490
495
491
496
let atoms: IndexIterator = match ( & q. property , q. value . as_ref ( ) ) {
492
497
( Some ( prop) , val) => find_in_prop_val_sub_index ( self , prop, val) ,
493
498
( None , None ) => self . all_index_atoms ( q. include_external ) ,
494
- ( None , Some ( _ ) ) => todo ! ( ) ,
499
+ ( None , Some ( val ) ) => find_in_val_prop_sub_index ( self , val , None ) ,
495
500
} ;
496
501
497
502
for a in atoms {
498
503
let atom = a?;
499
- update_indexed_member ( self , & q_filter, & atom. subject , & atom. value , false ) ?;
504
+ let sort_val: SortableValue = if let Some ( sort) = & q_filter. sort_by {
505
+ if & atom. property == sort {
506
+ atom. sort_value
507
+ } else {
508
+ // Find the sort value in the store
509
+ let sort_atom = self . get_value ( & atom. subject , sort) ?;
510
+ sort_atom. to_sortable_string ( )
511
+ }
512
+ } else {
513
+ atom. sort_value
514
+ } ;
515
+
516
+ update_indexed_member ( self , & q_filter, & atom. subject , & sort_val, false ) ?;
500
517
}
501
518
502
519
// Retry the same query!
@@ -562,111 +579,6 @@ impl Storelike for Db {
562
579
fn set_default_agent ( & self , agent : crate :: agents:: Agent ) {
563
580
self . default_agent . lock ( ) . unwrap ( ) . replace ( agent) ;
564
581
}
565
-
566
- // TPF implementation that used the index_value cache, far more performant than the StoreLike implementation
567
- #[ instrument( skip( self ) ) ]
568
- fn tpf (
569
- & self ,
570
- q_subject : Option < & str > ,
571
- q_property : Option < & str > ,
572
- q_value : Option < & Value > ,
573
- // Whether resources from outside the store should be searched through
574
- include_external : bool ,
575
- ) -> AtomicResult < Vec < Atom > > {
576
- trace ! ( "tpf" ) ;
577
- let mut vec: Vec < Atom > = Vec :: new ( ) ;
578
-
579
- let hassub = q_subject. is_some ( ) ;
580
- let hasprop = q_property. is_some ( ) ;
581
- let hasval = q_value. is_some ( ) ;
582
-
583
- // Simply return all the atoms
584
- if !hassub && !hasprop && !hasval {
585
- for resource in self . all_resources ( include_external) {
586
- for ( property, value) in resource. get_propvals ( ) {
587
- vec. push ( Atom :: new (
588
- resource. get_subject ( ) . clone ( ) ,
589
- property. clone ( ) ,
590
- value. clone ( ) ,
591
- ) )
592
- }
593
- }
594
- return Ok ( vec) ;
595
- }
596
-
597
- // If the value is a resourcearray, check if it is inside
598
- let val_equals = |val : & str | {
599
- let q = q_value. unwrap ( ) . to_sortable_string ( ) ;
600
- val == q || {
601
- if val. starts_with ( '[' ) {
602
- match crate :: parse:: parse_json_array ( val) {
603
- Ok ( vec) => return vec. contains ( & q) ,
604
- Err ( _) => return val == q,
605
- }
606
- }
607
- false
608
- }
609
- } ;
610
-
611
- // Find atoms matching the TPF query in a single resource
612
- let mut find_in_resource = |resource : & Resource | {
613
- let subj = resource. get_subject ( ) ;
614
- for ( prop, val) in resource. get_propvals ( ) . iter ( ) {
615
- if hasprop && q_property. as_ref ( ) . unwrap ( ) == prop {
616
- if hasval {
617
- if val_equals ( & val. to_string ( ) ) {
618
- vec. push ( Atom :: new ( subj. into ( ) , prop. into ( ) , val. clone ( ) ) )
619
- }
620
- break ;
621
- } else {
622
- vec. push ( Atom :: new ( subj. into ( ) , prop. into ( ) , val. clone ( ) ) )
623
- }
624
- break ;
625
- } else if hasval && !hasprop && val_equals ( & val. to_string ( ) ) {
626
- vec. push ( Atom :: new ( subj. into ( ) , prop. into ( ) , val. clone ( ) ) )
627
- }
628
- }
629
- } ;
630
-
631
- match q_subject {
632
- Some ( sub) => match self . get_resource ( sub) {
633
- Ok ( resource) => {
634
- if hasprop | hasval {
635
- find_in_resource ( & resource) ;
636
- Ok ( vec)
637
- } else {
638
- Ok ( resource. to_atoms ( ) )
639
- }
640
- }
641
- Err ( _) => Ok ( vec) ,
642
- } ,
643
- None => {
644
- if hasval {
645
- let key_prefix = if hasprop {
646
- format ! ( "{}\n {}\n " , q_value. unwrap( ) , q_property. unwrap( ) )
647
- } else {
648
- format ! ( "{}\n " , q_value. unwrap( ) )
649
- } ;
650
- for item in self . reference_index . scan_prefix ( key_prefix) {
651
- let ( k, _v) = item?;
652
- let key_string = String :: from_utf8 ( k. to_vec ( ) ) ?;
653
- // WARNING: Converts all Atoms to Strings, the datatype is lost here
654
- let atom = key_to_atom ( & key_string) ?;
655
- // NOTE: This means we'll include random values that start with the current server URL, including paragraphs for example.
656
- if include_external || atom. subject . starts_with ( self . get_server_url ( ) ) {
657
- vec. push ( atom)
658
- }
659
- }
660
- return Ok ( vec) ;
661
- }
662
- // TODO: Add an index for searching only by property
663
- for resource in self . all_resources ( include_external) {
664
- find_in_resource ( & resource) ;
665
- }
666
- Ok ( vec)
667
- }
668
- }
669
- }
670
582
}
671
583
672
584
fn corrupt_db_message ( subject : & str ) -> String {
0 commit comments