Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalid statistics #665

Open
alex60217101990 opened this issue Sep 21, 2024 · 2 comments
Open

Invalid statistics #665

alex60217101990 opened this issue Sep 21, 2024 · 2 comments
Labels
bug Something isn't working hacktoberfest

Comments

@alex60217101990
Copy link

Describe the bug
We have carried out load tests on the Opal server and found that during scaling, there are ghost clients persisting in the server's statistics. Could you incorporate a mechanism to automatically purge invalid connections from the statistics based on their TTL?

To Reproduce
To reproduce this issue, it is enough to deploy the Opal server in k8s with a certain number of replicas (we use Kafka for synchronization). Then scale the ReplicaSet with Opal clients to a certain value (100, 200, it doesn’t matter). Statistics are enabled on both the Opal server and the Opal clients. Next, as soon as some pods have transitioned to the ready status, change the replication factor by reducing it, then increasing it again. Simultaneously, make requests to the Opal server's service endpoint for its statistics endpoint.

OPAL version

  • Version: [e.g. 0.7.12]
@alex60217101990 alex60217101990 added the bug Something isn't working label Sep 21, 2024
@orweis
Copy link
Contributor

orweis commented Sep 21, 2024

Hi @alex60217101990 thanks for reaching out and submitting this issue ☺️.

We'll open a ticket for this.
Could you perhaps share your OPAL server logs? If clients aren't removed I'd expect to see errors there, which would ne useful for debugging this.

Re:TTL based removal - since the server client relationship is realtime, and counts on constant connectivity , there is no time tracking at the moment. And ideally a fix here would maintain a better realtime status. That being said time tracking might be useful regardless.

@kreyyser
Copy link

Hi @orweis , I'll add more details.
It doesn't throw the error if you don't send the updates (policy/data) to opal server (via REST API call or via direct broker message send).
But if you do it will try to notify that client and this error appears:

2024-09-24T14:13:52.702117+0000 | 12 | fastapi_websocket_pubsub.event_notifier |ERROR  | Failed to notify subscriber sub_id=0cd312c6f95f4844852d183efca7af77 with topic=ad-users-test
Traceback (most recent call last):
  File "/usr/local/bin/gunicorn", line 33, in <module>
    sys.exit(load_entry_point('gunicorn==22.0.0', 'console_scripts', 'gunicorn')())
    │   │    └ <function importlib_load_entry_point at 0x7f494e852ef0>
    │   └ <built-in function exit>
    └ <module 'sys' (built-in)>
  File "/usr/local/lib/python3.10/site-packages/gunicorn/app/wsgiapp.py", line 67, in run
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]", prog=prog).run()
    │                                                       └ None
    └ <class 'gunicorn.app.wsgiapp.WSGIApplication'>
  File "/usr/local/lib/python3.10/site-packages/gunicorn/app/base.py", line 236, in run
    super().run()
  File "/usr/local/lib/python3.10/site-packages/gunicorn/app/base.py", line 72, in run
    Arbiter(self).run()
    │       └ <gunicorn.app.wsgiapp.WSGIApplication object at 0x7f494e8bb610>
    └ <class 'gunicorn.arbiter.Arbiter'>
  File "/usr/local/lib/python3.10/site-packages/gunicorn/arbiter.py", line 202, in run
    self.manage_workers()
    │    └ <function Arbiter.manage_workers at 0x7f494e1fd870>
    └ <gunicorn.arbiter.Arbiter object at 0x7f494e6e4250>
  File "/usr/local/lib/python3.10/site-packages/gunicorn/arbiter.py", line 571, in manage_workers
    self.spawn_workers()
    │    └ <function Arbiter.spawn_workers at 0x7f494e1fd990>
    └ <gunicorn.arbiter.Arbiter object at 0x7f494e6e4250>
  File "/usr/local/lib/python3.10/site-packages/gunicorn/arbiter.py", line 642, in spawn_workers
    self.spawn_worker()
    │    └ <function Arbiter.spawn_worker at 0x7f494e1fd900>
    └ <gunicorn.arbiter.Arbiter object at 0x7f494e6e4250>
  File "/usr/local/lib/python3.10/site-packages/gunicorn/arbiter.py", line 609, in spawn_worker
    worker.init_process()
    │      └ <function UvicornWorker.init_process at 0x7f494a7e0af0>
    └ <uvicorn.workers.UvicornWorker object at 0x7f4948b27dc0>
  File "/usr/local/lib/python3.10/site-packages/uvicorn/workers.py", line 75, in init_process
    super().init_process()
  File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/base.py", line 142, in init_process
    self.run()
    │    └ <function UvicornWorker.run at 0x7f494a7e0d30>
    └ <uvicorn.workers.UvicornWorker object at 0x7f4948b27dc0>
  File "/usr/local/lib/python3.10/site-packages/uvicorn/workers.py", line 107, in run
    return asyncio.run(self._serve())
           │       │   │    └ <function UvicornWorker._serve at 0x7f494a7e0ca0>
           │       │   └ <uvicorn.workers.UvicornWorker object at 0x7f4948b27dc0>
           │       └ <function run at 0x7f494db40310>
           └ <module 'asyncio' from '/usr/local/lib/python3.10/asyncio/__init__.py'>
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
           │    │                  └ <coroutine object UvicornWorker._serve at 0x7f494856ff40>
           │    └ <method 'run_until_complete' of 'uvloop.loop.Loop' objects>
           └ <uvloop.Loop running=True closed=False debug=False>
