Skip to content

Commit

Permalink
improve i/o performances
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Apr 27, 2024
1 parent da69fbe commit 140742d
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 56 deletions.
162 changes: 136 additions & 26 deletions src/OpenAgentsNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,51 @@
import json
import asyncio
import pickle
import queue
import concurrent

class BlobWriter :
def __init__(self,writeQueue,res ):
self.writeQueue = writeQueue
self.res = res

async def write(self, data):
self.writeQueue.put_nowait(data)

async def writeInt(self, data):
self.writeQueue.put_nowait(data.to_bytes(4, byteorder='big'))

async def end(self):
self.writeQueue.put_nowait(None)

async def close(self):
self.writeQueue.put_nowait(None)
res= await self.res
return res.success

class BlobReader:
def __init__(self, chunksQueue , req):
self.chunksQueue = chunksQueue
self.buffer = bytearray()
self.req = req


async def read(self, n = 1):
while len(self.buffer) < n:
v = await self.chunksQueue.get()
if v is None: break
self.buffer.extend(v)
result, self.buffer = self.buffer[:n], self.buffer[n:]
return result

async def readInt(self):
return int.from_bytes(await self.read(4), byteorder='big')


async def close(self):
self.chunksQueue.task_done()
return await self.req


class BlobStorage:
def __init__(self, id, url, node):
Expand All @@ -27,7 +72,7 @@ async def delete(self, path):

async def writeBytes(self, path, dataBytes):
client = self.node.getClient()
CHUNK_SIZE = 1024*1024*100
CHUNK_SIZE = 1024*1024*15
def write_data():
for j in range(0, len(dataBytes), CHUNK_SIZE):
chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])
Expand All @@ -36,6 +81,39 @@ def write_data():
res=await client.diskWriteFile(write_data())
return res.success


async def openWriteStream(self, path):
client = self.node.getClient()
writeQueue = asyncio.Queue()
CHUNK_SIZE = 1024*1024*15


async def write_data():
while True:
dataBytes = await writeQueue.get()
if dataBytes is None: # End of stream
break
for j in range(0, len(dataBytes), CHUNK_SIZE):
chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])
request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
yield request
writeQueue.task_done()

res=client.diskWriteFile(write_data())

return BlobWriter(writeQueue, res)


async def openReadStream(self, path):
client = self.node.getClient()
readQueue = asyncio.Queue()

async def read_data():
async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
readQueue.put_nowait(chunk.data)
r = asyncio.create_task(read_data())
return BlobReader(readQueue, r)

async def readBytes(self, path):
client = self.node.getClient()
bytesOut = bytearray()
Expand Down Expand Up @@ -69,6 +147,7 @@ class JobRunner:
_meta = None
_sockets = None
_nextAnnouncementTimestamp = 0
cachePath = None
def __init__(self, filters, meta, template, sockets):
self._filters = filters

Expand All @@ -84,40 +163,64 @@ def __init__(self, filters, meta, template, sockets):
self._sockets = json.dumps(sockets)
else:
self._sockets = sockets

self.cachePath = os.getenv('CACHE_PATH', os.path.join(os.path.dirname(__file__), "cache"))
if not os.path.exists(self.cachePath):
os.makedirs(self.cachePath)


async def cacheSet(self, path, value, version=0, expireAt=0):
async def cacheSet(self, path, value, version=0, expireAt=0, local=False):
try:
dataBytes = pickle.dumps(value)
client = self._node.getClient()
CHUNK_SIZE = 1024*1024*100
def write_data():
for j in range(0, len(dataBytes), CHUNK_SIZE):
chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])
request = rpc_pb2.RpcCacheSetRequest(
key=path,
data=chunk,
expireAt=expireAt,
version=version
)
yield request
res=await client.cacheSet(write_data())
return res.success
if local:
fullPath = os.path.join(self.cachePath, path)
with open(fullPath, "wb") as f:
f.write(dataBytes)
with open(fullPath+".meta.json", "w") as f:
f.write(json.dumps({"version":version, "expireAt":expireAt}))
else:
client = self._node.getClient()
CHUNK_SIZE = 1024*1024*15
def write_data():
for j in range(0, len(dataBytes), CHUNK_SIZE):
chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])
request = rpc_pb2.RpcCacheSetRequest(
key=path,
data=chunk,
expireAt=expireAt,
version=version
)
yield request
res=await client.cacheSet(write_data())
return res.success
except Exception as e:
print("Error setting cache "+str(e))
return False


