@@ -23,8 +23,10 @@ use std::pin::Pin;
23
23
use std:: sync:: Arc ;
24
24
use std:: task:: { Context as StdTaskContext , Poll } ;
25
25
26
+ use datafusion:: arrow:: array:: StringArray ;
26
27
use datafusion:: arrow:: datatypes:: SchemaRef ;
27
28
use datafusion:: arrow:: record_batch:: RecordBatch ;
29
+ use datafusion:: arrow:: temporal_conversions;
28
30
use datafusion:: error:: { DataFusionError , Result } ;
29
31
use datafusion:: execution:: context:: TaskContext ;
30
32
use datafusion:: physical_plan:: expressions:: PhysicalSortExpr ;
@@ -35,6 +37,7 @@ use datafusion::physical_plan::{
35
37
} ;
36
38
use futures:: stream:: Stream ;
37
39
use futures:: StreamExt ;
40
+ use modelardb_common:: types:: { TimestampArray , ValueArray } ;
38
41
39
42
/// A column the [`GeneratedAsExec`] must add to each of the [`RecordBatches`](RecordBatch) using
40
43
/// [`GeneratedAsStream`] with the location it must be at and the [`PhysicalExpr`] that compute it.
@@ -195,6 +198,49 @@ impl GeneratedAsStream {
195
198
baseline_metrics,
196
199
}
197
200
}
201
+
202
+ /// Format and return the first row in `batch` that causes `physical_expr` to fail when
203
+ /// generating values. If `physical_expr` never fails the string Unknown Row is returned, and a
204
+ /// [`DataFusionError`] is returned if `batch` is from a normal table.
205
+ fn failing_row ( batch : & RecordBatch , physical_expr : & Arc < dyn PhysicalExpr > ) -> Result < String > {
206
+ for row_index in 0 ..batch. num_rows ( ) {
207
+ if physical_expr. evaluate ( & batch. slice ( row_index, 1 ) ) . is_err ( ) {
208
+ let schema = batch. schema ( ) ;
209
+ let mut formatted_values = Vec :: with_capacity ( batch. num_columns ( ) ) ;
210
+
211
+ for column_index in 0 ..batch. num_columns ( ) {
212
+ let name = schema. field ( column_index) . name ( ) ;
213
+ let column = batch. column ( column_index) ;
214
+
215
+ if let Some ( timestamps) = column. as_any ( ) . downcast_ref :: < TimestampArray > ( ) {
216
+ // Store a readable version of timestamp if it is in the time interval that
217
+ // can be represented by a NaiveDateTime, otherwise the integer is stored.
218
+ let timestamp = timestamps. value ( row_index) ;
219
+ let formated_value = if let Some ( naive_date_time) =
220
+ temporal_conversions:: timestamp_ms_to_datetime ( timestamp)
221
+ {
222
+ format ! ( "{name}: {}" , naive_date_time)
223
+ } else {
224
+ format ! ( "{name}: {}" , timestamp)
225
+ } ;
226
+ formatted_values. push ( formated_value) ;
227
+ } else if let Some ( fields) = column. as_any ( ) . downcast_ref :: < ValueArray > ( ) {
228
+ formatted_values. push ( format ! ( "{name}: {}" , fields. value( row_index) ) ) ;
229
+ } else if let Some ( tags) = column. as_any ( ) . downcast_ref :: < StringArray > ( ) {
230
+ formatted_values. push ( format ! ( "{name}: {}" , tags. value( row_index) ) ) ;
231
+ } else {
232
+ // The method has been called for a table with unsupported column types.
233
+ return Err ( DataFusionError :: Internal ( "Not a model table." . to_owned ( ) ) ) ;
234
+ }
235
+ }
236
+
237
+ return Ok ( formatted_values. join ( ", " ) ) ;
238
+ }
239
+ }
240
+
241
+ // physical_expr never failed for any of the rows in batch, e.g., if random values are used.
242
+ Ok ( "Unknown Row" . to_owned ( ) )
243
+ }
198
244
}
199
245
200
246
impl Stream for GeneratedAsStream {
@@ -227,8 +273,17 @@ impl Stream for GeneratedAsStream {
227
273
columns. push ( generated_column. into_array ( batch. num_rows ( ) ) ) ;
228
274
generated_columns += 1 ;
229
275
} else {
276
+ let column_name = self . schema . field ( column_to_generate. index ) . name ( ) ;
277
+
230
278
// unwrap() is safe as it is only executed if a column was not generated.
231
- return Poll :: Ready ( Some ( Err ( maybe_generated_column. err ( ) . unwrap ( ) ) ) ) ;
279
+ let physical_expr = & column_to_generate. physical_expr ;
280
+ let failing_row = Self :: failing_row ( & batch, physical_expr) ?;
281
+ let cause = maybe_generated_column. err ( ) . unwrap ( ) ;
282
+
283
+ let error = format ! (
284
+ "Failed to compute '{column_name}' for {{{failing_row}}} due to: {cause}"
285
+ ) ;
286
+ return Poll :: Ready ( Some ( Err ( DataFusionError :: Execution ( error) ) ) ) ;
232
287
} ;
233
288
}
234
289
0 commit comments