Skip to content

Commit

Permalink
Propagate delete events in shared streams
Browse files Browse the repository at this point in the history
Signed-off-by: Danil-Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Dec 28, 2024
1 parent 6a980c6 commit 5343bdc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 6 deletions.
15 changes: 11 additions & 4 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ where
pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
}

// Return a number of active subscribers to this shared sender.
pub(crate) fn subscribers(&self) -> usize {
self.dispatch_tx.receiver_count() - 1
}
}

/// A handle to a shared stream reader
Expand Down Expand Up @@ -132,10 +137,12 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match ready!(this.rx.as_mut().poll_next(cx)) {
Some(obj_ref) => this
.reader
.get(&obj_ref)
.map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
Some(obj_ref) => if obj_ref.extra.remaining_lookups.is_some() {
this.reader.remove(&obj_ref)
} else {
this.reader.get(&obj_ref)
}
.map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
None => Poll::Ready(None),
}
}
Expand Down
6 changes: 6 additions & 0 deletions kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub trait Lookup {
extra: Extra {
resource_version: self.resource_version().map(Cow::into_owned),
uid: self.uid().map(Cow::into_owned),
remaining_lookups: None,
},
}
}
Expand Down Expand Up @@ -156,6 +157,8 @@ pub struct Extra {
pub resource_version: Option<String>,
/// The uid of the object
pub uid: Option<String>,
/// Number of remaining cache lookups on this reference
pub remaining_lookups: Option<usize>,
}

impl<K: Lookup> ObjectRef<K>
Expand Down Expand Up @@ -225,6 +228,7 @@ impl<K: Lookup> ObjectRef<K> {
extra: Extra {
resource_version: None,
uid: Some(owner.uid.clone()),
remaining_lookups: None,
},
})
} else {
Expand Down Expand Up @@ -271,6 +275,7 @@ impl<K: Lookup> From<ObjectRef<K>> for ObjectReference {
extra: Extra {
resource_version,
uid,
..
},
} = val;
ObjectReference {
Expand Down Expand Up @@ -351,6 +356,7 @@ mod tests {
extra: Extra {
resource_version: Some("123".to_string()),
uid: Some("638ffacd-f666-4402-ba10-7848c66ef576".to_string()),
remaining_lookups: None,
},
..minimal.clone()
};
Expand Down
33 changes: 31 additions & 2 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,14 @@ where
self.store.write().insert(key, obj);
}
watcher::Event::Delete(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
let mut key = obj.to_object_ref(self.dyntype.clone());
let mut store = self.store.write();
store.remove(&key);
if self.dispatcher.is_some() {
// Re-insert the entry with updated key, as insert on its own doesnt modify the key
key.extra.remaining_lookups = self.dispatcher.as_ref().map(|d| d.subscribers());

Check failure on line 115 in kube-runtime/src/reflector/store.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

redundant closure

error: redundant closure --> kube-runtime/src/reflector/store.rs:115:80 | 115 | key.extra.remaining_lookups = self.dispatcher.as_ref().map(|d| d.subscribers()); | ^^^^^^^^^^^^^^^^^^^ help: replace the closure with the method itself: `super::dispatcher::Dispatcher::subscribers` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure_for_method_calls note: the lint level is defined here --> kube-runtime/src/lib.rs:12:9 | 12 | #![deny(clippy::pedantic)] | ^^^^^^^^^^^^^^^^ = note: `#[deny(clippy::redundant_closure_for_method_calls)]` implied by `#[deny(clippy::pedantic)]`
store.insert(key, Arc::new(obj.clone()));
}
}
watcher::Event::Init => {
self.buffer = AHashMap::new();
Expand Down Expand Up @@ -159,6 +165,12 @@ where
}
}

watcher::Event::Delete(obj) => {
let mut obj_ref = obj.to_object_ref(self.dyntype.clone());
obj_ref.extra.remaining_lookups = Some(dispatcher.subscribers());
dispatcher.broadcast(obj_ref).await;
}

_ => {}
}
}
Expand Down Expand Up @@ -236,6 +248,23 @@ where
.cloned()
}

#[must_use]
pub fn remove(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
let mut store = self.store.write();
store.remove_entry(key).map(|(k, obj)| {
let mut k = k.clone();
match k.extra.remaining_lookups {
Some(..=1) | None => (),
Some(lookups) => {
k.extra.remaining_lookups = Some(lookups - 1);
store.insert(k, obj.clone());
}
};

obj
})
}

/// Return a full snapshot of the current values
#[must_use]
pub fn state(&self) -> Vec<Arc<K>> {
Expand Down

0 comments on commit 5343bdc

Please sign in to comment.