async def cacheGet(self, path, lastVersion = 0):
async def cacheGet(self, path, lastVersion = 0, local=False):
try:
client = self._node.getClient()
bytesOut = bytearray()
stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=path, lastVersion = lastVersion))
async for chunk in stream:
if not chunk.exists:
if local:
fullPath = os.path.join(self.cachePath, path)
if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
return None
with open(fullPath+".meta.json", "r") as f:
meta = json.loads(f.read())
if lastVersion > 0 and meta["version"] != lastVersion:
return None
bytesOut.extend(chunk.data)
return pickle.loads(bytesOut)
if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
return None
with open(fullPath, "rb") as f:
return pickle.load(f)
else:
client = self._node.getClient()
bytesOut = bytearray()
stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=path, lastVersion = lastVersion))
async for chunk in stream:
if not chunk.exists:
return None
bytesOut.extend(chunk.data)
return pickle.loads(bytesOut)
except Exception as e:
print("Error getting cache "+str(e))
return None
Expand Down Expand Up @@ -223,10 +326,17 @@ def getClient(self):
except Exception as e:
print("Error closing channel "+str(e))
print("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl))

options=[
# 20 MB
('grpc.max_send_message_length', 1024*1024*20),
('grpc.max_receive_message_length', 1024*1024*20)
]

if self.poolSsl:
self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials())
self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials(),options)
else:
self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort))
self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort),options)
self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel)
return self.rpcClient

Expand Down
79 changes: 49 additions & 30 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
from openai import OpenAI
import nlpcloud
import numpy as np

import asyncio
import concurrent
class Runner (JobRunner):
openai = None
nlpcloud = None

def __init__(self, filters, meta, template, sockets):
super().__init__(filters, meta, template, sockets)
self.device = int(os.getenv('TRANSFORMERS_DEVICE', "-1"))
self.cachePath = os.getenv('CACHE_PATH', os.path.join(os.path.dirname(__file__), "cache"))
now = time.time()
self.modelName = os.getenv("EMBEDDING_MODEL") or os.getenv('MODEL', "intfloat/multilingual-e5-base")
self.maxTextLength = os.getenv('MAX_TEXT_LENGTH', 512)
Expand All @@ -40,8 +40,7 @@ def __init__(self, filters, meta, template, sockets):
self.pipe = SentenceTransformer( self.modelName, device=self.device if self.device >= 0 else "cpu")
self.log( "Model loaded in "+str(time.time()-now)+" seconds")
self.addMarkersToSentences = os.getenv('ADD_MARKERS_TO_SENTENCES', "true") == "true"
if not os.path.exists(self.cachePath):
os.makedirs(self.cachePath)




Expand All @@ -64,7 +63,7 @@ async def encode(self, sentences):

for s in sentences:
hash = hashlib.sha256((self.modelName+":"+s).encode()).hexdigest()
cached = await self.cacheGet(hash)
cached = await self.cacheGet(hash, local=True)
if cached is not None:
out.append(cached)
else:
Expand All @@ -80,7 +79,7 @@ async def encode(self, sentences):
embeddings = encodedRaw[i]
encoded.append([np.array(embeddings),to_encode[i][1],to_encode[i][2]])
elif self.openai:
CHUNK_SIZE = 32
CHUNK_SIZE = 1024
encoded = []

for i in range(0, len(to_encode), CHUNK_SIZE):
Expand All @@ -93,14 +92,7 @@ async def encode(self, sentences):
for j in range(len(chunk)):
embeddings = encodedRaw.data[j].embedding
encoded.append([np.array(embeddings), chunk[j][1], chunk[j][2]])
# encodedRaw=self.openai.embeddings.create(
# input=[x[0] for x in to_encode],
# model=self.openaiModelName
# )
# encoded = []
# for i in range(len(to_encode)):
# embeddings = encodedRaw.data[i].embedding
# encoded.append([np.array(embeddings),to_encode[i][1],to_encode[i][2]])

