Skip to content

Commit ef53748

Browse files
committed
🚧 fix(wip): field behavior is more performant
1 parent ab6caaf commit ef53748

File tree

2 files changed

+30
-28
lines changed

2 files changed

+30
-28
lines changed

magnet/ic/field.py

+2
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', ses
199199
name=self.session
200200
, deliver_group=self.session
201201
, durable_name=self.session
202+
, ack_wait=20
202203
)
203204
_f('wait',f'connecting to {self.server}')
204205
try:
@@ -209,6 +210,7 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', ses
209210
stream=self.stream
210211
, config=self.config
211212
, deliver_subject=self.session
213+
, max_ack_pending=1
212214
)
213215
except:
214216
_f('warn', f'consumer {self.session} exists, skipping create')

magnet/ize/memory.py

+28-28
Original file line numberDiff line numberDiff line change
@@ -59,35 +59,35 @@ async def index(self, payload, msg, verbose=False, field=None, charge=False, ins
5959
payload.embedding = self.model.encode(
6060
f"{instruction} {payload.text}", normalize_embeddings=True)
6161
except Exception as e:
62+
return _f('fatal', e)
63+
await msg.in_progress()
64+
try:
65+
_f('info', f'indexing payload') if verbose else None
66+
if not self.is_dupe(q=payload.embedding):
67+
self.db.collection.insert([
68+
[payload.document], [payload.text], [payload.embedding]
69+
])
70+
self.db.collection.flush(collection_name_array=[
71+
self.config['INDEX']])
72+
if charge:
73+
payload = EmbeddingPayload(
74+
model=self.config['MODEL'],
75+
embedding=self.model.encode(
76+
f"{instruction} {payload.text}", normalize_embeddings=True).tolist(),
77+
text=payload.text,
78+
document=payload.document
79+
)
80+
if field:
81+
_f('info', f'sending payload\n{payload}') if verbose else None
82+
await self.field.pulse(payload)
83+
await msg.ack_sync(timeout=15)
84+
else:
85+
await msg.ack_sync(timeout=15)
86+
_f('warn', f'embedding exists already\n{payload}') if verbose else None
87+
88+
except Exception as e:
89+
await msg.term()
6290
_f('fatal', e)
63-
else:
64-
await msg.in_progress()
65-
try:
66-
_f('info', f'indexing payload') if verbose else None
67-
if not self.is_dupe(q=payload.embedding):
68-
self.db.collection.insert([
69-
[payload.document], [payload.text], [payload.embedding]
70-
])
71-
self.db.collection.flush(collection_name_array=[
72-
self.config['INDEX']])
73-
if charge:
74-
payload = EmbeddingPayload(
75-
model=self.config['MODEL'],
76-
embedding=self.model.encode(
77-
f"{instruction} {payload.text}", normalize_embeddings=True).tolist(),
78-
text=payload.text,
79-
document=payload.document
80-
)
81-
if field:
82-
_f('info', f'sending payload\n{payload}') if verbose else None
83-
await self.field.pulse(payload)
84-
await msg.ack_sync(timeout=15)
85-
else:
86-
_f('warn', f'embedding exists already\n{payload.text}') if verbose else None
87-
await msg.ack_sync(timeout=15)
88-
except Exception as e:
89-
await msg.term()
90-
_f('fatal', e)
9191

9292
def search(self, payload, limit=100, cb=None, instruction="Represent this sentence for searching relevant passages: "):
9393
"""

0 commit comments

Comments
 (0)