Skip to content

Commit

Permalink
refactor(jsrt): load main module through module loader, clean up erro…
Browse files Browse the repository at this point in the history
…r responses
  • Loading branch information
ozwaldorf committed May 29, 2024
1 parent 17c01af commit 28d2fc8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 104 deletions.
127 changes: 35 additions & 92 deletions services/js-poc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use anyhow::{anyhow, bail, Context};
use arrayref::array_ref;
use cid::Cid;
use deno_core::url::Url;
use anyhow::{bail, Context};
use deno_core::v8::{Global, IsolateHandle, Value};
use deno_core::{serde_v8, v8, JsRuntime};
use deno_core::{serde_v8, v8, JsRuntime, ModuleSpecifier};
use fn_sdk::connection::Connection;
use fn_sdk::header::TransportDetail;
use fn_sdk::http_util::{respond, respond_with_error, respond_with_http_response};
Expand Down Expand Up @@ -77,6 +74,7 @@ async fn handle_connection(
.read_payload()
.await
.context("Could not read body.")?;

let TransportDetail::HttpRequest {
method,
ref url,
Expand All @@ -85,13 +83,21 @@ async fn handle_connection(
else {
unreachable!()
};

let request = http::request::extract(url, header, method, body.to_vec())
.context("failed to parse request")?;
handle_request(&mut connection, &tx, request).await?;

if let Err(e) = handle_request(&mut connection, &tx, request).await {
respond_with_error(&mut connection, format!("{e:?}").as_bytes(), 400).await?;
return Err(e);
}
} else {
while let Some(payload) = connection.read_payload().await {
let request: Request = serde_json::from_slice(&payload)?;
handle_request(&mut connection, &tx, request).await?;
if let Err(e) = handle_request(&mut connection, &tx, request).await {
respond_with_error(&mut connection, e.to_string().as_bytes(), 400).await?;
return Err(e);
};
}
}

Expand All @@ -109,105 +115,42 @@ async fn handle_request(
path,
param,
} = request;
if uri.is_empty() {
bail!("Empty origin uri");
}

// Fetch content from origin
let hash = match origin {
Origin::Blake3 => {
let hash = hex::decode(uri).context("failed to decode blake3 hash")?;

if hash.len() != 32 {
respond_with_error(connection, b"Invalid blake3 hash length", 400).await?;
return Err(anyhow!("invalid blake3 hash length"));
}

let hash = *array_ref![hash, 0, 32];

if fn_sdk::api::fetch_blake3(hash).await {
hash
} else {
respond_with_error(connection, b"Failed to fetch blake3 content", 400).await?;
return Err(anyhow!("failed to fetch file"));
}
},
Origin::Ipfs | Origin::Http => {
let uri = match origin {
Origin::Ipfs => Cid::try_from(uri).context("Invalid IPFS CID")?.to_bytes(),
Origin::Http => urlencoding::decode(&uri)
.context("Invalid URL encoding")?
.to_string()
.into(),
_ => unreachable!(),
};

match fn_sdk::api::fetch_from_origin(origin.into(), uri).await {
Some(hash) => hash,
None => {
respond_with_error(connection, b"Failed to fetch from origin", 400).await?;
return Err(anyhow!("failed to fetch from origin"));
},
}
},
o => {
let err = anyhow!("unknown origin: {o:?}");
respond_with_error(connection, err.to_string().as_bytes(), 400).await?;
return Err(err);
},
};
let module_url = match origin {
Origin::Blake3 => format!("blake3://{uri}"),
Origin::Ipfs => format!("ipfs://{uri}"),
Origin::Http => uri,
Origin::Unknown => todo!(),
}
.parse::<ModuleSpecifier>()
.context("Invalid origin URI")?;

let mut location = Url::parse(&format!("blake3://{}", hex::encode(hash)))
.context("failed to create base url")?;
let mut location = module_url.clone();
if let Some(path) = path {
location = location.join(&path).context("invalid path string")?;
location = location.join(&path).context("Invalid path string")?;
}

// Read and parse the source from the blockstore
let source_bytes = fn_sdk::blockstore::ContentHandle::load(&hash)
.await
.context("failed to get handle for source from blockstore")?
.read_to_end()
.await
.context("failed to read source from blockstore")?;
let source = String::from_utf8(source_bytes).context("failed to parse source as utf8")?;

// Create runtime and execute the source
let mut runtime = match Runtime::new(location.clone()) {
Ok(runtime) => runtime,
Err(e) => {
respond_with_error(connection, e.to_string().as_bytes(), 400).await?;
return Err(e).context("failed to initialize runtime");
},
};

let mut runtime = Runtime::new(location.clone()).context("Failed to initialize runtime")?;
tx.send(runtime.deno.v8_isolate().thread_safe_handle())
.context("Failed to send the IsolateHandle to main thread.")?;

let res = match runtime.exec(location, source, param).await {
Ok(Some(res)) => res,
Ok(None) => {
respond_with_error(connection, b"no response available", 400).await?;
bail!("no response available");
},
Err(e) => {
respond_with_error(connection, e.to_string().as_bytes(), 400).await?;
return Err(e).context("failed to run javascript");
let res = match runtime.exec(&module_url, param).await? {
Some(res) => res,
None => {
bail!("No response available");
},
};

// Resolve async if applicable
// TODO: figure out why `deno.resolve` doesn't drive async functions
#[allow(deprecated)]
let res = match tokio::time::timeout(params::REQ_TIMEOUT, runtime.deno.resolve_value(res)).await
{
Ok(Ok(res)) => res,
Ok(Err(e)) => {
respond_with_error(connection, e.to_string().as_bytes(), 400).await?;
return Err(e).context("failed to resolve output");
},
Err(e) => {
respond_with_error(connection, b"Request timeout", 400).await?;
return Err(e).context("execution timeout");
},
};
let res = tokio::time::timeout(params::REQ_TIMEOUT, runtime.deno.resolve_value(res))
.await
.context("Execution timeout")??;

parse_and_respond(connection, &mut runtime, res).await?;

Expand All @@ -230,7 +173,7 @@ async fn parse_and_respond(
// If the return type is a U8 array, send the raw data directly to the client
let bytes = match deno_core::_ops::to_v8_slice_any(local) {
Ok(slice) => slice.to_vec(),
Err(e) => return Err(anyhow!("failed to parse bytes: {e}")),
Err(e) => bail!("failed to parse bytes: {e}"),
};
respond(connection, &bytes).await?;
} else if local.is_string() {
Expand Down
12 changes: 3 additions & 9 deletions services/js-poc/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use deno_console::deno_console;
use deno_core::serde_v8::{self, Serializable};
use deno_core::url::Url;
use deno_core::v8::{self, CreateParams, Global, Value};
use deno_core::{JsRuntime, PollEventLoopOptions, RuntimeOptions};
use deno_core::{JsRuntime, ModuleSpecifier, PollEventLoopOptions, RuntimeOptions};
use deno_crypto::deno_crypto;
use deno_url::deno_url;
use deno_webgpu::deno_webgpu;
Expand Down Expand Up @@ -152,19 +152,13 @@ impl Runtime {
/// Execute javascript source on the runtime
pub async fn exec(
&mut self,
url: Url,
source: String,
specifier: &ModuleSpecifier,
param: Option<serde_json::Value>,
) -> anyhow::Result<Option<Global<Value>>> {
let id = self
.deno
.load_main_es_module_from_code(&url, source)
.await?;

let id = self.deno.load_main_es_module(specifier).await?;
self.deno
.run_event_loop(PollEventLoopOptions::default())
.await?;

self.deno.mod_evaluate(id).await?;

{
Expand Down
10 changes: 7 additions & 3 deletions services/js-poc/src/runtime/module_loader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Context};
use anyhow::{anyhow, bail, Context};
use arrayref::array_ref;
use cid::Cid;
use deno_core::url::Host;
Expand Down Expand Up @@ -109,6 +109,10 @@ impl ModuleLoader for FleekModuleLoader {
let hash = *array_ref![bytes, 0, 32];
let specifier = module_specifier.clone();
ModuleLoadResponse::Async(Box::pin(async move {
if !fn_sdk::api::fetch_blake3(hash).await {
bail!("Failed to fetch {specifier}")
}

let handle = ContentHandle::load(&hash).await?;
let source = handle.read_to_end().await?.into_boxed_slice();

Expand All @@ -132,7 +136,7 @@ impl ModuleLoader for FleekModuleLoader {
ModuleLoadResponse::Async(Box::pin(async move {
let hash = fetch_from_origin(fn_sdk::api::Origin::IPFS, cid.to_bytes())
.await
.context("Failed to fetch ipfs module from origin")?;
.with_context(|| format!("Failed to fetch {specifier} from origin"))?;

let handle = ContentHandle::load(&hash).await?;
let bytes = handle.read_to_end().await?;
Expand Down Expand Up @@ -164,7 +168,7 @@ impl ModuleLoader for FleekModuleLoader {
specifier.to_string(),
)
.await
.context("failed to fetch http module from origin")?;
.with_context(|| format!("Failed to fetch {specifier} from origin"))?;

let handle = ContentHandle::load(&hash).await?;
let bytes = handle.read_to_end().await?;
Expand Down

0 comments on commit 28d2fc8

Please sign in to comment.