Skip to content

Latest commit

 

History

History
168 lines (110 loc) · 3.29 KB

README.md

File metadata and controls

168 lines (110 loc) · 3.29 KB

Redis Job Queue

Simple redis job queue

crates.io Build Status

Documentation

https://docs.rs/rjq/

Enqueue jobs

use std::time::Duration;
use std::thread::sleep;

let queue = rjq::Queue::new("redis://localhost/", "rjq");
let mut uuids = Vec::new();

for _ in 0..10 {
    sleep(Duration::from_millis(100));
    uuids.push(queue.enqueue(None, vec![], 30).unwrap());
}
sleep(Duration::from_millis(3000));

for uuid in uuids.iter() {
    let status = queue.status(uuid).unwrap();
    let result = queue.result(uuid);
    if let Ok(result) = result {
        println!("{} {:?} {:?}", uuid, status, result);
    } else {
        println!("{:?}", result);
    }
}

Queue worker

use std::time::Duration;
use std::thread::sleep;

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    #[error("{0}")]
    Redis(#[from] redis::RedisError),
    #[error("{0}")]
    Serde(#[from] serde_json::Error),
    #[error("{0}")]
    Rjq(#[from] rjq::errors::Error),
}

fn process(uuid: String, _: Vec<String>) -> rjq::JobResult<Error> {
    sleep(Duration::from_millis(1000));
    println!("{}", uuid);
    Ok(Some(format!("hi from {}", uuid)))
}

let queue = rjq::Queue::new("redis://localhost/", "rjq");
queue.work(process, Some(5), Some(10), Some(1), Some(30), Some(false), Some(false)).unwrap();

Job status

QUEUED - job queued for further processing

RUNNING - job is running by worker

LOST - job has not been finished in time

FINISHED - job has been successfully finished

FAILED - job has been failed due to some errors

Queue methods

Init queue

fn new(url: &str, name: &str) -> Queue;

url - redis URL

name - queue name

Returns queue

Drop queue jobs

fn drop(&self) -> Result<(), Box<Error>>;

Enqueue job

fn enqueue(&self, args: Vec<String>, expire: usize) -> Result<String, Box<Error>>;

args - job arguments

expire - if job has not been started by worker in this time (in seconds), it will expire

Returns job UUID

Get job status

fn status(&self, uuid: &str) -> Result<Status, Box<Error>>;

uuid - job unique identifier

Returns job status

Work on queue

fn work<F: Fn(String, Vec<String>) -> Result<String, Box<Error>> + Send + Sync + 'static>
    (&self,
     fun: F,
     wait: Option<usize>,
     timeout: Option<usize>,
     freq: Option<usize>,
     expire: Option<usize>,
     fall: Option<bool>,
     infinite: Option<bool>)
     -> Result<(), Box<Error>>;

fun - worker function

wait - time to wait until next job will pop

timeout - worker function should finish in timeout (in seconds)

freq - job status check frequency (times per second)

expire - job result will expire in this time (in seconds)

fall - panics to terminate process if the job has been lost

infinite - process jobs infinitely one after another, otherwise only one job will be processed

Get job result

fn result(&self, uuid: &str) -> Result<Option<String>, Box<Error>>;

uuid - job unique identifier

Returns job result

Run tests

cargo test