-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontext.py
125 lines (96 loc) · 4.66 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import json
from typing import TYPE_CHECKING, Any, AsyncIterable, Literal
from aio_pika import IncomingMessage, Message, Exchange
from pydantic import BaseModel
if TYPE_CHECKING:
from plugins.microservices.handler import Channel
class CommonContext:
def __init__(self, body: Any | BaseModel, body_encoded: bytes,
delivery_frame: Literal["Basic.Ack", "Basic.Nack", "Basic.Reject"],
routing_key: str, exchanger: Exchange, channel: "Channel", **publication_params) -> None:
self.body = body
self.body_encoded = body_encoded
self.delivery_frame = delivery_frame
self.routing_key = routing_key
self.exchanger = exchanger
self.exchange = self.exchanger.name
self.channel = channel
async def retry(self, mandatory: bool = False, immediate: bool = False, timeout: float | int | None = None):
if isinstance(body, BaseModel):
body = body.model_dump_json()
if not isinstance(body, (str, bytes)):
body = json.dumps(
body,
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
if not isinstance(body, bytes):
body = body.encode("utf-8")
return await self.exchanger.publish(Message(self.body_encoded), self.routing_key, mandatory=mandatory, immediate=immediate, timeout=timeout)
class MessageContext:
def __init__(self, message: IncomingMessage, exchanger: Exchange, channel: "Channel") -> None:
self.message = message
self.exchanger = exchanger
self.__channel = channel
@property
def ack(self):
return self.message.ack
@property
def nack(self):
return self.message.nack
@property
def reject(self):
return self.message.reject
async def raw_publish(self, body: Any | BaseModel, *, exchange: str = "", routing_key: str = "", properties: Any | None = None, mandatory: bool = False, immediate: bool = False, timeout: float | int | None = None):
if isinstance(body, BaseModel):
body = body.model_dump_json()
if not isinstance(body, (str, bytes)):
body = json.dumps(
body,
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
if not isinstance(body, bytes):
body = body.encode("utf-8")
delivery = await self.message.channel.basic_publish(body, exchange=exchange, routing_key=routing_key, properties=properties, mandatory=mandatory, immediate=immediate, timeout=timeout)
match delivery.name:
case "Basic.Ack":
await self.__channel.success(self)
case "Basic.Nack":
await self.__channel.failure(self, "Basic.Nack")
case "Basic.Reject":
await self.__channel.failure(self, "Basic.Reject")
return delivery
async def respond_direct(self, body: Any | BaseModel, routing_key: str = "", immediate: bool = False, timeout: float | int | None = None):
if isinstance(body, BaseModel):
body = body.model_dump_json()
if not isinstance(body, (str, bytes)):
body = json.dumps(
body,
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
if not isinstance(body, bytes):
body = body.encode("utf-8")
delivery = await self.message.channel.basic_publish(body, exchange=self.message.exchange, routing_key=routing_key, immediate=immediate, timeout=timeout)
match delivery.name:
case "Basic.Ack":
await self.__channel.success(self)
case "Basic.Nack":
await self.__channel.failure(self, "Basic.Nack")
case "Basic.Reject":
await self.__channel.failure(self, "Basic.Reject")
async def astream_publication_back(self, messages: AsyncIterable[bytes], routing_key: str = "", is_presistant: bool = True):
exchange = self.exchanger
if not is_presistant:
async for message in messages:
await self.raw_publish(message, exchange=exchange.name, routing_key=routing_key, immediate=True)
else:
async for message in messages:
await self.exchanger.publish(Message(message, delivery_mode=2), routing_key=routing_key, immediate=True)