> File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 220, in callback_subscribers
    await self.trigger_callback(data, topic, subscriber_id, event)
          │    │                │     │      │              └ Subscription(id='0103aca6d784423f9509bcfadc2d8bb5', subscriber_id='0cd312c6f95f4844852d183efca7af77', topic='ad-users-test', ...
          │    │                │     │      └ '0cd312c6f95f4844852d183efca7af77'
          │    │                │     └ 'ad-users-test'
          │    │                └ {'entries': [{'url': '', 'config': {}, 'topics': ['ad-users-test'], 'dst_path': '/ad-users-test/ad-users/user234', 'save_m...
          │    └ <function EventNotifier.trigger_callback at 0x7f494a363490>
          └ <fastapi_websocket_pubsub.websocket_rpc_event_notifier.WebSocketRpcEventNotifier object at 0x7f49485f05e0>
  File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 178, in trigger_callback
    await subscription.callback(subscription, data)
          │            │        │             └ {'entries': [{'url': '', 'config': {}, 'topics': ['ad-users-test'], 'dst_path': '/ad-users-test/ad-users/user234', 'save_m...
          │            │        └ Subscription(id='0103aca6d784423f9509bcfadc2d8bb5', subscriber_id='0cd312c6f95f4844852d183efca7af77', topic='ad-users-test', ...
          │            └ <function RpcEventServerMethods.subscribe.<locals>.callback at 0x7f4940457e20>
          └ Subscription(id='0103aca6d784423f9509bcfadc2d8bb5', subscriber_id='0cd312c6f95f4844852d183efca7af77', topic='ad-users-test', ...
  File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_pubsub/rpc_event_methods.py", line 26, in callback
    await self.channel.other.notify(subscription=sub, data=data)
          │    │                                 │         └ {'entries': [{'url': '', 'config': {}, 'topics': ['ad-users-test'], 'dst_path': '/ad-users-test/ad-users/user234', 'save_m...
          │    │                                 └ Subscription(id='0103aca6d784423f9509bcfadc2d8bb5', subscriber_id='0cd312c6f95f4844852d183efca7af77', topic='ad-users-test', ...
          │    └ <property object at 0x7f494a456160>
          └ <fastapi_websocket_pubsub.rpc_event_methods.RpcEventServerMethods object at 0x7f494043a8f0>
  File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_rpc/rpc_channel.py", line 441, in call
    promise = await self.async_call(name, args)
                    │    │          │     └ {'subscription': Subscription(id='0103aca6d784423f9509bcfadc2d8bb5', subscriber_id='0cd312c6f95f4844852d183efca7af77', topic=...
                    │    │          └ 'notify'
                    │    └ <function RpcChannel.async_call at 0x7f494a30e200>
                    └ <fastapi_websocket_rpc.rpc_channel.RpcChannel object at 0x7f4940598520>
  File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_rpc/rpc_channel.py", line 433, in async_call
    await self.send(msg)
          │    │    └ RpcMessage(request=RpcRequest(method='notify', arguments={'subscription': Subscription(id='0103aca6d784423f9509bcfadc2d8bb5',...
          │    └ <function RpcChannel.send at 0x7f494a30d6c0>
          └ <fastapi_websocket_rpc.rpc_channel.RpcChannel object at 0x7f4940598520>
  File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_rpc/rpc_channel.py", line 211, in send
    await self.socket.send(data)
          │    │      │    └ RpcMessage(request=RpcRequest(method='notify', arguments={'subscription': Subscription(id='0103aca6d784423f9509bcfadc2d8bb5',...
          │    │      └ <function JsonSerializingWebSocket.send at 0x7f494a30e950>
          │    └ <fastapi_websocket_rpc.simplewebsocket.JsonSerializingWebSocket object at 0x7f4940657cd0>
          └ <fastapi_websocket_rpc.rpc_channel.RpcChannel object at 0x7f4940598520>
  File "/usr/local/lib/python3.10/site-packages/fastapi_websocket_rpc/simplewebsocket.py", line 44, in send
    await self._websocket.send(self._serialize(msg))
          │    │          │    │    │          └ RpcMessage(request=RpcRequest(method='notify', arguments={'subscription': Subscription(id='0103aca6d784423f9509bcfadc2d8bb5',...
          │    │          │    │    └ <function JsonSerializingWebSocket._serialize at 0x7f494a30e830>
          │    │          │    └ <fastapi_websocket_rpc.simplewebsocket.JsonSerializingWebSocket object at 0x7f4940657cd0>
          │    │          └ <property object at 0x7f494a35d850>
          │    └ <fastapi_websocket_rpc.websocket_rpc_endpoint.WebSocketSimplifier object at 0x7f494061f640>
          └ <fastapi_websocket_rpc.simplewebsocket.JsonSerializingWebSocket object at 0x7f4940657cd0>
  File "/usr/local/lib/python3.10/site-packages/starlette/websockets.py", line 165, in send_text
    await self.send({"type": "websocket.send", "text": data})
          │    │                                       └ '{"request": {"method": "notify", "arguments": {"subscription": {"id": "0103aca6d784423f9509bcfadc2d8bb5", "subscriber_id": "...
          │    └ <function WebSocket.send at 0x7f494a3e7760>
          └ <starlette.websockets.WebSocket object at 0x7f4940516a40>
  File "/usr/local/lib/python3.10/site-packages/starlette/websockets.py", line 97, in send
    raise RuntimeError('Cannot call "send" once a close message has been sent.')
RuntimeError: Cannot call "send" once a close message has been sent.

Additional info - 2 replicas of opal server and kafka as a broker. Version is latest at the moment.

@gemanor gemanor assigned gemanor and unassigned gemanor Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working hacktoberfest
Projects
None yet
Development

No branches or pull requests

5 participants