From dcbdab80f26a53dc5004e2c87499d3f9dc4fe70e Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 25 Jul 2023 13:03:51 +0800 Subject: [PATCH] alternative mixed field aggregation collection instead of having multiple accessor in one AggregationWithAccessor split it into multiple independent AggregationWithAccessor --- src/aggregation/agg_req_with_accessor.rs | 88 ++++++++++--------- src/aggregation/bucket/range.rs | 2 +- src/aggregation/bucket/term_agg.rs | 104 ----------------------- src/aggregation/segment_agg_result.rs | 27 ++---- 4 files changed, 55 insertions(+), 166 deletions(-) diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 5ba2ce12cd..b9750299e2 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -37,9 +37,6 @@ pub struct AggregationWithAccessor { pub(crate) accessor: Column, pub(crate) str_dict_column: Option, pub(crate) field_type: ColumnType, - /// In case there are multiple types of fast fields, e.g. string and numeric. - /// Only used for term aggregations currently. - pub(crate) accessor2: Option<(Column, ColumnType)>, pub(crate) sub_aggregation: AggregationsWithAccessor, pub(crate) limits: ResourceLimitGuard, pub(crate) column_block_accessor: ColumnBlockAccessor, @@ -52,20 +49,31 @@ impl AggregationWithAccessor { sub_aggregation: &Aggregations, reader: &SegmentReader, limits: AggregationLimits, - ) -> crate::Result { + ) -> crate::Result> { let mut str_dict_column = None; - let mut accessor2 = None; use AggregationVariants::*; - let (accessor, field_type) = match &agg.agg { + let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg { Range(RangeAggregation { field: field_name, .. - }) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?, + }) => vec![get_ff_reader( + reader, + field_name, + Some(get_numeric_or_date_column_types()), + )?], Histogram(HistogramAggregation { field: field_name, .. - }) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?, + }) => vec![get_ff_reader( + reader, + field_name, + Some(get_numeric_or_date_column_types()), + )?], DateHistogram(DateHistogramAggregationReq { field: field_name, .. - }) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?, + }) => vec![get_ff_reader( + reader, + field_name, + Some(get_numeric_or_date_column_types()), + )?], Terms(TermsAggregation { field: field_name, .. }) => { @@ -80,11 +88,7 @@ impl AggregationWithAccessor { // ColumnType::IpAddr Unsupported // ColumnType::DateTime Unsupported ]; - let mut columns = - get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?; - let first = columns.pop().unwrap(); - accessor2 = columns.pop(); - first + get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))? } Average(AverageAggregation { field: field_name }) | Count(CountAggregation { field: field_name }) @@ -95,7 +99,7 @@ impl AggregationWithAccessor { let (accessor, field_type) = get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; - (accessor, field_type) + vec![(accessor, field_type)] } Percentiles(percentiles) => { let (accessor, field_type) = get_ff_reader( @@ -103,25 +107,29 @@ impl AggregationWithAccessor { percentiles.field_name(), Some(get_numeric_or_date_column_types()), )?; - (accessor, field_type) + vec![(accessor, field_type)] } }; - let sub_aggregation = sub_aggregation.clone(); - Ok(AggregationWithAccessor { - accessor, - accessor2, - field_type, - sub_aggregation: get_aggs_with_segment_accessor_and_validate( - &sub_aggregation, - reader, - &limits, - )?, - agg: agg.clone(), - str_dict_column, - limits: limits.new_guard(), - column_block_accessor: Default::default(), - }) + let aggs: Vec<_> = acc_field_types + .into_iter() + .map(|(accessor, field_type)| { + Ok(AggregationWithAccessor { + accessor, + field_type, + sub_aggregation: get_aggs_with_segment_accessor_and_validate( + sub_aggregation, + reader, + &limits, + )?, + agg: agg.clone(), + str_dict_column: str_dict_column.clone(), + limits: limits.new_guard(), + column_block_accessor: Default::default(), + }) + }) + .collect::>()?; + Ok(aggs) } } @@ -141,15 +149,15 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate( ) -> crate::Result { let mut aggss = Vec::new(); for (key, agg) in aggs.iter() { - aggss.push(( - key.to_string(), - AggregationWithAccessor::try_from_agg( - agg, - agg.sub_aggregation(), - reader, - limits.clone(), - )?, - )); + let aggs = AggregationWithAccessor::try_from_agg( + agg, + agg.sub_aggregation(), + reader, + limits.clone(), + )?; + for agg in aggs { + aggss.push((key.to_string(), agg)); + } } Ok(AggregationsWithAccessor::from_data( VecWithNames::from_entries(aggss), diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index fda49141c1..a50761e63b 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -465,7 +465,7 @@ mod tests { SegmentRangeCollector::from_req_and_validate( &req, &mut Default::default(), - &mut AggregationLimits::default().new_guard(), + &AggregationLimits::default().new_guard(), field_type, 0, ) diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index adfa01673e..752f4657e2 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -224,110 +224,6 @@ impl TermBuckets { } } -/// The composite collector is used, when we have different types under one field, to support a term -/// aggregation on both. -#[derive(Clone, Debug)] -pub struct SegmentTermCollectorComposite { - term_agg1: SegmentTermCollector, // field type 1, e.g. strings - term_agg2: SegmentTermCollector, // field type 2, e.g. u64 - accessor_idx: usize, -} -impl SegmentAggregationCollector for SegmentTermCollectorComposite { - fn add_intermediate_aggregation_result( - self: Box, - agg_with_accessor: &AggregationsWithAccessor, - results: &mut IntermediateAggregationResults, - ) -> crate::Result<()> { - let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string(); - let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx]; - - let bucket = self - .term_agg1 - .into_intermediate_bucket_result(agg_with_accessor)?; - results.push( - name.to_string(), - IntermediateAggregationResult::Bucket(bucket), - )?; - let bucket = self - .term_agg2 - .into_intermediate_bucket_result(agg_with_accessor)?; - results.push(name, IntermediateAggregationResult::Bucket(bucket))?; - - Ok(()) - } - - #[inline] - fn collect( - &mut self, - doc: crate::DocId, - agg_with_accessor: &mut AggregationsWithAccessor, - ) -> crate::Result<()> { - self.term_agg1.collect_block(&[doc], agg_with_accessor)?; - self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); - self.term_agg2.collect_block(&[doc], agg_with_accessor)?; - self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); - Ok(()) - } - - #[inline] - fn collect_block( - &mut self, - docs: &[crate::DocId], - agg_with_accessor: &mut AggregationsWithAccessor, - ) -> crate::Result<()> { - self.term_agg1.collect_block(docs, agg_with_accessor)?; - self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); - self.term_agg2.collect_block(docs, agg_with_accessor)?; - self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); - - Ok(()) - } - - fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { - self.term_agg1.flush(agg_with_accessor)?; - self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); - self.term_agg2.flush(agg_with_accessor)?; - self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); - - Ok(()) - } -} - -impl SegmentTermCollectorComposite { - /// Swaps the accessor and field type with the second accessor and field type. - /// This way we can use the same code for both aggregations. - fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) { - if let Some(accessor) = aggregations.accessor2.as_mut() { - std::mem::swap(&mut accessor.0, &mut aggregations.accessor); - std::mem::swap(&mut accessor.1, &mut aggregations.field_type); - } - } - - pub(crate) fn from_req_and_validate( - req: &TermsAggregation, - sub_aggregations: &mut AggregationsWithAccessor, - field_type: ColumnType, - field_type2: ColumnType, - accessor_idx: usize, - ) -> crate::Result { - Ok(Self { - term_agg1: SegmentTermCollector::from_req_and_validate( - req, - sub_aggregations, - field_type, - accessor_idx, - )?, - term_agg2: SegmentTermCollector::from_req_and_validate( - req, - sub_aggregations, - field_type2, - accessor_idx, - )?, - accessor_idx, - }) - } -} - /// The collector puts values from the fast field into the correct buckets and does a conversion to /// the correct datatype. #[derive(Clone, Debug)] diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index f3cb1a60aa..e5be68dc61 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -15,7 +15,6 @@ use super::metric::{ SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation, SumAggregation, }; -use crate::aggregation::bucket::SegmentTermCollectorComposite; pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn add_intermediate_aggregation_result( @@ -81,26 +80,12 @@ pub(crate) fn build_single_agg_segment_collector( ) -> crate::Result> { use AggregationVariants::*; match &req.agg.agg { - Terms(terms_req) => { - if let Some(acc2) = req.accessor2.as_ref() { - Ok(Box::new( - SegmentTermCollectorComposite::from_req_and_validate( - terms_req, - &mut req.sub_aggregation, - req.field_type, - acc2.1, - accessor_idx, - )?, - )) - } else { - Ok(Box::new(SegmentTermCollector::from_req_and_validate( - terms_req, - &mut req.sub_aggregation, - req.field_type, - accessor_idx, - )?)) - } - } + Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate( + terms_req, + &mut req.sub_aggregation, + req.field_type, + accessor_idx, + )?)), Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate( range_req, &mut req.sub_aggregation,