From e87f72bbcf3c68008cee9fd630fb4d48b3a79e15 Mon Sep 17 00:00:00 2001 From: Lean Mendoza <8042536+leanmendoza@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:28:42 -0300 Subject: [PATCH] it compiles --- lib/src/content/web_resource_provider.rs | 470 ++++------------------- 1 file changed, 81 insertions(+), 389 deletions(-) diff --git a/lib/src/content/web_resource_provider.rs b/lib/src/content/web_resource_provider.rs index 27c62ca2..7b836cc0 100644 --- a/lib/src/content/web_resource_provider.rs +++ b/lib/src/content/web_resource_provider.rs @@ -1,20 +1,22 @@ -use futures_util::StreamExt; +use godot::builtin::{GString, PackedByteArray, PackedStringArray}; use godot::engine::file_access::ModeFlags; -use godot::engine::{DirAccess, FileAccess}; -use reqwest::Client; +use godot::engine::{DirAccess, FileAccess, TlsOptions}; +use godot::obj::{Gd, NewGd}; +use godot::prelude::ToGodot; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; -#[cfg(not(target_arch = "wasm32"))] -use tokio::fs; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::io; use tokio::sync::{Notify, OnceCell, RwLock, Semaphore}; use tokio::time::Instant; -#[cfg(feature = "use_resource_tracking")] -use super::resource_download_tracking::ResourceDownloadTracking; use crate::content::semaphore_ext::SemaphoreExt; +use crate::godot_classes::dcl_global::DclGlobal; +use crate::http_request::http_queue_requester::HttpQueueRequester; +use crate::http_request::request_response::{RequestOption, ResponseType}; + +use super::packed_array::PackedByteArrayFromVec; pub struct FileMetadata { file_size: i64, @@ -22,116 +24,48 @@ pub struct FileMetadata { } pub struct ResourceProvider { - cache_folder: PathBuf, + cache_folder: String, existing_files: RwLock>, max_cache_size: AtomicI64, downloaded_size: AtomicU64, pending_downloads: RwLock>>, - client: Client, initialized: OnceCell<()>, semaphore: Arc, - #[cfg(feature = "use_resource_tracking")] - download_tracking: Arc, + http_queue_requester: Arc, } const UPDATE_THRESHOLD: u64 = 1_024 * 1_024; // 1 MB threshold -pub struct GodotFileSystem; - -impl GodotFileSystem { - pub fn read_file(path: &str) -> Result, String> { - // Check if file exists first - if !FileAccess::file_exists(path.into()) { - return Err(format!("File does not exist: {}", path)); - } - - // Open file for reading - let file = FileAccess::open(path.into(), ModeFlags::READ); - if file.is_none() { - return Err(format!("Failed to open file: {}", path)); - } - let mut file = file.unwrap(); - - // Get file length and read all bytes - let length = file.get_length() as usize; - let buffer = file.get_buffer(length as i64); - - Ok(buffer.to_vec()) - } - - pub fn write_file(path: &str, data: &[u8]) -> Result<(), String> { - // Open file for writing - let file = FileAccess::open(path.into(), ModeFlags::WRITE); - if file.is_none() { - return Err(format!("Failed to create file: {}", path)); - } - let mut file = file.unwrap(); - - // Write buffer to file - file.store_buffer(data.to_vec().to_packed_byte_array()); - - Ok(()) - } - - pub fn ensure_dir_exists(path: &str) -> Result<(), String> { - if !DirAccess::dir_exists_absolute(path.into()) { - // Create directory recursively - if let Err(err) = DirAccess::make_dir_recursive_absolute(path.into()) { - return Err(format!("Failed to create directory {}: {:?}", path, err)); - } - } - Ok(()) - } - - pub fn exists(path: &str) -> bool { - FileAccess::file_exists(path.into()) - } - - pub fn remove_file(path: &str) -> Result<(), String> { - if !Self::exists(path) { - return Ok(()); - } - - if let Err(err) = DirAccess::remove_absolute(path.into()) { - return Err(format!("Failed to remove file {}: {:?}", path, err)); - } - Ok(()) - } -} - impl ResourceProvider { // Synchronous constructor that sets up the ResourceProvider - pub fn new( - cache_folder: &str, - max_cache_size: i64, - max_concurrent_downloads: usize, - #[cfg(feature = "use_resource_tracking")] download_tracking: Arc, - ) -> Self { + pub fn new(cache_folder: &str, max_cache_size: i64, max_concurrent_downloads: usize) -> Self { ResourceProvider { - cache_folder: PathBuf::from(cache_folder), + cache_folder: cache_folder.to_string(), existing_files: RwLock::new(HashMap::new()), max_cache_size: AtomicI64::new(max_cache_size), pending_downloads: RwLock::new(HashMap::new()), - client: Client::new(), initialized: OnceCell::new(), semaphore: Arc::new(Semaphore::new(max_concurrent_downloads)), downloaded_size: AtomicU64::new(0), - #[cfg(feature = "use_resource_tracking")] - download_tracking, + http_queue_requester: Arc::new(HttpQueueRequester::new( + 6, + DclGlobal::get_network_inspector_sender(), + )), } } // Private asynchronous function to initialize the cache async fn initialize(&self) -> Result<(), io::Error> { + let cache_folder: GString = self.cache_folder.clone().into(); let mut existing_files = self.existing_files.write().await; // Use GodotFileSystem to check if directory exists and create if needed - if let Err(err) = GodotFileSystem::ensure_dir_exists(&self.cache_folder.to_string_lossy()) { - return Err(io::Error::new(io::ErrorKind::Other, err)); + if !DirAccess::dir_exists_absolute(cache_folder.clone()) { + DirAccess::make_dir_recursive_absolute(cache_folder.clone()); } // List files in directory using DirAccess - let dir = DirAccess::open(self.cache_folder.to_string_lossy().as_ref().into()); + let dir = DirAccess::open(cache_folder.clone()); if dir.is_none() { return Err(io::Error::new( io::ErrorKind::NotFound, @@ -141,33 +75,30 @@ impl ResourceProvider { let mut dir = dir.unwrap(); dir.list_dir_begin(); - while let Some(file) = dir.get_next() { + loop { + let file = dir.get_next(); if file.is_empty() { break; } - let file_path = self.cache_folder.join(&*file); - let file_path_str = file_path.to_string_lossy().to_string(); + let file_path: String = format!("{}/{}", self.cache_folder, file); + let file_path_gstr: GString = file_path.clone().into(); // Skip directories - if DirAccess::dir_exists_absolute(file_path_str.clone().into()) { + if DirAccess::dir_exists_absolute(file_path_gstr.clone()) { continue; } // Handle temporary files - if file_path.extension().and_then(|ext| ext.to_str()) == Some("tmp") { - GodotFileSystem::remove_file(&file_path_str) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + if file_path.as_str().ends_with(".tmp") { + DirAccess::remove_absolute(file_path_gstr.clone()); continue; } // Get file size using FileAccess - if let Ok(mut file_handle) = - FileAccess::open(file_path_str.clone().into(), ModeFlags::READ) - { + if let Some(file_handle) = FileAccess::open(file_path_gstr.clone(), ModeFlags::READ) { let file_size = file_handle.get_length() as i64; - self.add_file(&mut existing_files, file_path_str, file_size) - .await; + self.add_file(&mut existing_files, file_path, file_size); } } @@ -189,7 +120,7 @@ impl ResourceProvider { } } - async fn add_file( + fn add_file( &self, existing_files: &mut HashMap, file_path: String, @@ -208,7 +139,7 @@ impl ResourceProvider { file_path: &str, ) -> Option { if let Some(metadata) = existing_files.remove(file_path) { - let _ = GodotFileSystem::remove_file(file_path); + DirAccess::remove_absolute(file_path.into()); Some(metadata) } else { None @@ -241,143 +172,43 @@ impl ResourceProvider { } } - async fn download_file( - &self, - url: &str, - dest: &Path, - #[cfg(feature = "use_resource_tracking")] file_hash: &str, - ) -> Result<(), String> { - let tmp_dest = dest.with_extension("tmp"); - let response = self - .client - .get(url) - .send() - .await - .map_err(|e| format!("Request error: {:?}", e))?; - - #[cfg(feature = "use_resource_tracking")] - self.download_tracking.start(file_hash.to_string()).await; - - #[cfg(feature = "use_resource_tracking")] - let mut current_size = 0; + async fn download_file(&self, url: &str, dest: &Path) -> Result<(), String> { + let data = self.download_file_with_buffer(url, dest).await?; - if !response.status().is_success() { - return Err(format!("Failed to download file: {:?}", response.status())); - } - - let mut stream = response.bytes_stream(); - let mut accumulated_data = Vec::new(); - - while let Some(chunk) = stream.next().await { - let chunk = chunk.map_err(|e| format!("Stream error: {:?}", e))?; - accumulated_data.extend_from_slice(&chunk); - - let accumulated_size = chunk.len() as u64; - if accumulated_size > UPDATE_THRESHOLD { - self.downloaded_size - .fetch_add(accumulated_size, Ordering::Relaxed); - #[cfg(feature = "use_resource_tracking")] - { - current_size += accumulated_size; - self.download_tracking - .report_progress(file_hash, current_size) - .await; - } - } - } - - // Write the accumulated data to the temporary file - GodotFileSystem::write_file(tmp_dest.to_str().unwrap(), &accumulated_data) - .map_err(|e| format!("Failed to write file: {}", e))?; - - // Rename the temporary file to the final destination - if GodotFileSystem::exists(dest.to_str().unwrap()) { - GodotFileSystem::remove_file(dest.to_str().unwrap()) - .map_err(|e| format!("Failed to remove existing file: {}", e))?; + // Write the downloaded data to the file + if let Some(mut file) = + FileAccess::open(dest.to_string_lossy().as_ref().into(), ModeFlags::WRITE) + { + file.store_buffer(PackedByteArray::from_vec(&data)); + Ok(()) + } else { + Err(format!( + "Failed to open file for writing: {}", + dest.display() + )) } - - GodotFileSystem::write_file(dest.to_str().unwrap(), &accumulated_data) - .map_err(|e| format!("Failed to write final file: {}", e))?; - - GodotFileSystem::remove_file(tmp_dest.to_str().unwrap()) - .map_err(|e| format!("Failed to remove temporary file: {}", e))?; - - #[cfg(feature = "use_resource_tracking")] - self.download_tracking.end(file_hash).await; - - Ok(()) } - async fn download_file_with_buffer( - &self, - url: &str, - dest: &Path, - #[cfg(feature = "use_resource_tracking")] file_hash: &str, - ) -> Result, String> { - let tmp_dest = dest.with_extension("tmp"); - let response = self - .client - .get(url) - .send() - .await - .map_err(|e| format!("Request error: {:?}", e))?; - - #[cfg(feature = "use_resource_tracking")] - self.download_tracking.start(file_hash.to_string()).await; - #[cfg(feature = "use_resource_tracking")] - let mut current_size = 0; - - let mut file = FileAccess::open(tmp_dest.to_str().unwrap(), ModeFlags::WRITE); - let mut stream = response.bytes_stream(); - let mut buffer = Vec::new(); - - let mut accumulated_size = 0; - - while let Some(chunk) = stream.next().await { - let chunk = chunk.map_err(|e| format!("Stream error: {:?}", e))?; - file.write_all(&chunk) - .await - .map_err(|e| format!("File write error: {:?}", e))?; - buffer.extend_from_slice(&chunk); - - accumulated_size += chunk.len() as u64; - if accumulated_size > UPDATE_THRESHOLD { - self.downloaded_size - .fetch_add(accumulated_size, Ordering::Relaxed); - #[cfg(feature = "use_resource_tracking")] - { - current_size += accumulated_size; - self.download_tracking - .report_progress(file_hash, current_size) - .await; - } - accumulated_size = 0; - } - } - - if accumulated_size > 0 { - self.downloaded_size - .fetch_add(accumulated_size, Ordering::Relaxed); - #[cfg(feature = "use_resource_tracking")] - { - current_size += accumulated_size; - self.download_tracking - .report_progress(file_hash, current_size) - .await; + async fn download_file_with_buffer(&self, url: &str, dest: &Path) -> Result, String> { + let request = RequestOption::new( + 0, + url.to_string(), + http::Method::GET, + ResponseType::ToFile(dest.to_string_lossy().as_ref().into()), + None, + None, + None, + ); + let request_response = self.http_queue_requester.request(request, 0).await; + match request_response { + Ok(request_response) => { + let file_path = FileAccess::open(dest.to_string_lossy().as_ref().into(), ModeFlags::READ) + .ok_or(format!("Failed open file: {}", dest.display()))?; + let data = file_path.get_buffer(file_path.get_length() as i64); + Ok(data.to_vec()) } + Err(e) => Err(e.error_message), } - - fs::rename(&tmp_dest, dest).await.map_err(|e| { - format!( - "Failed to rename file: {:?} from: {:?} to: {:?}", - e, tmp_dest, dest - ) - })?; - - #[cfg(feature = "use_resource_tracking")] - self.download_tracking.end(file_hash).await; - - Ok(buffer) } async fn ensure_initialized(&self) -> Result<(), String> { @@ -391,7 +222,13 @@ impl ResourceProvider { let mut existing_files = self.existing_files.write().await; self.touch_file(&mut existing_files, absolute_file_path); - GodotFileSystem::read_file(absolute_file_path) + let file_handle = FileAccess::open(absolute_file_path.into(), ModeFlags::READ); + if let Some(file) = file_handle { + let data = file.get_buffer(file.get_length() as i64); + Ok(data.to_vec()) + } else { + Err(format!("Failed to open file: {}", absolute_file_path)) + } } async fn handle_pending_download( @@ -423,39 +260,6 @@ impl ResourceProvider { Ok(()) } - pub async fn store_file(&self, file_hash: &str, bytes: &[u8]) -> Result<(), String> { - self.ensure_initialized().await?; - let absolute_file_path = self.cache_folder.join(file_hash); - let absolute_file_path_str = absolute_file_path.to_str().unwrap(); - - // Write the bytes to a temporary file first - let tmp_dest = absolute_file_path.with_extension("tmp"); - let tmp_dest_str = tmp_dest.to_str().unwrap(); - - GodotFileSystem::write_file(tmp_dest_str, bytes)?; - - // Rename temporary file to final destination - if GodotFileSystem::exists(absolute_file_path_str) { - GodotFileSystem::remove_file(absolute_file_path_str)?; - } - - GodotFileSystem::write_file(absolute_file_path_str, bytes)?; - GodotFileSystem::remove_file(tmp_dest_str)?; - - // Update the cache map - let file_size = bytes.len() as i64; - let mut existing_files = self.existing_files.write().await; - self.ensure_space_for(&mut existing_files, file_size).await; - self.add_file( - &mut existing_files, - absolute_file_path_str.to_string(), - file_size, - ) - .await; - - Ok(()) - } - pub async fn fetch_resource( &self, url: &str, @@ -470,23 +274,16 @@ impl ResourceProvider { let permit = self.semaphore.acquire().await.unwrap(); if !DirAccess::dir_exists_absolute(absolute_file_path.into()) { - self.download_file( - url, - Path::new(absolute_file_path), - #[cfg(feature = "use_resource_tracking")] - file_hash, - ) - .await?; + self.download_file(url, Path::new(absolute_file_path)) + .await?; let metadata = FileAccess::open(absolute_file_path.into(), ModeFlags::READ) - .await - .map_err(|e| format!("Failed to get metadata: {:?}", e))?; - let file_size = metadata.len() as i64; + .ok_or(format!("Failed open file: {}", absolute_file_path))?; + let file_size = metadata.get_length() as i64; let mut existing_files = self.existing_files.write().await; self.ensure_space_for(&mut existing_files, file_size).await; - self.add_file(&mut existing_files, absolute_file_path.clone(), file_size) - .await; + self.add_file(&mut existing_files, absolute_file_path.clone(), file_size); } else { self.handle_existing_file(absolute_file_path).await?; } @@ -516,21 +313,15 @@ impl ResourceProvider { let permit = self.semaphore.acquire().await.unwrap(); let data = if !FileAccess::file_exists(absolute_file_path.into()) { let data = self - .download_file_with_buffer( - url, - Path::new(absolute_file_path), - #[cfg(feature = "use_resource_tracking")] - file_hash, - ) + .download_file_with_buffer(url, Path::new(absolute_file_path)) .await?; + let metadata = FileAccess::open(absolute_file_path.into(), ModeFlags::READ) - .await - .map_err(|e| format!("Failed to get metadata: {:?}", e))?; - let file_size = metadata.len() as i64; + .ok_or(format!("Failed open file: {}", absolute_file_path))?; + let file_size = metadata.get_length() as i64; let mut existing_files = self.existing_files.write().await; self.ensure_space_for(&mut existing_files, file_size).await; - self.add_file(&mut existing_files, absolute_file_path.clone(), file_size) - .await; + self.add_file(&mut existing_files, absolute_file_path.clone(), file_size); data } else { self.handle_existing_file(absolute_file_path).await? @@ -579,102 +370,3 @@ impl ResourceProvider { self.total_size(&existing_files) } } - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::join_all; - use tokio::io::Result; - - async fn setup_cache_folder(path: &str) -> Result<()> { - if !DirAccess::dir_exists_absolute(path.into()) { - DirAccess::make_dir_recursive_absolute(path.into()); - } - Ok(()) - } - - #[tokio::test] - async fn test_fetch_resource_or_wait() { - let path = "./cache"; - let max_cache_size = 1024 * 1024 * 1024; // Set the cache size to 1 GB - - setup_cache_folder(path) - .await - .expect("Failed to create cache folder"); - - #[cfg(feature = "use_resource_tracking")] - let resource_download_tracking = Arc::new(ResourceDownloadTracking::new()); - - let provider = Arc::new(ResourceProvider::new( - path, - max_cache_size, - 2, - #[cfg(feature = "use_resource_tracking")] - resource_download_tracking.clone(), - )); - provider.clear().await; - - let files_to_download = vec![ - ( - "https://link.testfile.org/15MB", - "bafkreibmrvrdgqthfrvehyell552sk7ivuas2ozzjdmlojbzttqlcrxiya", - ), - ( - "https://link.testfile.org/15MB", - "bafkreic4osvzsjzyqutwjxt2xmyd4hjrwukrxzclvixke3putyrihggmam", - ), - ( - "https://link.testfile.org/15MB", - "bafkreibhjuitdcu3jwu7khjcg2fo6xf2h3hilnfv4liy4p5h2olxj6tcce", - ), - ]; - - // Create a vector to hold the handles of the spawned tasks - let handles: Vec<_> = files_to_download - .clone() - .into_iter() - .map(|(url, file_hash)| { - let url = url.to_string(); - let file_hash = file_hash.to_string(); - let absolute_file_path = format!("{}/{}", path, file_hash); - - let provider_clone = provider.clone(); - tokio::spawn(async move { - provider_clone - .fetch_resource(&url, &file_hash, &absolute_file_path) - .await - .expect("Failed to fetch resource"); - }) - }) - .collect(); - - // Await all the handles - join_all(handles).await; - - // Extract file hashes from the files_to_download vector - let file_hashes: Vec<_> = files_to_download - .iter() - .map(|(_, file_hash)| *file_hash) - .collect(); - - // Check if all files have been downloaded - for file_hash in file_hashes { - let absolute_file_path = format!("{}/{}", path, file_hash); - let existing_files = provider.existing_files.read().await; - assert!(existing_files.contains_key(&absolute_file_path)); - } - - { - let mut existing_files = provider.existing_files.write().await; - provider.remove_less_used(&mut existing_files).await; - assert!(existing_files.len() == 2); - } - - { - provider.clear().await; - let existing_files = provider.existing_files.read().await; - assert!(provider.total_size(&existing_files) == 0); - assert!(existing_files.is_empty()); - } - } -}