forked from databendlabs/databend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_limit.rs
63 lines (54 loc) · 1.73 KB
/
stream_limit.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Copyright 2020 The FuseQuery Authors.
//
// Code is licensed under AGPL License, Version 3.0.
use std::task::{Context, Poll};
use futures::stream::{Stream, StreamExt};
use crate::datablocks::DataBlock;
use crate::datastreams::SendableDataBlockStream;
use crate::error::FuseQueryResult;
pub struct LimitStream {
input: SendableDataBlockStream,
limit: usize,
current: usize,
}
impl LimitStream {
pub fn try_create(input: SendableDataBlockStream, limit: usize) -> FuseQueryResult<Self> {
Ok(LimitStream {
input,
limit,
current: 0,
})
}
pub fn limit(&mut self, block: &DataBlock) -> FuseQueryResult<Option<DataBlock>> {
let rows = block.num_rows();
if self.current == self.limit {
Ok(None)
} else if (self.current + rows) < self.limit {
self.current += rows;
Ok(Some(block.clone()))
} else {
let keep = self.limit - self.current;
self.current = self.limit;
let mut limited_columns = Vec::with_capacity(block.num_columns());
for i in 0..block.num_columns() {
limited_columns.push(arrow::compute::limit(block.column(i), keep));
}
Ok(Some(DataBlock::create(
block.schema().clone(),
limited_columns,
)))
}
}
}
impl Stream for LimitStream {
type Item = FuseQueryResult<DataBlock>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.input.poll_next_unpin(ctx).map(|x| match x {
Some(Ok(ref v)) => self.limit(v).transpose(),
other => other,
})
}
}