@@ -176,7 +176,7 @@ def __init__(self, server: str):
176
176
"""
177
177
self .server = server
178
178
179
- async def on (self , category : str = 'no_category' , stream : str = 'documents' , session = 'magnet' ):
179
+ async def on (self , category : str = 'no_category' , stream : str = 'documents' , session = 'magnet' , job : bool = None ):
180
180
"""
181
181
Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
182
182
@@ -213,11 +213,18 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', ses
213
213
except :
214
214
_f ('warn' , f'consumer { self .session } exists, skipping create' )
215
215
try :
216
- self .sub = await self .js .subscribe (
217
- self .category
218
- , queue = self .session
219
- , config = self .config
220
- )
216
+ if job :
217
+ self .sub = await self .js .pull_subscribe (
218
+ self .category
219
+ , durable = self .session
220
+ , config = self .config
221
+ )
222
+ else :
223
+ self .sub = await self .js .subscribe (
224
+ self .category
225
+ , queue = self .session
226
+ , config = self .config
227
+ )
221
228
_f ('info' , 'joined worker queue' )
222
229
except Exception as e :
223
230
return _f ('fatal' , f'{ e } ' )
@@ -246,6 +253,8 @@ async def listen(self, cb=print, job_n: int = None):
246
253
try :
247
254
for payload , msg in payloads , msgs :
248
255
await cb (payload , msg )
256
+ except ValueError as e :
257
+ _f ('success' , f"job of { job_n } fulfilled" )
249
258
except Exception as e :
250
259
_f ('fatal' , e )
251
260
except Exception as e :
0 commit comments