Skip to content

Commit

Permalink
Merge pull request #607 from nolar/irrelevant-handlers-backported
Browse files Browse the repository at this point in the history
Ignore irrelevant handlers in processing state calculations (backported for 0.28.x)
  • Loading branch information
nolar authored Dec 10, 2020
2 parents c5086f0 + a84617a commit c247b68
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 32 deletions.
2 changes: 1 addition & 1 deletion kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def execute(
cause_handlers = subregistry.get_handlers(cause=cause)
storage = settings.persistence.progress_storage
state = states.State.from_storage(body=cause.body, storage=storage, handlers=owned_handlers)
state = state.with_handlers(cause_handlers)
state = state.with_purpose(cause.reason).with_handlers(cause_handlers)
outcomes = await execute_handlers_once(
lifecycle=lifecycle,
settings=settings,
Expand Down
38 changes: 30 additions & 8 deletions kopf/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,38 @@ async def process_resource_changing_cause(
# Regular causes invoke the handlers.
if cause.reason in handlers_.HANDLER_REASONS:
title = handlers_.TITLES.get(cause.reason, repr(cause.reason))
logger.debug(f"{title.capitalize()} event: %r", body)
if cause.diff and cause.old is not None and cause.new is not None:
logger.debug(f"{title.capitalize()} diff: %r", cause.diff)

resource_registry = registry.resource_changing_handlers[cause.resource]
cause_handlers = resource_registry.get_handlers(cause=cause)
owned_handlers = resource_registry.get_all_handlers()
cause_handlers = resource_registry.get_handlers(cause=cause)
storage = settings.persistence.progress_storage
state = states.State.from_storage(body=cause.body, storage=storage, handlers=owned_handlers)
state = state.with_handlers(cause_handlers)
state = state.with_purpose(cause.reason).with_handlers(cause_handlers)

# Report the causes that have been superseded (intercepted, overridden) by the current one.
# The mix-in causes (i.e. resuming) is re-purposed if its handlers are still selected.
# To the next cycle, all extras are purged or re-purposed, so the message does not repeat.
for extra_reason, counters in state.extras.items(): # usually 0..1 items, rarely 2+.
extra_title = handlers_.TITLES.get(extra_reason, repr(extra_reason))
logger.info(f"{extra_title.capitalize()} event is superseded by {title.lower()}: "
f"{counters.success} succeeded; "
f"{counters.failure} failed; "
f"{counters.running} left to the moment.")
state = state.with_purpose(purpose=cause.reason, handlers=cause_handlers)

# Purge the now-irrelevant handlers if they were not re-purposed (extras are recalculated!).
# The current cause continues afterwards, and overrides its own pre-purged handler states.
# TODO: purge only the handlers that fell out of current purpose; but it is not critical
if state.extras:
state.purge(body=cause.body, patch=cause.patch,
storage=storage, handlers=owned_handlers)

# Inform on the current cause/event on every processing cycle. Even if there are
# no handlers -- to show what has happened and why the diff-base is patched.
logger.debug(f"{title.capitalize()} event: %r", body)
if cause.diff and cause.old is not None and cause.new is not None:
logger.debug(f"{title.capitalize()} diff: %r", cause.diff)

if cause_handlers:
outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
Expand All @@ -360,10 +382,10 @@ async def process_resource_changing_cause(
states.deliver_results(outcomes=outcomes, patch=cause.patch)

if state.done:
success_count, failure_count = state.counts
counters = state.counts # calculate only once
logger.info(f"{title.capitalize()} event is processed: "
f"{success_count} succeeded; "
f"{failure_count} failed.")
f"{counters.success} succeeded; "
f"{counters.failure} failed.")
state.purge(body=cause.body, patch=cause.patch,
storage=storage, handlers=owned_handlers)

Expand Down
1 change: 1 addition & 0 deletions kopf/storage/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ProgressRecord(TypedDict, total=True):
started: Optional[str]
stopped: Optional[str]
delayed: Optional[str]
purpose: Optional[str]
retries: Optional[int]
success: Optional[bool]
failure: Optional[bool]
Expand Down
85 changes: 72 additions & 13 deletions kopf/storage/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import copy
import dataclasses
import datetime
from typing import Any, Collection, Dict, Iterable, Iterator, Mapping, Optional, Tuple, overload
from typing import Any, Collection, Dict, Iterable, Iterator, \
Mapping, NamedTuple, Optional, overload

from kopf.storage import progress
from kopf.structs import bodies, callbacks, handlers as handlers_, patches
Expand Down Expand Up @@ -52,6 +53,7 @@ class HandlerState:
started: Optional[datetime.datetime] = None # None means this information was lost.
stopped: Optional[datetime.datetime] = None # None means it is still running (e.g. delayed).
delayed: Optional[datetime.datetime] = None # None means it is finished (succeeded/failed).
purpose: Optional[handlers_.Reason] = None # None is a catch-all marker for upgrades/rollbacks.
retries: int = 0
success: bool = False
failure: bool = False
Expand All @@ -60,9 +62,10 @@ class HandlerState:
_origin: Optional[progress.ProgressRecord] = None # to check later if it has actually changed.

@classmethod
def from_scratch(cls) -> "HandlerState":
def from_scratch(cls, *, purpose: Optional[handlers_.Reason] = None) -> "HandlerState":
return cls(
started=datetime.datetime.utcnow(),
purpose=purpose,
)

@classmethod
Expand All @@ -71,6 +74,7 @@ def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState":
started=_datetime_fromisoformat(__d.get('started')) or datetime.datetime.utcnow(),
stopped=_datetime_fromisoformat(__d.get('stopped')),
delayed=_datetime_fromisoformat(__d.get('delayed')),
purpose=handlers_.Reason(__d.get('purpose')) if __d.get('purpose') else None,
retries=__d.get('retries') or 0,
success=__d.get('success') or False,
failure=__d.get('failure') or False,
Expand All @@ -84,6 +88,7 @@ def for_storage(self) -> progress.ProgressRecord:
started=None if self.started is None else _datetime_toisoformat(self.started),
stopped=None if self.stopped is None else _datetime_toisoformat(self.stopped),
delayed=None if self.delayed is None else _datetime_toisoformat(self.delayed),
purpose=None if self.purpose is None else str(self.purpose.value),
retries=None if self.retries is None else int(self.retries),
success=None if self.success is None else bool(self.success),
failure=None if self.failure is None else bool(self.failure),
Expand All @@ -95,13 +100,20 @@ def as_in_storage(self) -> Mapping[str, Any]:
# Nones are not stored by Kubernetes, so we filter them out for comparison.
return {key: val for key, val in self.for_storage().items() if val is not None}

def with_purpose(
self,
purpose: Optional[handlers_.Reason],
) -> "HandlerState":
return dataclasses.replace(self, purpose=purpose)

def with_outcome(
self,
outcome: HandlerOutcome,
) -> "HandlerState":
now = datetime.datetime.utcnow()
cls = type(self)
return cls(
purpose=self.purpose,
started=self.started if self.started else now,
stopped=self.stopped if self.stopped else now if outcome.final else None,
delayed=now + datetime.timedelta(seconds=outcome.delay) if outcome.delay is not None else None,
Expand Down Expand Up @@ -133,6 +145,12 @@ def runtime(self) -> datetime.timedelta:
return now - (self.started if self.started else now)


class StateCounters(NamedTuple):
success: int
failure: int
running: int


class State(Mapping[handlers_.HandlerId, HandlerState]):
"""
A state of selected handlers, as persisted in the object's status.
Expand All @@ -149,9 +167,12 @@ class State(Mapping[handlers_.HandlerId, HandlerState]):
def __init__(
self,
__src: Mapping[handlers_.HandlerId, HandlerState],
*,
purpose: Optional[handlers_.Reason] = None,
):
super().__init__()
self._states = dict(__src)
self.purpose = purpose

@classmethod
def from_scratch(cls) -> "State":
Expand All @@ -173,17 +194,27 @@ def from_storage(
handler_states[handler_id] = HandlerState.from_storage(content)
return cls(handler_states)

def with_purpose(
self,
purpose: Optional[handlers_.Reason],
handlers: Iterable[handlers_.BaseHandler] = (), # to be re-purposed
) -> "State":
handler_states: Dict[handlers_.HandlerId, HandlerState] = dict(self)
for handler in handlers:
handler_states[handler.id] = handler_states[handler.id].with_purpose(purpose)
cls = type(self)
return cls(handler_states, purpose=purpose)

def with_handlers(
self,
handlers: Iterable[handlers_.BaseHandler],
) -> "State":
handler_ids = {handler.id for handler in handlers}
handler_states: Dict[handlers_.HandlerId, HandlerState] = dict(self)
for handler_id in handler_ids:
if handler_id not in handler_states:
handler_states[handler_id] = HandlerState.from_scratch()
for handler in handlers:
if handler.id not in handler_states:
handler_states[handler.id] = HandlerState.from_scratch(purpose=self.purpose)
cls = type(self)
return cls(handler_states)
return cls(handler_states, purpose=self.purpose)

def with_outcomes(
self,
Expand All @@ -198,7 +229,7 @@ def with_outcomes(
handler_id: (handler_state if handler_id not in outcomes else
handler_state.with_outcome(outcomes[handler_id]))
for handler_id, handler_state in self.items()
})
}, purpose=self.purpose)

def store(
self,
Expand Down Expand Up @@ -245,13 +276,39 @@ def __getitem__(self, item: handlers_.HandlerId) -> HandlerState:
@property
def done(self) -> bool:
# In particular, no handlers means that it is "done" even before doing.
return all(handler_state.finished for handler_state in self._states.values())
return all(
handler_state.finished for handler_state in self._states.values()
if self.purpose is None or handler_state.purpose is None
or handler_state.purpose == self.purpose
)

@property
def extras(self) -> Mapping[handlers_.Reason, StateCounters]:
return {
reason: StateCounters(
success=len([1 for handler_state in self._states.values()
if handler_state.purpose == reason and handler_state.success]),
failure=len([1 for handler_state in self._states.values()
if handler_state.purpose == reason and handler_state.failure]),
running=len([1 for handler_state in self._states.values()
if handler_state.purpose == reason and not handler_state.finished]),
)
for reason in handlers_.HANDLER_REASONS
if self.purpose is not None and reason != self.purpose
if any(handler_state.purpose == reason for handler_state in self._states.values())
}

@property
def counts(self) -> Tuple[int, int]:
return (
len([1 for handler_state in self._states.values() if handler_state.success]),
len([1 for handler_state in self._states.values() if handler_state.failure]),
def counts(self) -> StateCounters:
purposeful_states = [
handler_state for handler_state in self._states.values()
if self.purpose is None or handler_state.purpose is None
or handler_state.purpose == self.purpose
]
return StateCounters(
success=len([1 for handler_state in purposeful_states if handler_state.success]),
failure=len([1 for handler_state in purposeful_states if handler_state.failure]),
running=len([1 for handler_state in purposeful_states if not handler_state.finished]),
)

@property
Expand All @@ -273,6 +330,8 @@ def delays(self) -> Collection[float]:
max(0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0
for handler_state in self._states.values()
if not handler_state.finished
if self.purpose is None or handler_state.purpose is None
or handler_state.purpose == self.purpose
]


Expand Down
46 changes: 46 additions & 0 deletions tests/handling/test_cause_logging.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import datetime
import logging

import freezegun
import pytest

import kopf
from kopf.reactor.processing import process_resource_event
from kopf.storage.progress import StatusProgressStorage
from kopf.structs.containers import ResourceMemories
from kopf.structs.handlers import ALL_REASONS, HANDLER_REASONS, Reason

Expand Down Expand Up @@ -91,3 +94,46 @@ async def test_diffs_not_logged_if_absent(registry, settings, resource, handlers
], prohibited=[
" diff: "
])



# Timestamps: time zero (0), before (B), after (A), and time zero+1s (1).
TS0 = datetime.datetime(2020, 12, 31, 23, 59, 59, 123456)
TS1_ISO = '2021-01-01T00:00:00.123456'


@pytest.mark.parametrize('cause_types', [
# All combinations except for same-to-same (it is not an "extra" then).
(a, b) for a in HANDLER_REASONS for b in HANDLER_REASONS if a != b
])
@freezegun.freeze_time(TS0)
async def test_supersession_is_logged(
registry, settings, resource, handlers, cause_types, cause_mock, caplog, assert_logs):
caplog.set_level(logging.DEBUG)

settings.persistence.progress_storage = StatusProgressStorage()
body = {'status': {'kopf': {'progress': {
'create_fn': {'purpose': cause_types[0]},
'update_fn': {'purpose': cause_types[0]},
'resume_fn': {'purpose': cause_types[0]},
'delete_fn': {'purpose': cause_types[0]},
}}}}

cause_mock.reason = cause_types[1]
event_type = None if cause_types[1] == Reason.RESUME else 'irrelevant'

await process_resource_event(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
settings=settings,
resource=resource,
memories=ResourceMemories(),
raw_event={'type': event_type, 'object': body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert_logs([
"(Creation|Update|Resuming|Deletion) event is superseded by (creation|update|resuming|deletion): ",
"(Creation|Update|Resuming|Deletion) event: ",
"(Creation|Update|Resuming|Deletion) event is processed: ",
])
Loading

0 comments on commit c247b68

Please sign in to comment.