Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Перевезти подбор портов с базы на redis #26

Merged
merged 7 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions demo/scripts/example_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
echo 'Hello world!'
2 changes: 1 addition & 1 deletion overhave/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_test_run_storage() -> ITestRunStorage:

@cache
def get_emulation_storage() -> IEmulationStorage:
return EmulationStorage(OverhaveEmulationSettings())
return EmulationStorage(settings=OverhaveEmulationSettings(), redis=make_redis(get_redis_settings()))


@cache
Expand Down
1 change: 1 addition & 0 deletions overhave/entities/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class OverhaveEmulationSettings(BaseOverhavePrefix):
emulation_bind_ip: str = "0.0.0.0" # noqa: S104
# Ports for emulation binding. Expects as string with format `["port1", "port2", ...]`
emulation_ports: list[int] = [8080]
redis_ports_key: str = "allocated_ports"

# As a real service, should be used follow path: `http://my-service.domain/mount`
# where `emulation_service_url` = `http://my-service.domain` - URL for service,
Expand Down
3 changes: 2 additions & 1 deletion overhave/factory/base_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from overhave.test_execution import PytestRunner, StepCollector
from overhave.transport import S3Manager
from overhave.transport.redis.deps import get_redis_settings, make_redis


class IOverhaveFactory(Generic[TApplicationContext], abc.ABC):
Expand Down Expand Up @@ -120,7 +121,7 @@ def draft_storage(self) -> IDraftStorage:

@cached_property
def _emulation_storage(self) -> EmulationStorage:
return EmulationStorage(settings=self.context.emulation_settings)
return EmulationStorage(settings=self.context.emulation_settings, redis=make_redis(get_redis_settings()))

@property
def emulation_storage(self) -> IEmulationStorage:
Expand Down
43 changes: 22 additions & 21 deletions overhave/storage/emulation_storage.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import abc
import logging
import socket
from typing import cast
from typing import Any, List, cast

import orjson
import sqlalchemy as sa
import sqlalchemy.orm as so
from redis import Redis

from overhave import db
from overhave.entities.settings import OverhaveEmulationSettings
Expand Down Expand Up @@ -51,8 +52,10 @@ def get_emulation_runs_by_test_user_id(test_user_id: int) -> list[EmulationRunMo
class EmulationStorage(IEmulationStorage):
"""Class for emulation runs storage."""

def __init__(self, settings: OverhaveEmulationSettings):
def __init__(self, settings: OverhaveEmulationSettings, redis: "Redis[Any]"):
self._redis = redis
self._settings = settings
self._redis.set(self._settings.redis_ports_key, orjson.dumps([]))
self._emulation_ports_len = len(self._settings.emulation_ports)

@staticmethod
Expand All @@ -65,20 +68,9 @@ def create_emulation_run(emulation_id: int, initiated_by: str) -> int:
session.flush()
return emulation_run.id

def _get_next_port(self, session: so.Session) -> int:
runs_with_allocated_ports = ( # noqa: ECE001
session.query(db.EmulationRun)
.filter(db.EmulationRun.port.isnot(None))
.order_by(db.EmulationRun.id.desc())
.limit(self._emulation_ports_len)
.all()
)
allocated_sorted_runs = sorted(
runs_with_allocated_ports,
key=lambda t: t.changed_at,
)

allocated_ports = {run.port for run in allocated_sorted_runs}
def _get_next_port(self) -> int:
allocated_ports = self.get_allocated_ports()

logger.debug("Allocated ports: %s", allocated_ports)
not_allocated_ports = set(self._settings.emulation_ports).difference(allocated_ports)
logger.debug("Not allocated ports: %s", not_allocated_ports)
Expand All @@ -88,12 +80,20 @@ def _get_next_port(self, session: so.Session) -> int:
continue
return port
logger.debug("All not allocated ports are busy!")
for run in allocated_sorted_runs:
if self._is_port_in_use(cast(int, run.port)):
for port in allocated_ports:
if self._is_port_in_use(port):
continue
return cast(int, run.port)
return port
raise AllPortsAreBusyError("All ports are busy - could not find free port!")

def get_allocated_ports(self) -> List[int]:
return cast(List[int], orjson.loads(cast(bytes, self._redis.get(self._settings.redis_ports_key))))

def allocate_port(self, port: int) -> None:
new_allocated_ports = self.get_allocated_ports()
new_allocated_ports.append(port)
self._redis.set(self._settings.redis_ports_key, orjson.dumps(sorted(new_allocated_ports)))

def _is_port_in_use(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((self._settings.emulation_bind_ip, port)) == 0
Expand All @@ -102,7 +102,8 @@ def get_requested_emulation_run(self, emulation_run_id: int) -> EmulationRunMode
with db.create_session() as session:
emulation_run = session.query(db.EmulationRun).filter(db.EmulationRun.id == emulation_run_id).one()
emulation_run.status = db.EmulationStatus.REQUESTED
emulation_run.port = self._get_next_port(session)
emulation_run.port = self._get_next_port()
self.allocate_port(emulation_run.port)
emulation_run.changed_at = get_current_time()
return EmulationRunModel.model_validate(emulation_run)

Expand Down
Loading
Loading