Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #25759: Improve APT/dpkg output collecter #5977

Draft
wants to merge 1 commit into
base: branches/rudder/8.2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions policies/module-types/system-updates/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ rudder_module_type = { path = "../../rudder-module-type" }
#rust-apt = { version = "0.8.0", optional = true }
# while we don't have a fixed release
rust-apt = { git = "https://gitlab.com/amousset/rust-apt.git", branch = "rudder", optional = true }
stdio-override = { version = "0.1.3", optional = true }
memfile = { version = "0.3.2", optional = true }
uuid = { version = "1", features = ["v4"] }
#librpm = "0.1.1"
Expand All @@ -30,7 +29,7 @@ log = "0.4.21"

[features]
# Enable APT package management
apt = ["dep:rust-apt", "dep:stdio-override", "dep:memfile"]
apt = ["dep:rust-apt", "dep:memfile"]

[dev-dependencies]
rudder_commons_test = { path = "../../rudder-commons-test" }
Expand Down
25 changes: 21 additions & 4 deletions policies/module-types/system-updates/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use crate::{output::Report, state::UpdateStatus};
use chrono::{DateTime, Duration, SecondsFormat, Utc};
use rudder_module_type::rudder_debug;
use rusqlite::{self, Connection, Row};
use std::fs::Permissions;
use std::os::unix::prelude::PermissionsExt;
use std::{
fmt::{Display, Formatter},
fs,
fs::Permissions,
os::unix::prelude::PermissionsExt,
path::{Path, PathBuf},
};

