-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontext.py
99 lines (80 loc) · 3.06 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from typing import Any, Iterable
from pydantic import RootModel
from core.optionals.base.dto import BaseDTO
from core.optionals.base.response import BaseResponse
from plugins.liveapi.engines.base import BaseEngine
class SIOContext:
def __init__(self, engine: BaseEngine, namespace: str,
event_name: str, sid: str) -> None:
self.engine = engine
self.namespace = namespace
self.event_name = event_name
self.session_id = sid
async def reply(self, event_name: str | None,
data: BaseDTO | BaseResponse | RootModel | Any,
to: str | None = None, **additional_arguments):
if to is None:
to = self.session_id
if event_name is None:
event_name = self.event_name
return await self.send_event(event_name,
data, to, self.namespace, **additional_arguments)
async def streaming_response(self, contents: Iterable[BaseDTO | BaseResponse | RootModel | Any],
to: str | None = None, **additional_arguments):
for content in contents:
await self.reply(content, to, **additional_arguments)
async def disconnect_client(self, sid: str, namespace: str | None = None):
if namespace is None:
namespace = self.namespace
await self.engine.disconnect(sid, namespace)
async def reject_client(self):
return await self.disconnect_client(self.session_id, self.namespace)
def session(
self,
sid: int | None = None,
namespace: str | None = None,
):
if sid is None:
sid = self.session_id
if namespace is None:
namespace = self.namespace
return self.engine._client._sio.session(sid, namespace)
@property
def send_event(self):
return self.engine.send_event
@property
def send_message(self):
return self.engine.send_message
@property
def broadcast(self):
return self.engine.broadcast
@property
def send_r2r(self):
return self.engine.send_r2r
async def subscribe(
self,
room_name: str,
sid: str | None = None,
namespace: str | None = None,
exclude_events: list[str] = []):
if sid is None:
sid = self.session_id
if namespace is None:
namespace = self.namespace
return await self.engine.subscribe(
room_name=room_name,
sid=sid, namespace=namespace,
exclude_events=exclude_events
)
async def unsubscribe(self,
room_name: str,
sid: str | None = None,
namespace: str | None = None,):
if sid is None:
sid = self.session_id
if namespace is None:
namespace = self.namespace
return await self.engine.unsubscribe(
room_name=room_name,
sid=sid, namespace=namespace
)