Skip to content

Commit

Permalink
fix: Ensure that the container info of each process is up to date
Browse files Browse the repository at this point in the history
Before this change, we were storing a map of namespaces and
corresponding container info, but that didn't work - namespaces
used by one container might be reused by a next container later.

This change drops that solution and fetches the container info
always.
  • Loading branch information
vadorovsky authored and banditopazzo committed Jan 10, 2024
1 parent a1822d6 commit c2fc521
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions crates/pulsar-core/src/pdk/process_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ struct ProcessTracker {
rx: mpsc::UnboundedReceiver<TrackerRequest>,
/// current processes
processes: HashMap<Pid, ProcessData>,
/// current containers
containers: HashMap<Namespaces, ContainerInfo>,
/// scheduled removal of exited processes
next_cleanup: Timestamp,
/// pending info requests arrived before the process was created
Expand All @@ -162,6 +160,7 @@ struct ProcessData {
>,
argv: Vec<String>,
namespaces: Namespaces,
container: Option<ContainerInfo>,
}

/// Cleanup timeout in nanoseconds. This is how long an exited process
Expand All @@ -186,12 +185,12 @@ impl ProcessTracker {
exec_changes: BTreeMap::new(),
argv: Vec::new(),
namespaces: Namespaces::default(),
container: None,
},
);
Self {
rx,
processes,
containers: HashMap::new(),
next_cleanup: Timestamp::now() + CLEANUP_TIMEOUT,
pending_requests: Vec::new(),
pending_updates: HashMap::new(),
Expand Down Expand Up @@ -263,25 +262,24 @@ impl ProcessTracker {
}
}

fn handle_container_info(
fn get_container_info(
&mut self,
pid: Pid,
namespaces: Namespaces,
is_new_container: bool,
) -> Result<(), ContainerError> {
) -> Result<Option<ContainerInfo>, ContainerError> {
let container_id = procfs::get_process_container_id(pid)?;
if let Some(id) = container_id {
let container_info = ContainerInfo::from_container_id(id.clone())?;
self.containers.entry(namespaces).or_insert_with(|| {
match container_id {
Some(id) => {
let container_info = ContainerInfo::from_container_id(id.clone())?;
if is_new_container {
log::debug!("Detected a new container {id}");
} else {
log::debug!("Detected an already existing container {id}");
}
container_info
});
Ok(Some(container_info))
}
None => Ok(None),
}
Ok(())
}

fn handle_update(&mut self, mut update: TrackerUpdate) {
Expand All @@ -293,8 +291,13 @@ impl ProcessTracker {
namespaces,
is_new_container,
} => {
self.handle_container_info(pid, namespaces, is_new_container)
.unwrap_or_else(|err| log::error!("{err}"));
let container = match self.get_container_info(pid, is_new_container) {
Ok(container) => container,
Err(err) => {
log::error!("{err}");
None
}
};
self.processes.insert(
pid,
ProcessData {
Expand All @@ -309,6 +312,7 @@ impl ProcessTracker {
.map(|parent| parent.argv.clone())
.unwrap_or_default(),
namespaces,
container,
},
);
if let Some(pending_updates) = self.pending_updates.remove(&pid) {
Expand All @@ -325,11 +329,18 @@ impl ProcessTracker {
namespaces,
is_new_container,
} => {
self.handle_container_info(pid, namespaces, is_new_container)
.unwrap_or_else(|err| log::error!("{err}"));
let container = match self.get_container_info(pid, is_new_container) {
Ok(container) => container,
Err(err) => {
log::error!("{err}");
None
}
};
if let Some(p) = self.processes.get_mut(&pid) {
p.exec_changes.insert(timestamp, std::mem::take(image));
p.argv = std::mem::take(argv)
p.argv = std::mem::take(argv);
p.namespaces = namespaces;
p.container = container;
} else {
// if exec arrived before the fork, we save the event as pending
log::debug!("(exec) Process {pid} not found in process tree, saving for later");
Expand Down Expand Up @@ -360,7 +371,6 @@ impl ProcessTracker {
.processes
.get(&pid)
.ok_or(TrackerError::ProcessNotFound)?;
let container: Option<ContainerInfo> = self.containers.get(&process.namespaces).cloned();
if ts < process.fork_time {
log::warn!(
"{} not forked yet {} < {} ({}ms)",
Expand All @@ -383,7 +393,7 @@ impl ProcessTracker {
fork_time: process.fork_time,
argv: process.argv.clone(),
namespaces: process.namespaces,
container,
container: process.container.clone(),
})
}

Expand Down

0 comments on commit c2fc521

Please sign in to comment.