Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithread open source tests #882

Draft
wants to merge 18 commits into
base: mainline
Choose a base branch
from
Draft
4 changes: 3 additions & 1 deletion .github/workflows/unit_test_200gb_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
export VESPA_CONFIG_URL=http://localhost:19071
export VESPA_DOCUMENT_URL=http://localhost:8080
export VESPA_QUERY_URL=http://localhost:8080
export ZOOKEEPER_HOSTS=http://localhost:2181

cd marqo/scripts/vespa_local
set -x
Expand Down Expand Up @@ -116,9 +117,10 @@ jobs:
export VESPA_CONFIG_URL=http://localhost:19071
export VESPA_DOCUMENT_URL=http://localhost:8080
export VESPA_QUERY_URL=http://localhost:8080
export ZOOKEEPER_HOSTS=http://localhost:2181

export PYTHONPATH="./marqo/tests:./marqo/src:./marqo"
pytest marqo/tests
pytest -n auto --dist loadfile marqo/tests

Stop-Runner:
name: Stop self-hosted EC2 runner
Expand Down
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ httpx==0.25.0
# test requirements
pyvespa==0.37.1
pytest==7.4.3
pytest-xdist==3.6.1
56 changes: 51 additions & 5 deletions tests/marqo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import unittest
import uuid
from unittest.mock import patch, Mock
import multiprocessing

import vespa.application as pyvespa

Expand All @@ -18,6 +19,8 @@

class MarqoTestCase(unittest.TestCase):
indexes = []
create_lock = multiprocessing.Lock()
delete_lock = multiprocessing.Lock()

@classmethod
def configure_request_metrics(cls):
Expand All @@ -32,8 +35,28 @@ def configure_request_metrics(cls):
@classmethod
def tearDownClass(cls):
cls.patcher.stop()

"""if cls.indexes:
with cls.delete_lock:
try:
cls.index_management.batch_delete_indexes(cls.indexes)
except Exception as e:
print(f"Error deleting indexes: {e}")"""
max_retries = 25
initial_wait_time = 120 # seconds
max_wait_time = 1000 # seconds
if cls.indexes:
cls.index_management.batch_delete_indexes(cls.indexes)
for attempt in range(max_retries):
try:
with cls.delete_lock:
cls.index_management.batch_delete_indexes(cls.indexes)
break
except Exception as e: # TODO: Change this exception to something more specific
if attempt < max_retries - 1:
wait_time = min(initial_wait_time * (2 ** attempt), max_wait_time)
time.sleep(wait_time)
else:
raise e

@classmethod
def setUpClass(cls) -> None:
Expand All @@ -54,13 +77,36 @@ def setUpClass(cls) -> None:

cls.pyvespa_client = pyvespa.Vespa(url="http://localhost", port=8080)
cls.CONTENT_CLUSTER = 'content_default'
#cls.indexes = []

@classmethod
def create_indexes(cls, index_requests: List[MarqoIndexRequest]) -> List[MarqoIndex]:
indexes = cls.index_management.batch_create_indexes(index_requests)
cls.indexes = indexes

return indexes
"""with cls.create_lock:
try:
indexes = cls.index_management.batch_create_indexes(index_requests)
cls.indexes = indexes
return indexes
except Exception as e:
print(f"Error creating indexes: {e}")
return []"""
max_retries = 25
initial_wait_time = 120 # seconds
max_wait_time = 1000 # seconds
for attempt in range(max_retries):
try:
with cls.create_lock:
indexes = cls.index_management.batch_create_indexes(index_requests)
cls.indexes = indexes
break
except Exception as e: # TODO: Change this exception to something more specific
if attempt < max_retries - 1:
wait_time = min(initial_wait_time * (2 ** attempt), max_wait_time)
time.sleep(wait_time)
else:
raise e
#indexes = cls.index_management.batch_create_indexes(index_requests)



def setUp(self) -> None:
self.clear_indexes(self.indexes)
Expand Down
Loading