Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed pipeline #5

Merged
merged 4 commits into from
Apr 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ jobs:

- name: Run cargo check
uses: actions-rs/cargo@v1
continue-on-error: true # WARNING: only for this example, remove it!
with:
command: check

Expand All @@ -45,7 +44,6 @@ jobs:

- name: Run cargo test
uses: actions-rs/cargo@v1
continue-on-error: true # WARNING: only for this example, remove it!
with:
command: test

Expand All @@ -66,14 +64,12 @@ jobs:

- name: Run cargo fmt
uses: actions-rs/cargo@v1
continue-on-error: true # WARNING: only for this example, remove it!
with:
command: fmt
args: --all -- --check

- name: Run cargo clippy
uses: actions-rs/cargo@v1
continue-on-error: true # WARNING: only for this example, remove it!
with:
command: clippy
args: -- -D warnings
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
[package]
name = "tokio_interval_buffer"
version = "0.1.0"
version = "0.2.0"
authors = ["Victor 'Trangar' Koenders <[email protected]>"]
description = "Easy enum wrapper that implements all traits that the wrapped objects implement"
homepage = "https://github.com/trangar/tokio_interval_buffer"
repository = "https://github.com/trangar/tokio_interval_buffer"
edition = "2018"
edition = "2021"
license = "MIT"
readme = "readme.md"

[dependencies]
futures = "0.3"
tokio = "1.0"

[dependencies.tokio]
version = "1.0"
default-features = false
features = ["time"]

[dev-dependencies]
irc = "0.15"
failure = "0.1"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
112 changes: 54 additions & 58 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,40 @@
//!
//! This is useful for when you receive streaming data but want to parse it in bulk.
//!
//! ```rust
//! extern crate irc;
//! extern crate failure;
//! extern crate tokio_interval_buffer;
//! extern crate futures;
//! extern crate tokio;
//!
//! ```rust,no_run
//! use futures::prelude::*;
//! use irc::client::prelude::*;
//! use tokio_interval_buffer::IntervalBuffer;
//!
//! fn main() {
//! tokio::run(futures::future::lazy(|| {
//! let client = IrcClient::from_config(Config {
//! nickname: Some(String::from("...")),
//! server: Some(String::from("...")),
//! channels: Some(vec![String::from("...")]),
//! ..Default::default()
//! })
//! .expect("Could not create an irc client");
//! #[tokio::main]
//! async fn main() {
//! let mut client = Client::from_config(Config {
//! nickname: Some(String::from("...")),
//! server: Some(String::from("...")),
//! channels: vec![String::from("...")],
//! ..Default::default()
//! })
//! .await
//! .expect("Could not create an irc client");
//!
//! // Take the IRC stream and process all the messages every 10 seconds
//! let buffered_receiver = IntervalBuffer::<_, _, failure::Error>::new(
//! client
//! .stream()
//! .map_err(|e| failure::format_err!("Client stream error: {:?}", e)),
//! std::time::Duration::from_secs(10),
//! );
//! // Take the IRC stream and process all the messages every 10 seconds
//! let mut buffered_receiver = IntervalBuffer::<_, _, failure::Error>::new(
//! client.stream().unwrap().map_err(|e| e.into()),
//! std::time::Duration::from_secs(10),
//! );
//!
//! buffered_receiver
//! .for_each(|b| {
//! println!("Buffer: {:?}", b);
//! Ok(())
//! })
//! .map_err(|e| {
//! println!("Buffered receiver error: {:?}", e);
//! })
//! }));
//! while let Some(item) = buffered_receiver.next().await {
//! println!("Buffer: {:?}", item);
//! }
//! }
//! ```

use futures::prelude::*;
use std::time::Duration;
use tokio::timer::Interval;
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::Interval;

/// This buffer takes a stream and an interval, and will emit a Vec<Stream::Item> every interval.
///
Expand All @@ -56,8 +47,8 @@ use tokio::timer::Interval;
/// In the future I want to be able to configure a `.map_err` function for this.
pub struct IntervalBuffer<Stream, Item, Error, Container: Insertable<Item> = Vec<Item>>
where
Stream: futures::Stream<Item = Item, Error = Error>,
Error: From<tokio::timer::Error>,
Stream: futures::Stream<Item = Result<Item, Error>>,
Error: From<tokio::time::error::Error>,
{
stream: Stream,
timer: Interval,
Expand Down Expand Up @@ -99,8 +90,8 @@ impl<T> Insertable<T> for Vec<T> {

impl<Stream, Item, Error, Container> IntervalBuffer<Stream, Item, Error, Container>
where
Stream: futures::Stream<Item = Item, Error = Error>,
Error: From<tokio::timer::Error>,
Stream: futures::Stream<Item = Result<Item, Error>>,
Error: From<tokio::time::error::Error>,
Container: Insertable<Item>,
{
/// Create a new IntervalBuffer with a default container. This will simply call `new_with_container(.., .., Container::default())`. See that function for more informaiton.
Expand All @@ -117,9 +108,9 @@ where
///
/// If either the stream or the internal timer emits an error, this stream will emit an error.
///
/// If the stream ends (by returning `Ok(Async::Ready(None))`), this stream will immediately return. The internal timer will not be polled.
/// If the stream ends (by returning `Ok(Poll::Ready(None))`), this stream will immediately return. The internal timer will not be polled.
pub fn new_with_container(stream: Stream, interval: Duration, container: Container) -> Self {
let timer = Interval::new_interval(interval);
let timer = tokio::time::interval(interval);
IntervalBuffer {
stream,
timer,
Expand All @@ -143,31 +134,36 @@ where
impl<Stream, Item, Error, Container> futures::Stream
for IntervalBuffer<Stream, Item, Error, Container>
where
Stream: futures::Stream<Item = Item, Error = Error>,
Error: From<tokio::timer::Error>,
Stream: futures::Stream<Item = Result<Item, Error>>,
Error: From<tokio::time::error::Error>,
Container: Insertable<Item>,
{
type Item = Container;
type Error = Error;
type Item = Result<Container, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = unsafe { Pin::get_unchecked_mut(self) };
let mut stream = unsafe { Pin::new_unchecked(&mut inner.stream) };

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.stream.poll()? {
Async::NotReady => break,
Async::Ready(Some(v)) => self.buffer.insert(v),
Async::Ready(None) => return Ok(Async::Ready(None)),
match stream.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(Some(Ok(v))) => inner.buffer.insert(v),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None),
}
}

if let Async::Ready(_) = self.timer.poll()? {
let result = self.buffer.return_content_and_clear();
if result.is_some() {
Ok(Async::Ready(result))
} else {
Ok(Async::NotReady)
let mut timer = unsafe { Pin::new_unchecked(&mut inner.timer) };
match timer.poll_tick(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
let result = inner.buffer.return_content_and_clear();
if let Some(container) = result {
Poll::Ready(Some(Ok(container)))
} else {
Poll::Pending
}
}
} else {
Ok(Async::NotReady)
}
}
}