diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 1060dab2b..eb939d077 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -70,6 +70,11 @@ where pub(crate) fn subscribe(&self, reader: Store) -> ReflectHandle { ReflectHandle::new(reader, self.dispatch_tx.new_receiver()) } + + // Return a number of active subscribers to this shared sender. + pub(crate) fn subscribers(&self) -> usize { + self.dispatch_tx.receiver_count() - 1 + } } /// A handle to a shared stream reader @@ -132,10 +137,12 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); match ready!(this.rx.as_mut().poll_next(cx)) { - Some(obj_ref) => this - .reader - .get(&obj_ref) - .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), + Some(obj_ref) => if obj_ref.extra.remaining_lookups.is_some() { + this.reader.remove(&obj_ref) + } else { + this.reader.get(&obj_ref) + } + .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), None => Poll::Ready(None), } } diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index 9cfc4e028..ef2c02093 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -57,6 +57,7 @@ pub trait Lookup { extra: Extra { resource_version: self.resource_version().map(Cow::into_owned), uid: self.uid().map(Cow::into_owned), + remaining_lookups: None, }, } } @@ -156,6 +157,8 @@ pub struct Extra { pub resource_version: Option, /// The uid of the object pub uid: Option, + /// Number of remaining cache lookups on this reference + pub remaining_lookups: Option, } impl ObjectRef @@ -225,6 +228,7 @@ impl ObjectRef { extra: Extra { resource_version: None, uid: Some(owner.uid.clone()), + remaining_lookups: None, }, }) } else { @@ -271,6 +275,7 @@ impl From> for ObjectReference { extra: Extra { resource_version, uid, + .. }, } = val; ObjectReference { @@ -351,6 +356,7 @@ mod tests { extra: Extra { resource_version: Some("123".to_string()), uid: Some("638ffacd-f666-4402-ba10-7848c66ef576".to_string()), + remaining_lookups: None, }, ..minimal.clone() }; diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index d6d264dea..f2e72b6c1 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -107,8 +107,14 @@ where self.store.write().insert(key, obj); } watcher::Event::Delete(obj) => { - let key = obj.to_object_ref(self.dyntype.clone()); - self.store.write().remove(&key); + let mut key = obj.to_object_ref(self.dyntype.clone()); + let mut store = self.store.write(); + store.remove(&key); + if self.dispatcher.is_some() { + // Re-insert the entry with updated key, as insert on its own doesnt modify the key + key.extra.remaining_lookups = self.dispatcher.as_ref().map(|d| d.subscribers()); + store.insert(key, Arc::new(obj.clone())); + } } watcher::Event::Init => { self.buffer = AHashMap::new(); @@ -159,6 +165,12 @@ where } } + watcher::Event::Delete(obj) => { + let mut obj_ref = obj.to_object_ref(self.dyntype.clone()); + obj_ref.extra.remaining_lookups = Some(dispatcher.subscribers()); + dispatcher.broadcast(obj_ref).await; + } + _ => {} } } @@ -236,6 +248,23 @@ where .cloned() } + #[must_use] + pub fn remove(&self, key: &ObjectRef) -> Option> { + let mut store = self.store.write(); + store.remove_entry(key).map(|(k, obj)| { + let mut k = k.clone(); + match k.extra.remaining_lookups { + Some(..=1) | None => (), + Some(lookups) => { + k.extra.remaining_lookups = Some(lookups - 1); + store.insert(k, obj.clone()); + } + }; + + obj + }) + } + /// Return a full snapshot of the current values #[must_use] pub fn state(&self) -> Vec> {