diff --git a/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html b/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html index c8a87697..bc814fc8 100644 --- a/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html +++ b/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html @@ -376,48 +376,68 @@

Module slack_sdk.socket_mode.aiohttp

return self.build_session_id(self.current_session) async def connect(self): - old_session: Optional[ClientWebSocketResponse] = None if self.current_session is None else self.current_session - if self.wss_uri is None: - # If the underlying WSS URL does not exist, - # acquiring a new active WSS URL from the server-side first - self.wss_uri = await self.issue_new_wss_url() - - self.current_session = await self.aiohttp_client_session.ws_connect( - self.wss_uri, - autoping=False, - heartbeat=self.ping_interval, - proxy=self.proxy, - ssl=self.web_client.ssl, - ) - session_id: str = await self.session_id() - self.auto_reconnect_enabled = self.default_auto_reconnect_enabled - self.stale = False - self.logger.info(f"A new session ({session_id}) has been established") - - # The first ping from the new connection - if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...") - t = time.time() - await self.current_session.ping(f"sdk-ping-pong:{t}") - - if self.current_session_monitor is not None: - self.current_session_monitor.cancel() + # This loop is used to ensure when a new session is created, + # a new monitor and a new message receiver are also created. + # If a new session is created but we failed to create the new + # monitor or the new message, we should try it. + while True: + try: + old_session: Optional[ClientWebSocketResponse] = ( + None if self.current_session is None else self.current_session + ) - self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session()) - if self.logger.level <= logging.DEBUG: - self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}") + # If the old session is broken (e.g. reset by peer), it might fail to close it. + # We don't want to retry when this kind of cases happen. + try: + # We should close old session before create a new one. Because when disconnect + # reason is `too_many_websockets`, we need to close the old one first to + # to decrease the number of connections. + self.auto_reconnect_enabled = False + if old_session is not None: + await old_session.close() + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") + except Exception as e: + self.logger.exception(f"Failed to close the old session : {e}") + + if self.wss_uri is None: + # If the underlying WSS URL does not exist, + # acquiring a new active WSS URL from the server-side first + self.wss_uri = await self.issue_new_wss_url() + + self.current_session = await self.aiohttp_client_session.ws_connect( + self.wss_uri, + autoping=False, + heartbeat=self.ping_interval, + proxy=self.proxy, + ssl=self.web_client.ssl, + ) + session_id: str = await self.session_id() + self.auto_reconnect_enabled = self.default_auto_reconnect_enabled + self.stale = False + self.logger.info(f"A new session ({session_id}) has been established") - if self.message_receiver is not None: - self.message_receiver.cancel() + # The first ping from the new connection + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...") + t = time.time() + await self.current_session.ping(f"sdk-ping-pong:{t}") - self.message_receiver = asyncio.ensure_future(self.receive_messages()) - if self.logger.level <= logging.DEBUG: - self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session()) + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}") - if old_session is not None: - await old_session.close() - old_session_id = self.build_session_id(old_session) - self.logger.info(f"The old session ({old_session_id}) has been abandoned") + if self.message_receiver is not None: + self.message_receiver.cancel() + self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") + break + except Exception as e: + self.logger.exception(f"Failed to connect (error: {e}); Retrying...") + await asyncio.sleep(self.ping_interval) async def disconnect(self): if self.current_session is not None: @@ -832,48 +852,68 @@

Args

return self.build_session_id(self.current_session) async def connect(self): - old_session: Optional[ClientWebSocketResponse] = None if self.current_session is None else self.current_session - if self.wss_uri is None: - # If the underlying WSS URL does not exist, - # acquiring a new active WSS URL from the server-side first - self.wss_uri = await self.issue_new_wss_url() - - self.current_session = await self.aiohttp_client_session.ws_connect( - self.wss_uri, - autoping=False, - heartbeat=self.ping_interval, - proxy=self.proxy, - ssl=self.web_client.ssl, - ) - session_id: str = await self.session_id() - self.auto_reconnect_enabled = self.default_auto_reconnect_enabled - self.stale = False - self.logger.info(f"A new session ({session_id}) has been established") - - # The first ping from the new connection - if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...") - t = time.time() - await self.current_session.ping(f"sdk-ping-pong:{t}") - - if self.current_session_monitor is not None: - self.current_session_monitor.cancel() + # This loop is used to ensure when a new session is created, + # a new monitor and a new message receiver are also created. + # If a new session is created but we failed to create the new + # monitor or the new message, we should try it. + while True: + try: + old_session: Optional[ClientWebSocketResponse] = ( + None if self.current_session is None else self.current_session + ) - self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session()) - if self.logger.level <= logging.DEBUG: - self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}") + # If the old session is broken (e.g. reset by peer), it might fail to close it. + # We don't want to retry when this kind of cases happen. + try: + # We should close old session before create a new one. Because when disconnect + # reason is `too_many_websockets`, we need to close the old one first to + # to decrease the number of connections. + self.auto_reconnect_enabled = False + if old_session is not None: + await old_session.close() + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") + except Exception as e: + self.logger.exception(f"Failed to close the old session : {e}") + + if self.wss_uri is None: + # If the underlying WSS URL does not exist, + # acquiring a new active WSS URL from the server-side first + self.wss_uri = await self.issue_new_wss_url() + + self.current_session = await self.aiohttp_client_session.ws_connect( + self.wss_uri, + autoping=False, + heartbeat=self.ping_interval, + proxy=self.proxy, + ssl=self.web_client.ssl, + ) + session_id: str = await self.session_id() + self.auto_reconnect_enabled = self.default_auto_reconnect_enabled + self.stale = False + self.logger.info(f"A new session ({session_id}) has been established") - if self.message_receiver is not None: - self.message_receiver.cancel() + # The first ping from the new connection + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...") + t = time.time() + await self.current_session.ping(f"sdk-ping-pong:{t}") - self.message_receiver = asyncio.ensure_future(self.receive_messages()) - if self.logger.level <= logging.DEBUG: - self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session()) + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}") - if old_session is not None: - await old_session.close() - old_session_id = self.build_session_id(old_session) - self.logger.info(f"The old session ({old_session_id}) has been abandoned") + if self.message_receiver is not None: + self.message_receiver.cancel() + self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") + break + except Exception as e: + self.logger.exception(f"Failed to connect (error: {e}); Retrying...") + await asyncio.sleep(self.ping_interval) async def disconnect(self): if self.current_session is not None: @@ -1082,48 +1122,68 @@

