From 27ea5942ee5c08205e072e70a215a318ff6224db Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 2 Nov 2023 12:41:28 +0100 Subject: [PATCH 1/3] router: explicitly track endpoints The router currently keeps a mapping of open channels to the endpoints responsible for them. This presents two problems: - when we close down an endpoint, we need to iterate all open channels in order to determine which channels belong to that endpoint - it's possible to have active endpoints associated with the router which the router has no idea about Move to a more explicit model where we add a second mapping: endpoints to their set of open channels. This makes endpoint shutdown easier and adds the advantage that an endpoint with no channels can still be tracked by the router (with an empty channel list). The second point of this will be useful in future commits when the router (and not the routing rules) become responsible for ensuring that all endpoints are correctly shutdown. We need to adjust the beiboot code a bit to avoid a catch22 whereby the routing rule needs to exist before the router is initialized, but the Peer itself can only now exist after the router is initialized. --- src/cockpit/beiboot.py | 22 +++++++++++++--------- src/cockpit/channel.py | 2 +- src/cockpit/router.py | 9 +++++++-- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/cockpit/beiboot.py b/src/cockpit/beiboot.py index 79f04a6ee169..68c153452bbf 100644 --- a/src/cockpit/beiboot.py +++ b/src/cockpit/beiboot.py @@ -113,17 +113,17 @@ def report_exists(files): class DefaultRoutingRule(RoutingRule): - peer: Peer + peer: 'Peer | None' - def __init__(self, router: Router, peer: Peer): + def __init__(self, router: Router): super().__init__(router) - self.peer = peer - def apply_rule(self, options: JsonObject) -> Peer: + def apply_rule(self, options: JsonObject) -> 'Peer | None': return self.peer def shutdown(self) -> None: - self.peer.close() + if self.peer is not None: + self.peer.close() class AuthorizeResponder(ferny.AskpassHandler): @@ -259,11 +259,15 @@ class SshBridge(Router): ssh_peer: SshPeer def __init__(self, args: argparse.Namespace): - self.ssh_peer = SshPeer(self, args.destination, args) + # By default, we route everything to the other host. We add an extra + # routing rule for the packages webserver only if we're running the + # beipack. + rule = DefaultRoutingRule(self) + super().__init__([rule]) - super().__init__([ - DefaultRoutingRule(self, self.ssh_peer), - ]) + # This needs to be created after Router.__init__ is called. + self.ssh_peer = SshPeer(self, args.destination, args) + rule.peer = self.ssh_peer def do_send_init(self): pass # wait for the peer to do it first diff --git a/src/cockpit/channel.py b/src/cockpit/channel.py index 5753e0030bff..f4aaacbf9519 100644 --- a/src/cockpit/channel.py +++ b/src/cockpit/channel.py @@ -204,7 +204,7 @@ def is_closing(self) -> bool: return self._close_args is not None def _close_now(self): - self.send_control('close', **self._close_args) + self.shutdown_endpoint(**self._close_args) def _task_done(self, task): # Strictly speaking, we should read the result and check for exceptions but: diff --git a/src/cockpit/router.py b/src/cockpit/router.py index 944f4af7acad..794867dd6703 100644 --- a/src/cockpit/router.py +++ b/src/cockpit/router.py @@ -61,6 +61,7 @@ class Endpoint: __endpoint_frozen_queue: Optional[ExecutionQueue] = None def __init__(self, router: 'Router'): + router.endpoints[self] = set() self.router = router def freeze_endpoint(self): @@ -94,6 +95,7 @@ def send_channel_message(self, channel: str, **kwargs: JsonDocument) -> None: def send_channel_control(self, channel, command, **kwargs: JsonDocument) -> None: self.router.write_control(channel=channel, command=command, **kwargs) if command == 'close': + self.router.endpoints[self].remove(channel) self.router.drop_channel(channel) def shutdown_endpoint(self, **kwargs: JsonDocument) -> None: @@ -130,6 +132,7 @@ def shutdown(self): class Router(CockpitProtocolServer): routing_rules: List[RoutingRule] open_channels: Dict[str, Endpoint] + endpoints: 'dict[Endpoint, set[str]]' _eof: bool = False def __init__(self, routing_rules: List[RoutingRule]): @@ -137,6 +140,7 @@ def __init__(self, routing_rules: List[RoutingRule]): rule.router = self self.routing_rules = routing_rules self.open_channels = {} + self.endpoints = {} def check_rules(self, options: JsonObject) -> Endpoint: for rule in self.routing_rules: @@ -161,14 +165,14 @@ def drop_channel(self, channel: str) -> None: self.transport.close() def shutdown_endpoint(self, endpoint: Endpoint, **kwargs) -> None: - channels = {key for key, value in self.open_channels.items() if value == endpoint} + channels = self.endpoints.pop(endpoint) logger.debug('shutdown_endpoint(%s, %s) will close %s', endpoint, kwargs, channels) for channel in channels: self.write_control(command='close', channel=channel, **kwargs) self.drop_channel(channel) def do_kill(self, host: Optional[str], group: Optional[str]) -> None: - endpoints = set(self.open_channels.values()) + endpoints = set(self.endpoints) logger.debug('do_kill(%s, %s). Considering %d endpoints.', host, group, len(endpoints)) for endpoint in endpoints: endpoint.do_kill(host, group) @@ -189,6 +193,7 @@ def channel_control_received(self, channel: str, command: str, message: JsonObje return self.open_channels[channel] = endpoint + self.endpoints[endpoint].add(channel) else: try: endpoint = self.open_channels[channel] From e05a203141b8ad1e9c328689e5c7d9c5efdf2f0d Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 2 Nov 2023 16:49:27 +0100 Subject: [PATCH 2/3] dbus channel: make 'close' message idempotent It's theoretically possible that the user might send multiple 'close' messages to a D-Bus channel, which would have a chance to be delivered because the channel doesn't close immediately (but rather waits for outstanding requests to complete). This is a pretty theoretical bug but it's going to get worse when we modify how the router shutdown works in the next commit. --- src/cockpit/channels/dbus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cockpit/channels/dbus.py b/src/cockpit/channels/dbus.py index f55ba8572c24..1be2ecb585be 100644 --- a/src/cockpit/channels/dbus.py +++ b/src/cockpit/channels/dbus.py @@ -511,5 +511,5 @@ def do_data(self, data): def do_close(self): for slot in self.matches: slot.cancel() - self.matches = None # error out + self.matches = [] self.close() From e7eb12dfdcf8c266011b23aa8db4dda72938b97d Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 2 Nov 2023 13:29:16 +0100 Subject: [PATCH 3/3] router: shutdown via endpoints, not channels Add a .do_close() virtual method at the Endpoint level. This is already implemented by Channel, but let's also add an implementation of it to Peer which shuts down the Peer in the usual way. When the bridge gets EOF we can now call this method on all endpoints instead of creating synthetic close messages for each channel. We also stop tracking open channels for knowing when the shutdown is complete, and track endpoints instead (which is a strictly tighter constraint, as it includes endpoints with no open channels). There's the small matter of the increasingly redundant shutdown() method on routing rules, but we can clean that up soon. --- src/cockpit/peer.py | 3 +++ src/cockpit/router.py | 25 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/cockpit/peer.py b/src/cockpit/peer.py index a20838c9ee86..2e3df6784775 100644 --- a/src/cockpit/peer.py +++ b/src/cockpit/peer.py @@ -229,6 +229,9 @@ def do_kill(self, host: Optional[str], group: Optional[str]) -> None: assert self.init_future is None self.write_control(command='kill', host=host, group=group) + def do_close(self) -> None: + self.close() + class ConfiguredPeer(Peer): config: BridgeConfig diff --git a/src/cockpit/router.py b/src/cockpit/router.py index 794867dd6703..3ac0a633ac04 100644 --- a/src/cockpit/router.py +++ b/src/cockpit/router.py @@ -76,6 +76,9 @@ def thaw_endpoint(self): self.__endpoint_frozen_queue = None # interface for receiving messages + def do_close(self): + raise NotImplementedError + def do_channel_control(self, channel: str, command: str, message: JsonObject) -> None: raise NotImplementedError @@ -160,10 +163,6 @@ def drop_channel(self, channel: str) -> None: except KeyError: logger.error('trying to drop non-existent channel %s from %s', channel, self.open_channels) - # were we waiting to exit? - if not self.open_channels and self._eof and self.transport: - self.transport.close() - def shutdown_endpoint(self, endpoint: Endpoint, **kwargs) -> None: channels = self.endpoints.pop(endpoint) logger.debug('shutdown_endpoint(%s, %s) will close %s', endpoint, kwargs, channels) @@ -171,6 +170,13 @@ def shutdown_endpoint(self, endpoint: Endpoint, **kwargs) -> None: self.write_control(command='close', channel=channel, **kwargs) self.drop_channel(channel) + # were we waiting to exit? + if self._eof: + logger.debug(' %d endpoints remaining', len(self.endpoints)) + if not self.endpoints and self.transport: + logger.debug(' close transport') + self.transport.close() + def do_kill(self, host: Optional[str], group: Optional[str]) -> None: endpoints = set(self.endpoints) logger.debug('do_kill(%s, %s). Considering %d endpoints.', host, group, len(endpoints)) @@ -213,12 +219,15 @@ def channel_data_received(self, channel: str, data: bytes) -> None: endpoint.do_channel_data(channel, data) def eof_received(self) -> bool: - self._eof = True + logger.debug('eof_received(%r)', self) - for channel, endpoint in list(self.open_channels.items()): - endpoint.do_channel_control(channel, 'close', {'command': 'close', 'channel': channel}) + endpoints = set(self.endpoints) + for endpoint in endpoints: + endpoint.do_close() - return bool(self.open_channels) + self._eof = True + logger.debug(' endpoints remaining: %r', self.endpoints) + return bool(self.endpoints) def do_closed(self, exc: Optional[Exception]) -> None: for rule in self.routing_rules: