Skip to content

Commit

Permalink
alternative mixed field aggregation collection
Browse files Browse the repository at this point in the history
instead of having multiple accessor in one AggregationWithAccessor split it into
multiple independent AggregationWithAccessor
  • Loading branch information
PSeitz committed Jul 25, 2023
1 parent c805f08 commit dcbdab8
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 166 deletions.
88 changes: 48 additions & 40 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ pub struct AggregationWithAccessor {
pub(crate) accessor: Column<u64>,
pub(crate) str_dict_column: Option<StrColumn>,
pub(crate) field_type: ColumnType,
/// In case there are multiple types of fast fields, e.g. string and numeric.
/// Only used for term aggregations currently.
pub(crate) accessor2: Option<(Column<u64>, ColumnType)>,
pub(crate) sub_aggregation: AggregationsWithAccessor,
pub(crate) limits: ResourceLimitGuard,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
Expand All @@ -52,20 +49,31 @@ impl AggregationWithAccessor {
sub_aggregation: &Aggregations,
reader: &SegmentReader,
limits: AggregationLimits,
) -> crate::Result<AggregationWithAccessor> {
) -> crate::Result<Vec<AggregationWithAccessor>> {
let mut str_dict_column = None;
let mut accessor2 = None;
use AggregationVariants::*;
let (accessor, field_type) = match &agg.agg {
let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg {
Range(RangeAggregation {
field: field_name, ..
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
DateHistogram(DateHistogramAggregationReq {
field: field_name, ..
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
Terms(TermsAggregation {
field: field_name, ..
}) => {
Expand All @@ -80,11 +88,7 @@ impl AggregationWithAccessor {
// ColumnType::IpAddr Unsupported
// ColumnType::DateTime Unsupported
];
let mut columns =
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?;
let first = columns.pop().unwrap();
accessor2 = columns.pop();
first
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?
}
Average(AverageAggregation { field: field_name })
| Count(CountAggregation { field: field_name })
Expand All @@ -95,33 +99,37 @@ impl AggregationWithAccessor {
let (accessor, field_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;

(accessor, field_type)
vec![(accessor, field_type)]
}
Percentiles(percentiles) => {
let (accessor, field_type) = get_ff_reader(
reader,
percentiles.field_name(),
Some(get_numeric_or_date_column_types()),
)?;
(accessor, field_type)
vec![(accessor, field_type)]
}
};

let sub_aggregation = sub_aggregation.clone();
Ok(AggregationWithAccessor {
accessor,
accessor2,
field_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
&sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column,
limits: limits.new_guard(),
column_block_accessor: Default::default(),
})
let aggs: Vec<_> = acc_field_types
.into_iter()
.map(|(accessor, field_type)| {
Ok(AggregationWithAccessor {
accessor,
field_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column: str_dict_column.clone(),
limits: limits.new_guard(),
column_block_accessor: Default::default(),
})
})
.collect::<crate::Result<_>>()?;
Ok(aggs)
}
}

Expand All @@ -141,15 +149,15 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
) -> crate::Result<AggregationsWithAccessor> {
let mut aggss = Vec::new();
for (key, agg) in aggs.iter() {
aggss.push((
key.to_string(),
AggregationWithAccessor::try_from_agg(
agg,
agg.sub_aggregation(),
reader,
limits.clone(),
)?,
));
let aggs = AggregationWithAccessor::try_from_agg(
agg,
agg.sub_aggregation(),
reader,
limits.clone(),
)?;
for agg in aggs {
aggss.push((key.to_string(), agg));
}
}
Ok(AggregationsWithAccessor::from_data(
VecWithNames::from_entries(aggss),
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ mod tests {
SegmentRangeCollector::from_req_and_validate(
&req,
&mut Default::default(),
&mut AggregationLimits::default().new_guard(),
&AggregationLimits::default().new_guard(),
field_type,
0,
)
Expand Down
104 changes: 0 additions & 104 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,110 +224,6 @@ impl TermBuckets {
}
}

/// The composite collector is used, when we have different types under one field, to support a term
/// aggregation on both.
#[derive(Clone, Debug)]
pub struct SegmentTermCollectorComposite {
term_agg1: SegmentTermCollector, // field type 1, e.g. strings
term_agg2: SegmentTermCollector, // field type 2, e.g. u64
accessor_idx: usize,
}
impl SegmentAggregationCollector for SegmentTermCollectorComposite {
fn add_intermediate_aggregation_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
results: &mut IntermediateAggregationResults,
) -> crate::Result<()> {
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];

let bucket = self
.term_agg1
.into_intermediate_bucket_result(agg_with_accessor)?;
results.push(
name.to_string(),
IntermediateAggregationResult::Bucket(bucket),
)?;
let bucket = self
.term_agg2
.into_intermediate_bucket_result(agg_with_accessor)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;

Ok(())
}

#[inline]
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.term_agg1.collect_block(&[doc], agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
self.term_agg2.collect_block(&[doc], agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
Ok(())
}

#[inline]
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.term_agg1.collect_block(docs, agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
self.term_agg2.collect_block(docs, agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);

Ok(())
}

fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
self.term_agg1.flush(agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
self.term_agg2.flush(agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);

Ok(())
}
}

impl SegmentTermCollectorComposite {
/// Swaps the accessor and field type with the second accessor and field type.
/// This way we can use the same code for both aggregations.
fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) {
if let Some(accessor) = aggregations.accessor2.as_mut() {
std::mem::swap(&mut accessor.0, &mut aggregations.accessor);
std::mem::swap(&mut accessor.1, &mut aggregations.field_type);
}
}

pub(crate) fn from_req_and_validate(
req: &TermsAggregation,
sub_aggregations: &mut AggregationsWithAccessor,
field_type: ColumnType,
field_type2: ColumnType,
accessor_idx: usize,
) -> crate::Result<Self> {
Ok(Self {
term_agg1: SegmentTermCollector::from_req_and_validate(
req,
sub_aggregations,
field_type,
accessor_idx,
)?,
term_agg2: SegmentTermCollector::from_req_and_validate(
req,
sub_aggregations,
field_type2,
accessor_idx,
)?,
accessor_idx,
})
}
}

/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
#[derive(Clone, Debug)]
Expand Down
27 changes: 6 additions & 21 deletions src/aggregation/segment_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use super::metric::{
SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
SumAggregation,
};
use crate::aggregation::bucket::SegmentTermCollectorComposite;

pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
fn add_intermediate_aggregation_result(
Expand Down Expand Up @@ -81,26 +80,12 @@ pub(crate) fn build_single_agg_segment_collector(
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
use AggregationVariants::*;
match &req.agg.agg {
Terms(terms_req) => {
if let Some(acc2) = req.accessor2.as_ref() {
Ok(Box::new(
SegmentTermCollectorComposite::from_req_and_validate(
terms_req,
&mut req.sub_aggregation,
req.field_type,
acc2.1,
accessor_idx,
)?,
))
} else {
Ok(Box::new(SegmentTermCollector::from_req_and_validate(
terms_req,
&mut req.sub_aggregation,
req.field_type,
accessor_idx,
)?))
}
}
Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate(
terms_req,
&mut req.sub_aggregation,
req.field_type,
accessor_idx,
)?)),
Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate(
range_req,
&mut req.sub_aggregation,
Expand Down

0 comments on commit dcbdab8

Please sign in to comment.