diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6aa1064 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target/ +**/*.rs.bk +Cargo.lock diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..a5075dd --- /dev/null +++ b/.travis.yml @@ -0,0 +1,34 @@ +language: rust +sudo: required +dist: trusty +cache: cargo + +services: + - rabbitmq + +matrix: + fast_finish: true + allow_failures: + - rust: nightly + include: + - rust: stable + env: RUSTFMT=YES + script: + - rustup component add rustfmt-preview + - cargo fmt -- --write-mode=diff + - rust: stable + - rust: beta + - rust: nightly + +before_install: + - sudo apt-get install -qq -y dnsmasq + - echo "listen-address=127.0.0.1" | sudo tee -a /etc/dnsmasq.conf > /dev/null + - echo "user=root" | sudo tee -a /etc/dnsmasq.conf > /dev/null + - sudo service dnsmasq restart + +script: + - cargo test --all + +notifications: + email: + on_success: never diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..373729d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,5 @@ +[workspace] +members = [ + "batch", + "batch-codegen", +] diff --git a/README.md b/README.md new file mode 100644 index 0000000..e472fb1 --- /dev/null +++ b/README.md @@ -0,0 +1,227 @@ +# Batch + +A distributed task queue library written in Rust using RabbitMQ as a message broker. + +This library allows you to send a task to a RabbitMQ broker, so that a worker will be able +to pull it and execute the associated handler. It leverages the `futures` and `tokio-core` +crates to provide asynchronous I/O operations. + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +batch = "0.1" +``` + +> **Note**: Task serialization depends on [`serde`](https://serde.rs/), so you will have to add it to your project's dependencies as well. + +Then add this to your crate root: + +``` +#[macro_use] +extern crate batch; +``` + +Examples are available on [GitHub](https://github.com/kureuil/batch-rs/tree/master/batch/examples) or you can continue and read the Getting Started guide. + +## Getting started + +The first thing you'll want to do once you've installed `batch` is connect to a RabbitMQ broker. We'll start by creating a `Client`: + +```rust +extern crate batch; +extern crate tokio_core; + +use batch::ClientBuilder; +use tokio_core::reactor::Core; + +fn main() { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let client = ClientBuilder::new() + .connection_url("amqp://localhost/%2f") + .handle(handle) + .build(); + + core.run(client).unwrap(); +} +``` + +Now, that we're connected to our broker, we'll create our first task. A task is a work of unit that you want to asynchronously, becuse handling synchronously isn't possible or wouldn't be ideal (e.g sending a mail from a web API). The easiest of creating a task, is by declaring a structure, and derive `Task` on it: + +```rust +#[macro_use] +extern crate batch; +#[macro_use] +extern crate serde; +extern crate tokio_core; + +use batch::ClientBuilder; +use tokio_core::reactor::Core; + +#[derive(Serialize, Deserialize, Task)] +#[task_routing_key = "hello-world"] +struct SayHello { + to: String, +} + +fn main() { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let client = ClientBuilder::new() + .connection_url("amqp://localhost/%2f") + .handle(handle) + .build(); + + core.run(client).unwrap(); +} +``` + +> **Note**: you can see that in addition to `Task`, we're also deriving `serde`'s `Serialize` & `Deserialize` traits. This is necessary in order to safely send task over the network. + +> **Note**: When deriving `Task` we added the (mandatory) `task_routing_key` attribute, it is used by RabbitMQ to deliver your message to the right worker. + +Now that we have our task, we can send it to our message broker: + +```rust +#[macro_use] +extern crate batch; +extern crate futures; +#[macro_use] +extern crate serde; +extern crate tokio_core; + +use batch::{job, ClientBuilder}; +use futures::Future; +use tokio_core::reactor::Core; + +#[derive(Serialize, Deserialize, Task)] +#[task_routing_key = "hello-world"] +struct SayHello { + to: String, +} + +fn main() { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let client = ClientBuilder::new() + .connection_url("amqp://localhost/%2f") + .handle(handle) + .build(); + + let send = client.and_then(|client| { + let task = SayHello { + to: "Ferris".into() + }; + + job(task).send(&client) + }); + + core.run(send).unwrap(); +} +``` + +Now that our task has been published to our broker, we'll need to fetch it and assign a function to this task. To do this, we'll create a new program, the *worker*: + +```rust +#[macro_use] +extern crate batch; +#[macro_use] +extern crate serde; +extern crate tokio_core; + +use batch::{queue, WorkerBuilder}; +use tokio_core::reactor::Core; + +#[derive(Serialize, Deserialize, Task)] +#[task_routing_key = "hello-world"] +struct SayHello { + to: String, +} + +fn main() { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let queues = vec![queue("hello-world")]; + let worker = WorkerBuilder::new(()) + .connection_url("amqp://localhost/%2f") + .queues(queues) + .handle(handle) + .build() + .unwrap(); + + core.run(worker.run()).unwrap(); +} +``` + +In order to register our task on the worker, we'll need to make it executable by implementing the `Perform` trait: + +```rust +#[macro_use] +extern crate batch; +#[macro_use] +extern crate serde; +extern crate tokio_core; + +use batch::{queue, Perform, WorkerBuilder}; +use tokio_core::reactor::Core; + +#[derive(Serialize, Deserialize, Task)] +#[task_routing_key = "hello-world"] +struct SayHello { + to: String, +} + +impl Perform for SayHello { + type Context = (); + + fn perform(&self, _ctx: Self::Context) { + println!("Hello {}!", self.to); + } +} + +fn main() { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let queues = vec![queue("hello-world")]; + let worker = WorkerBuilder::new(()) + .connection_url("amqp://localhost/%2f") + .queues(queues) + .handle(handle) + .task::() + .build() + .unwrap(); + + core.run(worker.run()).unwrap(); +} +``` + +We can now run our *worker* program and see the `Hello Ferris!` message displayed in the terminal. + +## Features + +* `codegen` *(enabled by default)*: Automatically re-exports the procedurals macros of `batch-codegen` from the `batch` crate. + +## License + +Licensed under either of + + * Apache License, Version 2.0 + ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license + ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +## Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/batch-codegen/Cargo.toml b/batch-codegen/Cargo.toml new file mode 100644 index 0000000..19d15f0 --- /dev/null +++ b/batch-codegen/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "batch-codegen" +description = "Procedural macros for the batch crate" +repository = "https://github.com/kureuil/batch-rs" +version = "0.1.0" +license = "MIT/Apache-2.0" +authors = ["Louis Person "] + +[lib] +proc-macro = true + +[dependencies] +syn = "0.12" +quote = "0.4" + +[features] +default = [] diff --git a/batch-codegen/LICENSE-APACHE b/batch-codegen/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/batch-codegen/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/batch-codegen/LICENSE-MIT b/batch-codegen/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/batch-codegen/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/batch-codegen/src/lib.rs b/batch-codegen/src/lib.rs new file mode 100644 index 0000000..05eefbc --- /dev/null +++ b/batch-codegen/src/lib.rs @@ -0,0 +1,155 @@ +//! This crate provides a Batch's derive macro. +//! +//! ```rust,ignore +//! #[derive(Task)] +//! ``` + +#![deny(missing_debug_implementations)] +#![recursion_limit = "128"] + +extern crate proc_macro; +#[macro_use] +extern crate quote; +extern crate syn; + +use proc_macro::TokenStream; +use quote::{ToTokens, Tokens}; +use syn::{DeriveInput, Ident, Lit, Meta}; + +/// Macros 1.1 implementation of `#[derive(Task)]` +/// +/// This macro supports several attributes: +/// +/// * `task_name`: a unique ID for the task. +/// e.g: `#[task_name = "batch-rs:send-confirmation-email"]` +/// **default value**: The derived struct name +/// * `task_exchange`: the exchange this task will be published to. +/// e.g: `#[task_exchange = "batch.example"]` +/// **default value**: `""` +/// * `task_routing_key`: the routing key associated to the task. +/// e.g: `#[task_routing_key = "mailer"]` +/// * `task_timeout`: Number of seconds available for the task to execute. If the time limit is +/// exceeded, the task's process is killed and the task is marked as failed. +/// e.g: `#[task_timeout = "120"]` +/// **default value**: 900 (15 minutes) +/// * `task_retries`: Number of times the task should be retried in case of error. +/// e.g: `#[task_retries = "5"]` +/// **default value**: 2 +#[proc_macro_derive(Task, + attributes(task_name, task_exchange, task_routing_key, task_timeout, + task_retries))] +pub fn task_derive(input: TokenStream) -> TokenStream { + let input: DeriveInput = syn::parse(input).unwrap(); + let task_name = get_derive_name_attr(&input); + let task_exchange = get_derive_exchange_attr(&input); + let task_routing_key = get_derive_routing_key_attr(&input); + let task_timeout = get_derive_timeout_attr(&input); + let task_retries = get_derive_retries_attr(&input); + let name = &input.ident; + + let expanded = quote! { + impl ::batch::Task for #name { + + fn name() -> &'static str { + #task_name + } + + fn exchange() -> &'static str { + #task_exchange + } + + fn routing_key() -> &'static str { + #task_routing_key + } + + fn timeout() -> Option<::std::time::Duration> { + #task_timeout + } + + fn retries() -> u32 { + #task_retries + } + } + }; + expanded.into() +} + +fn get_derive_name_attr(input: &DeriveInput) -> Tokens { + let attr = { + let raw = get_str_attr_by_name(&input.attrs, "task_name"); + raw.unwrap_or_else(|| input.ident.as_ref().to_string()) + }; + attr.into_tokens() +} + +fn get_derive_exchange_attr(input: &DeriveInput) -> Tokens { + let attr = { + let raw = get_str_attr_by_name(&input.attrs, "task_exchange"); + raw.unwrap_or_else(|| "".to_string()) + }; + attr.into_tokens() +} + +fn get_derive_routing_key_attr(input: &DeriveInput) -> Tokens { + let attr = { + let raw = get_str_attr_by_name(&input.attrs, "task_routing_key"); + raw.expect("task_routing_key is a mandatory attribute when deriving Task") + }; + attr.into_tokens() +} + +fn get_derive_timeout_attr(input: &DeriveInput) -> Tokens { + let attr = { + let raw = get_str_attr_by_name(&input.attrs, "task_timeout"); + raw.unwrap_or_else(|| "900".to_string()) + }; + let timeout = attr.parse::() + .expect("Couldn't parse timeout as an unsigned integer"); + quote! { + ::std::option::Option::Some(::std::time::Duration::from_secs(#timeout)) + } +} + +fn get_derive_retries_attr(input: &DeriveInput) -> Tokens { + let attr = { + let raw = get_str_attr_by_name(&input.attrs, "task_retries"); + raw.unwrap_or_else(|| "2".to_string()) + }; + let retries = attr.parse::() + .expect("Couldn't parse retries as an unsigned integer"); + quote! { + #retries + } +} + +/// Gets the string value of an attribute by its name. +fn get_str_attr_by_name(haystack: &[syn::Attribute], needle: &str) -> Option { + let attr = get_raw_attr_by_name(haystack, needle); + attr.and_then(|attr| { + if let Lit::Str(literal) = attr { + Some(literal.value()) + } else { + None + } + }) +} + +/// Gets the raw value of an attribute by its name. +fn get_raw_attr_by_name(haystack: &[syn::Attribute], needle_raw: &str) -> Option { + let needle = Ident::from(needle_raw); + for attr in haystack { + let meta = match attr.interpret_meta() { + Some(meta) => meta, + None => continue, + }; + let nv = match meta { + Meta::NameValue(nv) => nv, + _ => continue, + }; + if nv.ident != needle { + continue; + } + return Some(nv.lit.clone()); + } + None +} diff --git a/batch/Cargo.toml b/batch/Cargo.toml new file mode 100644 index 0000000..6e0da94 --- /dev/null +++ b/batch/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "batch" +description = "Distributed task queue library based on RabbitMQ" +repository = "https://github.com/kureuil/batch-rs" +version = "0.1.0" +license = "MIT/Apache-2.0" +authors = ["Louis Person "] +readme = "README.md" + +[dependencies] +failure = "0.1.1" +futures = "0.1.17" +lapin-async = "0.10" +lapin-futures = "0.10" +lapin-futures-rustls = "0.10" +lapin-futures-tls-api = "0.6" +log = "0.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio-core = "0.1" +uuid = { version = "0.6", features = ["v4", "serde"] } +wait-timeout = "0.1.5" + +batch-codegen = { version = "0.1", path = "../batch-codegen", optional = true } + +[dev-dependencies] +env_logger = "0.5" + +[features] +default = ["codegen"] +codegen = ["batch-codegen"] diff --git a/batch/LICENSE-APACHE b/batch/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/batch/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/batch/LICENSE-MIT b/batch/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/batch/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/batch/README.md b/batch/README.md new file mode 120000 index 0000000..32d46ee --- /dev/null +++ b/batch/README.md @@ -0,0 +1 @@ +../README.md \ No newline at end of file diff --git a/batch/examples/simple-client.rs b/batch/examples/simple-client.rs new file mode 100644 index 0000000..53c6791 --- /dev/null +++ b/batch/examples/simple-client.rs @@ -0,0 +1,37 @@ +#[macro_use] +extern crate batch; +extern crate env_logger; +extern crate futures; +#[macro_use] +extern crate serde; +extern crate tokio_core; + +use batch::{exchange, job, ClientBuilder}; +use futures::Future; +use tokio_core::reactor::Core; + +#[derive(Serialize, Deserialize, Task)] +#[task_routing_key = "hello-world"] +struct SayHello { + to: String, +} + +fn main() { + env_logger::init(); + println!("Starting RabbitMQ client example"); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let exchanges = vec![exchange("batch.example")]; + let client = ClientBuilder::new() + .connection_url("amqp://localhost/%2f") + .exchanges(exchanges) + .handle(handle) + .build(); + let send = client.and_then(|client| { + let task = SayHello { + to: "Ferris".into(), + }; + job(task).exchange("batch.example").send(&client) + }); + core.run(send).unwrap(); +} diff --git a/batch/examples/simple-worker.rs b/batch/examples/simple-worker.rs new file mode 100644 index 0000000..96950fb --- /dev/null +++ b/batch/examples/simple-worker.rs @@ -0,0 +1,41 @@ +#[macro_use] +extern crate batch; +extern crate env_logger; +#[macro_use] +extern crate serde; +extern crate tokio_core; + +use batch::{exchange, queue, Perform, WorkerBuilder}; +use tokio_core::reactor::Core; + +#[derive(Serialize, Deserialize, Task)] +#[task_routing_key = "hello-world"] +struct SayHello { + to: String, +} + +impl Perform for SayHello { + type Context = (); + + fn perform(&self, _ctx: Self::Context) { + println!("Hello {}", self.to); + } +} + +fn main() { + env_logger::init(); + println!("Starting RabbitMQ worker example"); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let exchanges = vec![exchange("batch.example")]; + let queues = vec![queue("hello-world").bind("batch.example", "hello-world")]; + let worker = WorkerBuilder::new(()) + .connection_url("amqp://localhost/%2f") + .exchanges(exchanges) + .queues(queues) + .handle(handle) + .task::() + .build() + .unwrap(); + core.run(worker.run()).unwrap(); +} diff --git a/batch/src/client.rs b/batch/src/client.rs new file mode 100644 index 0000000..c4c5a46 --- /dev/null +++ b/batch/src/client.rs @@ -0,0 +1,155 @@ +//! Batch client. + +use std::fmt; +use std::iter::FromIterator; +use std::sync::{Arc, Mutex}; +use std::result::Result as StdResult; + +use futures::{future, Future, IntoFuture}; +use tokio_core::reactor::Handle; + +use error::{Error, ErrorKind, Result}; +use job::Job; +use rabbitmq::{Exchange, ExchangeBuilder, RabbitmqBroker}; +use task::Task; + +/// A builder to ease the construction of `Client` instances. +#[derive(Debug)] +pub struct ClientBuilder { + connection_url: String, + exchanges: Vec, + handle: Option, +} + +impl Default for ClientBuilder { + fn default() -> ClientBuilder { + ClientBuilder { + connection_url: "amqp://localhost/%2f".into(), + exchanges: Vec::new(), + handle: None, + } + } +} + +impl ClientBuilder { + /// Create a new `ClientBuilder` instance. + pub fn new() -> Self { + ClientBuilder::default() + } + + /// Set the URL used to connect to `RabbitMQ`. + /// + /// The URL must be a valid AMQP connection URL (ex: `amqp://localhost/%2f`) using either the + /// `amqp` protocol or the `amqps` protocol. + /// + /// # Example + /// + /// ``` + /// use batch::ClientBuilder; + /// + /// let builder = ClientBuilder::new() + /// .connection_url("amqp://guest:guest@localhost:5672/%2f"); + /// ``` + pub fn connection_url(mut self, url: &str) -> Self { + self.connection_url = url.into(); + self + } + + /// Add exchanges to be declared when connecting to `RabbitMQ`. + /// + /// See `exchange` documentation. + /// + /// # Example + /// + /// ``` + /// use batch::{exchange, ClientBuilder}; + /// + /// let exchanges = vec![ + /// exchange("batch.example"), + /// ]; + /// let builder = ClientBuilder::new() + /// .exchanges(exchanges); + /// ``` + pub fn exchanges(mut self, exchanges: EIter) -> Self + where + EIter: IntoIterator, + { + self.exchanges + .extend(exchanges.into_iter().map(|e| e.build())); + self + } + + /// Set the `Handle` to the Tokio reactor that should be used by the `Worker`. + /// + /// # Example + /// + /// ``` + /// # extern crate batch; + /// # extern crate tokio_core; + /// # + /// use batch::ClientBuilder; + /// use tokio_core::reactor::Core; + /// + /// # fn main() { + /// let core = Core::new().unwrap(); + /// let handle = core.handle(); + /// let builder = ClientBuilder::new() + /// .handle(handle); + /// # } + /// ``` + pub fn handle(mut self, handle: Handle) -> Self { + self.handle = Some(handle); + self + } + + /// Build a new `Client` instance from this builder data. + pub fn build(self) -> Box> { + if self.handle.is_none() { + return Box::new(future::err(ErrorKind::NoHandle.into())); + } + let task = RabbitmqBroker::new_with_handle( + &self.connection_url, + self.exchanges, + vec![], + self.handle.unwrap(), + ).and_then(|broker| { + Ok(Client { + broker: Arc::new(broker), + }) + }); + Box::new(task) + } +} + +/// The `Client` is responsible for sending tasks to the broker. +#[derive(Debug)] +pub struct Client { + broker: Arc, +} + +impl Client { + /// Send a job to the client's message broker. + /// + /// Once a job is sent to the message broker, it is transmitted to a Worker currently + /// receiving jobs from the same broker. + pub(crate) fn send(&self, job: &Job) -> Box> { + let broker = Arc::clone(&self.broker); + let task = broker.send(job); + Box::new(task) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_send() {} + + fn assert_sync() {} + + #[test] + fn test_auto_impl_traits() { + assert_send::(); + assert_sync::(); + } +} diff --git a/batch/src/error.rs b/batch/src/error.rs new file mode 100644 index 0000000..e42a2d3 --- /dev/null +++ b/batch/src/error.rs @@ -0,0 +1,143 @@ +//! Error and Result module. + +use std::fmt; +use std::result::Result as StdResult; +use failure::{Backtrace, Context, Fail}; + +/// `Error` type for the batch crate. Implements `Fail`. +#[derive(Debug)] +pub struct Error { + inner: Context, +} + +/// A set of errors that can occur interacting with queues & workers. +#[derive(Debug, Fail)] +pub enum ErrorKind { + /// Couldn't serialize `Task` or `Job`. + #[fail(display = "Couldn't serialize Task or Job: {}", _0)] + Serialization(#[cause] ::serde_json::Error), + + /// Couldn't deserialize `Task` or `Job`. + #[fail(display = "Couldn't deserialize Task or Job: {}", _0)] + Deserialization(#[cause] ::serde_json::Error), + + /// Couldn't create Tokio reactor + #[fail(display = "Couldn't create Tokio reactor: {}", _0)] + Reactor(#[cause] ::std::io::Error), + + /// Generic I/O error + #[fail(display = "Generic I/O error: {}", _0)] + Io(#[cause] ::std::io::Error), + + /// Couldn't spawn child process and execute given job. + #[fail(display = "Couldn't spawn child process and execute given job: {}", _0)] + SubProcessManagement(#[cause] ::std::io::Error), + + /// The broker wasn't provided any queue. + #[fail(display = "A handle to a Tokio reactor was not provided")] + NoHandle, + + /// The given URL's scheme is not handled. + #[fail(display = "This URL scheme is invalid: {}", _0)] + InvalidScheme(::std::string::String), + + /// An error occured in the RabbitMQ broker. + #[fail(display = "An error occured in the RabbitMQ broker: {}", _0)] + Rabbitmq(#[cause] ::std::io::Error), +} + +impl Error { + /// Returns the underlying `Kind` of this error + pub(crate) fn kind(&self) -> &ErrorKind { + self.inner.get_context() + } + + /// Returns true if the error is from the serialization of a `Job` or a `Task`. + pub fn is_serialization(&self) -> bool { + match *self.kind() { + ErrorKind::Serialization(_) => true, + _ => false, + } + } + + /// Returns true if the error is from the deserialization of a `Job` or a `Task`. + pub fn is_deserialization(&self) -> bool { + match *self.kind() { + ErrorKind::Deserialization(_) => true, + _ => false, + } + } + + /// Returns true if the error is from the underlying I/O event loop. + pub fn is_reactor(&self) -> bool { + match *self.kind() { + ErrorKind::Reactor(_) => true, + _ => false, + } + } + + /// Returns true if the error is a generic I/O one. + pub fn is_generic_io(&self) -> bool { + match *self.kind() { + ErrorKind::Io(_) => true, + _ => false, + } + } + + /// Returns true if the error is from the spawning or execution of a subprocess. + pub fn is_subprocess(&self) -> bool { + match *self.kind() { + ErrorKind::SubProcessManagement(_) => true, + _ => false, + } + } + + /// Returns true if the error is from a missing Tokio reactor handle. + pub fn is_no_handle(&self) -> bool { + match *self.kind() { + ErrorKind::NoHandle => true, + _ => false, + } + } + + /// Returns true if the error is from the `RabbitMQ` connection. + pub fn is_rabbitmq(&self) -> bool { + match *self.kind() { + ErrorKind::Rabbitmq(_) => true, + _ => false, + } + } +} + +impl Fail for Error { + fn cause(&self) -> Option<&Fail> { + self.inner.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.inner.backtrace() + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.inner, f) + } +} + +impl From for Error { + fn from(inner: ErrorKind) -> Error { + Error { + inner: Context::new(inner), + } + } +} + +impl From> for Error { + fn from(inner: Context) -> Error { + Error { inner: inner } + } +} + +/// Result type returned from `batch` functions that can fail. +pub type Result = StdResult; diff --git a/batch/src/job.rs b/batch/src/job.rs new file mode 100644 index 0000000..078f8ba --- /dev/null +++ b/batch/src/job.rs @@ -0,0 +1,185 @@ +//! A serialized `Task` annotated with metadata. + +use std::fmt; +use std::result::Result as StdResult; +use std::time::Duration; + +use futures::Future; +use uuid::Uuid; + +use client::Client; +use error::{self, Error, Result}; +use rabbitmq::Exchange; +use task::Task; +use ser; + +/// A `Query` is responsible for publishing jobs to `RabbitMQ`. +pub struct Query +where + T: Task, +{ + task: T, + exchange: String, + routing_key: String, + timeout: Option, + retries: u32, +} + +impl fmt::Debug for Query +where + T: Task, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> { + write!( + f, + "Query {{ exchange: {:?} routing_key: {:?} timeout: {:?} retries: {:?} }}", + self.exchange, self.routing_key, self.timeout, self.retries + ) + } +} + +impl Query +where + T: Task, +{ + /// Create a new `Query` from a `Task` instance. + pub fn new(task: T) -> Self { + Query { + task, + exchange: T::exchange().into(), + routing_key: T::routing_key().into(), + timeout: T::timeout(), + retries: T::retries(), + } + } + + /// Set the exchange this task will be published to. + pub fn exchange(mut self, exchange: &str) -> Self { + self.exchange = exchange.into(); + self + } + + /// Set the routing key associated with this task. + pub fn routing_key(mut self, routing_key: &str) -> Self { + self.routing_key = routing_key.into(); + self + } + + /// Set the timeout associated to this task's execution. + pub fn timeout(mut self, timeout: Option) -> Self { + self.timeout = timeout; + self + } + + /// Set the number of allowed retries for this task. + pub fn retries(mut self, retries: u32) -> Self { + self.retries = retries; + self + } + + /// Send the job using the given client. + pub fn send(self, client: &Client) -> Box> { + let serialized = ser::to_vec(&self.task) + .map_err(error::ErrorKind::Serialization) + .unwrap(); + let job = Job { + uuid: Uuid::new_v4(), + name: String::from(T::name()), + queue: self.routing_key, + task: serialized, + timeout: self.timeout, + retries: self.retries, + }; + client.send(&job) + } +} + +/// Shorthand to create a new `Query` instance from a `Task`. +pub fn job(task: T) -> Query +where + T: Task, +{ + Query::new(task) +} + +/// A `Job` is a serialized `Task` with metadata about its status & how it should be executed. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Job { + uuid: Uuid, + name: String, + queue: String, + task: Vec, + timeout: Option, + retries: u32, +} + +impl Job { + /// Returns the UUIDv4 of this job. + pub fn uuid(&self) -> &Uuid { + &self.uuid + } + + /// Returns the name of the task associated to this job. + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the queue this job should be pushed to. + pub fn queue(&self) -> &str { + &self.queue + } + + /// Returns the raw serialized task this job is associated to. + pub fn task(&self) -> &[u8] { + &self.task + } + + /// Returns the timeout associated to this job. + pub fn timeout(&self) -> Option { + self.timeout + } + + /// Returns the number of retries this job is allowed. + pub fn retries(&self) -> u32 { + self.retries + } + + /// Returns the `Job` that should be sent if the execution failed. + /// + /// If `retries` was 0, the function returns `None` as nothing should be sent to + /// the broker. + pub fn failed(self) -> Option { + if self.retries() == 0 { + None + } else { + Some(Job { + retries: self.retries() - 1, + ..self + }) + } + } +} + +/// The different states a `Job` can be in. +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub enum Status { + /// The job was created but it wasn't sent/received yet. + Pending, + /// The job was received by a worker that started executing it. + Started, + /// The job completed successfully. + Success, + /// The job didn't complete successfully, see attached `Failure` cause. + Failed(Failure), +} + +/// Stores the reason for a job failure. +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum Failure { + /// The task handler returned an error. + Error, + /// The task didn't complete in time. + Timeout, + /// The task crashed (panic, segfault, etc.) while executing. + Crash, +} diff --git a/batch/src/lib.rs b/batch/src/lib.rs new file mode 100644 index 0000000..54b5f8c --- /dev/null +++ b/batch/src/lib.rs @@ -0,0 +1,95 @@ +//! Batch is a distributed task queue library. +//! +//! This library allows you to send a task to a RabbitMQ broker, so that a worker will be able +//! to pull it and execute the associated handler. It leverages the `futures` and `tokio-core` +//! crates to provide asynchronous I/O operations. +//! +//! # Example +//! +//! ```rust +//! #[macro_use] +//! extern crate batch; +//! extern crate futures; +//! #[macro_use] +//! extern crate serde; +//! extern crate tokio_core; +//! +//! use batch::{exchange, job, ClientBuilder}; +//! use futures::Future; +//! use tokio_core::reactor::Core; +//! +//! #[derive(Serialize, Deserialize, Task)] +//! #[task_routing_key = "hello-world"] +//! struct SayHello { +//! to: String, +//! } +//! +//! fn main() { +//! let mut core = Core::new().unwrap(); +//! let handle = core.handle(); +//! +//! let exchanges = vec![ +//! exchange("batch.examples"), +//! ]; +//! let client = ClientBuilder::new() +//! .connection_url("amqp://localhost/%2f") +//! .exchanges(exchanges) +//! .handle(handle) +//! .build(); +//! let send = client.and_then(|client| { +//! let task = SayHello { +//! to: "Ferris".into(), +//! }; +//! +//! job(task).exchange("batch.example").send(&client) +//! }); +//! +//! core.run(send).unwrap(); +//! } +//! ``` + +#![deny(missing_debug_implementations)] +#![deny(missing_docs)] +#![allow(unused_imports)] +#![allow(unknown_lints)] + +#[macro_use] +extern crate failure; +extern crate futures; +extern crate lapin_async; +extern crate lapin_futures as lapin; +extern crate lapin_futures_rustls as lapin_rustls; +extern crate lapin_futures_tls_api as lapin_tls_api; +#[macro_use] +extern crate log; +#[macro_use] +extern crate serde; +extern crate serde_json; +extern crate tokio_core; +extern crate uuid; +extern crate wait_timeout; + +#[cfg(feature = "codegen")] +#[macro_use] +extern crate batch_codegen; + +#[cfg(feature = "codegen")] +#[doc(hidden)] +pub use batch_codegen::*; + +use serde_json::de; +use serde_json::ser; + +mod client; +mod error; +mod job; +mod rabbitmq; +mod task; +mod worker; + +pub use client::{Client, ClientBuilder}; +pub use error::Error; +pub use job::{job, Query}; +pub use rabbitmq::{exchange, queue, ExchangeBuilder, QueueBuilder}; +pub use task::{Perform, Task}; +pub use worker::{Worker, WorkerBuilder}; diff --git a/batch/src/rabbitmq.rs b/batch/src/rabbitmq.rs new file mode 100644 index 0000000..96485be --- /dev/null +++ b/batch/src/rabbitmq.rs @@ -0,0 +1,524 @@ +//! `RabbitMQ` broker implementation + +use std::collections::BTreeSet; +use std::cmp; +use std::fmt; +use std::io; +use std::iter::FromIterator; +use std::net::{self, ToSocketAddrs}; +use std::result::Result as StdResult; +use std::thread; + +use futures::{future, Async, Future, IntoFuture, Poll, Stream}; +use lapin::channel::{BasicConsumeOptions, BasicProperties, BasicPublishOptions, Channel, + ExchangeBindOptions, ExchangeDeclareOptions, QueueBindOptions, + QueueDeclareOptions}; +use lapin::client::{Client, ConnectionOptions}; +use lapin::types::FieldTable; +use lapin_async::queue::Message; +use lapin_rustls::AMQPConnectionRustlsExt; +use lapin_tls_api::AMQPStream; +use tokio_core::reactor::{Core, Handle}; +use tokio_core::net::TcpStream; + +use de; +use error::{Error, ErrorKind}; +use job::Job; +use ser; + +/// Declare the given queues to the given `Channel`. +fn declare_queues( + queues: Q, + channel: Channel, +) -> Box> +where + Q: IntoIterator + 'static, +{ + let task = future::loop_fn(queues.into_iter(), move |mut iter| { + let next = iter.next(); + let task: Box, Error = io::Error>> = + if let Some(queue) = next { + let binding_channel = channel.clone(); + let task = channel + .queue_declare(&queue.name, &queue.options, &queue.arguments) + .and_then(move |_| { + future::join_all(queue.bindings().clone().into_iter().map(move |b| { + binding_channel.queue_bind( + queue.name(), + &b.exchange, + &b.routing_key, + &QueueBindOptions::default(), + &FieldTable::new(), + ) + })) + }) + .and_then(|_| Ok(future::Loop::Continue(iter))); + Box::new(task) + } else { + Box::new(future::ok(future::Loop::Break(()))) + }; + task + }); + Box::new(task.map(|_| ())) +} + +/// Declare the given exchanges to the given `Channel`. +fn declare_exchanges( + exchanges: E, + channel: Channel, +) -> Box> +where + E: IntoIterator + 'static, +{ + let task = future::loop_fn(exchanges.into_iter(), move |mut iter| { + let next = iter.next(); + let task: Box, Error = io::Error>> = + if let Some(exchange) = next { + let binding_channel = channel.clone(); + let task = channel + .exchange_declare( + exchange.name(), + exchange.exchange_type(), + &exchange.options, + &exchange.arguments, + ) + .and_then(move |_| { + future::join_all(exchange.bindings().clone().into_iter().map(move |b| { + binding_channel.exchange_bind( + &b.exchange, + &exchange.name, + &b.routing_key, + &ExchangeBindOptions::default(), + &FieldTable::new(), + ) + })) + }) + .and_then(|_| Ok(future::Loop::Continue(iter))); + Box::new(task) + } else { + Box::new(future::ok(future::Loop::Break(()))) + }; + task + }); + Box::new(task.map(|_| ())) +} + +/// An AMQP based broker for the Batch distributed task queue. +#[derive(Clone)] +pub struct RabbitmqBroker { + exchanges: Vec, + queues: Vec, + publish_channel: Channel, + client: Client, +} + +impl fmt::Debug for RabbitmqBroker { + fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> { + write!( + f, + "RabbitmqBroker {{ exchanges: {:?} queues: {:?} }}", + self.exchanges, self.queues + ) + } +} + +impl RabbitmqBroker { + /// Create a `RabbitmqBroker` instance from a RabbitMQ URI and an explicit tokio handle. + pub fn new_with_handle( + connection_url: &str, + exchanges_iter: E, + queues_iter: Q, + handle: Handle, + ) -> Box> + where + E: IntoIterator, + Q: IntoIterator, + { + let exchanges = exchanges_iter.into_iter().collect::>(); + let exchanges_ = exchanges.clone(); + + let queues = queues_iter.into_iter().collect::>(); + let queues_ = queues.clone(); + + let task = connection_url + .connect(handle, |err| { + error!( + "An error occured in the RabbitMQ heartbeat handler: {}", + err + ) + }) + .and_then(|client| client.create_channel().join(future::ok(client))) + .and_then(move |(channel, client)| { + let channel_ = channel.clone(); + declare_exchanges(exchanges_, channel).map(|_| (channel_, client)) + }) + .and_then(move |(channel, client)| { + let channel_ = channel.clone(); + declare_queues(queues_, channel).map(|_| (channel_, client)) + }) + .and_then(move |(publish_channel, client)| { + future::ok(RabbitmqBroker { + client, + publish_channel, + queues, + exchanges, + }) + }) + .map_err(|e| ErrorKind::Rabbitmq(e).into()); + Box::new(task) + } + + /// Return a `Future` of a `Stream` of incoming jobs (see `Self::Stream`). + /// + /// This method consumes the current connection in order to avoid mixing publishing + /// and consuming jobs on the same connection (which more often than not leads to issues). + pub fn recv(self) -> Box> { + let consumer_exchanges = self.exchanges.clone(); + let consumer_queues = self.queues.clone(); + let queues = self.queues.clone(); + let task = self.client + .create_channel() + .and_then(|channel| { + let channel_ = channel.clone(); + declare_exchanges(consumer_exchanges, channel).map(|_| channel_) + }) + .and_then(|channel| { + let channel_ = channel.clone(); + declare_queues(consumer_queues, channel).map(|_| channel_) + }) + .and_then(|channel| { + let consumer_channel = channel.clone(); + future::join_all(queues.into_iter().map(move |queue| { + consumer_channel.basic_consume( + &queue.name, + &format!("batch-rs-consumer-{}", queue.name), + &BasicConsumeOptions::default(), + &FieldTable::new(), + ) + })).join(future::ok(channel)) + }) + .and_then(|(mut consumers, channel)| { + let initial: Box + Send> = + Box::new(consumers.pop().unwrap()); + let consumer = consumers + .into_iter() + .fold(initial, |acc, consumer| Box::new(acc.select(consumer))); + future::ok(RabbitmqStream { + channel, + stream: consumer, + }) + }) + .map_err(|e| ErrorKind::Rabbitmq(e).into()); + Box::new(task) + } + + /// Send a job to the broker. + /// + /// Returns a `Future` that completes once the job is sent to the broker. + pub fn send(&self, job: &Job) -> Box> { + let channel = self.publish_channel.clone(); + let serialized = match ser::to_vec(&job) { + Ok(serialized) => serialized, + Err(e) => return Box::new(future::err(ErrorKind::Serialization(e).into())), + }; + let task = channel + .basic_publish( + "", + job.queue(), + &serialized, + &BasicPublishOptions::default(), + BasicProperties::default(), + ) + .and_then(move |_| future::ok(())) + .map_err(|e| ErrorKind::Rabbitmq(e).into()); + Box::new(task) + } +} + +/// A `Consumer` of incoming jobs. +/// +/// The type of the stream is a tuple containing a `u64` which is a unique ID for the +/// job used when `ack`'ing or `reject`'ing it, and a `Job` instance. +pub struct RabbitmqStream { + channel: Channel, + stream: Box + Send>, +} + +impl fmt::Debug for RabbitmqStream { + fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> { + write!(f, "RabbitmqStream {{ }}") + } +} + +impl RabbitmqStream { + /// Acknowledge the successful execution of a `Task`. + /// + /// Returns a `Future` that completes once the `ack` is sent to the broker. + pub fn ack(&self, uid: u64) -> Box> { + let task = self.channel + .basic_ack(uid) + .map_err(|e| ErrorKind::Rabbitmq(e).into()); + Box::new(task) + } + + /// Reject the successful execution of a `Task`. + /// + /// Returns a `Future` that completes once the `reject` is sent to the broker. + pub fn reject(&self, uid: u64) -> Box> { + let task = self.channel + .basic_reject(uid, false) + .map_err(|e| ErrorKind::Rabbitmq(e).into()); + Box::new(task) + } +} + +impl Stream for RabbitmqStream { + type Item = (u64, Job); + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let async = match self.stream.poll() { + Ok(async) => async, + Err(e) => return Err(ErrorKind::Rabbitmq(e).into()), + }; + let option = match async { + Async::Ready(option) => option, + Async::NotReady => return Ok(Async::NotReady), + }; + let message = match option { + Some(message) => message, + None => return Ok(Async::Ready(None)), + }; + let job: Job = de::from_slice(&message.data).map_err(ErrorKind::Deserialization)?; + Ok(Async::Ready(Some((message.delivery_tag, job)))) + } +} + +/// A binding from a queue to an exchange, or from an exchange to an exchange. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Binding { + exchange: String, + routing_key: String, +} + +/// A `RabbitMQ` exchange. +#[derive(Clone, Debug)] +pub struct Exchange { + name: String, + exchange_type: String, + bindings: BTreeSet, + options: ExchangeDeclareOptions, + arguments: FieldTable, +} + +impl Default for Exchange { + fn default() -> Exchange { + Exchange { + name: "".into(), + exchange_type: "direct".into(), + bindings: BTreeSet::new(), + options: ExchangeDeclareOptions::default(), + arguments: FieldTable::new(), + } + } +} + +impl cmp::PartialEq for Exchange { + fn eq(&self, other: &Exchange) -> bool { + self.name == other.name + } +} + +impl cmp::Eq for Exchange {} + +impl cmp::PartialOrd for Exchange { + fn partial_cmp(&self, other: &Exchange) -> Option { + Some(self.cmp(other)) + } +} + +impl cmp::Ord for Exchange { + fn cmp(&self, other: &Exchange) -> cmp::Ordering { + self.name.cmp(&other.name) + } +} + +impl Exchange { + /// Returns the name associated to this `Exchange`. + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the type associated to this `Exchange`. + pub fn exchange_type(&self) -> &str { + &self.exchange_type + } + + /// Returns the bindings associated to this `Exchange`. + pub(crate) fn bindings(&self) -> &BTreeSet { + &self.bindings + } +} + +/// A builder for `RabbitMQ` `Exchange`. +#[derive(Debug)] +pub struct ExchangeBuilder { + name: String, + bindings: BTreeSet, +} + +impl ExchangeBuilder { + /// Create a new `ExchangeBuilder` instance from the desired exchange name. + /// + /// # Example + /// + /// ``` + /// use batch::ExchangeBuilder; + /// + /// let builder = ExchangeBuilder::new("batch.example"); + /// ``` + pub fn new(name: &str) -> ExchangeBuilder { + ExchangeBuilder { + name: name.into(), + bindings: BTreeSet::new(), + } + } + + /// Binds this exchange to another exchange via a routing key. + /// + /// All of the messages posted to this exchange associated to the given routing key + /// are automatically sent to the given exchange. + /// + /// # Example + /// + /// ``` + /// use batch::ExchangeBuilder; + /// + /// let builder = ExchangeBuilder::new("batch.example") + /// .bind("batch.messaging", "hello-world"); + /// ``` + pub fn bind(mut self, exchange: &str, routing_key: &str) -> Self { + self.bindings.insert(Binding { + exchange: exchange.into(), + routing_key: routing_key.into(), + }); + self + } + + /// Build a new `Exchange` instance from this builder data. + pub(crate) fn build(self) -> Exchange { + Exchange { + name: self.name, + exchange_type: "direct".into(), + bindings: self.bindings, + options: ExchangeDeclareOptions::default(), + arguments: FieldTable::new(), + } + } +} + +/// Shorthand to create a new `ExchangeBuilder` instance. +pub fn exchange(name: &str) -> ExchangeBuilder { + ExchangeBuilder::new(name) +} + +/// A `RabbitMQ` queue. +#[derive(Clone, Debug)] +pub struct Queue { + name: String, + bindings: BTreeSet, + options: QueueDeclareOptions, + arguments: FieldTable, +} + +impl cmp::PartialEq for Queue { + fn eq(&self, other: &Queue) -> bool { + self.name == other.name + } +} + +impl cmp::Eq for Queue {} + +impl cmp::PartialOrd for Queue { + fn partial_cmp(&self, other: &Queue) -> Option { + Some(self.cmp(other)) + } +} + +impl cmp::Ord for Queue { + fn cmp(&self, other: &Queue) -> cmp::Ordering { + self.name.cmp(&other.name) + } +} + +impl Queue { + /// Returns the name associated to this `Queue`. + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the bindings associated to this `Queue`. + pub(crate) fn bindings(&self) -> &BTreeSet { + &self.bindings + } +} + +/// A builder for `RabbitMQ` `Queue`. +#[derive(Debug)] +pub struct QueueBuilder { + name: String, + bindings: BTreeSet, +} + +impl QueueBuilder { + /// Create a new `QueueBuilder` from the desired queue name. + /// + /// # Example + /// + /// ``` + /// use batch::QueueBuilder; + /// + /// let queue = QueueBuilder::new("video-transcoding"); + /// ``` + pub fn new(name: &str) -> QueueBuilder { + QueueBuilder { + name: name.into(), + bindings: BTreeSet::new(), + } + } + + /// Bind this queue to an exchange via a routing key. + /// + /// # Example + /// + /// ``` + /// use batch::QueueBuilder; + /// + /// QueueBuilder::new("video-transcoding") + /// .bind("movies", "transcoding") + /// .bind("series", "transcoding") + /// .bind("anime", "transcoding"); + /// ``` + pub fn bind(mut self, exchange: &str, routing_key: &str) -> Self { + self.bindings.insert(Binding { + exchange: exchange.into(), + routing_key: routing_key.into(), + }); + self + } + + /// Create a new `Queue` instance from this builder data. + pub(crate) fn build(self) -> Queue { + Queue { + name: self.name, + bindings: self.bindings, + options: QueueDeclareOptions::default(), + arguments: FieldTable::new(), + } + } +} + +/// Shorthand to create a new `QueueBuilder` instance. +pub fn queue(name: &str) -> QueueBuilder { + QueueBuilder::new(name) +} diff --git a/batch/src/task.rs b/batch/src/task.rs new file mode 100644 index 0000000..a6c51f7 --- /dev/null +++ b/batch/src/task.rs @@ -0,0 +1,100 @@ +//! A trait representing an executable task. + +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::time::Duration; + +use error::Result; + +/// An executable task and its related metadata (name, queue, timeout, etc.) +/// +/// In most cases, you should be deriving this trait instead of implementing it manually yourself. +/// +/// # Examples +/// +/// Using the provided defaults: +/// +/// ```rust +/// #[macro_use] +/// extern crate batch; +/// #[macro_use] +/// extern crate serde; +/// +/// #[derive(Deserialize, Serialize, Task)] +/// #[task_routing_key = "emails"] +/// struct SendConfirmationEmail; +/// +/// # +/// # fn main() {} +/// ``` +/// +/// Overriding the provided defaults: +/// +/// ```rust +/// #[macro_use] +/// extern crate batch; +/// #[macro_use] +/// extern crate serde; +/// +/// struct App; +/// +/// #[derive(Deserialize, Serialize, Task)] +/// #[task_name = "batch-rs:send-password-reset-email"] +/// #[task_routing_key = "emails"] +/// #[task_timeout = "120"] +/// #[task_retries = "0"] +/// struct SendPasswordResetEmail; +/// +/// # +/// # fn main() {} +/// ``` +pub trait Task: DeserializeOwned + Serialize { + /// A should-be-unique human-readable ID for this task. + fn name() -> &'static str; + + /// The exchange the task will be published to. + fn exchange() -> &'static str; + + /// The routing key associated to this task. + fn routing_key() -> &'static str; + + /// The number of times this task must be retried in case of error. + fn retries() -> u32; + + /// An optional duration representing the time allowed for this task's handler to complete. + fn timeout() -> Option; +} + +/// The `Perform` trait allow marking a `Task` as executable. +/// +/// # Example +/// +/// ``` +/// #[macro_use] +/// extern crate batch; +/// #[macro_use] +/// extern crate serde; +/// +/// use batch::Perform; +/// +/// #[derive(Serialize, Deserialize, Task)] +/// #[task_routing_key = "emails"] +/// struct SendPasswordResetEmail; +/// +/// impl Perform for SendPasswordResetEmail { +/// type Context = (); +/// +/// fn perform(&self, _ctx: Self::Context) { +/// println!("Sending password reset email..."); +/// } +/// } +/// +/// # fn main() {} +/// ``` +pub trait Perform { + /// The type of the context value that will be given to this task's handler. + type Context; + + /// Perform the task's duty. + fn perform(&self, Self::Context); +} diff --git a/batch/src/worker.rs b/batch/src/worker.rs new file mode 100644 index 0000000..1bb57bd --- /dev/null +++ b/batch/src/worker.rs @@ -0,0 +1,441 @@ +//! Batch Worker. +//! +//! The worker is responsible for polling the broker for tasks, deserializing them an execute +//! them. It should never ever crash and sould be resilient to panic-friendly task handlers. Its +//! `Broker` implementation is completely customizable by the user. +//! +//! # Trade-offs +//! +//! The most important thing to know about the worker is that it favours safety over performance. +//! For each incoming job, it will spawn a new process whose only goal is to perform the task. +//! Even if this is slower than just executing the function in a threadpool, it allows much more +//! control: timeouts wouldn't even be possible if we were running the tasks in-process. It also +//! protects against unpredictable crashes + +use std::collections::HashMap; +use std::env; +use std::fmt; +use std::io; +use std::process; +use std::result::Result as StdResult; +use std::sync::Arc; +use std::time::Duration; + +use futures::{future, Future, IntoFuture, Stream}; +use tokio_core::reactor::{Core, Handle}; +use wait_timeout::ChildExt; + +use error::{self, Result}; +use de; +use ser; +use job::{Failure as JobFailure, Job, Status as JobStatus}; +use rabbitmq::{Exchange, ExchangeBuilder, Queue, QueueBuilder, RabbitmqBroker, RabbitmqStream}; +use task::{Perform, Task}; + +/// Type of task handlers stored in `Worker`. +type WorkerFn = Fn(&Job, Ctx) -> Result<()>; + +/// A builder to ease the construction of `Worker` instances. +pub struct WorkerBuilder { + connection_url: String, + context: Ctx, + exchanges: Vec, + handle: Option, + handlers: HashMap<&'static str, Box>>, + queues: Vec, +} + +impl fmt::Debug for WorkerBuilder +where + Ctx: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> { + write!( + f, + "WorkerBuilder {{ connection_url: {:?} context: {:?} exchanges: {:?} queues: {:?} }}", + self.connection_url, self.context, self.exchanges, self.queues + ) + } +} + +impl WorkerBuilder { + /// Create a new `WorkerBuilder` instance, using the mandatory context. + /// + /// The type of the given context is then used to typecheck the tasks registered on + /// this builder. + /// + /// # Example + /// + /// ``` + /// use batch::WorkerBuilder; + /// + /// let builder = WorkerBuilder::new(()); + /// ``` + pub fn new(context: Ctx) -> Self { + WorkerBuilder { + context: context, + connection_url: "amqp://localhost/%2f".into(), + exchanges: Vec::new(), + queues: Vec::new(), + handle: None, + handlers: HashMap::new(), + } + } + + /// Set the URL used to connect to `RabbitMQ`. + /// + /// The URL must be a valid AMQP connection URL (ex: `amqp://localhost/%2f`) using either the + /// `amqp` protocol or the `amqps` protocol. + /// + /// # Example + /// + /// ``` + /// use batch::WorkerBuilder; + /// + /// let builder = WorkerBuilder::new(()) + /// .connection_url("amqp://guest:guest@localhost:5672/%2f"); + /// ``` + pub fn connection_url(mut self, url: &str) -> Self { + self.connection_url = url.into(); + self + } + + /// Add exchanges to be declared when connecting to `RabbitMQ`. + /// + /// See `exchange` documentation. + /// + /// # Example + /// + /// ``` + /// use batch::{exchange, WorkerBuilder}; + /// + /// let exchanges = vec![ + /// exchange("batch.example"), + /// ]; + /// let builder = WorkerBuilder::new(()) + /// .exchanges(exchanges); + /// ``` + pub fn exchanges(mut self, exchanges: EIter) -> Self + where + EIter: IntoIterator, + { + self.exchanges + .extend(exchanges.into_iter().map(|e| e.build())); + self + } + + /// Add queues to be declared when connecting to `RabbitMQ`. + /// + /// See `queue` documentation. + /// + /// # Example + /// + /// ``` + /// use batch::{queue, WorkerBuilder}; + /// + /// let queues = vec![ + /// queue("hello-world").bind("batch.example", "hello-world"), + /// ]; + /// let builder = WorkerBuilder::new(()) + /// .queues(queues); + /// ``` + pub fn queues(mut self, queues: QIter) -> Self + where + QIter: IntoIterator, + { + self.queues.extend(queues.into_iter().map(|q| q.build())); + self + } + + /// Set the `Handle` to the Tokio reactor that should be used by the `Worker`. + /// + /// # Example + /// + /// ``` + /// # extern crate batch; + /// # extern crate tokio_core; + /// # + /// use batch::WorkerBuilder; + /// use tokio_core::reactor::Core; + /// + /// # fn main() { + /// let core = Core::new().unwrap(); + /// let handle = core.handle(); + /// let builder = WorkerBuilder::new(()) + /// .handle(handle); + /// # } + /// ``` + pub fn handle(mut self, handle: Handle) -> Self { + self.handle = Some(handle); + self + } + + /// Register a new `Task` to be handled by the `Worker`. + /// + /// The type of the `Task`'s `Context` must be the same as the `Worker`'s. + /// + /// # Example + /// + /// ``` + /// # #[macro_use] + /// # extern crate batch; + /// # #[macro_use] + /// # extern crate serde; + /// # + /// use batch::{Perform, WorkerBuilder}; + /// + /// #[derive(Serialize, Deserialize, Task)] + /// #[task_routing_key = "hello-world"] + /// struct SayHello { + /// to: String, + /// } + /// + /// impl Perform for SayHello { + /// type Context = (); + /// + /// fn perform(&self, _ctx: Self::Context) { + /// println!("Hello {}", self.to); + /// } + /// } + /// + /// # fn main() { + /// let builder = WorkerBuilder::new(()) + /// .task::(); + /// # } + /// ``` + pub fn task(mut self) -> Self + where + T: Task + Perform, + { + self.handlers.insert( + T::name(), + Box::new(|job, ctx| -> Result<()> { + let task: T = de::from_slice(job.task()).unwrap(); + Perform::perform(&task, ctx); + Ok(()) + }), + ); + self + } + + /// Create a new `Worker` instance from this builder data. + /// + /// # Example + /// + /// ``` + /// use batch::WorkerBuilder; + /// + /// let builder = WorkerBuilder::new(()) + /// .build(); + /// ``` + pub fn build(self) -> Result> { + if self.handle.is_none() { + Err(error::ErrorKind::NoHandle)?; + } + Ok(Worker { + connection_url: self.connection_url, + context: self.context, + handle: self.handle.unwrap(), + handlers: self.handlers, + exchanges: self.exchanges, + queues: self.queues, + }) + } +} + +/// Long-running worker polling tasks from the given `Broker`. +pub struct Worker { + connection_url: String, + context: Ctx, + handle: Handle, + handlers: HashMap<&'static str, Box>>, + exchanges: Vec, + queues: Vec, +} + +impl fmt::Debug for Worker +where + Ctx: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> { + write!( + f, + "Worker {{ connection_url: {:?} context: {:?} queues: {:?} }}", + self.connection_url, self.context, self.queues + ) + } +} + +impl Worker { + /// Runs the worker, polling tasks from the broker and executing them. + /// + /// # Example + /// + /// ```rust + /// extern crate batch; + /// extern crate tokio_core; + /// + /// use batch::WorkerBuilder; + /// use tokio_core::reactor::Core; + /// + /// fn main() { + /// let mut core = Core::new().unwrap(); + /// let handle = core.handle(); + /// let worker = WorkerBuilder::new(()) + /// .handle(handle) + /// .build() + /// .unwrap(); + /// let task = worker.run(); + /// + /// // In your code, un-comment the next line: + /// // core.run(task).unwrap(); + /// } + /// ``` + pub fn run(self) -> Box> { + match env::var("BATCHRS_WORKER_IS_EXECUTOR") { + Ok(_) => Box::new(self.execute().into_future()), + Err(_) => self.supervise(), + } + } + + fn supervise(self) -> Box> { + let handle = self.handle; + let connection_url = self.connection_url; + let queues = self.queues; + let exchanges = self.exchanges; + let ctor = |e: Vec, q: Vec, h: &Handle| { + RabbitmqBroker::new_with_handle(&connection_url, e, q, h.clone()) + }; + let task = ctor(exchanges.clone(), queues.clone(), &handle) + .join(ctor(exchanges, queues, &handle)) + .and_then(|(consume_broker, publish_broker)| { + let publish_broker = Arc::new(publish_broker); + consume_broker.recv().and_then(move |consumer| { + future::loop_fn(consumer.into_future(), move |f| { + let publish_broker = Arc::clone(&publish_broker); + let handle = handle.clone(); + f.and_then(move |(next, consumer)| { + let (uid, job) = match next { + Some((uid, job)) => (uid, job), + None => return Ok(future::Loop::Break(())), + }; + let task = match spawn(&job) { + Err(e) => { + error!("[{}] Couldn't spawn child process: {}", job.uuid(), e); + reject(&consumer, publish_broker, uid, job) + } + Ok(status) => match status { + JobStatus::Success => { + debug!("[{}] Child execution succeeded", job.uuid()); + consumer.ack(uid) + } + JobStatus::Failed(_) => { + debug!("[{}] Child execution failed", job.uuid()); + reject(&consumer, publish_broker, uid, job) + } + _ => unreachable!(), + }, + }; + let task = task.map_err(move |e| { + error!("An error occured: {}", e); + }); + handle.spawn(task); + Ok(future::Loop::Continue(consumer.into_future())) + }).or_else(|(e, consumer)| { + use failure::Fail; + + let cause = match e.kind().cause() { + Some(cause) => format!(" Cause: {}", cause), + None => "".into(), + }; + error!("Couldn't receive message from consumer: {}.{}", e, cause); + Ok(future::Loop::Continue(consumer.into_future())) + }) + }) + }) + }); + Box::new(task) + } + + fn execute(self) -> Result<()> { + let job: Job = de::from_reader(io::stdin()).map_err(error::ErrorKind::Deserialization)?; + if let Some(handler) = self.handlers.get(job.name()) { + if let Err(e) = (*handler)(&job, self.context) { + error!("Couldn't process job: {}", e); + } + } else { + warn!("No handler registered for job: `{}'", job.name()); + } + Ok(()) + } +} + +fn reject( + consumer: &RabbitmqStream, + broker: Arc, + uid: u64, + job: Job, +) -> Box> { + let task = consumer.reject(uid); + if let Some(job) = job.failed() { + debug!("[{}] Retry job after failure: {:?}", job.uuid(), job); + Box::new(task.and_then(move |_| broker.send(&job))) + } else { + task + } +} + +fn spawn(job: &Job) -> Result { + use std::io::Write; + + let current_exe = env::current_exe().map_err(error::ErrorKind::SubProcessManagement)?; + let mut child = process::Command::new(¤t_exe) + .env("BATCHRS_WORKER_IS_EXECUTOR", "1") + .stdin(process::Stdio::piped()) + .spawn() + .map_err(error::ErrorKind::SubProcessManagement)?; + let payload = ser::to_vec(&job).map_err(error::ErrorKind::Serialization)?; + { + let stdin = child.stdin.as_mut().expect("failed to get stdin"); + stdin + .write_all(&payload) + .map_err(error::ErrorKind::SubProcessManagement)?; + stdin + .flush() + .map_err(error::ErrorKind::SubProcessManagement)?; + } + if let Some(duration) = job.timeout() { + drop(child.stdin.take()); + if let Some(status) = child + .wait_timeout(duration) + .map_err(error::ErrorKind::SubProcessManagement)? + { + if status.success() { + Ok(JobStatus::Success) + } else if status.unix_signal().is_some() { + Ok(JobStatus::Failed(JobFailure::Crash)) + } else { + Ok(JobStatus::Failed(JobFailure::Error)) + } + } else { + child + .kill() + .map_err(error::ErrorKind::SubProcessManagement)?; + child + .wait() + .map_err(error::ErrorKind::SubProcessManagement)?; + Ok(JobStatus::Failed(JobFailure::Timeout)) + } + } else { + let status = child + .wait() + .map_err(error::ErrorKind::SubProcessManagement)?; + if status.success() { + Ok(JobStatus::Success) + } else if status.code().is_some() { + Ok(JobStatus::Failed(JobFailure::Error)) + } else { + Ok(JobStatus::Failed(JobFailure::Crash)) + } + } +}