Skip to content

Commit 2a0a8c9

Browse files
committed
Refactor locust-pinecone -> vsb
1 parent ed9e15c commit 2a0a8c9

File tree

9 files changed

+268
-0
lines changed

9 files changed

+268
-0
lines changed

vsb/databases/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from enum import Enum
2+
from .base import DB
3+
4+
5+
class Database(Enum):
6+
"""Set of supported database backends, the value is the string used to
7+
specify via --database="""
8+
Pinecone = "pinecone"
9+
PGVector = "pgvector"
10+
11+
def build(self) -> DB:
12+
"""Construct an instance of """
13+
match self:
14+
case Database.Pinecone:
15+
from .pinecone.pinecone import PineconeDB
16+
return PineconeDB()
17+
case Database.PGVector:
18+
from .pgvector.pgvector import PGVectorDB
19+
return PGVectorDB()

vsb/databases/base.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from abc import ABC, abstractmethod
2+
from enum import Enum, auto
3+
4+
5+
class Request(Enum):
6+
Upsert = auto()
7+
Search = auto()
8+
9+
10+
class Index(ABC):
11+
"""Abstract class with represents an index or one or more vector records.
12+
Specific implementations should subclass this and implement all abstract
13+
methods.
14+
Instance of this (derived) class are typically created via the corresponding
15+
(concrete) DB create_index() method.
16+
"""
17+
18+
@abstractmethod
19+
def upsert(self, ident, vector, metadata):
20+
raise NotImplementedError
21+
22+
@abstractmethod
23+
def search(self, query_vector):
24+
raise NotImplementedError
25+
26+
def do_request(self, request):
27+
print(f"Got request: {request}")
28+
match request.operation:
29+
case Request.Upsert:
30+
self.upsert(request.id, request.vector, request.metadata)
31+
return
32+
case Request.Search:
33+
response = self.search(request.q_vector)
34+
# Record timing, calculate Recall etc.
35+
36+
37+
class DB(ABC):
38+
"""Abstract class which represents a database which can store vector
39+
records in one or more Indexes. Specific Vector DB implementations should
40+
subclass this and implement all abstract methods.
41+
"""
42+
43+
@abstractmethod
44+
def create_index(self, tenant: str) -> Index:
45+
raise NotImplementedError

vsb/databases/pgvector/pgvector.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from ..vectordb import VectorDB
2+
3+
4+
class PGVectorDB(VectorDB):
5+
def __init__(self):
6+
print("PGVectorDB::__init__")
7+
8+
def upsert(self, ident, vector, metadata):
9+
pass
10+
11+
def search(self, query_vector):
12+
pass

vsb/databases/pinecone/pinecone.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from ..base import DB, Index
2+
3+
4+
class PineconeIndex(Index):
5+
def __init__(self, tenent: str):
6+
pass
7+
8+
def upsert(self, ident, vector, metadata):
9+
pass
10+
11+
def search(self, query_vector):
12+
pass
13+
14+
15+
class PineconeDB(DB):
16+
def __init__(self):
17+
print("PineconeDB::__init__")
18+
19+
def create_index(self, tenant: str) -> Index:
20+
return PineconeIndex()
21+

