@@ -244,57 +244,64 @@ async def download(self, obj: object = None):
244
244
_f ('fatal' , 'no object store initialized' )
245
245
246
246
async def listen (self , cb = print , job_n : int = None , generic : bool = False , v = False ):
247
- try : self .sub
248
- except : return _f ('fatal' , 'no subscriber initialized' )
249
- if job_n :
250
- _f ("info" ,
251
- f'consuming { job_n } from [{ self .magnet .config .category } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .session } "' )
252
- try :
253
- msgs = await self .sub .fetch (batch = job_n , timeout = 60 )
254
- payloads = [msg .data if generic else Payload (
255
- ** json .loads (msg .data )) for msg in msgs ]
247
+ try :
248
+ self .sub
249
+ except AttributeError :
250
+ return _f ('fatal' , 'no subscriber initialized' )
251
+
252
+ async def deliver_messages (msgs ):
253
+ payloads = [msg .data if generic else Payload (** json .loads (msg .data )) for msg in msgs ]
254
+ for payload , msg in zip (payloads , msgs ):
256
255
try :
257
- for payload , msg in zip (payloads , msgs ):
258
- await cb (payload , msg )
256
+ await cb (payload , msg )
259
257
except ValueError as e :
260
258
_f ('success' , f"job of { job_n } fulfilled\n { e } " )
261
259
except Exception as e :
262
260
_f ('fatal' , e )
261
+
262
+ if job_n :
263
+ try :
264
+ if type (self .sub ).__name__ == "ObjectWatcher" :
265
+ _f ("info" , f'consuming objects from [{ self .magnet .config .host .split ("@" )[1 ]} ] from\n 🛰️ bucket: { self .magnet .config .os_name } "' )
266
+ msgs = await self .object_store .list ()
267
+ for msg in msgs :
268
+ await self .download (msg )
269
+ await cb (self .object_store , msg )
270
+ else :
271
+ _f ("info" , f'consuming { job_n } from [{ self .magnet .config .category } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .session } "' )
272
+ msgs = await self .sub .fetch (batch = job_n , timeout = 60 )
273
+ await deliver_messages (msgs )
263
274
except ValueError as e :
264
- _f ('warn' ,
265
- f'{ self .magnet .config .session } reached the end of { self .magnet .config .category } , { self .magnet .config .name } ' )
275
+ _f ('warn' , f'{ self .magnet .config .session } reached the end of { self .magnet .config .category } , { self .magnet .config .name } ' )
266
276
except Exception as e :
267
277
_f ('warn' , "no more data" )
268
278
else :
269
279
if type (self .sub ).__name__ == "ObjectWatcher" :
280
+ _f ("info" , f'consuming objects from [{ self .magnet .config .host .split ("@" )[1 ]} ] from\n 🛰️ bucket: { self .magnet .config .stream_name } "' )
270
281
e = await self .sub .updates ()
271
282
loop = asyncio .get_event_loop ()
272
283
loop .create_task (cb (self .object_store , e ))
273
- _f ("info" ,
274
- f'consuming objects from [{ self .magnet .config .os_name } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .config .session } "' )
275
284
await asyncio .sleep (1 )
276
-
277
285
else :
278
286
while True :
279
287
try :
280
288
msgs = await self .sub .fetch (batch = 1 , timeout = 60 )
281
- _f ('info' , f"{ msgs } " ) if v else None
282
- payload = msgs [0 ].data if generic else Payload (
283
- ** json .loads (msgs [0 ].data ))
284
- _f ('info' , f"{ payload } " ) if v else None
285
- try :
286
- await cb (payload , msgs [0 ])
287
- except Exception as e :
288
- _f ("warn" , f'retrying connection to { self .magnet .config .host } \n { e } ' )
289
- _f ("info" , "this can also be a problem with your callback" )
289
+ if v :
290
+ _f ('info' , f"{ msgs } " )
291
+ payload = msgs [0 ].data if generic else Payload (** json .loads (msgs [0 ].data ))
292
+ if v :
293
+ _f ('info' , f"{ payload } " )
294
+ await cb (payload , msgs [0 ])
290
295
except Exception as e :
291
296
if "nats: timeout" in str (e ):
292
297
_f ('warn' , 'encountered a timeout, retrying in 1s' )
293
298
else :
294
299
_f ('fatal' , str (e ))
300
+ _f ("warn" , f'retrying connection to { self .magnet .config .host } \n { e } ' )
301
+ _f ("info" , "this can also be a problem with your callback" )
295
302
await asyncio .sleep (1 )
296
- _f ("info" ,
297
- f'consuming delta from [ { self . magnet . config . category } ] on \n 🛰️ stream: { self . magnet . config . stream_name } \n 🧲 session: " { self . magnet . config . session } "' )
303
+ _f ("info" , f'consuming delta from [ { self . magnet . config . category } ] on \n 🛰️ stream: { self . magnet . config . stream_name } \n 🧲 session: " { self . magnet . config . session } "' )
304
+
298
305
299
306
async def worker (self , cb = print ):
300
307
"""
0 commit comments