# TODO: more apis?
else: # Use local models
encodedRaw = self.pipe.encode([x[0] for x in to_encode], show_progress_bar=True)
Expand All @@ -109,13 +101,14 @@ async def encode(self, sentences):
embeddings = encodedRaw[i]
encoded.append([embeddings,to_encode[i][1],to_encode[i][2]])

waitList = []
for i in range(len(encoded)):
embeddings = encoded[i][0]
hash = encoded[i][1]
index = encoded[i][2]
out[index] = embeddings
await self.cacheSet(hash, embeddings)

waitList.append(self.cacheSet(hash, embeddings, local=True))
await asyncio.gather(*waitList)
return out

def quantize(self, embeddings):
Expand Down Expand Up @@ -172,9 +165,8 @@ def getParamValue(key,default=None):
(str( self.modelName) + str(outputFormat)
+ str(max_tokens) + str(overlap) + str(quantize)
+ "".join([sentences[i][0] + ":" + sentences[i][1] for i in range(len(sentences))])).encode("utf-8")).hexdigest()
cached = await self.cacheGet(cacheId)
if cached is not None:
self.log("Cache hit")
cached = await self.cacheGet(cacheId,local=True)
if cached is not None:
return cached

# Split long sentences
Expand All @@ -196,22 +188,49 @@ def getParamValue(key,default=None):
self.log("Embeddings ready. Serialize for output...")
output = ""
if outputFormat=="application/hyperdrive+bundle":
blobDisk = await self.createStorage()
blobDisk = await self.createStorage()

sentencesOut = await blobDisk.openWriteStream("sentences.bin")
await sentencesOut.writeInt(len(sentences))

for i in range(len(sentences)):
dtype = embeddings[i].dtype
print("Write sentence",str(i))
sentence = sentences[i][0].encode("utf-8")
await sentencesOut.writeInt(len(sentence))
await sentencesOut.write(sentence)

embeddingsOut = await blobDisk.openWriteStream("embeddings.bin")
await embeddingsOut.writeInt(len(embeddings))
for i in range(len(embeddings)):
print("Write embeddings",str(i))

shape = embeddings[i].shape
sentences_bytes = sentences[i][0].encode("utf-8")
marker = sentences[i][1]
embeddings_bytes = embeddings[i].tobytes()
await blobDisk.writeBytes(str(i)+".embeddings.dtype", str(dtype).encode("utf-8"))
await blobDisk.writeBytes(str(i)+".embeddings.shape", json.dumps(shape).encode("utf-8"))
await blobDisk.writeBytes(str(i)+".embeddings", sentences_bytes)
await blobDisk.writeBytes(str(i)+".embeddings.kind", marker.encode("utf-8"))
await blobDisk.writeBytes(str(i)+".embeddings.vectors", embeddings_bytes)
await embeddingsOut.writeInt(len(shape))
for s in shape:
await embeddingsOut.writeInt(s)

dtype = str(embeddings[i].dtype).encode()
await embeddingsOut.writeInt(len(dtype))
await embeddingsOut.write(dtype)

bs = embeddings[i].tobytes()
await embeddingsOut.writeInt(len(bs))
await embeddingsOut.write(bs)

await embeddingsOut.end()
await sentencesOut.end()

await embeddingsOut.close()
await sentencesOut.close()


output = blobDisk.getUrl()
await blobDisk.close()



else:

jsonOut = []
for i in range(len(sentences)):
dtype = embeddings[i].dtype
Expand All @@ -223,7 +242,7 @@ def getParamValue(key,default=None):
)
output=json.dumps(jsonOut)

await self.cacheSet(cacheId, output)
await self.cacheSet(cacheId, output,local=True)
return output

node = OpenAgentsNode(NodeConfig.meta)
Expand Down

0 comments on commit 140742d

Please sign in to comment.