@@ -183,7 +183,7 @@ def __init__(self, server: str):
183
183
"""
184
184
self .server = server
185
185
186
- async def on (self , category : str = 'no_category' , stream : str = 'documents' , session = 'magnet' , job : bool = None , local : bool = False , domain : str = None , bandwidth : int = 3 ):
186
+ async def on (self , category : str = 'no_category' , stream : str = 'documents' , session = 'magnet' , job : bool = None , local : bool = False , domain : str = None , bandwidth : int = 3 , credentials : str = 'path_to_creds_file' , port : int = 4222 ):
187
187
"""
188
188
Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
189
189
@@ -211,19 +211,16 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', ses
211
211
)
212
212
_f ('wait' , f'connecting to { self .server } ' )
213
213
try :
214
- self .nc = await nats .connect (f'nats://{ self .server } :4222' )
214
+ self .nc = await nats .connect (f'{ " nats://" if not credentials else "tls://" } { self .server } :{ port } ' , user_credentials = credentials )
215
215
self .js = self .nc .jetstream (
216
216
domain = domain
217
217
)
218
218
try :
219
219
self .sub = await self .js .pull_subscribe (
220
- stream = self .stream , subject = self .category , durable = self .durable , config = self .config
221
- ) if job else await self .js .subscribe (
222
- subject = self .category
220
+ durable = self .durable
221
+ , subject = self .category
223
222
, stream = self .stream
224
- , queue = self .session
225
223
, config = self .config
226
- , manual_ack = True
227
224
)
228
225
_f ('info' ,
229
226
f'joined worker queue: { self .session } as { self .node } ' )
@@ -233,9 +230,9 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', ses
233
230
except TimeoutError :
234
231
_f ("fatal" , f'could not connect to { self .server } ' )
235
232
except Exception as e :
236
- _f ('wait' , 'server queuing you...' )
233
+ _f ('wait' , f 'server queuing you...\n { e } ' )
237
234
238
- async def listen (self , cb = print , job_n : int = None , generic : bool = False ):
235
+ async def listen (self , cb = print , job_n : int = None , generic : bool = False , verbose = False ):
239
236
"""
240
237
Consume messages from a specific category in a stream and process them.
241
238
@@ -275,13 +272,15 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False):
275
272
f'consuming delta from [{ self .category } ] on\n 🛰️ stream: { self .stream } \n 🧲 session: "{ self .session } "' )
276
273
while True :
277
274
try :
278
- msg = await self .sub .next_msg (timeout = 60 )
279
- payload = msg .data if generic else Payload (
280
- ** json .loads (msg .data ))
275
+ msgs = await self .sub .fetch (batch = 1 , timeout = 60 )
276
+ _f ('info' , f"{ msgs } " ) if verbose else None
277
+ payload = msgs [0 ].data if generic else Payload (
278
+ ** json .loads (msgs [0 ].data ))
279
+ _f ('info' , f"{ payload } " ) if verbose else None
281
280
try :
282
- await cb (payload , msg )
281
+ await cb (payload , msgs [ 0 ] )
283
282
except Exception as e :
284
- _f ("warn" , f'retrying connection to { self .server } ' )
283
+ _f ("warn" , f'retrying connection to { self .server } \n { e } ' )
285
284
except Exception as e :
286
285
_f ('fatal' , f'invalid JSON\n { e } ' )
287
286
break
0 commit comments