Skip to content

Commit

Permalink
Support push + force push over SSH (#1323)
Browse files Browse the repository at this point in the history
* Support receive-pack over SSH
* Improve tracing in push routines
* Improve error propagation between hooks and proxy
* Improve push option handling: use predefined struct
* Add `force` push option

commit-id:2dbf5e93
  • Loading branch information
vlad-ivanov-name authored Apr 5, 2024
1 parent cd9a4bb commit ade86d2
Show file tree
Hide file tree
Showing 34 changed files with 626 additions and 396 deletions.
243 changes: 144 additions & 99 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use josh::{josh_error, JoshError, JoshResult};
use josh_rpc::calls::RequestedCommand;
use serde::Serialize;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::io;
use std::net::IpAddr;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -729,14 +730,15 @@ async fn serve_namespace(
params: &josh_rpc::calls::ServeNamespace,
repo_path: std::path::PathBuf,
namespace: &str,
repo_update: RepoUpdate,
) -> josh::JoshResult<()> {
const SERVE_TIMEOUT: u64 = 60;

tracing::trace!(
"serve_namespace: command: {:?}, query: {}, namespace: {}",
params.command,
params.query,
namespace
command = ?params.command,
query = %params.query,
namespace = %namespace,
"serve_namespace",
);

enum ServeError {
Expand All @@ -746,25 +748,24 @@ async fn serve_namespace(
SubprocessExited(i32),
}

if params.command == RequestedCommand::GitReceivePack {
return Err(josh_error("Push over SSH is not supported"));
}

let command = match params.command {
RequestedCommand::GitUploadPack => "git-upload-pack",
RequestedCommand::GitUploadArchive => "git-upload-archive",
RequestedCommand::GitReceivePack => "git-receive-pack",
};

let overlay_path = repo_path.join("overlay");

let mut process = tokio::process::Command::new(command)
.arg(repo_path.join("overlay"))
.current_dir(repo_path.join("overlay"))
.arg(&overlay_path)
.current_dir(&overlay_path)
.env("GIT_DIR", &repo_path)
.env("GIT_NAMESPACE", namespace)
.env(
"GIT_ALTERNATE_OBJECT_DIRECTORIES",
repo_path.join("mirror").join("objects"),
)
.env("JOSH_REPO_UPDATE", serde_json::to_string(&repo_update)?)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
Expand Down Expand Up @@ -912,6 +913,31 @@ fn head_ref_or_default(head_ref: &str) -> HeadRef {
}
}

fn make_repo_update(
remote_url: &str,
serv: Arc<JoshProxyService>,
filter: josh::filter::Filter,
remote_auth: RemoteAuth,
meta: &MetaConfig,
repo_path: &Path,
ns: Arc<josh_proxy::TmpGitNamespace>,
) -> RepoUpdate {
let context_propagator = josh_proxy::trace::make_context_propagator();

RepoUpdate {
refs: HashMap::new(),
remote_url: remote_url.to_string(),
remote_auth,
port: serv.port.clone(),
filter_spec: josh::filter::spec(filter),
base_ns: josh::to_ns(&meta.config.repo),
git_ns: ns.name().to_string(),
git_dir: repo_path.display().to_string(),
mirror_git_dir: serv.repo_path.join("mirror").display().to_string(),
context_propagator,
}
}

async fn handle_serve_namespace_request(
serv: Arc<JoshProxyService>,
req: Request<hyper::Body>,
Expand Down Expand Up @@ -958,6 +984,9 @@ async fn handle_serve_namespace_request(
));
};

eprintln!("params: {:?}", params);
eprintln!("parsed_url.upstream_repo: {:?}", parsed_url.upstream_repo);

let auth_socket = params.ssh_socket.clone();
let remote_auth = RemoteAuth::Ssh {
auth_socket: auth_socket.clone(),
Expand Down Expand Up @@ -1000,24 +1029,34 @@ async fn handle_serve_namespace_request(
let remote_url = upstream + meta_config.config.repo.as_str();
let head_ref = head_ref_or_default(&parsed_url.headref);

let remote_refs = [head_ref.get()];
let remote_refs = match ssh_list_refs(&remote_url, auth_socket, Some(&remote_refs)).await {
Ok(remote_refs) => remote_refs,
Err(e) => {
return Ok(make_response(
hyper::Body::from(e.to_string()),
hyper::StatusCode::FORBIDDEN,
))
}
};
let resolved_ref = match params.command {
// When pushing over SSH, we need to fetch to get new references
// for searching for unapply base, so we don't bother with additional cache checks
RequestedCommand::GitReceivePack => None,
// Otherwise, list refs - it doesn't need locking and is faster -
// and use results to potentially skip fetching
_ => {
let remote_refs = [head_ref.get()];
let remote_refs =
match ssh_list_refs(&remote_url, auth_socket, Some(&remote_refs)).await {
Ok(remote_refs) => remote_refs,
Err(e) => {
return Ok(make_response(
hyper::Body::from(e.to_string()),
hyper::StatusCode::FORBIDDEN,
))
}
};

let resolved_ref = match remote_refs.get(head_ref.get()) {
Some(resolved_ref) => resolved_ref,
None => {
return Ok(make_response(
hyper::Body::from("Could not resolve remote ref"),
hyper::StatusCode::INTERNAL_SERVER_ERROR,
))
match remote_refs.get(head_ref.get()) {
Some(resolved_ref) => Some(resolved_ref.clone()),
None => {
return Ok(make_response(
hyper::Body::from("Could not resolve remote ref"),
hyper::StatusCode::INTERNAL_SERVER_ERROR,
))
}
}
}
};

Expand All @@ -1027,7 +1066,7 @@ async fn handle_serve_namespace_request(
&remote_auth,
remote_url.to_owned(),
Some(head_ref.get()),
Some(resolved_ref),
resolved_ref.as_deref(),
false,
)
.await
Expand Down Expand Up @@ -1095,7 +1134,19 @@ async fn handle_serve_namespace_request(
}
};

let serve_result = serve_namespace(&params, serv.repo_path.clone(), temp_ns.name()).await;
let overlay_path = serv.repo_path.join("overlay");
let repo_update = make_repo_update(
&remote_url,
serv.clone(),
filter,
remote_auth,
&meta_config,
&overlay_path,
temp_ns.clone(),
);

let serve_result =
serve_namespace(&params, serv.repo_path.clone(), temp_ns.name(), repo_update).await;
std::mem::drop(temp_ns);

match serve_result {
Expand Down Expand Up @@ -1295,68 +1346,39 @@ async fn call_service(
}

let temp_ns = prepare_namespace(serv.clone(), &meta, filter, &headref).await?;
let overlay_path = serv.repo_path.join("overlay");

let repo_path = serv
.repo_path
.join("overlay")
.to_str()
.ok_or(josh::josh_error("repo_path.to_str"))?
.to_string();

let mirror_repo_path = serv
.repo_path
.join("mirror")
.to_str()
.ok_or(josh::josh_error("repo_path.to_str"))?
.to_string();

let context_propagator = {
let span = tracing::Span::current();

let mut context_propagator = HashMap::<String, String>::default();
let context = span.context();
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut context_propagator);
});

tracing::debug!("context propagator: {:?}", context_propagator);
context_propagator
};

let repo_update = josh_proxy::RepoUpdate {
refs: HashMap::new(),
remote_url: remote_url.clone(),
let repo_update = make_repo_update(
&remote_url,
serv.clone(),
filter,
remote_auth,
port: serv.port.clone(),
filter_spec: josh::filter::spec(filter),
base_ns: josh::to_ns(&meta.config.repo),
git_ns: temp_ns.name().to_string(),
git_dir: repo_path.clone(),
mirror_git_dir: mirror_repo_path.clone(),
context_propagator,
};
&meta,
&overlay_path,
temp_ns.clone(),
);

let cgi_response = async {
let mut cmd = Command::new("git");
cmd.arg("http-backend");
cmd.current_dir(&serv.repo_path.join("overlay"));
cmd.env("GIT_DIR", &repo_path);
cmd.current_dir(&overlay_path);
cmd.env("GIT_DIR", &overlay_path);
cmd.env("GIT_HTTP_EXPORT_ALL", "");
cmd.env(
"GIT_ALTERNATE_OBJECT_DIRECTORIES",
serv.repo_path
.join("mirror")
.join("objects")
.to_str()
.ok_or(josh::josh_error("repo_path.to_str"))?,
.display()
.to_string(),
);
cmd.env("GIT_NAMESPACE", temp_ns.name());
cmd.env("GIT_PROJECT_ROOT", repo_path);
cmd.env("GIT_PROJECT_ROOT", &overlay_path);
cmd.env("JOSH_REPO_UPDATE", serde_json::to_string(&repo_update)?);
cmd.env("PATH_INFO", parsed_url.pathinfo.clone());

let (response, stderr) = hyper_cgi::do_cgi(req, cmd).await;
tracing::debug!("Git stderr: {}", String::from_utf8_lossy(&stderr));
tracing::debug!(stderr = %String::from_utf8_lossy(&stderr), "http-backend exited");

Ok::<_, JoshError>(response)
}
Expand Down Expand Up @@ -1655,35 +1677,41 @@ async fn run_housekeeping(local: std::path::PathBuf) -> josh::JoshResult<()> {
}
}

fn repo_update_from_env() -> josh::JoshResult<josh_proxy::RepoUpdate> {
let repo_update =
std::env::var("JOSH_REPO_UPDATE").map_err(|_| josh_error("JOSH_REPO_UPDATE not set"))?;

serde_json::from_str(&repo_update)
.map_err(|e| josh_error(&format!("Failed to parse JOSH_REPO_UPDATE: {}", e)))
}

fn pre_receive_hook() -> josh::JoshResult<i32> {
let repo_update: josh_proxy::RepoUpdate =
serde_json::from_str(&std::env::var("JOSH_REPO_UPDATE")?)?;
let repo_update = repo_update_from_env()?;

let p = std::path::PathBuf::from(repo_update.git_dir)
let push_options_path = std::path::PathBuf::from(repo_update.git_dir)
.join("refs/namespaces")
.join(repo_update.git_ns)
.join("push_options");

let n: usize = std::env::var("GIT_PUSH_OPTION_COUNT")?.parse()?;
let push_option_count: usize = std::env::var("GIT_PUSH_OPTION_COUNT")?.parse()?;

let mut push_options = std::collections::HashMap::<String, String>::new();
for i in 0..n {
let s = std::env::var(format!("GIT_PUSH_OPTION_{}", i))?;
if let [key, value] = s.as_str().split('=').collect::<Vec<_>>().as_slice() {
push_options.insert(key.to_string(), value.to_string());
let mut push_options = HashMap::<String, serde_json::Value>::new();
for i in 0..push_option_count {
let push_option = std::env::var(format!("GIT_PUSH_OPTION_{}", i))?;
if let Some((key, value)) = push_option.split_once("=") {
push_options.insert(key.into(), value.into());
} else {
push_options.insert(s, "".to_string());
push_options.insert(push_option, true.into());
}
}

std::fs::write(p, serde_json::to_string(&push_options)?)?;
std::fs::write(push_options_path, serde_json::to_string(&push_options)?)?;

Ok(0)
}

fn update_hook(refname: &str, old: &str, new: &str) -> josh::JoshResult<i32> {
let mut repo_update: josh_proxy::RepoUpdate =
serde_json::from_str(&std::env::var("JOSH_REPO_UPDATE")?)?;
let mut repo_update = repo_update_from_env()?;

repo_update
.refs
Expand All @@ -1696,24 +1724,33 @@ fn update_hook(refname: &str, old: &str, new: &str) -> josh::JoshResult<i32> {
.send();

match resp {
Ok(r) => {
let success = r.status().is_success();
if let Ok(body) = r.text() {
println!("response from upstream:\n{}\n\n", body);
} else {
println!("no upstream response");
Ok(resp) => {
let success = resp.status().is_success();
println!("upstream: response status: {}", resp.status());

match resp.text() {
Ok(text) if text.trim().is_empty() => {
println!("upstream: no response body");
}
Ok(text) => {
println!("upstream: response body:\n\n{}", text);
}
Err(err) => {
println!("upstream: warn: failed to read response body: {:?}", err);
}
}

if success {
return Ok(0);
Ok(0)
} else {
return Ok(1);
Ok(1)
}
}
Err(err) => {
tracing::warn!("/repo_update request failed {:?}", err);
Ok(1)
}
};
Ok(1)
}
}

async fn serve_graphql(
Expand Down Expand Up @@ -1949,8 +1986,16 @@ fn main() {

if let [a0, ..] = &std::env::args().collect::<Vec<_>>().as_slice() {
if a0.ends_with("/pre-receive") {
println!("josh-proxy");
std::process::exit(pre_receive_hook().unwrap_or(1));
eprintln!("josh-proxy: pre-receive hook");
let code = match pre_receive_hook() {
Ok(code) => code,
Err(e) => {
eprintln!("josh-proxy: pre-receive hook failed: {}", e);
std::process::exit(1);
}
};

std::process::exit(code);
}
}

Expand Down
Loading

0 comments on commit ade86d2

Please sign in to comment.