Methods

Expand source code
async def connect(self):
-    old_session: Optional[ClientWebSocketResponse] = None if self.current_session is None else self.current_session
-    if self.wss_uri is None:
-        # If the underlying WSS URL does not exist,
-        # acquiring a new active WSS URL from the server-side first
-        self.wss_uri = await self.issue_new_wss_url()
-
-    self.current_session = await self.aiohttp_client_session.ws_connect(
-        self.wss_uri,
-        autoping=False,
-        heartbeat=self.ping_interval,
-        proxy=self.proxy,
-        ssl=self.web_client.ssl,
-    )
-    session_id: str = await self.session_id()
-    self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
-    self.stale = False
-    self.logger.info(f"A new session ({session_id}) has been established")
-
-    # The first ping from the new connection
-    if self.logger.level <= logging.DEBUG:
-        self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
-    t = time.time()
-    await self.current_session.ping(f"sdk-ping-pong:{t}")
-
-    if self.current_session_monitor is not None:
-        self.current_session_monitor.cancel()
+    # This loop is used to ensure when a new session is created,
+    # a new monitor and a new message receiver are also created.
+    # If a new session is created but we failed to create the new
+    # monitor or the new message, we should try it.
+    while True:
+        try:
+            old_session: Optional[ClientWebSocketResponse] = (
+                None if self.current_session is None else self.current_session
+            )
 
-    self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
-    if self.logger.level <= logging.DEBUG:
-        self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
+            # If the old session is broken (e.g. reset by peer), it might fail to close it.
+            # We don't want to retry when this kind of cases happen.
+            try:
+                # We should close old session before create a new one. Because when disconnect
+                # reason is `too_many_websockets`, we need to close the old one first to
+                # to decrease the number of connections.
+                self.auto_reconnect_enabled = False
+                if old_session is not None:
+                    await old_session.close()
+                    old_session_id = self.build_session_id(old_session)
+                    self.logger.info(f"The old session ({old_session_id}) has been abandoned")
+            except Exception as e:
+                self.logger.exception(f"Failed to close the old session : {e}")
+
+            if self.wss_uri is None:
+                # If the underlying WSS URL does not exist,
+                # acquiring a new active WSS URL from the server-side first
+                self.wss_uri = await self.issue_new_wss_url()
+
+            self.current_session = await self.aiohttp_client_session.ws_connect(
+                self.wss_uri,
+                autoping=False,
+                heartbeat=self.ping_interval,
+                proxy=self.proxy,
+                ssl=self.web_client.ssl,
+            )
+            session_id: str = await self.session_id()
+            self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
+            self.stale = False
+            self.logger.info(f"A new session ({session_id}) has been established")
 
-    if self.message_receiver is not None:
-        self.message_receiver.cancel()
+            # The first ping from the new connection
+            if self.logger.level <= logging.DEBUG:
+                self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
+            t = time.time()
+            await self.current_session.ping(f"sdk-ping-pong:{t}")
 
-    self.message_receiver = asyncio.ensure_future(self.receive_messages())
-    if self.logger.level <= logging.DEBUG:
-        self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
+            if self.current_session_monitor is not None:
+                self.current_session_monitor.cancel()
+            self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
+            if self.logger.level <= logging.DEBUG:
+                self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
 
-    if old_session is not None:
-        await old_session.close()
-        old_session_id = self.build_session_id(old_session)
-        self.logger.info(f"The old session ({old_session_id}) has been abandoned")
+ if self.message_receiver is not None: + self.message_receiver.cancel() + self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.logger.level <= logging.DEBUG: + self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") + break + except Exception as e: + self.logger.exception(f"Failed to connect (error: {e}); Retrying...") + await asyncio.sleep(self.ping_interval)
diff --git a/docs/api-docs/slack_sdk/version.html b/docs/api-docs/slack_sdk/version.html index b7381720..146286f6 100644 --- a/docs/api-docs/slack_sdk/version.html +++ b/docs/api-docs/slack_sdk/version.html @@ -28,7 +28,7 @@

Module slack_sdk.version

Expand source code
"""Check the latest version at https://pypi.org/project/slack-sdk/"""
-__version__ = "3.26.1"
+__version__ = "3.26.2"
diff --git a/slack_sdk/version.py b/slack_sdk/version.py index 114c441a..9cda6501 100644 --- a/slack_sdk/version.py +++ b/slack_sdk/version.py @@ -1,2 +1,2 @@ """Check the latest version at https://pypi.org/project/slack-sdk/""" -__version__ = "3.26.1" +__version__ = "3.26.2"