vsb/vsb.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#!/usr/bin/env python3
2+
3+
from enum import Enum, auto
4+
from locust import User, events, task, TaskSet
5+
from locust.env import Environment
6+
from locust.exception import StopUser
7+
from locust.log import setup_logging
8+
from locust.runners import WorkerRunner
9+
from locust.stats import stats_history, stats_printer
10+
11+
from databases import Database
12+
from workloads import Workload
13+
import argparse
14+
import gevent
15+
import logging
16+
import sys
17+
18+
19+
setup_logging("INFO")
20+
21+
22+
class VectorSearchUser(User):
23+
class Phase(Enum):
24+
Load = auto()
25+
Run = auto()
26+
27+
"""Represents a single user (aka client) performing requests against
28+
a particular Backend."""
29+
def __init__(self, environment):
30+
super().__init__(environment)
31+
self.count = 0
32+
self.database = Database(environment.options.database).build()
33+
self.workload = Workload(environment.options.workload).build()
34+
self.phase = VectorSearchUser.Phase.Load
35+
36+
@task
37+
def request(self):
38+
match self.phase:
39+
case VectorSearchUser.Phase.Load:
40+
self.do_load()
41+
case VectorSearchUser.Phase.Run:
42+
self.do_run()
43+
44+
def do_load(self):
45+
if batch := self.workload.next_record_batch():
46+
print(f"Batch: {batch}")
47+
print(f"Loading batch of size:", len(batch))
48+
self.database.upsert(batch)
49+
else:
50+
# No more data to load, advance to Run phase.
51+
self.phase = VectorSearchUser.Phase.Run
52+
53+
def do_run(self):
54+
if self.workload.execute_next_request(self.workload):
55+
print(f"Issue request {self.count}: request")
56+
else:
57+
runner = self.environment.runner
58+
logging.info(f"User count: {runner.user_count}")
59+
if runner.user_count == 1:
60+
logging.info("Last user stopped, quitting runner")
61+
if isinstance(runner, WorkerRunner):
62+
runner._send_stats() # send a final report
63+
# need to trigger this in a separate greenlet, in case test_stop handlers do something async
64+
gevent.spawn_later(0.1, runner.quit)
65+
raise StopUser()
66+
67+
68+
69+
def main():
70+
parser = argparse.ArgumentParser(
71+
prog='VCB',
72+
description='Vector Search Bench')
73+
parser.add_argument("--database", required=True,
74+
choices=tuple(e.value for e in Database))
75+
parser.add_argument("--workload", required=True,
76+
choices=tuple(e.value for e in Workload))
77+
78+
options = parser.parse_args()
79+
80+
# setup Environment and Runner
81+
env = Environment(user_classes=[VectorSearchUser], events=events)
82+
env.options = options
83+
84+
runner = env.create_local_runner()
85+
86+
# start a WebUI instance
87+
#web_ui = env.create_web_ui("127.0.0.1", 8089)
88+
89+
# execute init event handlers (only really needed if you have registered any)
90+
env.events.init.fire(environment=env, runner=runner)# , web_ui=web_ui)
91+
92+
# start a greenlet that periodically outputs the current stats
93+
gevent.spawn(stats_printer(env.stats))
94+
95+
# start a greenlet that save current stats to history
96+
gevent.spawn(stats_history, env.runner)
97+
98+
# start the test
99+
runner.start(1, spawn_rate=10)
100+
101+
# in 30 seconds stop the runner
102+
gevent.spawn_later(30, runner.quit)
103+
104+
# wait for the greenlets
105+
runner.greenlet.join()
106+
107+
# stop the web server for good measures
108+
# web_ui.stop()
109+
110+
111+
if __name__ == "__main__":
112+
main()

vsb/workloads/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from enum import Enum
2+
from .base import VectorWorkload
3+
4+
5+
class Workload(Enum):
6+
"""Set of supported workloads, the value is the string used to
7+
specify via --benchmark=
8+
"""
9+
10+
MNIST = "mnist"
11+
12+
def build(self) -> VectorWorkload:
13+
"""Construct an instance of Benchmark based on the value of the enum"""
14+
match self:
15+
case Workload.MNIST:
16+
from .mnist.mnist import MNIST
17+
return MNIST()

vsb/workloads/base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from abc import ABC, abstractmethod
2+
3+
4+
class VectorWorkload(ABC):
5+
@abstractmethod
6+
def next_record_batch(self):
7+
"""
8+
For initial dataset ingest, returns the next
9+
:return:
10+
"""
11+
raise NotImplementedError
12+
13+
@abstractmethod
14+
def execute_next_request(self, db: 'DB') -> bool:
15+
"""Obtain the next request for this workload and execute against the given
16+
database. Returns true if execution should continue after this request,
17+
or false if the workload is complete.
18+
"""
19+
raise NotImplementedError
File renamed without changes.

vsb/workloads/mnist/mnist.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from ..base import VectorWorkload
2+
from ..dataset import Dataset
3+
4+
5+
class MNIST(VectorWorkload):
6+
def __init__(self):
7+
print("MNIST::__init__")
8+
self.dataset = Dataset(name="mnist")
9+
self.dataset.load_documents()
10+
print(self.dataset.documents)
11+
self.records = Dataset.split_dataframe(self.dataset.documents, 100)
12+
print(self.records)
13+
self.operation_count = 0
14+
self.operation_limit = 10
15+
16+
def next_record_batch(self):
17+
print("MNIST::next_record_batch")
18+
19+
def execute_next_request(self, db) ->bool:
20+
print("MNIST::execute_next_request")
21+
self.operation_count += 1
22+
return self.operation_count < self.operation_limit
23+

0 commit comments

Comments
 (0)