From 5b563a4530b571ba8543ea2508806f0f17cf2356 Mon Sep 17 00:00:00 2001 From: Lean Mendoza <8042536+leanmendoza@users.noreply.github.com> Date: Thu, 9 Jan 2025 07:57:45 -0300 Subject: [PATCH] wip --- lib/src/content/web_resource_provider.rs | 680 +++++++++++++++++++++ lib/src/http_request/web_http_requester.rs | 320 ++++++++++ lib/src/utils/log_writer.rs | 178 ++++++ 3 files changed, 1178 insertions(+) create mode 100644 lib/src/content/web_resource_provider.rs create mode 100644 lib/src/http_request/web_http_requester.rs create mode 100644 lib/src/utils/log_writer.rs diff --git a/lib/src/content/web_resource_provider.rs b/lib/src/content/web_resource_provider.rs new file mode 100644 index 00000000..27c62ca2 --- /dev/null +++ b/lib/src/content/web_resource_provider.rs @@ -0,0 +1,680 @@ +use futures_util::StreamExt; +use godot::engine::file_access::ModeFlags; +use godot::engine::{DirAccess, FileAccess}; +use reqwest::Client; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] +use tokio::fs; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::sync::{Notify, OnceCell, RwLock, Semaphore}; +use tokio::time::Instant; + +#[cfg(feature = "use_resource_tracking")] +use super::resource_download_tracking::ResourceDownloadTracking; +use crate::content::semaphore_ext::SemaphoreExt; + +pub struct FileMetadata { + file_size: i64, + last_accessed: Instant, +} + +pub struct ResourceProvider { + cache_folder: PathBuf, + existing_files: RwLock>, + max_cache_size: AtomicI64, + downloaded_size: AtomicU64, + pending_downloads: RwLock>>, + client: Client, + initialized: OnceCell<()>, + semaphore: Arc, + #[cfg(feature = "use_resource_tracking")] + download_tracking: Arc, +} + +const UPDATE_THRESHOLD: u64 = 1_024 * 1_024; // 1 MB threshold + +pub struct GodotFileSystem; + +impl GodotFileSystem { + pub fn read_file(path: &str) -> Result, String> { + // Check if file exists first + if !FileAccess::file_exists(path.into()) { + return Err(format!("File does not exist: {}", path)); + } + + // Open file for reading + let file = FileAccess::open(path.into(), ModeFlags::READ); + if file.is_none() { + return Err(format!("Failed to open file: {}", path)); + } + let mut file = file.unwrap(); + + // Get file length and read all bytes + let length = file.get_length() as usize; + let buffer = file.get_buffer(length as i64); + + Ok(buffer.to_vec()) + } + + pub fn write_file(path: &str, data: &[u8]) -> Result<(), String> { + // Open file for writing + let file = FileAccess::open(path.into(), ModeFlags::WRITE); + if file.is_none() { + return Err(format!("Failed to create file: {}", path)); + } + let mut file = file.unwrap(); + + // Write buffer to file + file.store_buffer(data.to_vec().to_packed_byte_array()); + + Ok(()) + } + + pub fn ensure_dir_exists(path: &str) -> Result<(), String> { + if !DirAccess::dir_exists_absolute(path.into()) { + // Create directory recursively + if let Err(err) = DirAccess::make_dir_recursive_absolute(path.into()) { + return Err(format!("Failed to create directory {}: {:?}", path, err)); + } + } + Ok(()) + } + + pub fn exists(path: &str) -> bool { + FileAccess::file_exists(path.into()) + } + + pub fn remove_file(path: &str) -> Result<(), String> { + if !Self::exists(path) { + return Ok(()); + } + + if let Err(err) = DirAccess::remove_absolute(path.into()) { + return Err(format!("Failed to remove file {}: {:?}", path, err)); + } + Ok(()) + } +} + +impl ResourceProvider { + // Synchronous constructor that sets up the ResourceProvider + pub fn new( + cache_folder: &str, + max_cache_size: i64, + max_concurrent_downloads: usize, + #[cfg(feature = "use_resource_tracking")] download_tracking: Arc, + ) -> Self { + ResourceProvider { + cache_folder: PathBuf::from(cache_folder), + existing_files: RwLock::new(HashMap::new()), + max_cache_size: AtomicI64::new(max_cache_size), + pending_downloads: RwLock::new(HashMap::new()), + client: Client::new(), + initialized: OnceCell::new(), + semaphore: Arc::new(Semaphore::new(max_concurrent_downloads)), + downloaded_size: AtomicU64::new(0), + #[cfg(feature = "use_resource_tracking")] + download_tracking, + } + } + + // Private asynchronous function to initialize the cache + async fn initialize(&self) -> Result<(), io::Error> { + let mut existing_files = self.existing_files.write().await; + + // Use GodotFileSystem to check if directory exists and create if needed + if let Err(err) = GodotFileSystem::ensure_dir_exists(&self.cache_folder.to_string_lossy()) { + return Err(io::Error::new(io::ErrorKind::Other, err)); + } + + // List files in directory using DirAccess + let dir = DirAccess::open(self.cache_folder.to_string_lossy().as_ref().into()); + if dir.is_none() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "Failed to open directory", + )); + } + let mut dir = dir.unwrap(); + + dir.list_dir_begin(); + while let Some(file) = dir.get_next() { + if file.is_empty() { + break; + } + + let file_path = self.cache_folder.join(&*file); + let file_path_str = file_path.to_string_lossy().to_string(); + + // Skip directories + if DirAccess::dir_exists_absolute(file_path_str.clone().into()) { + continue; + } + + // Handle temporary files + if file_path.extension().and_then(|ext| ext.to_str()) == Some("tmp") { + GodotFileSystem::remove_file(&file_path_str) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + continue; + } + + // Get file size using FileAccess + if let Ok(mut file_handle) = + FileAccess::open(file_path_str.clone().into(), ModeFlags::READ) + { + let file_size = file_handle.get_length() as i64; + self.add_file(&mut existing_files, file_path_str, file_size) + .await; + } + } + + self.ensure_space_for(&mut existing_files, 0).await; + Ok(()) + } + + async fn ensure_space_for( + &self, + existing_files: &mut HashMap, + file_size: i64, + ) { + // If adding the new file exceeds the cache size, remove less used files + let max_cache_size = self.max_cache_size.load(Ordering::SeqCst); + while self.total_size(existing_files) + file_size > max_cache_size { + if !self.remove_less_used(existing_files).await { + break; + } + } + } + + async fn add_file( + &self, + existing_files: &mut HashMap, + file_path: String, + file_size: i64, + ) { + let metadata = FileMetadata { + file_size, + last_accessed: Instant::now(), + }; + existing_files.insert(file_path, metadata); + } + + async fn remove_file( + &self, + existing_files: &mut HashMap, + file_path: &str, + ) -> Option { + if let Some(metadata) = existing_files.remove(file_path) { + let _ = GodotFileSystem::remove_file(file_path); + Some(metadata) + } else { + None + } + } + + fn total_size(&self, existing_files: &HashMap) -> i64 { + existing_files + .values() + .map(|metadata| metadata.file_size) + .sum() + } + + async fn remove_less_used(&self, existing_files: &mut HashMap) -> bool { + if let Some((file_path, _)) = existing_files + .iter() + .min_by_key(|(_, metadata)| metadata.last_accessed) + .map(|(path, metadata)| (path.clone(), metadata)) + { + self.remove_file(existing_files, &file_path).await; + true + } else { + false + } + } + + fn touch_file(&self, existing_files: &mut HashMap, file_path: &str) { + if let Some(metadata) = existing_files.get_mut(file_path) { + metadata.last_accessed = Instant::now(); + } + } + + async fn download_file( + &self, + url: &str, + dest: &Path, + #[cfg(feature = "use_resource_tracking")] file_hash: &str, + ) -> Result<(), String> { + let tmp_dest = dest.with_extension("tmp"); + let response = self + .client + .get(url) + .send() + .await + .map_err(|e| format!("Request error: {:?}", e))?; + + #[cfg(feature = "use_resource_tracking")] + self.download_tracking.start(file_hash.to_string()).await; + + #[cfg(feature = "use_resource_tracking")] + let mut current_size = 0; + + if !response.status().is_success() { + return Err(format!("Failed to download file: {:?}", response.status())); + } + + let mut stream = response.bytes_stream(); + let mut accumulated_data = Vec::new(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| format!("Stream error: {:?}", e))?; + accumulated_data.extend_from_slice(&chunk); + + let accumulated_size = chunk.len() as u64; + if accumulated_size > UPDATE_THRESHOLD { + self.downloaded_size + .fetch_add(accumulated_size, Ordering::Relaxed); + #[cfg(feature = "use_resource_tracking")] + { + current_size += accumulated_size; + self.download_tracking + .report_progress(file_hash, current_size) + .await; + } + } + } + + // Write the accumulated data to the temporary file + GodotFileSystem::write_file(tmp_dest.to_str().unwrap(), &accumulated_data) + .map_err(|e| format!("Failed to write file: {}", e))?; + + // Rename the temporary file to the final destination + if GodotFileSystem::exists(dest.to_str().unwrap()) { + GodotFileSystem::remove_file(dest.to_str().unwrap()) + .map_err(|e| format!("Failed to remove existing file: {}", e))?; + } + + GodotFileSystem::write_file(dest.to_str().unwrap(), &accumulated_data) + .map_err(|e| format!("Failed to write final file: {}", e))?; + + GodotFileSystem::remove_file(tmp_dest.to_str().unwrap()) + .map_err(|e| format!("Failed to remove temporary file: {}", e))?; + + #[cfg(feature = "use_resource_tracking")] + self.download_tracking.end(file_hash).await; + + Ok(()) + } + + async fn download_file_with_buffer( + &self, + url: &str, + dest: &Path, + #[cfg(feature = "use_resource_tracking")] file_hash: &str, + ) -> Result, String> { + let tmp_dest = dest.with_extension("tmp"); + let response = self + .client + .get(url) + .send() + .await + .map_err(|e| format!("Request error: {:?}", e))?; + + #[cfg(feature = "use_resource_tracking")] + self.download_tracking.start(file_hash.to_string()).await; + #[cfg(feature = "use_resource_tracking")] + let mut current_size = 0; + + let mut file = FileAccess::open(tmp_dest.to_str().unwrap(), ModeFlags::WRITE); + let mut stream = response.bytes_stream(); + let mut buffer = Vec::new(); + + let mut accumulated_size = 0; + + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| format!("Stream error: {:?}", e))?; + file.write_all(&chunk) + .await + .map_err(|e| format!("File write error: {:?}", e))?; + buffer.extend_from_slice(&chunk); + + accumulated_size += chunk.len() as u64; + if accumulated_size > UPDATE_THRESHOLD { + self.downloaded_size + .fetch_add(accumulated_size, Ordering::Relaxed); + #[cfg(feature = "use_resource_tracking")] + { + current_size += accumulated_size; + self.download_tracking + .report_progress(file_hash, current_size) + .await; + } + accumulated_size = 0; + } + } + + if accumulated_size > 0 { + self.downloaded_size + .fetch_add(accumulated_size, Ordering::Relaxed); + #[cfg(feature = "use_resource_tracking")] + { + current_size += accumulated_size; + self.download_tracking + .report_progress(file_hash, current_size) + .await; + } + } + + fs::rename(&tmp_dest, dest).await.map_err(|e| { + format!( + "Failed to rename file: {:?} from: {:?} to: {:?}", + e, tmp_dest, dest + ) + })?; + + #[cfg(feature = "use_resource_tracking")] + self.download_tracking.end(file_hash).await; + + Ok(buffer) + } + + async fn ensure_initialized(&self) -> Result<(), String> { + self.initialized + .get_or_try_init(|| async { self.initialize().await.map_err(|e| e.to_string()) }) + .await + .map(|_| ()) + } + + async fn handle_existing_file(&self, absolute_file_path: &String) -> Result, String> { + let mut existing_files = self.existing_files.write().await; + self.touch_file(&mut existing_files, absolute_file_path); + + GodotFileSystem::read_file(absolute_file_path) + } + + async fn handle_pending_download( + &self, + file_hash: &String, + absolute_file_path: &String, + ) -> Result<(), String> { + let notify = { + let mut pending_downloads = self.pending_downloads.write().await; + if let Some(notify) = pending_downloads.get(file_hash) { + Some(notify.clone()) + } else { + let notify = Arc::new(Notify::new()); + pending_downloads.insert(file_hash.clone(), notify.clone()); + None + } + }; + + if let Some(notify) = notify { + notify.notified().await; + let existing_files = self.existing_files.read().await; + if existing_files.contains_key(absolute_file_path) { + return Ok(()); + } else { + return Err("File not found after waiting".to_string()); + } + } + + Ok(()) + } + + pub async fn store_file(&self, file_hash: &str, bytes: &[u8]) -> Result<(), String> { + self.ensure_initialized().await?; + let absolute_file_path = self.cache_folder.join(file_hash); + let absolute_file_path_str = absolute_file_path.to_str().unwrap(); + + // Write the bytes to a temporary file first + let tmp_dest = absolute_file_path.with_extension("tmp"); + let tmp_dest_str = tmp_dest.to_str().unwrap(); + + GodotFileSystem::write_file(tmp_dest_str, bytes)?; + + // Rename temporary file to final destination + if GodotFileSystem::exists(absolute_file_path_str) { + GodotFileSystem::remove_file(absolute_file_path_str)?; + } + + GodotFileSystem::write_file(absolute_file_path_str, bytes)?; + GodotFileSystem::remove_file(tmp_dest_str)?; + + // Update the cache map + let file_size = bytes.len() as i64; + let mut existing_files = self.existing_files.write().await; + self.ensure_space_for(&mut existing_files, file_size).await; + self.add_file( + &mut existing_files, + absolute_file_path_str.to_string(), + file_size, + ) + .await; + + Ok(()) + } + + pub async fn fetch_resource( + &self, + url: &str, + file_hash: &String, + absolute_file_path: &String, + ) -> Result<(), String> { + self.ensure_initialized().await?; + + self.handle_pending_download(file_hash, absolute_file_path) + .await?; + + let permit = self.semaphore.acquire().await.unwrap(); + + if !DirAccess::dir_exists_absolute(absolute_file_path.into()) { + self.download_file( + url, + Path::new(absolute_file_path), + #[cfg(feature = "use_resource_tracking")] + file_hash, + ) + .await?; + + let metadata = FileAccess::open(absolute_file_path.into(), ModeFlags::READ) + .await + .map_err(|e| format!("Failed to get metadata: {:?}", e))?; + let file_size = metadata.len() as i64; + + let mut existing_files = self.existing_files.write().await; + self.ensure_space_for(&mut existing_files, file_size).await; + self.add_file(&mut existing_files, absolute_file_path.clone(), file_size) + .await; + } else { + self.handle_existing_file(absolute_file_path).await?; + } + + let mut pending_downloads = self.pending_downloads.write().await; + if let Some(notify) = pending_downloads.remove(file_hash) { + notify.notify_waiters(); + } + + drop(permit); + + Ok(()) + } + + // Method to fetch resource and wait for the data + pub async fn fetch_resource_with_data( + &self, + url: &str, + file_hash: &String, + absolute_file_path: &String, + ) -> Result, String> { + self.ensure_initialized().await?; + + self.handle_pending_download(file_hash, absolute_file_path) + .await?; + + let permit = self.semaphore.acquire().await.unwrap(); + let data = if !FileAccess::file_exists(absolute_file_path.into()) { + let data = self + .download_file_with_buffer( + url, + Path::new(absolute_file_path), + #[cfg(feature = "use_resource_tracking")] + file_hash, + ) + .await?; + let metadata = FileAccess::open(absolute_file_path.into(), ModeFlags::READ) + .await + .map_err(|e| format!("Failed to get metadata: {:?}", e))?; + let file_size = metadata.len() as i64; + let mut existing_files = self.existing_files.write().await; + self.ensure_space_for(&mut existing_files, file_size).await; + self.add_file(&mut existing_files, absolute_file_path.clone(), file_size) + .await; + data + } else { + self.handle_existing_file(absolute_file_path).await? + }; + + let mut pending_downloads = self.pending_downloads.write().await; + if let Some(notify) = pending_downloads.remove(file_hash) { + notify.notify_waiters(); + } + + drop(permit); + + Ok(data) + } + + // Method to clear the cache and delete all files from the file system + pub async fn clear(&self) { + if self.ensure_initialized().await.is_err() { + tracing::error!("ResourceLoader failed to load!"); + return; + } + + let mut existing_files = self.existing_files.write().await; + let file_paths: Vec = existing_files.keys().cloned().collect(); + for file_path in file_paths { + self.remove_file(&mut existing_files, &file_path).await; + } + } + + pub fn consume_download_size(&self) -> u64 { + self.downloaded_size.swap(0, Ordering::AcqRel) + } + + // Method to change the number of concurrent downloads + pub fn set_max_concurrent_downloads(&self, max: usize) { + self.semaphore.set_permits(max) + } + + // Method to change the max cache size + pub fn set_max_cache_size(&self, size: i64) { + self.max_cache_size.store(size, Ordering::SeqCst); + } + + pub fn get_cache_total_size(&self) -> i64 { + let existing_files = self.existing_files.blocking_read(); + self.total_size(&existing_files) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::future::join_all; + use tokio::io::Result; + + async fn setup_cache_folder(path: &str) -> Result<()> { + if !DirAccess::dir_exists_absolute(path.into()) { + DirAccess::make_dir_recursive_absolute(path.into()); + } + Ok(()) + } + + #[tokio::test] + async fn test_fetch_resource_or_wait() { + let path = "./cache"; + let max_cache_size = 1024 * 1024 * 1024; // Set the cache size to 1 GB + + setup_cache_folder(path) + .await + .expect("Failed to create cache folder"); + + #[cfg(feature = "use_resource_tracking")] + let resource_download_tracking = Arc::new(ResourceDownloadTracking::new()); + + let provider = Arc::new(ResourceProvider::new( + path, + max_cache_size, + 2, + #[cfg(feature = "use_resource_tracking")] + resource_download_tracking.clone(), + )); + provider.clear().await; + + let files_to_download = vec![ + ( + "https://link.testfile.org/15MB", + "bafkreibmrvrdgqthfrvehyell552sk7ivuas2ozzjdmlojbzttqlcrxiya", + ), + ( + "https://link.testfile.org/15MB", + "bafkreic4osvzsjzyqutwjxt2xmyd4hjrwukrxzclvixke3putyrihggmam", + ), + ( + "https://link.testfile.org/15MB", + "bafkreibhjuitdcu3jwu7khjcg2fo6xf2h3hilnfv4liy4p5h2olxj6tcce", + ), + ]; + + // Create a vector to hold the handles of the spawned tasks + let handles: Vec<_> = files_to_download + .clone() + .into_iter() + .map(|(url, file_hash)| { + let url = url.to_string(); + let file_hash = file_hash.to_string(); + let absolute_file_path = format!("{}/{}", path, file_hash); + + let provider_clone = provider.clone(); + tokio::spawn(async move { + provider_clone + .fetch_resource(&url, &file_hash, &absolute_file_path) + .await + .expect("Failed to fetch resource"); + }) + }) + .collect(); + + // Await all the handles + join_all(handles).await; + + // Extract file hashes from the files_to_download vector + let file_hashes: Vec<_> = files_to_download + .iter() + .map(|(_, file_hash)| *file_hash) + .collect(); + + // Check if all files have been downloaded + for file_hash in file_hashes { + let absolute_file_path = format!("{}/{}", path, file_hash); + let existing_files = provider.existing_files.read().await; + assert!(existing_files.contains_key(&absolute_file_path)); + } + + { + let mut existing_files = provider.existing_files.write().await; + provider.remove_less_used(&mut existing_files).await; + assert!(existing_files.len() == 2); + } + + { + provider.clear().await; + let existing_files = provider.existing_files.read().await; + assert!(provider.total_size(&existing_files) == 0); + assert!(existing_files.is_empty()); + } + } +} diff --git a/lib/src/http_request/web_http_requester.rs b/lib/src/http_request/web_http_requester.rs new file mode 100644 index 00000000..0b0514f6 --- /dev/null +++ b/lib/src/http_request/web_http_requester.rs @@ -0,0 +1,320 @@ +use std::collections::{HashMap, VecDeque}; + +use godot::{engine::file_access::ModeFlags, prelude::*}; +use serde_json::json; + +use crate::content::packed_array::PackedByteArrayFromVec; + +use super::{ + http_queue_requester::QueueRequest, + request_response::{RequestResponse, RequestResponseError, ResponseEnum, ResponseType}, +}; + +const MAX_PARALLEL_REQUESTS: usize = 8; + +#[derive(Debug)] +struct OngoingRequest { + client: Gd, + queue_request: QueueRequest, + body_ready: bool, + file_handle: Option>, + accumulated_bytes: Vec, +} + +#[derive(GodotClass)] +#[class(base=Node, init)] +pub struct DclWebHttpRequester { + _base: Base, + + pub web_pending_queue_requests: VecDeque, + pub web_ongoing_requests: HashMap, + + pub tls_options: Option>, +} + +#[godot_api] +impl INode for DclWebHttpRequester { + fn process(&mut self, _delta: f64) { + self.poll_web_fetch(); + } +} + +#[godot_api] +impl DclWebHttpRequester { + #[func] + fn set_tls_options(&mut self, tls_options: Gd) { + self.tls_options = Some(tls_options); + } +} + +impl DclWebHttpRequester { + pub fn add_requests(&mut self, http_request: QueueRequest) { + self.web_pending_queue_requests.push_back(http_request); + } + + pub fn poll_web_fetch(&mut self) { + // Process ongoing requests + let mut completed_requests = Vec::new(); + + for (id, ongoing) in self.web_ongoing_requests.iter_mut() { + let mut client = ongoing.client.clone(); + let poll_err = client.poll(); + + if poll_err != godot::engine::global::Error::OK { + completed_requests.push((*id, Err(RequestResponseError { + id: ongoing.queue_request.id, + error_message: format!("Poll error: {:?} - {:?}", poll_err, client.get_status()), + }))); + continue; + } + + match client.get_status() { + godot::engine::http_client::Status::BODY => { + ongoing.body_ready = true; + let chunk = client.read_response_body_chunk(); + + if !chunk.is_empty() { + // Handle chunk based on response type + if let Some(file_handle) = &mut ongoing.file_handle { + // Write directly to file + file_handle.store_buffer(chunk); + } else { + // Accumulate in buffer + ongoing.accumulated_bytes.extend_from_slice(&chunk.as_slice()); + } + } + + if client.get_status() == godot::engine::http_client::Status::DISCONNECTED { + completed_requests.push((*id, Ok(()))); + } + } + godot::engine::http_client::Status::CONNECTED + | godot::engine::http_client::Status::REQUESTING => { + continue; + } + godot::engine::http_client::Status::DISCONNECTED + | godot::engine::http_client::Status::CONNECTION_ERROR + | godot::engine::http_client::Status::CANT_CONNECT + | godot::engine::http_client::Status::CANT_RESOLVE => { + if ongoing.body_ready { + completed_requests.push((*id, Ok(()))); + } else { + completed_requests.push((*id, Err(RequestResponseError { + id: ongoing.queue_request.id, + error_message: format!("Connection error: {:?}", client.get_status()), + }))); + } + } + _ => { + completed_requests.push((*id, Err(RequestResponseError { + id: ongoing.queue_request.id, + error_message: format!("Connection error: {:?}", client.get_status()), + }))); + } + } + } + + // Remove completed requests + for (id, result) in completed_requests { + let Some(mut item) = self.web_ongoing_requests.remove(&id) else { + continue; + }; + + match result { + Ok(()) => { + let accumulated_bytes = std::mem::take(&mut item.accumulated_bytes); + let response = self.read_response(item.client, &mut item.queue_request, accumulated_bytes); + let _ = item.queue_request.response_sender.send(response); + } + Err(err) => { + let _ = item.queue_request.response_sender.send(Err(err)); + } + } + } + + // Start new requests if slots are available + while self.web_ongoing_requests.len() < MAX_PARALLEL_REQUESTS { + if let Some(queue_request) = self.web_pending_queue_requests.pop_front() { + if !queue_request.request_option.is_some() { + continue; + } + + match self.start_request(queue_request) { + Ok(()) => {} + Err(err) => { + godot_warn!("Failed to start request: {:?}", err); + } + } + } else { + break; // No more pending requests + } + } + } + + fn start_request( + &mut self, + mut queue_request: QueueRequest, + ) -> Result<(), RequestResponseError> { + let mut client = godot::engine::HttpClient::new_gd(); + let mut request_option = queue_request.request_option.take().unwrap(); + // Parse URL to get host and path + let url = url::Url::parse(&request_option.url).map_err(|e| RequestResponseError { + id: queue_request.id, + error_message: format!("Invalid URL: {}", e), + })?; + + // Connect to host + let port = url + .port() + .unwrap_or(if url.scheme() == "https" { 443 } else { 80 }); + let host = url.host_str().ok_or_else(|| RequestResponseError { + id: queue_request.id, + error_message: "No host in URL".to_string(), + })?; + + let connect_err = { + let result = client.call( + "connect_to_host".into(), + &[ + host.to_variant(), + port.to_variant(), + self.tls_options.clone().unwrap().to_variant(), + ], + ); + result.to::() + }; + if connect_err != godot::engine::global::Error::OK { + return Err(RequestResponseError { + id: queue_request.id, + error_message: format!("Connection error: {:?}", connect_err), + }); + } + + if client.get_status() == godot::engine::http_client::Status::RESOLVING { + client.poll(); + } + + if client.get_status() == godot::engine::http_client::Status::CONNECTING { + client.poll(); + } + + // Convert headers to PackedStringArray + let headers = request_option + .headers + .as_ref() + .and_then(|headers| { + Some( + headers + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>(), + ) + }) + .unwrap_or_default(); + let headers = headers + .iter() + .map(|s| s.into()) + .collect::(); + + // Start request + let method = match request_option.method.as_str() { + "GET" => godot::engine::http_client::Method::GET, + "POST" => godot::engine::http_client::Method::POST, + "PUT" => godot::engine::http_client::Method::PUT, + "DELETE" => godot::engine::http_client::Method::DELETE, + _ => godot::engine::http_client::Method::GET, + }; + + // TODO: implement send body + let path = url.path(); + let request_err = if let Some(body) = request_option.body.take() { + let bytes = PackedByteArray::from_vec(body.as_slice()); + client.request_raw(method, path.into(), headers, bytes) + } else { + client.request(method, path.into(), headers) + }; + + if request_err != godot::engine::global::Error::OK { + return Err(RequestResponseError { + id: queue_request.id, + error_message: format!("Request error: {:?}", request_err), + }); + } + + queue_request.request_option = Some(request_option); + // Initialize file handle if needed + let file_handle = if let Some(request_option) = &queue_request.request_option { + if let ResponseType::ToFile(path) = &request_option.response_type { + let fs = godot::engine::FileAccess::open(path.as_str().into(), ModeFlags::WRITE); + if fs.is_none() { + return Err(RequestResponseError { + id: queue_request.id, + error_message: "Failed to open file for writing".to_string(), + }); + } + fs + } else { + None + } + } else { + None + }; + + // Store ongoing request + self.web_ongoing_requests.insert( + queue_request.id, + OngoingRequest { + client, + queue_request, + body_ready: false, + file_handle, + accumulated_bytes: Vec::new(), + }, + ); + + Ok(()) + } + + fn read_response( + &self, + mut client: Gd, + queue_request: &mut QueueRequest, + accumulated_bytes: Vec, + ) -> Result { + let response_code = client.get_response_code(); + let headers = client.get_response_headers_as_dictionary(); + let headers = headers + .iter_shared() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect::>(); + let status_code = http::StatusCode::from_u16(response_code as u16) + .unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR); + let request_option = queue_request.request_option.take().unwrap(); + + tracing::info!("Finished request - response: {:?} bytes", accumulated_bytes.len()); + + let response_data = match &request_option.response_type { + ResponseType::AsString => { + ResponseEnum::String(String::from_utf8(accumulated_bytes.clone()).unwrap()) + } + ResponseType::AsBytes => { + ResponseEnum::Bytes(accumulated_bytes.clone()) + } + ResponseType::AsJson => { + let text = String::from_utf8_lossy(&accumulated_bytes).to_string(); + let json_result = serde_json::from_str(&text); + ResponseEnum::Json(json_result) + } + ResponseType::ToFile(path) => { + ResponseEnum::ToFile(Ok(path.clone())) + } + }; + + Ok(RequestResponse { + headers: Some(headers), + status_code, + request_option, + response_data: Ok(response_data), + }) + } +} diff --git a/lib/src/utils/log_writer.rs b/lib/src/utils/log_writer.rs new file mode 100644 index 00000000..528b36e3 --- /dev/null +++ b/lib/src/utils/log_writer.rs @@ -0,0 +1,178 @@ +use core::slice; +use std::{ + ffi::{CStr, CString}, io::{self, Write}, os::raw::c_char +}; + +use lazy_static::lazy_static; +use sharded_slab::{pool::RefMut, Pool}; +// use smallvec::SmallVec; +// use tracing_core::Metadata; +use tracing_subscriber::fmt::MakeWriter; +use godot::prelude::*; + +// use crate::logging::{Buffer, Priority}; + +/// The writer produced by [`GodotWasmLogMakeWriter`]. +#[derive(Debug)] +pub struct GodotWasmLogWriter<'a> { + tag: &'a CStr, + message: PooledCString, + location: Option, +} + +/// A [`MakeWriter`] suitable for writing Android logs. +#[derive(Debug)] +pub struct GodotWasmLogMakeWriter { + tag: CString, +} + +#[derive(Debug)] +struct Location { + file: PooledCString, + line: u32, +} + +// logd truncates logs at 4096 bytes, so we chunk at 4000 to be conservative +const MAX_LOG_LEN: usize = 4000; + +impl Write for GodotWasmLogWriter<'_> { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.message.write(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + // Convert the message to a string, fallback to empty string if invalid UTF-8 + let message = String::from_utf8_lossy(self.message.as_bytes()); + + // Create location string if available + let location_str = self.location.as_ref().map(|loc| { + let file = String::from_utf8_lossy(loc.file.as_bytes()); + format!(" ({}:{})", file, loc.line) + }).unwrap_or_default(); + + // Combine message and location + let full_message = format!("{}{}", message, location_str); + + // Use godot::log::print + godot::log::print(&[full_message.to_variant()]); + + Ok(()) + } +} + +impl Drop for GodotWasmLogWriter<'_> { + fn drop(&mut self) { + self.flush().unwrap(); + } +} + +impl<'a> MakeWriter<'a> for GodotWasmLogMakeWriter { + type Writer = GodotWasmLogWriter<'a>; + + fn make_writer(&'a self) -> Self::Writer { + GodotWasmLogWriter { + tag: self.tag.as_c_str(), + message: PooledCString::empty(), + + location: None, + } + } + + fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer { + + let location = match (meta.file(), meta.line()) { + (Some(file), Some(line)) => { + let file = PooledCString::new(file.as_bytes()); + Some(Location { file, line }) + } + _ => None, + }; + + GodotWasmLogWriter { + tag: self.tag.as_c_str(), + message: PooledCString::empty(), + location, + } + } +} + +impl GodotWasmLogMakeWriter { + /// Returns a new [`GodotWasmLogWriter`] with the given tag. + pub fn new(tag: String) -> Self { + Self { + tag: CString::new(tag).unwrap(), + } + } +} + +#[derive(Debug)] +struct PooledCString { + buf: RefMut<'static, Vec>, +} + +enum MessageIter<'a> { + Single(Option<&'a mut PooledCString>), + Multi(slice::IterMut<'a, PooledCString>), +} + +lazy_static! { + static ref BUFFER_POOL: Pool> = Pool::new(); +} + +impl PooledCString { + fn empty() -> Self { + Self { + buf: BUFFER_POOL.create().unwrap(), + } + } + + fn new(data: &[u8]) -> Self { + let mut this = PooledCString::empty(); + this.write(data); + this + } + + fn write(&mut self, data: &[u8]) { + self.buf.extend_from_slice(data); + } + + fn as_ptr(&mut self) -> Option<*const c_char> { + if self.buf.last().copied() != Some(0) { + self.buf.push(0); + } + + CStr::from_bytes_with_nul(self.buf.as_ref()) + .ok() + .map(CStr::as_ptr) + } + + fn as_bytes(&self) -> &[u8] { + self.buf.as_ref() + } +} + +impl Drop for PooledCString { + fn drop(&mut self) { + BUFFER_POOL.clear(self.buf.key()); + } +} + +impl<'a> Iterator for MessageIter<'a> { + type Item = &'a mut PooledCString; + + fn next(&mut self) -> Option { + match self { + MessageIter::Single(message) => message.take(), + MessageIter::Multi(iter) => iter.next(), + } + } + + fn size_hint(&self) -> (usize, Option) { + match self { + MessageIter::Single(Some(_)) => (1, Some(1)), + MessageIter::Single(None) => (0, Some(0)), + MessageIter::Multi(iter) => iter.size_hint(), + } + } +} \ No newline at end of file