Skip to content

Commit

Permalink
add experiment environments and component synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
gmega committed Dec 3, 2024
1 parent a8f6ff9 commit 7280e87
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 43 deletions.
61 changes: 58 additions & 3 deletions benchmarks/core/experiments/experiments.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
"""Basic definitions for structuring experiments."""

import logging
from abc import ABC, abstractmethod
from collections.abc import Iterable

import logging
from typing import Optional

from typing_extensions import Generic, TypeVar

from benchmarks.core.utils import await_predicate

logger = logging.getLogger(__name__)


class Experiment(ABC):
"""Base interface for an executable :class:`Experiment`."""

@abstractmethod
def run(self):
"""Synchronously runs the experiment, blocking the current thread until it's done."""
Expand All @@ -20,8 +24,59 @@ def run(self):
TExperiment = TypeVar('TExperiment', bound=Experiment)


class ExperimentComponent(ABC):
"""An :class:`ExperimentComponent` is a part of the environment for an experiment. These could be databases,
network nodes, etc."""

@abstractmethod
def is_ready(self) -> bool:
"""Returns whether this component is ready or not."""
pass


class ExperimentEnvironment:
"""An :class:`ExperimentEnvironment` is a collection of :class:`ExperimentComponent`s that must be ready before
an :class:`Experiment` can execute."""

def __init__(self, components: Iterable[ExperimentComponent], polling_interval: float = 0):
self.components = components
self.polling_interval = polling_interval

def await_ready(self, timeout: float = 0) -> bool:
"""Awaits for all components to be ready, or until a timeout is reached."""
# TODO we should probably have per-component timeouts, or at least provide feedback
# as to what was the completion state of each component.
if not await_predicate(
lambda: all(component.is_ready() for component in self.components),
timeout=timeout,
polling_interval=self.polling_interval,
):
return False

return True

def run(self, experiment: Experiment):
"""Runs the :class:`Experiment` within this :class:`ExperimentEnvironment`."""
if not self.await_ready():
raise RuntimeError('One or more environment components were not get ready in time')

experiment.run()

def bind(self, experiment: TExperiment) -> Experiment:
return _BoundExperiment(experiment, self)


class _BoundExperiment(Experiment, ABC):
def __init__(self, experiment: Experiment, env: ExperimentEnvironment):
self.experiment = experiment
self.env = env

def run(self):
self.env.run(self.experiment)


class IteratedExperiment(Experiment, Generic[TExperiment]):
"""An :class:`IteratedExperiment` will a sequence of :class:`Experiment`s."""
"""An :class:`IteratedExperiment` will run a sequence of :class:`Experiment`s."""

def __init__(self, experiments: Iterable[TExperiment]):
self.successful_runs = 0
Expand Down
27 changes: 15 additions & 12 deletions benchmarks/core/experiments/static_experiment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing_extensions import Generic, List
from typing_extensions import Generic, List, Tuple

