diff --git a/modules/importer/src/runner/common/walker/git.rs b/modules/importer/src/runner/common/walker/git.rs index 1a68ec02a..22afb2bcc 100644 --- a/modules/importer/src/runner/common/walker/git.rs +++ b/modules/importer/src/runner/common/walker/git.rs @@ -10,6 +10,7 @@ use std::{ convert::Infallible, env, fmt::{Debug, Display}, + fs::remove_dir_all, path::{Path, PathBuf}, }; use tracing::{info_span, instrument}; @@ -191,51 +192,96 @@ where // clone or open repository - let result = info_span!("clone repository").in_scope(|| { - self.progress - .message_sync(format!("Cloning repository: {}", self.source)); + let repo = self.clone_or_update_repo(path)?; - let mut builder = RepoBuilder::new(); + log::info!("Repository cloned or updated"); - if let Some(branch) = &self.branch { - builder.branch(branch); - } + // discover files between "then" and now - let mut fo = Self::create_fetch_options(); - if self.continuation.0.is_none() { - fo.depth(self.depth); - } - builder.fetch_options(fo).clone(&self.source, path) - }); + let changes = self.find_changes(&repo)?; + + // discover and process files + + let mut path = Cow::Borrowed(path); + if let Some(base) = &self.path { + let new_path = path.join(base); - let repo = match result { - Ok(repo) => repo, + log::debug!(" Base: {}", path.display()); + log::debug!("Target: {}", new_path.display()); + + // ensure that self.path was a relative sub-directory of the repository + let _ = new_path + .strip_prefix(path) + .map_err(|_| Error::Path(base.into()))?; + + path = new_path.into(); + } + + self.walk(&path, &changes)?; + + let head = repo.head()?; + let commit = head.peel_to_commit()?.id(); + log::info!("Most recent commit: {commit}"); + + // only drop when we are done, as this might delete the working directory + + drop(working_dir); + + // return result + + Ok(Continuation(Some(commit.to_string()))) + } + + fn clone_or_update_repo(&self, path: &Path) -> Result { + match self.clone_repo(path) { + Ok(repo) => Ok(repo), Err(err) if err.code() == ErrorCode::Exists && err.class() == ErrorClass::Invalid => { log::info!("Already exists, opening ..."); let repo = info_span!("open repository").in_scope(|| Repository::open(path))?; - info_span!("fetching updates").in_scope(|| { + let repo = info_span!("fetching updates").in_scope(move || { log::info!("Fetching updates"); self.progress .message_sync(format!("Fetching updates: {}", self.source)); - let mut remote = repo.find_remote("origin")?; + { + let mut remote = repo.find_remote("origin")?; + let mut fo = Self::create_fetch_options(); - let mut fo = Self::create_fetch_options(); - remote.fetch(&[] as &[&str], Some(&mut fo), None)?; - remote.disconnect()?; + match remote.fetch(&[] as &[&str], Some(&mut fo), None) { + Ok(()) => {} + Err(err) + if err.code() == ErrorCode::NotFound + && err.class() == ErrorClass::Odb => + { + // delete repo - let head = repo.find_reference("FETCH_HEAD")?; - let head = head.peel_to_commit()?; + remove_dir_all(path)?; - // reset to the most recent commit - repo.reset(head.as_object(), ResetType::Hard, None)?; + // clone repo + + return Ok(self.clone_repo(path)?); + } + err => err?, + } + remote.disconnect()?; + } + + log::info!("Fetched, resetting"); + + { + let head = repo.find_reference("FETCH_HEAD")?; + let head = head.peel_to_commit()?; + + // reset to the most recent commit + repo.reset(head.as_object(), ResetType::Hard, None)?; + } - Ok::<_, Error>(()) + Ok::<_, Error>(repo) })?; - repo + Ok(repo) } Err(err) => { log::info!( @@ -243,20 +289,46 @@ where err.code(), err.class() ); - return Err(err.into()); + Err(err.into()) } - }; + } + } - log::info!("Repository cloned or updated"); + #[instrument(skip(self), err)] + fn clone_repo(&self, path: &Path) -> Result { + self.progress + .message_sync(format!("Cloning repository: {}", self.source)); - // discover files between "then" and now + let mut builder = RepoBuilder::new(); + + if let Some(branch) = &self.branch { + builder.branch(branch); + } + + let mut fo = Self::create_fetch_options(); + if self.continuation.0.is_none() { + fo.depth(self.depth); + } + + builder.fetch_options(fo).clone(&self.source, path) + } - let changes = match &self.continuation.0 { + fn find_changes(&self, repo: &Repository) -> Result>, Error> { + let result = match &self.continuation.0 { Some(commit) => { log::info!("Continuing from: {commit}"); let files = info_span!("continue from", commit).in_scope(|| { - let start = repo.find_commit(repo.revparse_single(commit)?.id())?; + let start = match repo.find_commit(repo.revparse_single(commit)?.id()) { + Ok(start) => start, + Err(err) + if err.code() == ErrorCode::NotFound + && err.class() == ErrorClass::Odb => + { + return Ok::<_, Error>(None); + } + err => err?, + }; let end = repo.head()?.peel_to_commit()?; let start = start.tree()?; @@ -285,12 +357,14 @@ where } } - Ok::<_, Error>(files) + Ok(Some(files)) })?; - log::info!("Detected {} changed files", files.len()); + if let Some(files) = &files { + log::info!("Detected {} changed files", files.len()); + } - Some(files) + files } _ => { log::debug!("Ingesting all files"); @@ -298,36 +372,16 @@ where } }; - // discover and process files - - let mut path = Cow::Borrowed(path); - if let Some(base) = &self.path { - let new_path = path.join(base); - - log::debug!(" Base: {}", path.display()); - log::debug!("Target: {}", new_path.display()); - - // ensure that self.path was a relative sub-directory of the repository - let _ = new_path - .strip_prefix(path) - .map_err(|_| Error::Path(base.into()))?; - - path = new_path.into(); + match &result { + Some(result) => { + log::info!("Detected {} changed files", result.len()); + } + None => { + log::debug!("Ingesting all files"); + } } - self.walk(&path, &changes)?; - - let head = repo.head()?; - let commit = head.peel_to_commit()?.id(); - log::info!("Most recent commit: {commit}"); - - // only drop when we are done, as this might delete the working directory - - drop(working_dir); - - // return result - - Ok(Continuation(Some(commit.to_string()))) + Ok(result) } fn create_fetch_options<'cb>() -> FetchOptions<'cb> {