Expand Down Expand Up @@ -355,8 +355,7 @@ mod tests {
use chrono::{Duration, Utc};
use pretty_assertions::assert_eq;
use rusqlite::Connection;
use std::ops::Add;
use std::os::unix::prelude::PermissionsExt;
use std::{ops::Add, os::unix::prelude::PermissionsExt};

#[test]
fn new_creates_new_database() {
Expand Down Expand Up @@ -417,6 +416,24 @@ mod tests {
assert_eq!(db.lock(0, event_id).unwrap(), None);
}

#[test]
fn it_gets_status_regardless_of_event_id_case() {
let mut db = PackageDatabase::new(None).unwrap();
let event_id = "TEST";
let campaign_id = "CAMPAIGN";
let now = Utc::now();
assert_eq!(db.get_status(event_id).unwrap(), None);
db.schedule_event(event_id, campaign_id, now).unwrap();
assert_eq!(
db.get_status(event_id).unwrap().unwrap(),
UpdateStatus::ScheduledUpdate
);
assert_eq!(
db.get_status(&event_id.to_lowercase()).unwrap().unwrap(),
UpdateStatus::ScheduledUpdate
);
}

#[test]
fn start_event_inserts_and_sets_running_update() {
let mut db = PackageDatabase::new(None).unwrap();
Expand Down
82 changes: 27 additions & 55 deletions policies/module-types/system-updates/src/package_manager/apt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
mod filter;
mod progress;

use crate::package_manager::apt::progress::AptAcquireProgress;
use crate::{
campaign::FullCampaignType,
output::ResultOutput,
package_manager::{
apt::filter::{Distribution, PackageFileFilter},
apt::{
filter::{Distribution, PackageFileFilter},
progress::RudderAptAcquireProgress,
},
LinuxPackageManager, PackageId, PackageInfo, PackageList, PackageManager, PackageSpec,
},
};
Expand All @@ -27,15 +29,7 @@ use rust_apt::{
progress::{AcquireProgress, InstallProgress},
Cache, PackageSort,
};
use std::{
collections::HashMap,
env,
fs::File,
io::{Read, Seek},
path::Path,
process::Command,
};
use stdio_override::{StdoutOverride, StdoutOverrideGuard};
use std::{collections::HashMap, env, os::fd::AsRawFd, path::Path, process::Command};

/// References:
/// * https://www.debian.org/doc/manuals/debian-faq/uptodate.en.html
Expand Down Expand Up @@ -211,41 +205,19 @@ impl AptPackageManager {
}
}

// Catch stdout/stderr from the library
pub struct OutputCatcher {
out_file: File,
out_guard: StdoutOverrideGuard,
}

impl OutputCatcher {
pub fn new() -> Self {
let out_file = MemFile::create_default("stdout").unwrap().into_file();
let out_guard = StdoutOverride::override_raw(out_file.try_clone().unwrap()).unwrap();
Self {
out_file,
out_guard,
}
}

pub fn read(mut self) -> String {
drop(self.out_guard);
let mut out = String::new();
self.out_file.rewind().unwrap();
self.out_file.read_to_string(&mut out).unwrap();
out
}
}

impl LinuxPackageManager for AptPackageManager {
fn update_cache(&mut self) -> ResultOutput<()> {
let cache = self.cache();

if let Ok(o) = cache.inner {
let mut progress = AcquireProgress::new(AptAcquireProgress::new());
let catch = OutputCatcher::new();
let mem_file_acquire = MemFile::create_default("update-acquire").unwrap();
let mut progress = AcquireProgress::new(RudderAptAcquireProgress::new(
mem_file_acquire.try_clone().unwrap(),
));
let mut r = Self::apt_errors_to_output(o.update(&mut progress));
let out = catch.read();
r.stdout(out);

let acquire_out = RudderAptAcquireProgress::read_mem_file(mem_file_acquire);
r.stdout(acquire_out);
r
} else {
cache.clear_ok()
Expand Down Expand Up @@ -312,13 +284,23 @@ impl LinuxPackageManager for AptPackageManager {
}

// Do the changes
let mut acquire_progress = AcquireProgress::new(AptAcquireProgress::new());
let mut install_progress = InstallProgress::default();
let catch = OutputCatcher::new();
let mem_file_acquire = MemFile::create_default("upgrade-acquire").unwrap();
let mut acquire_progress = AcquireProgress::new(RudderAptAcquireProgress::new(
mem_file_acquire.try_clone().unwrap(),
));

let mem_file_install = MemFile::create_default("upgrade-install").unwrap();
let mut install_progress = InstallProgress::fd(mem_file_install.as_raw_fd());

let mut res_commit =
Self::apt_errors_to_output(c.commit(&mut acquire_progress, &mut install_progress));
let out = catch.read();
res_commit.stdout(out);

let acquire_out = RudderAptAcquireProgress::read_mem_file(mem_file_acquire);
res_commit.stdout(acquire_out);

let install_out = RudderAptAcquireProgress::read_mem_file(mem_file_install);
res_commit.stdout(install_out);

res_resolve.step(res_commit)
} else {
cache.clear_ok()
Expand Down Expand Up @@ -389,14 +371,4 @@ NEEDRESTART-SESS: amousset @ user manager service";
expected2
);
}

#[test]
// Needs "-- --nocapture --ignored" to run in tests as cargo test also messes with stdout/stderr
#[ignore]
fn it_captures_stdout() {
let catch = OutputCatcher::new();
println!("plouf");
let out = catch.read();
assert_eq!(out, "plouf\n".to_string());
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// SPDX-FileCopyrightText: 2024 Normation SAS

use rust_apt::error::pending_error;
use rust_apt::progress::DynAcquireProgress;
use rust_apt::raw::{AcqTextStatus, ItemDesc, ItemState, PkgAcquire};
use rust_apt::util::{time_str, unit_str, NumSys};
use memfile::MemFile;
use rust_apt::{
error::pending_error,
progress::DynAcquireProgress,
raw::{AcqTextStatus, ItemDesc, ItemState, PkgAcquire},
util::{time_str, unit_str, NumSys},
};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};

/// Reimplement `rust_apt`'s progress without interactivity.
///
/// Try to stay consistent with APT's output, but allow some divergence.

/// This struct mimics the output of `apt update`.
#[derive(Default, Debug)]
pub struct AptAcquireProgress {}
#[derive(Debug)]
pub struct RudderAptAcquireProgress {
/// Enforce MemFile as we won't properly handle IO errors
writer: BufWriter<MemFile>,
}

impl AptAcquireProgress {
impl RudderAptAcquireProgress {
/// Returns a new default progress instance.
pub fn new() -> Self {
Self::default()
pub fn new(out_file: MemFile) -> Self {
let writer = BufWriter::new(out_file);
Self { writer }
}

pub fn read_mem_file(mut file: MemFile) -> String {
let mut acquire_out = String::new();
file.seek(SeekFrom::Start(0)).unwrap();
file.read_to_string(&mut acquire_out).unwrap();
acquire_out
}
}

impl DynAcquireProgress for AptAcquireProgress {
impl DynAcquireProgress for RudderAptAcquireProgress {
/// Used to send the pulse interval to the apt progress class.
///
/// Pulse Interval is in microseconds.
Expand All @@ -41,7 +56,13 @@ impl DynAcquireProgress for AptAcquireProgress {
///
/// Prints out the short description and the expected size.
fn hit(&mut self, item: &ItemDesc) {
println!("\rHit:{} {}", item.owner().id(), item.description());
write!(
self.writer,
"\rHit:{} {}",
item.owner().id(),
item.description()
)
.unwrap();
}

/// Called when an Item has started to download
Expand All @@ -55,7 +76,7 @@ impl DynAcquireProgress for AptAcquireProgress {
string.push_str(&format!(" [{}]", unit_str(file_size, NumSys::Decimal)));
}

println!("{string}");
write!(self.writer, "{string}").unwrap();
}

/// Called when an Item fails to download.
Expand All @@ -68,18 +89,18 @@ impl DynAcquireProgress for AptAcquireProgress {

match item.owner().status() {
ItemState::StatIdle | ItemState::StatDone => {
println!("\rIgn: {desc}");
write!(self.writer, "\rIgn: {desc}").unwrap();
if error_text.is_empty() {
show_error = false;
}
}
_ => {
println!("\rErr: {desc}");
write!(self.writer, "\rErr: {desc}").unwrap();
}
}

if show_error {
println!("\r{error_text}");
write!(self.writer, "\r{error_text}").unwrap();
}
}

Expand Down Expand Up @@ -117,14 +138,17 @@ impl DynAcquireProgress for AptAcquireProgress {
}

if owner.fetched_bytes() != 0 {
println!(
write!(
self.writer,
"Fetched {} in {} ({}/s)",
unit_str(owner.fetched_bytes(), NumSys::Decimal),
time_str(owner.elapsed_time()),
unit_str(owner.current_cps(), NumSys::Decimal)
);
)
.unwrap();
} else {
println!("Nothing to fetch.");
write!(self.writer, "Nothing to fetch.").unwrap();
}
self.writer.flush().unwrap()
}
}