-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathmod.rs
109 lines (92 loc) · 3.21 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
mod cast;
mod compare;
mod filter;
mod is_constant;
mod take;
use vortex_array::compute::{
CastFn, CompareFn, FilterKernelAdapter, IsConstantFn, KernelRef, ScalarAtFn, SliceFn, TakeFn,
scalar_at, slice,
};
use vortex_array::vtable::ComputeVTable;
use vortex_array::{Array, ArrayComputeImpl, ArrayRef};
use vortex_dtype::Nullability::{NonNullable, Nullable};
use vortex_dtype::datetime::TemporalMetadata;
use vortex_dtype::{DType, PType};
use vortex_error::{VortexResult, vortex_bail};
use vortex_scalar::Scalar;
use crate::timestamp::{self, TimestampParts};
use crate::{DateTimePartsArray, DateTimePartsEncoding};
impl ArrayComputeImpl for DateTimePartsArray {
const FILTER: Option<KernelRef> = FilterKernelAdapter(DateTimePartsEncoding).some();
}
impl ComputeVTable for DateTimePartsEncoding {
fn cast_fn(&self) -> Option<&dyn CastFn<&dyn Array>> {
Some(self)
}
fn compare_fn(&self) -> Option<&dyn CompareFn<&dyn Array>> {
Some(self)
}
fn is_constant_fn(&self) -> Option<&dyn IsConstantFn<&dyn Array>> {
Some(self)
}
fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<&dyn Array>> {
Some(self)
}
fn slice_fn(&self) -> Option<&dyn SliceFn<&dyn Array>> {
Some(self)
}
fn take_fn(&self) -> Option<&dyn TakeFn<&dyn Array>> {
Some(self)
}
// TODO(joe): implement `between_fn` this is used at lot.
}
impl SliceFn<&DateTimePartsArray> for DateTimePartsEncoding {
fn slice(
&self,
array: &DateTimePartsArray,
start: usize,
stop: usize,
) -> VortexResult<ArrayRef> {
Ok(DateTimePartsArray::try_new(
array.dtype().clone(),
slice(array.days(), start, stop)?,
slice(array.seconds(), start, stop)?,
slice(array.subseconds(), start, stop)?,
)?
.into_array())
}
}
impl ScalarAtFn<&DateTimePartsArray> for DateTimePartsEncoding {
fn scalar_at(&self, array: &DateTimePartsArray, index: usize) -> VortexResult<Scalar> {
let DType::Extension(ext) = array.dtype().clone() else {
vortex_bail!(
"DateTimePartsArray must have extension dtype, found {}",
array.dtype()
);
};
let Ok(temporal_metadata) = TemporalMetadata::try_from(ext.as_ref()) else {
vortex_bail!(ComputeError: "must decode TemporalMetadata from extension metadata");
};
if !array.is_valid(index)? {
return Ok(Scalar::null(DType::Extension(ext)));
}
let days: i64 = scalar_at(array.days(), index)?
.cast(&DType::Primitive(PType::I64, Nullable))?
.try_into()?;
let seconds: i64 = scalar_at(array.seconds(), index)?
.cast(&DType::Primitive(PType::I64, NonNullable))?
.try_into()?;
let subseconds: i64 = scalar_at(array.subseconds(), index)?
.cast(&DType::Primitive(PType::I64, NonNullable))?
.try_into()?;
let ts = timestamp::combine(
TimestampParts {
days,
seconds,
subseconds,
},
temporal_metadata.time_unit(),
)?;
Ok(Scalar::extension(ext, Scalar::from(ts)))
}
}