Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ rerouted update projects networks via dynamic-scheduler #6945

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ async def director_v2_service_mock(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/computations/.*:stop$"
)
delete_computation_pattern = get_computation_pattern
projects_networks_pattern = re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/dynamic_services/projects/.*/-/networks$"
)

get_services_pattern = re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/dynamic_services.*$"
Expand Down Expand Up @@ -202,7 +199,6 @@ async def director_v2_service_mock(
repeat=True,
)
aioresponses_mocker.delete(delete_computation_pattern, status=204, repeat=True)
aioresponses_mocker.patch(projects_networks_pattern, status=204, repeat=True)

return aioresponses_mocker

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,16 @@ async def stop_dynamic_service(
timeout_s=timeout_s,
)
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def update_projects_networks(
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("update_projects_networks"),
project_id=project_id,
timeout_s=_RPC_DEFAULT_TIMEOUT_S,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ async def stop_dynamic_service(
return await scheduler_interface.stop_dynamic_service(
app, dynamic_service_stop=dynamic_service_stop
)


@router.expose()
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
await scheduler_interface.update_projects_networks(app, project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ async def list_tracked_dynamic_services(
)
return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json())

async def update_projects_networks(self, *, project_id: ProjectID) -> None:
await self.thin_client.patch_projects_networks(project_id=project_id)


def setup_director_v2(app: FastAPI) -> None:
public_client = DirectorV2Client(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@ async def get_dynamic_services(
"/dynamic_services",
params=as_dict_exclude_unset(user_id=user_id, project_id=project_id),
)

@retry_on_errors()
@expect_status(status.HTTP_204_NO_CONTENT)
async def patch_projects_networks(self, *, project_id: ProjectID) -> Response:
return await self.client.patch(
f"/dynamic_services/projects/{project_id}/-/networks"
)
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,12 @@ async def stop_dynamic_service(
)

await set_request_as_stopped(app, dynamic_service_stop)


async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
await director_v2_client.update_projects_networks(project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,24 @@ async def test_stop_dynamic_service_serializes_generic_errors(
),
timeout_s=5,
)


@pytest.fixture
def mock_director_v2_update_projects_networks(project_id: ProjectID) -> Iterator[None]:
with respx.mock(
base_url="http://director-v2:8000/v2",
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
mock.patch(f"/dynamic_services/projects/{project_id}/-/networks").respond(
status.HTTP_204_NO_CONTENT
)
yield None


async def test_update_projects_networks(
mock_director_v2_update_projects_networks: None,
rpc_client: RabbitMQRPCClient,
project_id: ProjectID,
):
await services.update_projects_networks(rpc_client, project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,6 @@ async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None:
)


@log_decorator(logger=_log)
async def update_dynamic_service_networks_in_project(
app: web.Application, project_id: ProjectID
) -> None:
settings: DirectorV2Settings = get_plugin_settings(app)
backend_url = (
URL(settings.base_url) / f"dynamic_services/projects/{project_id}/-/networks"
)
await request_director_v2(
app, "PATCH", backend_url, expected_status=web.HTTPNoContent
)


@log_decorator(logger=_log)
async def get_project_inactivity(
app: web.Application,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
request_retrieve_dyn_service,
restart_dynamic_service,
retrieve,
update_dynamic_service_networks_in_project,
)
from ._core_utils import is_healthy
from .exceptions import DirectorServiceError
Expand All @@ -43,6 +42,5 @@
"retrieve",
"set_project_run_policy",
"stop_pipeline",
"update_dynamic_service_networks_in_project",
)
# nopycln: file
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,11 @@ async def stop_dynamic_services_in_project(
]

await logged_gather(*services_to_stop)


async def update_projects_networks(
app: web.Application, *, project_id: ProjectID
) -> None:
await services.update_projects_networks(
get_rabbitmq_rpc_client(app), project_id=project_id
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

from ..application_settings import get_application_settings
from ..catalog import client as catalog_client
from ..director_v2 import api
from ..director_v2 import api as director_v2_api
from ..dynamic_scheduler import api as dynamic_scheduler_api
from ..folders import _folders_db as folders_db
from ..storage.api import (
copy_data_folders_from_project,
Expand Down Expand Up @@ -376,13 +377,13 @@ async def create_project( # pylint: disable=too-many-arguments,too-many-branche
await db.set_hidden_flag(new_project["uuid"], hidden=False)

# update the network information in director-v2
await api.update_dynamic_service_networks_in_project(
request.app, ProjectID(new_project["uuid"])
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=ProjectID(new_project["uuid"])
)
task_progress.update()

# This is a new project and every new graph needs to be reflected in the pipeline tables
await api.create_or_update_pipeline(
await director_v2_api.create_or_update_pipeline(
request.app, user_id, new_project["uuid"], product_name
)
# get the latest state of the project (lastChangeDate for instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,8 @@ async def add_project_node(
await director_v2_api.create_or_update_pipeline(
request.app, user_id, project["uuid"], product_name
)
await director_v2_api.update_dynamic_service_networks_in_project(
request.app, project["uuid"]
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=ProjectID(project["uuid"])
)

if _is_node_dynamic(service_key):
Expand Down Expand Up @@ -936,8 +936,8 @@ async def delete_project_node(
await director_v2_api.create_or_update_pipeline(
request.app, user_id, project_uuid, product_name
)
await director_v2_api.update_dynamic_service_networks_in_project(
request.app, project_uuid
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=project_uuid
)


Expand Down Expand Up @@ -1045,9 +1045,7 @@ async def patch_project_node(
app, user_id, project_id, product_name=product_name
)
if _node_patch_exclude_unset.get("label"):
await director_v2_api.update_dynamic_service_networks_in_project(
app, project_id
)
await dynamic_scheduler_api.update_projects_networks(app, project_id=project_id)

# 5. Updates project states for user, if inputs have been changed
if "inputs" in _node_patch_exclude_unset:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from servicelib.aiohttp.typing_extension import Handler
from servicelib.logging_errors import create_troubleshotting_log_kwargs

from ..director_v2.api import update_dynamic_service_networks_in_project
from ..dynamic_scheduler import api as dynamic_scheduler_api
from ..products.api import get_product_name
from ..utils import compose_support_error_msg
from ..utils_aiohttp import create_redirect_to_page_response
Expand Down Expand Up @@ -252,7 +252,9 @@ async def get_redirection_to_viewer(request: web.Request):
file_params.download_link,
product_name=get_product_name(request),
)
await update_dynamic_service_networks_in_project(request.app, project_id)
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=project_id
)

response = _create_redirect_response_to_view_page(
request.app,
Expand Down Expand Up @@ -281,7 +283,9 @@ async def get_redirection_to_viewer(request: web.Request):
service_info=_create_service_info_from(valid_service),
product_name=get_product_name(request),
)
await update_dynamic_service_networks_in_project(request.app, project_id)
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=project_id
)

response = _create_redirect_response_to_view_page(
request.app,
Expand Down Expand Up @@ -317,7 +321,9 @@ async def get_redirection_to_viewer(request: web.Request):
).STUDIES_DEFAULT_FILE_THUMBNAIL,
product_name=get_product_name(request),
)
await update_dynamic_service_networks_in_project(request.app, project_id)
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=project_id
)

response = _create_redirect_response_to_view_page(
request.app,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@

from .._constants import INDEX_RESOURCE_NAME
from ..director_v2._core_computations import create_or_update_pipeline
from ..director_v2._core_dynamic_services import (
update_dynamic_service_networks_in_project,
)
from ..dynamic_scheduler import api as dynamic_scheduler_api
from ..products.api import get_current_product, get_product_name
from ..projects._groups_db import get_project_group
from ..projects.api import check_user_project_permission
Expand Down Expand Up @@ -214,7 +212,9 @@ async def copy_study_to_account(
await create_or_update_pipeline(
request.app, user["id"], project["uuid"], product_name
)
await update_dynamic_service_networks_in_project(request.app, project["uuid"])
await dynamic_scheduler_api.update_projects_networks(
request.app, project_id=ProjectID(project["uuid"])
)

return project_uuid

Expand Down
13 changes: 13 additions & 0 deletions services/web/server/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import ProjectState
from pytest_mock import MockerFixture
from pytest_simcore.helpers.assert_checks import assert_status
from pytest_simcore.helpers.dict_tools import ConfigDict
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
Expand Down Expand Up @@ -444,3 +445,15 @@ async def _creator(
for client, project_uuid in zip(used_clients, created_project_uuids, strict=True):
url = client.app.router["delete_project"].url_for(project_id=project_uuid)
await client.delete(url.path)


@pytest.fixture
def mock_dynamic_scheduler(mocker: MockerFixture) -> None:
mocker.patch(
"simcore_service_webserver.dynamic_scheduler.api.stop_dynamic_services_in_project",
autospec=True,
)
mocker.patch(
"simcore_service_webserver.dynamic_scheduler.api.update_projects_networks",
autospec=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ async def director_v2_service_mock(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/computations/.*$"
)
delete_computation_pattern = get_computation_pattern
projects_networks_pattern = re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/dynamic_services/projects/.*/-/networks$"
)

mocker.patch(
"simcore_service_webserver.dynamic_scheduler.api.list_dynamic_services",
Expand All @@ -134,7 +131,6 @@ async def director_v2_service_mock(
repeat=True,
)
mock.delete(delete_computation_pattern, status=204, repeat=True)
mock.patch(projects_networks_pattern, status=204, repeat=True)
yield mock


Expand Down
6 changes: 2 additions & 4 deletions services/web/server/tests/unit/with_dbs/02/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,8 @@ async def project_db_cleaner(client: TestClient):


@pytest.fixture(autouse=True)
async def mocked_director_v2(
director_v2_service_mock: aioresponses,
) -> AsyncIterator[aioresponses]:
return director_v2_service_mock
async def mocked_director_v2(director_v2_service_mock: aioresponses) -> None:
pass


@pytest.fixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ async def test_copying_large_project_and_retrieving_copy_task(

@pytest.mark.parametrize(*_standard_user_role_response())
async def test_creating_new_project_from_template_without_copying_data_creates_skeleton(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: dict[str, Any],
primary_group: dict[str, str],
Expand Down Expand Up @@ -230,6 +231,7 @@ async def test_creating_new_project_from_template_without_copying_data_creates_s

@pytest.mark.parametrize(*_standard_user_role_response())
async def test_creating_new_project_as_template_without_copying_data_creates_skeleton(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: dict[str, Any],
primary_group: dict[str, str],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ async def test_get_project(

@pytest.mark.parametrize(*standard_role_response())
async def test_new_project(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: UserInfoDict,
primary_group,
Expand All @@ -427,6 +428,7 @@ async def test_new_project(

@pytest.mark.parametrize(*standard_user_role_response())
async def test_new_project_from_template(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: UserInfoDict,
primary_group: dict[str, str],
Expand All @@ -453,6 +455,7 @@ async def test_new_project_from_template(

@pytest.mark.parametrize(*standard_user_role_response())
async def test_new_project_from_other_study(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: UserInfoDict,
primary_group: dict[str, str],
Expand Down Expand Up @@ -482,6 +485,7 @@ async def test_new_project_from_other_study(

@pytest.mark.parametrize(*standard_user_role_response())
async def test_new_project_from_template_with_body(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: UserInfoDict,
primary_group: dict[str, str],
Expand Down Expand Up @@ -536,6 +540,7 @@ async def test_new_project_from_template_with_body(

@pytest.mark.parametrize(*standard_user_role_response())
async def test_new_template_from_project(
mock_dynamic_scheduler: None,
client: TestClient,
logged_user: dict[str, Any],
primary_group: dict[str, str],
Expand Down
Loading
Loading