@@ -88,7 +88,7 @@ async def off(self):
88
88
await self .magnet .nc .close ()
89
89
_f ('warn' , f'disconnected from { self .magnet .config .host } ' )
90
90
91
- async def pulse (self , payload : Payload | FilePayload | GeneratedPayload | EmbeddingPayload | JobParams = None , v = False ):
91
+ async def pulse (self , payload : Payload | FilePayload | GeneratedPayload | EmbeddingPayload | JobParams = None , create_job = False , v = False ):
92
92
"""
93
93
Publishes data to the NATS server using the specified category and payload.
94
94
@@ -107,6 +107,11 @@ async def pulse(self, payload: Payload | FilePayload | GeneratedPayload | Embedd
107
107
bucket = await self .magnet .js .object_store (bucket_name )
108
108
await bucket .put (object_name , payload_data_bytes , meta = meta )
109
109
_f ('success' , f'uploaded to NATS object store in bucket { bucket_name } as { object_name } ' ) if v else None
110
+
111
+ if create_job :
112
+ job = Job ("process_document" , _hash )
113
+ await self .magnet .kv .put (key = job ._id , value = json .dumps (asdict (job )).encode ('utf-8' ))
114
+ _f ('info' , f'created job { job ._id } ' )
110
115
else :
111
116
try :
112
117
bytes_ = json .dumps (asdict (payload ), separators = (
@@ -210,9 +215,7 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000,
210
215
_f ('wait' , f'connecting to { self .magnet .config .host } ' )
211
216
try :
212
217
if obj :
213
- object_store = await self .magnet .js .object_store (self .magnet .config .os_name )
214
- self .object_store = object_store
215
- self .sub = await object_store .watch (include_history = False )
218
+ self .sub = await self .magnet .os .watch (include_history = False )
216
219
_f ('info' ,
217
220
f'subscribed to object store: { self .magnet .config .os_name } as { self .node } ' )
218
221
else :
@@ -228,9 +231,9 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000,
228
231
return _f ('fatal' , e )
229
232
230
233
async def download (self , obj : object = None ):
231
- if obj and self .object_store :
234
+ if obj and self .magnet . os :
232
235
buffer = io .BytesIO ()
233
- file = await self .object_store .get (obj .name , buffer )
236
+ file = await self .magnet . os .get (obj .name , buffer )
234
237
buffer .seek (0 )
235
238
chunk_size = 128 * 1024
236
239
with open (f"{ file .info .name } .{ file .info .headers ['ext' ]} " , 'wb' ) as fh :
@@ -263,10 +266,10 @@ async def deliver_messages(msgs):
263
266
try :
264
267
if type (self .sub ).__name__ == "ObjectWatcher" :
265
268
_f ("info" , f'consuming objects from [{ self .magnet .config .host .split ("@" )[1 ]} ] from\n 🛰️ bucket: { self .magnet .config .os_name } "' )
266
- msgs = await self .object_store .list ()
269
+ msgs = await self .magnet . os .list ()
267
270
for msg in msgs :
268
271
await self .download (msg )
269
- await cb (self .object_store , msg )
272
+ await cb (self .magnet . os , msg )
270
273
else :
271
274
_f ("info" , f'consuming { job_n } from [{ self .magnet .config .category } ] on\n 🛰️ stream: { self .magnet .config .stream_name } \n 🧲 session: "{ self .magnet .session } "' )
272
275
msgs = await self .sub .fetch (batch = job_n , timeout = 60 )
@@ -280,7 +283,7 @@ async def deliver_messages(msgs):
280
283
_f ("info" , f'consuming objects from [{ self .magnet .config .host .split ("@" )[1 ]} ] from\n 🛰️ bucket: { self .magnet .config .stream_name } "' )
281
284
e = await self .sub .updates ()
282
285
loop = asyncio .get_event_loop ()
283
- loop .create_task (cb (self .object_store , e ))
286
+ loop .create_task (cb (self .magnet . os , e ))
284
287
await asyncio .sleep (1 )
285
288
else :
286
289
while True :
@@ -317,16 +320,16 @@ async def worker(self, cb=print):
317
320
Exception: If there is an error in consuming the message or processing the callback function.
318
321
"""
319
322
_f ("info" ,
320
- f'processing jobs from [{ self .magnet .config .category } ] on\n 🛰️ stream : { self .magnet .config .stream_name } \n 🧲 session: " { self . magnet . session } " ' )
323
+ f'processing jobs from [{ self .magnet .config .kv_name } ] on\n 🛰️ object store : { self .magnet .config .os_name } ' )
321
324
try :
322
- msg = await self .sub . next_msg ( timeout = 60 )
323
- payload = JobParams ( ** json . loads ( msg . data ))
324
- try :
325
- await cb ( payload , msg )
326
- except Exception as e :
327
- _f ( "warn" , f'something wrong in your callback function! \n { e } ' )
325
+ keys = await self .magnet . kv . keys ( )
326
+ for key in keys :
327
+ _job = await self . magnet . kv . get ( key )
328
+ job = json . loads ( _job . value . decode ( 'utf-8' ) )
329
+ if not job [ '_isClaimed' ]:
330
+ await cb ( _job , job )
328
331
except Exception as e :
329
- _f ('fatal' , 'invalid JSON' )
332
+ _f ('fatal' , f 'invalid JSON\n { e } ' )
330
333
331
334
async def conduct (self , cb = print ):
332
335
pass
0 commit comments