From 248ddfa56491cacaaf27e70558f029caa794364b Mon Sep 17 00:00:00 2001 From: Reuben Wong Date: Tue, 10 Oct 2023 00:26:57 +0800 Subject: [PATCH] first cut marketstack client --- src/marketstack.rs | 258 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 236 insertions(+), 22 deletions(-) diff --git a/src/marketstack.rs b/src/marketstack.rs index 5b1662a..9aa53c9 100644 --- a/src/marketstack.rs +++ b/src/marketstack.rs @@ -4,7 +4,7 @@ use std::fmt::{self, Debug}; use async_trait::async_trait; use bytes::Bytes; -use http::{HeaderMap, Response as HttpResponse}; +use http::{request, HeaderMap, Response as HttpResponse}; use itertools::Itertools; use log::{debug, error, info}; use reqwest::blocking::Client; @@ -17,6 +17,21 @@ use url::Url; use crate::api; use crate::auth::Auth; +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum RestError { + #[error("communication with marketstack: {}", source)] + Communication { + #[from] + source: reqwest::Error, + }, + #[error("`http` error: {}", source)] + Http { + #[from] + source: http::Error, + }, +} + #[derive(Debug, Error)] #[non_exhaustive] pub enum MarketstackError { @@ -90,13 +105,24 @@ impl Marketstack { /// /// The `token` should be a valid [personal access token](https://marketstack.com/documentation). /// Errors out if `token` is invalid. - // pub fn new(host: T, token: T) -> MarketstackResult - // where - // H: AsRef, - // T: Into, - // { - // Self::new_impl() - // } + pub fn new(host: H, token: T) -> MarketstackResult + where + H: AsRef, + T: Into, + { + Self::new_impl("https", host.as_ref(), Auth::Token(token.into())) + } + + /// Create a new non-SSL Marketstack API representation. + /// + /// A `token` will still be required for insecure access. + pub fn new_insecure(host: H, token: T) -> MarketstackResult + where + H: AsRef, + T: Into, + { + Self::new_impl("http", host.as_ref(), Auth::Token(token.into())) + } /// Internal method to create a new Marketstack client. fn new_impl(protocol: &str, host: &str, auth: Auth) -> MarketstackResult { @@ -114,23 +140,211 @@ impl Marketstack { }; // Ensure the API is working. - // api.auth.check_connection(&api)?; + api.auth.check_connection(&api)?; Ok(api) } + + /// Create a new Marketstack API client builder. + pub fn builder(host: H, token: T) -> MarketstackBuilder + where + H: Into, + T: Into, + { + MarketstackBuilder::new(host, token) + } + + fn send(&self, req: reqwest::blocking::RequestBuilder) -> MarketstackResult + where + T: DeserializeOwned, + { + let rsp = req.headers(HeaderMap::default()).send()?; + let status = rsp.status(); + if status.is_server_error() { + return Err(MarketstackError::http(status)); + } + + serde_json::from_reader::<_, T>(rsp).map_err(MarketstackError::data_type::) + } + + fn rest_simple( + &self, + mut request: http::request::Builder, + body: Vec, + ) -> Result, api::ApiError<::Error>> { + let call = || -> Result<_, RestError> { + let http_request = request.body(body)?; + let request = http_request.try_into()?; + let rsp = self.client.execute(request)?; + + let mut http_rsp = HttpResponse::builder() + .status(rsp.status()) + .version(rsp.version()); + let headers = http_rsp.headers_mut().unwrap(); + for (key, value) in rsp.headers() { + headers.insert(key, value.clone()); + } + Ok(http_rsp.body(rsp.bytes()?)?) + }; + call().map_err(api::ApiError::client) + } +} +pub struct MarketstackBuilder { + protocol: &'static str, + host: String, + token: Auth, } -#[derive(Debug, Error)] -#[non_exhaustive] -pub enum RestError { - #[error("communication with marketstack: {}", source)] - Communication { - #[from] - source: reqwest::Error, - }, - #[error("`http` error: {}", source)] - Http { - #[source] - source: http::Error, - }, +impl MarketstackBuilder { + /// Create a new Marketstack API client builder. + pub fn new(host: H, token: T) -> Self + where + H: Into, + T: Into, + { + Self { + protocol: "https", + host: host.into(), + token: Auth::Token(token.into()), + } + } + + /// Switch to an insecure protocol (http instead of https). + pub fn insecure(&mut self) -> &mut Self { + self.protocol = "http"; + self + } + + pub fn build(&self) -> MarketstackResult { + Marketstack::new_impl(self.protocol, &self.host, self.token.clone()) + } + + pub async fn build_async(&self) -> MarketstackResult { + AsyncMarketstack::new_impl(self.protocol, &self.host, self.token.clone()).await + } +} + +impl api::RestClient for Marketstack { + type Error = RestError; + + fn rest_endpoint(&self, endpoint: &str) -> Result> { + debug!(target: "marketstack", "REST api call {}", endpoint); + Ok(self.rest_url.join(endpoint)?) + } + + fn get_auth(&self) -> Option { + Some(self.auth.clone()) + } +} + +impl api::Client for Marketstack { + fn rest( + &self, + request: request::Builder, + body: Vec, + ) -> Result, api::ApiError> { + self.rest_simple(request, body) + } +} + +/// A represenation of the asynchronous Marketstack API. +#[derive(Clone)] +pub struct AsyncMarketstack { + /// The client to use for API calls. + client: reqwest::Client, + /// The base URL to use for API calls. + rest_url: Url, + /// The authentication information to use when communicating with Marketstack. + auth: Auth, +} + +impl Debug for AsyncMarketstack { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AsyncMarketstack") + .field("rest_url", &self.rest_url) + .finish() + } +} + +#[async_trait] +impl api::RestClient for AsyncMarketstack { + type Error = RestError; + + fn rest_endpoint(&self, endpoint: &str) -> Result> { + debug!(target: "marketstack", "REST api call {}", endpoint); + Ok(self.rest_url.join(endpoint)?) + } + + fn get_auth(&self) -> Option { + Some(self.auth.clone()) + } +} + +#[async_trait] +impl api::AsyncClient for AsyncMarketstack { + async fn rest_async( + &self, + request: http::request::Builder, + body: Vec, + ) -> Result, api::ApiError<::Error>> { + self.rest_async_simple(request, body).await + } +} + +impl AsyncMarketstack { + /// Internal method to create a new Marketstack client. + async fn new_impl(protocol: &str, host: &str, auth: Auth) -> MarketstackResult { + let rest_url = Url::parse(&format!("{}://{}/v1/", protocol, host))?; + + let client = AsyncClient::builder() + .danger_accept_invalid_certs(true) + .build()?; + + let api = AsyncMarketstack { + client, + rest_url, + auth, + }; + + // Ensure the API is working. + api.auth.check_connection_async(&api).await?; + + Ok(api) + } + + async fn send(&self, req: reqwest::RequestBuilder) -> MarketstackResult + where + T: DeserializeOwned, + { + let rsp = req.headers(HeaderMap::default()).send().await?; + let status = rsp.status(); + if status.is_server_error() { + return Err(MarketstackError::http(status)); + } + + serde_json::from_slice::(&rsp.bytes().await?).map_err(MarketstackError::data_type::) + } + + async fn rest_async_simple( + &self, + mut request: http::request::Builder, + body: Vec, + ) -> Result, api::ApiError<::Error>> { + use futures_util::TryFutureExt; + let call = || async { + let http_request = request.body(body)?; + let request = http_request.try_into()?; + let rsp = self.client.execute(request).await?; + + let mut http_rsp = HttpResponse::builder() + .status(rsp.status()) + .version(rsp.version()); + let headers = http_rsp.headers_mut().unwrap(); + for (key, value) in rsp.headers() { + headers.insert(key, value.clone()); + } + Ok(http_rsp.body(rsp.bytes().await?)?) + }; + call().map_err(api::ApiError::client).await + } }