@@ -212,7 +212,7 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000,
212
212
, max_ack_pending = bandwidth
213
213
, ack_wait = 3600
214
214
)
215
- _f ('wait' , f'connecting to { self .magnet .config .host } ' )
215
+ _f ('wait' , f'connecting to { self .magnet .config .host . split ( "@" )[ 1 ] } ' )
216
216
try :
217
217
if obj :
218
218
self .sub = await self .magnet .os .watch (include_history = False )
@@ -222,7 +222,6 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000,
222
222
self .sub = await self .magnet .js .pull_subscribe (
223
223
durable = self .magnet .config .session
224
224
, subject = self .magnet .config .category
225
- , stream = self .magnet .config .stream_name
226
225
, config = self .consumer_config
227
226
)
228
227
_f ('info' ,
@@ -271,13 +270,13 @@ async def deliver_messages(msgs):
271
270
await self .download (msg )
272
271
await cb (self .magnet .os , msg )
273
272
else :
274
- _f ("info" , f'consuming { job_n } from [{ self .magnet .config .category } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .session } "' )
273
+ _f ("info" , f'consuming { job_n } from [{ self .magnet .config .category } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .config . session } "' )
275
274
msgs = await self .sub .fetch (batch = job_n , timeout = 60 )
276
275
await deliver_messages (msgs )
277
276
except ValueError as e :
278
277
_f ('warn' , f'{ self .magnet .config .session } reached the end of { self .magnet .config .category } , { self .magnet .config .name } ' )
279
278
except Exception as e :
280
- _f ('warn' , "no more data" )
279
+ _f ('warn' , f "no more data\n { e } " )
281
280
else :
282
281
if type (self .sub ).__name__ == "ObjectWatcher" :
283
282
_f ("info" , f'consuming objects from [{ self .magnet .config .host .split ("@" )[1 ]} ] from\n 🛰️ bucket: { self .magnet .config .stream_name } "' )
@@ -300,7 +299,7 @@ async def deliver_messages(msgs):
300
299
_f ('warn' , 'encountered a timeout, retrying in 1s' )
301
300
else :
302
301
_f ('fatal' , str (e ))
303
- _f ("warn" , f'retrying connection to { self .magnet .config .host } \n { e } ' )
302
+ _f ("warn" , f'retrying connection to { self .magnet .config .host . split ( "@" )[ 1 ] } \n { e } ' )
304
303
_f ("info" , "this can also be a problem with your callback" )
305
304
await asyncio .sleep (1 )
306
305
_f ("info" , f'consuming delta from [{ self .magnet .config .category } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .config .session } "' )
@@ -350,8 +349,8 @@ async def off(self):
350
349
351
350
:return: None
352
351
"""
353
- await self .magnet . js . sub .unsubscribe ()
352
+ await self .sub .unsubscribe ()
354
353
_f ('warn' , f'unsubscribed from { self .magnet .config .stream_name } ' )
355
354
await self .nc .drain ()
356
- _f ('warn' , f'safe to disconnect from { self .magnet .config .host } ' )
355
+ _f ('warn' , f'safe to disconnect from { self .magnet .config .host . split ( "@" )[ 1 ] } ' )
357
356
0 commit comments