From 140742de92fc5c1de6cc940273404eb3330dda08 Mon Sep 17 00:00:00 2001 From: Riccardo Balbo Date: Sat, 27 Apr 2024 19:55:04 +0200 Subject: [PATCH] improve i/o performances --- src/OpenAgentsNode.py | 162 +++++++++++++++++++++++++++++++++++------- src/main.py | 79 ++++++++++++-------- 2 files changed, 185 insertions(+), 56 deletions(-) diff --git a/src/OpenAgentsNode.py b/src/OpenAgentsNode.py index f5ed69e..a92215d 100644 --- a/src/OpenAgentsNode.py +++ b/src/OpenAgentsNode.py @@ -7,6 +7,51 @@ import json import asyncio import pickle +import queue +import concurrent + +class BlobWriter : + def __init__(self,writeQueue,res ): + self.writeQueue = writeQueue + self.res = res + + async def write(self, data): + self.writeQueue.put_nowait(data) + + async def writeInt(self, data): + self.writeQueue.put_nowait(data.to_bytes(4, byteorder='big')) + + async def end(self): + self.writeQueue.put_nowait(None) + + async def close(self): + self.writeQueue.put_nowait(None) + res= await self.res + return res.success + +class BlobReader: + def __init__(self, chunksQueue , req): + self.chunksQueue = chunksQueue + self.buffer = bytearray() + self.req = req + + + async def read(self, n = 1): + while len(self.buffer) < n: + v = await self.chunksQueue.get() + if v is None: break + self.buffer.extend(v) + result, self.buffer = self.buffer[:n], self.buffer[n:] + return result + + async def readInt(self): + return int.from_bytes(await self.read(4), byteorder='big') + + + async def close(self): + self.chunksQueue.task_done() + return await self.req + class BlobStorage: def __init__(self, id, url, node): @@ -27,7 +72,7 @@ async def delete(self, path): async def writeBytes(self, path, dataBytes): client = self.node.getClient() - CHUNK_SIZE = 1024*1024*100 + CHUNK_SIZE = 1024*1024*15 def write_data(): for j in range(0, len(dataBytes), CHUNK_SIZE): chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) @@ -36,6 +81,39 @@ def write_data(): res=await client.diskWriteFile(write_data()) return res.success + + async def openWriteStream(self, path): + client = self.node.getClient() + writeQueue = asyncio.Queue() + CHUNK_SIZE = 1024*1024*15 + + + async def write_data(): + while True: + dataBytes = await writeQueue.get() + if dataBytes is None: # End of stream + break + for j in range(0, len(dataBytes), CHUNK_SIZE): + chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) + request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk) + yield request + writeQueue.task_done() + + res=client.diskWriteFile(write_data()) + + return BlobWriter(writeQueue, res) + + + async def openReadStream(self, path): + client = self.node.getClient() + readQueue = asyncio.Queue() + + async def read_data(): + async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)): + readQueue.put_nowait(chunk.data) + r = asyncio.create_task(read_data()) + return BlobReader(readQueue, r) + async def readBytes(self, path): client = self.node.getClient() bytesOut = bytearray() @@ -69,6 +147,7 @@ class JobRunner: _meta = None _sockets = None _nextAnnouncementTimestamp = 0 + cachePath = None def __init__(self, filters, meta, template, sockets): self._filters = filters @@ -84,40 +163,64 @@ def __init__(self, filters, meta, template, sockets): self._sockets = json.dumps(sockets) else: self._sockets = sockets + + self.cachePath = os.getenv('CACHE_PATH', os.path.join(os.path.dirname(__file__), "cache")) + if not os.path.exists(self.cachePath): + os.makedirs(self.cachePath) - async def cacheSet(self, path, value, version=0, expireAt=0): + async def cacheSet(self, path, value, version=0, expireAt=0, local=False): try: dataBytes = pickle.dumps(value) - client = self._node.getClient() - CHUNK_SIZE = 1024*1024*100 - def write_data(): - for j in range(0, len(dataBytes), CHUNK_SIZE): - chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) - request = rpc_pb2.RpcCacheSetRequest( - key=path, - data=chunk, - expireAt=expireAt, - version=version - ) - yield request - res=await client.cacheSet(write_data()) - return res.success + if local: + fullPath = os.path.join(self.cachePath, path) + with open(fullPath, "wb") as f: + f.write(dataBytes) + with open(fullPath+".meta.json", "w") as f: + f.write(json.dumps({"version":version, "expireAt":expireAt})) + else: + client = self._node.getClient() + CHUNK_SIZE = 1024*1024*15 + def write_data(): + for j in range(0, len(dataBytes), CHUNK_SIZE): + chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) + request = rpc_pb2.RpcCacheSetRequest( + key=path, + data=chunk, + expireAt=expireAt, + version=version + ) + yield request + res=await client.cacheSet(write_data()) + return res.success except Exception as e: print("Error setting cache "+str(e)) return False - async def cacheGet(self, path, lastVersion = 0): + async def cacheGet(self, path, lastVersion = 0, local=False): try: - client = self._node.getClient() - bytesOut = bytearray() - stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=path, lastVersion = lastVersion)) - async for chunk in stream: - if not chunk.exists: + if local: + fullPath = os.path.join(self.cachePath, path) + if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): + return None + with open(fullPath+".meta.json", "r") as f: + meta = json.loads(f.read()) + if lastVersion > 0 and meta["version"] != lastVersion: return None - bytesOut.extend(chunk.data) - return pickle.loads(bytesOut) + if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: + return None + with open(fullPath, "rb") as f: + return pickle.load(f) + else: + client = self._node.getClient() + bytesOut = bytearray() + stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=path, lastVersion = lastVersion)) + async for chunk in stream: + if not chunk.exists: + return None + bytesOut.extend(chunk.data) + return pickle.loads(bytesOut) except Exception as e: print("Error getting cache "+str(e)) return None @@ -223,10 +326,17 @@ def getClient(self): except Exception as e: print("Error closing channel "+str(e)) print("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl)) + + options=[ + # 20 MB + ('grpc.max_send_message_length', 1024*1024*20), + ('grpc.max_receive_message_length', 1024*1024*20) + ] + if self.poolSsl: - self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials()) + self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials(),options) else: - self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort)) + self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort),options) self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel) return self.rpcClient diff --git a/src/main.py b/src/main.py index a3a874c..fae9274 100644 --- a/src/main.py +++ b/src/main.py @@ -17,7 +17,8 @@ from openai import OpenAI import nlpcloud import numpy as np - +import asyncio +import concurrent class Runner (JobRunner): openai = None nlpcloud = None @@ -25,7 +26,6 @@ class Runner (JobRunner): def __init__(self, filters, meta, template, sockets): super().__init__(filters, meta, template, sockets) self.device = int(os.getenv('TRANSFORMERS_DEVICE', "-1")) - self.cachePath = os.getenv('CACHE_PATH', os.path.join(os.path.dirname(__file__), "cache")) now = time.time() self.modelName = os.getenv("EMBEDDING_MODEL") or os.getenv('MODEL', "intfloat/multilingual-e5-base") self.maxTextLength = os.getenv('MAX_TEXT_LENGTH', 512) @@ -40,8 +40,7 @@ def __init__(self, filters, meta, template, sockets): self.pipe = SentenceTransformer( self.modelName, device=self.device if self.device >= 0 else "cpu") self.log( "Model loaded in "+str(time.time()-now)+" seconds") self.addMarkersToSentences = os.getenv('ADD_MARKERS_TO_SENTENCES', "true") == "true" - if not os.path.exists(self.cachePath): - os.makedirs(self.cachePath) + @@ -64,7 +63,7 @@ async def encode(self, sentences): for s in sentences: hash = hashlib.sha256((self.modelName+":"+s).encode()).hexdigest() - cached = await self.cacheGet(hash) + cached = await self.cacheGet(hash, local=True) if cached is not None: out.append(cached) else: @@ -80,7 +79,7 @@ async def encode(self, sentences): embeddings = encodedRaw[i] encoded.append([np.array(embeddings),to_encode[i][1],to_encode[i][2]]) elif self.openai: - CHUNK_SIZE = 32 + CHUNK_SIZE = 1024 encoded = [] for i in range(0, len(to_encode), CHUNK_SIZE): @@ -93,14 +92,7 @@ async def encode(self, sentences): for j in range(len(chunk)): embeddings = encodedRaw.data[j].embedding encoded.append([np.array(embeddings), chunk[j][1], chunk[j][2]]) - # encodedRaw=self.openai.embeddings.create( - # input=[x[0] for x in to_encode], - # model=self.openaiModelName - # ) - # encoded = [] - # for i in range(len(to_encode)): - # embeddings = encodedRaw.data[i].embedding - # encoded.append([np.array(embeddings),to_encode[i][1],to_encode[i][2]]) + # TODO: more apis? else: # Use local models encodedRaw = self.pipe.encode([x[0] for x in to_encode], show_progress_bar=True) @@ -109,13 +101,14 @@ async def encode(self, sentences): embeddings = encodedRaw[i] encoded.append([embeddings,to_encode[i][1],to_encode[i][2]]) + waitList = [] for i in range(len(encoded)): embeddings = encoded[i][0] hash = encoded[i][1] index = encoded[i][2] out[index] = embeddings - await self.cacheSet(hash, embeddings) - + waitList.append(self.cacheSet(hash, embeddings, local=True)) + await asyncio.gather(*waitList) return out def quantize(self, embeddings): @@ -172,9 +165,8 @@ def getParamValue(key,default=None): (str( self.modelName) + str(outputFormat) + str(max_tokens) + str(overlap) + str(quantize) + "".join([sentences[i][0] + ":" + sentences[i][1] for i in range(len(sentences))])).encode("utf-8")).hexdigest() - cached = await self.cacheGet(cacheId) - if cached is not None: - self.log("Cache hit") + cached = await self.cacheGet(cacheId,local=True) + if cached is not None: return cached # Split long sentences @@ -196,22 +188,49 @@ def getParamValue(key,default=None): self.log("Embeddings ready. Serialize for output...") output = "" if outputFormat=="application/hyperdrive+bundle": - blobDisk = await self.createStorage() + blobDisk = await self.createStorage() + + sentencesOut = await blobDisk.openWriteStream("sentences.bin") + await sentencesOut.writeInt(len(sentences)) + for i in range(len(sentences)): - dtype = embeddings[i].dtype + print("Write sentence",str(i)) + sentence = sentences[i][0].encode("utf-8") + await sentencesOut.writeInt(len(sentence)) + await sentencesOut.write(sentence) + + embeddingsOut = await blobDisk.openWriteStream("embeddings.bin") + await embeddingsOut.writeInt(len(embeddings)) + for i in range(len(embeddings)): + print("Write embeddings",str(i)) + shape = embeddings[i].shape - sentences_bytes = sentences[i][0].encode("utf-8") - marker = sentences[i][1] - embeddings_bytes = embeddings[i].tobytes() - await blobDisk.writeBytes(str(i)+".embeddings.dtype", str(dtype).encode("utf-8")) - await blobDisk.writeBytes(str(i)+".embeddings.shape", json.dumps(shape).encode("utf-8")) - await blobDisk.writeBytes(str(i)+".embeddings", sentences_bytes) - await blobDisk.writeBytes(str(i)+".embeddings.kind", marker.encode("utf-8")) - await blobDisk.writeBytes(str(i)+".embeddings.vectors", embeddings_bytes) + await embeddingsOut.writeInt(len(shape)) + for s in shape: + await embeddingsOut.writeInt(s) + + dtype = str(embeddings[i].dtype).encode() + await embeddingsOut.writeInt(len(dtype)) + await embeddingsOut.write(dtype) + + bs = embeddings[i].tobytes() + await embeddingsOut.writeInt(len(bs)) + await embeddingsOut.write(bs) + + await embeddingsOut.end() + await sentencesOut.end() + + await embeddingsOut.close() + await sentencesOut.close() + + output = blobDisk.getUrl() await blobDisk.close() + + else: + jsonOut = [] for i in range(len(sentences)): dtype = embeddings[i].dtype @@ -223,7 +242,7 @@ def getParamValue(key,default=None): ) output=json.dumps(jsonOut) - await self.cacheSet(cacheId, output) + await self.cacheSet(cacheId, output,local=True) return output node = OpenAgentsNode(NodeConfig.meta)