Skip to content

Commit 306f8d5

Browse files
committed
Update sum and grid to take residuals into account
1 parent cd54ce1 commit 306f8d5

File tree

5 files changed

+151
-47
lines changed

5 files changed

+151
-47
lines changed

crates/modelardb_compression/src/models/gorilla.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,17 +171,10 @@ impl Gorilla {
171171
/// Gorilla's compression method for floating-point values. If `maybe_model_last_value` is provided,
172172
/// it is assumed the first value in `values` is compressed against it instead of being stored in
173173
/// full, i.e., uncompressed.
174-
pub fn sum(
175-
start_time: Timestamp,
176-
end_time: Timestamp,
177-
timestamps: &[u8],
178-
values: &[u8],
179-
maybe_model_last_value: Option<Value>,
180-
) -> Value {
174+
pub fn sum(length: usize, values: &[u8], maybe_model_last_value: Option<Value>) -> Value {
181175
// This function replicates code from gorilla::grid() as it isn't necessary
182176
// to store the univariate ids, timestamps, and values in arrays for a sum.
183177
// So any changes to the decompression must be mirrored in gorilla::grid().
184-
let length = models::len(start_time, end_time, timestamps);
185178
let mut bits = BitReader::try_new(values).unwrap();
186179
let mut leading_zeros = u8::MAX;
187180
let mut trailing_zeros: u8 = 0;
@@ -369,7 +362,7 @@ mod tests {
369362
fn test_sum(values in collection::vec(ProptestValue::ANY, 0..50)) {
370363
prop_assume!(!values.is_empty());
371364
let compressed_values = compress_values_using_gorilla(&values);
372-
let sum = sum(1, values.len() as i64, &values.len().to_be_bytes(), &compressed_values, None);
365+
let sum = sum(values.len(), &compressed_values, None);
373366
let expected_sum = aggregate::sum(&ValueArray::from_iter_values(values)).unwrap();
374367
prop_assert!(models::equal_or_nan(expected_sum as f64, sum as f64));
375368
}
@@ -414,7 +407,7 @@ mod tests {
414407
let error_bound = ErrorBound::try_new(0.0).unwrap();
415408
let mut model_type = Gorilla::new(error_bound);
416409
model_type.compress_values(values);
417-
model_type.compressed_values()
410+
model_type.compressed_values.finish()
418411
}
419412

420413
fn slice_of_value_equal(values_one: &[Value], values_two: &[Value]) -> bool {

crates/modelardb_compression/src/models/mod.rs

Lines changed: 121 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ pub mod timestamps;
2727
use std::mem;
2828

2929
use arrow::array::ArrayBuilder;
30-
use modelardb_common::errors::ModelarDbError;
3130
use modelardb_common::types::{
3231
Timestamp, TimestampBuilder, UnivariateId, UnivariateIdBuilder, Value, ValueBuilder,
3332
};
@@ -99,6 +98,7 @@ pub fn len(start_time: Timestamp, end_time: Timestamp, timestamps: &[u8]) -> usi
9998
}
10099

101100
/// Compute the sum of the values for a time series segment whose values are represented by a model.
101+
#[allow(clippy::too_many_arguments)]
102102
pub fn sum(
103103
model_type_id: u8,
104104
start_time: Timestamp,
@@ -107,15 +107,52 @@ pub fn sum(
107107
min_value: Value,
108108
max_value: Value,
109109
values: &[u8],
110+
residuals: &[u8],
110111
) -> Value {
111-
match model_type_id {
112-
PMC_MEAN_ID => pmc_mean::sum(start_time, end_time, timestamps, min_value),
113-
SWING_ID => swing::sum(
114-
start_time, end_time, timestamps, min_value, max_value, values,
112+
// Extract the number of residuals stored.
113+
let residuals_length = if residuals.is_empty() {
114+
0
115+
} else {
116+
// The number of residuals are stored as the last byte.
117+
residuals[residuals.len() - 1] as usize
118+
};
119+
120+
let model_length = len(start_time, end_time, timestamps) - residuals_length;
121+
122+
// Computes the sum from the model.
123+
let (model_last_value, model_sum) = match model_type_id {
124+
PMC_MEAN_ID => {
125+
let value =
126+
CompressedSegmentBuilder::decode_values_for_pmc_mean(min_value, max_value, values);
127+
(value, pmc_mean::sum(model_length, value))
128+
}
129+
SWING_ID => {
130+
let (first_value, last_value) =
131+
CompressedSegmentBuilder::decode_values_for_swing(min_value, max_value, values);
132+
(
133+
last_value,
134+
swing::sum(
135+
start_time,
136+
end_time,
137+
timestamps,
138+
first_value,
139+
last_value,
140+
residuals_length,
141+
),
142+
)
143+
}
144+
GORILLA_ID => (
145+
f32::NAN, // A segment with values compressed by Gorilla never has residuals.
146+
gorilla::sum(model_length, values, None),
115147
),
116-
// TODO: take residuals stored as part of the segment into account when refactoring optimizer.
117-
GORILLA_ID => gorilla::sum(start_time, end_time, timestamps, values, None),
118148
_ => panic!("Unknown model type."),
149+
};
150+
151+
// Compute the sum from the residuals.
152+
if residuals.is_empty() {
153+
model_sum
154+
} else {
155+
model_sum + gorilla::sum(residuals_length, residuals, Some(model_last_value))
119156
}
120157
}
121158

@@ -137,21 +174,15 @@ pub fn grid(
137174
timestamp_builder: &mut TimestampBuilder,
138175
value_builder: &mut ValueBuilder,
139176
) {
140-
// Extract the number of residuals stored.
141-
let residuals_length = if residuals.is_empty() {
142-
0
143-
} else {
144-
// The number of residuals are stored as the last byte.
145-
residuals[residuals.len() - 1]
146-
};
147-
148-
// Decompress all of the timestamps and create a slice for the model's timestamps.
149-
let model_timestamps_start_index = timestamp_builder.values_slice().len();
150-
timestamps::decompress_all_timestamps(start_time, end_time, timestamps, timestamp_builder);
151-
let model_timestamps_end_index =
152-
timestamp_builder.values_slice().len() - residuals_length as usize;
153-
let model_timestamps =
154-
&timestamp_builder.values_slice()[model_timestamps_start_index..model_timestamps_end_index];
177+
// Decompress the timestamps.
178+
let (model_timestamps, residuals_timestamps) =
179+
decompress_all_timestamps_and_split_into_models_and_residuals(
180+
start_time,
181+
end_time,
182+
timestamps,
183+
residuals,
184+
timestamp_builder,
185+
);
155186

156187
// Reconstruct the values from the model.
157188
match model_type_id {
@@ -192,21 +223,49 @@ pub fn grid(
192223
}
193224

194225
// Reconstruct the values from the residuals.
195-
if residuals_length > 0 {
196-
// The first value in residuals are compressed against models last value.
226+
if !residuals.is_empty() {
197227
let model_last_value = value_builder.values_slice()[value_builder.len() - 1];
198228

199229
gorilla::grid(
200230
univariate_id,
201231
&residuals[..residuals.len() - 1],
202232
univariate_id_builder,
203-
&timestamp_builder.values_slice()[model_timestamps_end_index..],
233+
residuals_timestamps,
204234
value_builder,
205235
Some(model_last_value),
206236
);
207237
}
208238
}
209239

240+
/// Decompress the timestamps stored as `start_time`, `end_time`, and `timestamps`, add them to
241+
/// `timestamp_builder`, and return slices to the model's timestamps and the residual's timestamps.
242+
fn decompress_all_timestamps_and_split_into_models_and_residuals<'a>(
243+
start_time: Timestamp,
244+
end_time: Timestamp,
245+
timestamps: &'a [u8],
246+
residuals: &'a [u8],
247+
timestamp_builder: &'a mut TimestampBuilder,
248+
) -> (&'a [Timestamp], &'a [Timestamp]) {
249+
// Extract the number of residuals stored.
250+
let residuals_length = if residuals.is_empty() {
251+
0
252+
} else {
253+
// The number of residuals are stored as the last byte.
254+
residuals[residuals.len() - 1]
255+
};
256+
257+
let model_timestamps_start_index = timestamp_builder.values_slice().len();
258+
timestamps::decompress_all_timestamps(start_time, end_time, timestamps, timestamp_builder);
259+
let model_timestamps_end_index =
260+
timestamp_builder.values_slice().len() - residuals_length as usize;
261+
262+
let model_timestamps =
263+
&timestamp_builder.values_slice()[model_timestamps_start_index..model_timestamps_end_index];
264+
let residuals_timestamps = &timestamp_builder.values_slice()[model_timestamps_end_index..];
265+
266+
(model_timestamps, residuals_timestamps)
267+
}
268+
210269
#[cfg(test)]
211270
mod tests {
212271
use super::*;
@@ -298,4 +357,41 @@ mod tests {
298357
prop_assert!(!equal_or_nan(v1, v2));
299358
}
300359
}
360+
361+
// Test for decompress_all_timestamps_and_split_into_models_and_residuals().
362+
#[test]
363+
fn test_decompress_all_timestamps_and_split_into_models_and_residuals_no_residuals() {
364+
let mut timestamp_builder = TimestampBuilder::new();
365+
366+
let (model_timestamps, residuals_timestamps) =
367+
decompress_all_timestamps_and_split_into_models_and_residuals(
368+
100,
369+
500,
370+
&[5],
371+
&[],
372+
&mut timestamp_builder,
373+
);
374+
375+
// Type aliases cannot be used when constructor, so &[Timestamp] is not possible.
376+
let expected_residuals_timestamps: &[Timestamp] = &[];
377+
assert_eq!(model_timestamps, &[100, 200, 300, 400, 500]);
378+
assert_eq!(residuals_timestamps, expected_residuals_timestamps);
379+
}
380+
381+
#[test]
382+
fn test_decompress_all_timestamps_and_split_into_models_and_residuals_with_residuals() {
383+
let mut timestamp_builder = TimestampBuilder::new();
384+
385+
let (model_timestamps, residuals_timestamps) =
386+
decompress_all_timestamps_and_split_into_models_and_residuals(
387+
100,
388+
500,
389+
&[5],
390+
&[2],
391+
&mut timestamp_builder,
392+
);
393+
394+
assert_eq!(model_timestamps, &[100, 200, 300]);
395+
assert_eq!(residuals_timestamps, &[400, 500]);
396+
}
301397
}

crates/modelardb_compression/src/models/pmc_mean.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ impl PMCMean {
9696

9797
/// Compute the sum of the values for a time series segment whose values are
9898
/// represented by a model of type PMC-Mean.
99-
pub fn sum(start_time: Timestamp, end_time: Timestamp, timestamps: &[u8], value: Value) -> Value {
100-
models::len(start_time, end_time, timestamps) as Value * value
99+
pub fn sum(model_length: usize, value: Value) -> Value {
100+
model_length as Value * value
101101
}
102102

103103
/// Reconstruct the values for the `timestamps` without matching values in
@@ -251,8 +251,7 @@ mod tests {
251251
proptest! {
252252
#[test]
253253
fn test_sum(value in ProptestValue::ANY) {
254-
let sum = sum(1657734000, 1657734540, &[10], value);
255-
prop_assert!(models::equal_or_nan(sum as f64, (10.0 * value) as f64));
254+
prop_assert!(models::equal_or_nan(sum(10, value) as f64, (10.0 * value) as f64));
256255
}
257256
}
258257

crates/modelardb_compression/src/models/swing.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
//! [ModelarDB paper]: https://www.vldb.org/pvldb/vol11/p1688-jensen.pdf
2525
2626
use modelardb_common::schemas::COMPRESSED_METADATA_SIZE_IN_BYTES;
27-
use modelardb_common::types::{Timestamp, UnivariateId, UnivariateIdBuilder, Value, ValueBuilder};
27+
use modelardb_common::types::{
28+
Timestamp, TimestampBuilder, UnivariateId, UnivariateIdBuilder, Value, ValueBuilder,
29+
};
2830

29-
use crate::models::ErrorBound;
30-
use crate::models::{self, timestamps};
31+
use super::timestamps;
32+
use crate::models::{self, ErrorBound};
3133

3234
/// The state the Swing model type needs while fitting a model to a time series
3335
/// segment.
@@ -214,6 +216,7 @@ pub fn sum(
214216
timestamps: &[u8],
215217
first_value: Value,
216218
last_value: Value,
219+
residuals_length: usize,
217220
) -> Value {
218221
let (slope, intercept) =
219222
compute_slope_and_intercept(start_time, first_value as f64, end_time, last_value as f64);
@@ -225,9 +228,20 @@ pub fn sum(
225228
let length = models::len(start_time, end_time, timestamps);
226229
(average * length as f64) as Value
227230
} else {
228-
// TODO: decompress timestamps instead of just casting them when refactoring the optimizer.
231+
let mut timestamp_builder = TimestampBuilder::new();
232+
233+
timestamps::decompress_all_timestamps(
234+
start_time,
235+
end_time,
236+
timestamps,
237+
&mut timestamp_builder,
238+
);
239+
240+
let timestamps = timestamp_builder.finish();
241+
let model_timestamps_end_index = timestamps.len() - residuals_length;
242+
229243
let mut sum: f64 = 0.0;
230-
for timestamp in timestamps {
244+
for timestamp in &timestamps.values()[0..model_timestamps_end_index] {
231245
sum += slope * (*timestamp as f64) + intercept;
232246
}
233247
sum as Value
@@ -461,7 +475,7 @@ mod tests {
461475
first_value in num::i32::ANY.prop_map(i32_to_value),
462476
last_value in num::i32::ANY.prop_map(i32_to_value),
463477
) {
464-
let sum = sum(START_TIME, END_TIME, &[], first_value, last_value);
478+
let sum = sum(START_TIME, END_TIME, &[], first_value, last_value, 0);
465479
prop_assert_eq!(sum, first_value + last_value);
466480
}
467481
}

crates/modelardb_server/src/optimizer/model_simple_aggregates.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ impl PhysicalExpr for ModelSumPhysicalExpr {
696696
min_values,
697697
max_values,
698698
values,
699-
_residuals,
699+
residuals,
700700
_error_array
701701
);
702702

@@ -718,6 +718,7 @@ impl PhysicalExpr for ModelSumPhysicalExpr {
718718
min_value,
719719
max_value,
720720
values,
721+
residuals.values(),
721722
);
722723
}
723724

@@ -838,7 +839,7 @@ impl PhysicalExpr for ModelAvgPhysicalExpr {
838839
min_values,
839840
max_values,
840841
values,
841-
_residuals,
842+
residuals,
842843
_error_array
843844
);
844845

@@ -861,6 +862,7 @@ impl PhysicalExpr for ModelAvgPhysicalExpr {
861862
min_value,
862863
max_value,
863864
values,
865+
residuals.values(),
864866
);
865867

866868
count += modelardb_compression::len(start_time, end_time, timestamps);

0 commit comments

Comments
 (0)