from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node
Expand All @@ -21,17 +21,7 @@ def __init__(
self.data = data

def run(self, run: int = 0):
seeders, leechers = (
[
self.nodes[i]
for i in self.seeders
],
[
self.nodes[i]
for i in range(0, len(self.nodes))
if i not in self.seeders
]
)
seeders, leechers = self._split_nodes()

logger.info('Running experiment with %d seeders and %d leechers',
len(seeders), len(leechers))
Expand All @@ -51,3 +41,16 @@ def run(self, run: int = 0):
for i, download in enumerate(downloads):
download.await_for_completion()
logger.info('Download %d / %d completed', i + 1, len(downloads))

def _split_nodes(self) -> Tuple[
List[Node[TNetworkHandle, TInitialMetadata]],
List[Node[TNetworkHandle, TInitialMetadata]]
]:
return [
self.nodes[i]
for i in self.seeders
], [
self.nodes[i]
for i in range(0, len(self.nodes))
if i not in self.seeders
]
74 changes: 74 additions & 0 deletions benchmarks/core/experiments/tests/test_experiments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from time import sleep
from typing import List

from benchmarks.core.experiments.experiments import ExperimentComponent, ExperimentEnvironment, Experiment


class ExternalComponent(ExperimentComponent):

@property
def readiness_timeout(self) -> float:
return 0.1

def __init__(self, loops: int, wait_time: float = 0.0):
self.loops = loops
self.iteration = 0
self.wait_time = wait_time

def is_ready(self) -> bool:
sleep(self.wait_time)
if self.iteration < self.loops:
self.iteration += 1
return False

return True


def test_should_await_until_components_are_ready():
components = [
ExternalComponent(5),
ExternalComponent(3),
]

environment = ExperimentEnvironment(components, polling_interval=0)
assert environment.await_ready()

assert components[0].iteration == 5
assert components[1].iteration == 3


def test_should_timeout_if_component_takes_too_long():
components = [
ExternalComponent(5),
ExternalComponent(3, wait_time=0.1),
]

environment = ExperimentEnvironment(components, polling_interval=0)
assert not environment.await_ready(0.1)

assert components[0].iteration == 5
assert components[1].iteration < 3


class ExperimentThatReliesOnComponents(Experiment):
def __init__(self, components: List[ExperimentComponent]):
self.components = components

def run(self):
assert all(component.is_ready() for component in self.components)


def test_should_bind_experiment_to_environment():
components = [
ExternalComponent(5),
ExternalComponent(3),
]

env = ExperimentEnvironment(components, polling_interval=0)
experiment = ExperimentThatReliesOnComponents(components)
bound = env.bind(experiment)

bound.run()

assert components[0].is_ready()
assert components[1].is_ready()
4 changes: 3 additions & 1 deletion benchmarks/core/experiments/tests/test_static_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from pathlib import Path
from typing import Optional, List, Tuple, Union

from benchmarks.core.network import Node, DownloadHandle
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.experiments.tests.utils import MockExperimentData
from benchmarks.core.network import Node, DownloadHandle


@dataclass
Expand All @@ -29,6 +29,7 @@ def seed(
file: Path,
handle: Union[str, MockHandle]
) -> MockHandle:

if isinstance(handle, MockHandle):
self.seeding = (handle, file)
else:
Expand All @@ -37,6 +38,7 @@ def seed(
return self.seeding[0]

def leech(self, handle: MockHandle):

self.leeching = handle
return MockDownloadHandle(self)

Expand Down
13 changes: 12 additions & 1 deletion benchmarks/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from contextlib import contextmanager, AbstractContextManager
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator, Tuple, ContextManager, Optional
from time import time, sleep
from typing import Iterator, Tuple, ContextManager, Optional, Callable

from typing_extensions import Generic

Expand Down Expand Up @@ -59,6 +60,16 @@ def temp_random_file(size: int, name: str = 'data.bin'):
yield random_file


def await_predicate(predicate: Callable[[], bool], timeout: float = 0, polling_interval: float = 0) -> bool:
current = time()
while (timeout == 0) or ((time() - current) <= timeout):
if predicate():
return True
sleep(polling_interval)

return False


def sample(n: int) -> Iterator[int]:
"""Samples without replacement using a basic Fisher-Yates shuffle."""
p = list(range(0, n))
Expand Down
23 changes: 14 additions & 9 deletions benchmarks/deluge/deluge_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
from time import time, sleep
from typing import List, Union, Optional, Self, Dict, Any

import pathvalidate
from deluge_client import DelugeRPCClient
from torrentool.torrent import Torrent
from urllib3.util import Url

from benchmarks.core.experiments.experiments import ExperimentComponent
from benchmarks.core.network import SharedFSNode, DownloadHandle
from benchmarks.core.utils import await_predicate

logger = logging.getLogger(__name__)

Expand All @@ -25,7 +26,7 @@ class DelugeMeta:
announce_url: Url


class DelugeNode(SharedFSNode[Torrent, DelugeMeta]):
class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):

def __init__(
self,
Expand Down Expand Up @@ -123,6 +124,13 @@ def connect(self) -> Self:
self._rpc = client
return self

def is_ready(self) -> bool:
try:
self.connect()
return True
except ConnectionRefusedError:
return False

def _init_folders(self):
self.downloads_root.mkdir(parents=True, exist_ok=True)

Expand All @@ -141,16 +149,13 @@ def __init__(self, torrent: Torrent, node: DelugeNode) -> None:

def await_for_completion(self, timeout: float = 0) -> bool:
name = self.torrent.name
current = time()
while (timeout == 0) or ((time() - current) <= timeout):

def _predicate():
response = self.node.rpc.core.get_torrents_status({'name': name}, [])
if len(response) > 1:
logger.warning(f'Client has multiple torrents matching name {name}. Returning the first one.')

status = list(response.values())[0]
if status[b'is_seed']:
return True

sleep(0.5)
return status[b'is_seed']

return False
return await_predicate(_predicate, timeout=timeout)
8 changes: 5 additions & 3 deletions benchmarks/deluge/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from urllib3.util import Url, parse_url

from benchmarks.core import utils
from benchmarks.core.utils import megabytes
from benchmarks.core.utils import megabytes, await_predicate
from benchmarks.deluge.deluge_node import DelugeNode
from benchmarks.deluge.tracker import Tracker
from benchmarks.tests.utils import shared_volume


def deluge_node(name: str, port: int) -> Generator[DelugeNode, None, None]:
node = DelugeNode(name, volume=shared_volume(), daemon_port=port)
await_predicate(node.is_ready, timeout=10, polling_interval=0.5)
node.wipe_all_torrents()
try:
yield node
Expand Down Expand Up @@ -41,5 +43,5 @@ def temp_random_file() -> Generator[Path, None, None]:


@pytest.fixture
def tracker() -> Url:
return parse_url('http://127.0.0.1:8000/announce')
def tracker() -> Tracker:
return Tracker(parse_url('http://127.0.0.1:8000/announce'))
15 changes: 9 additions & 6 deletions benchmarks/deluge/tests/test_deluge_node.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
from pathlib import Path

import pytest
from urllib3.util import Url

from benchmarks.core.utils import megabytes
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta
from benchmarks.deluge.tracker import Tracker


@pytest.mark.integration
def assert_is_seed(node: DelugeNode, name: str, size: int):
response = node.torrent_info(name=name)
assert len(response) == 1
info = response[0]

assert info[b'name'] == name.encode('utf-8') # not sure that this works for ANY name...
assert info[b'name'] == name.encode('utf-8') # not sure that this works for ANY name...
assert info[b'total_size'] == size
assert info[b'is_seed'] == True


@pytest.mark.integration
def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Url):
def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker):
assert not deluge_node1.torrent_info(name='dataset1')

deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker))
deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker.announce_url))
assert_is_seed(deluge_node1, name='dataset1', size=megabytes(1))


@pytest.mark.integration
def test_should_download_files(
deluge_node1: DelugeNode, deluge_node2: DelugeNode,
temp_random_file: Path, tracker: Url):
temp_random_file: Path, tracker: Tracker):
assert not deluge_node1.torrent_info(name='dataset1')
assert not deluge_node2.torrent_info(name='dataset1')

torrent = deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker))
torrent = deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker.announce_url))
handle = deluge_node2.leech(torrent)

assert handle.await_for_completion(5)
Expand Down
Loading

0 comments on commit 7280e87

Please sign in to comment.