From 4ebc3d35ac3bf2321bbe457d21b96a00b3c47b21 Mon Sep 17 00:00:00 2001 From: Bohdan Date: Sat, 24 Sep 2022 01:37:51 +0200 Subject: [PATCH 1/3] Adds tests for graph coloring goal --- .gitignore | 3 +- .../graph_coloring_problem/goals/goal.py | 2 + setup.py | 5 +- tests/README.md | 1 + tests/test_graph_coloring_goal.py | 73 +++++++++++++++++++ 5 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 tests/README.md create mode 100644 tests/test_graph_coloring_goal.py diff --git a/.gitignore b/.gitignore index e9c1c32..e808d58 100644 --- a/.gitignore +++ b/.gitignore @@ -140,4 +140,5 @@ cython_debug/ # Custom stuff local-search/ .idea/ -test-venv/ \ No newline at end of file +test-venv/ +student_solutions/ \ No newline at end of file diff --git a/local_search/problems/graph_coloring_problem/goals/goal.py b/local_search/problems/graph_coloring_problem/goals/goal.py index 6a27a0c..dc711f2 100644 --- a/local_search/problems/graph_coloring_problem/goals/goal.py +++ b/local_search/problems/graph_coloring_problem/goals/goal.py @@ -6,6 +6,8 @@ from local_search.problems.graph_coloring_problem.models.edge import Edge from local_search.problems.graph_coloring_problem.state import GraphColoringState +# Do we want to have TODOs on this class. It not teaches students nothing about algorithms, only makes them familiar with the problem? + class GraphColoringGoal(Goal, ABC): """ diff --git a/setup.py b/setup.py index 501c59e..4639a5f 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,18 @@ from setuptools import setup + setup( name='Local search', version='1.0.0', + py_modules=['local_search'], install_requires=[ 'numpy', 'rich', 'click', 'pygame', 'Pillow', - "mpmath" + 'mpmath', + 'pytest' ], entry_points=''' [console_scripts] diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..7e192da --- /dev/null +++ b/tests/README.md @@ -0,0 +1 @@ +This folder contains scripts, that allows you to test TODOs \ No newline at end of file diff --git a/tests/test_graph_coloring_goal.py b/tests/test_graph_coloring_goal.py new file mode 100644 index 0000000..6ff73bf --- /dev/null +++ b/tests/test_graph_coloring_goal.py @@ -0,0 +1,73 @@ +from typing import List + +import pytest + +from local_search.problems.graph_coloring_problem.goals import MinColors +from local_search.problems.graph_coloring_problem.goals.goal import GraphColoringGoal +from local_search.problems.graph_coloring_problem.models.edge import Edge +from local_search.problems.graph_coloring_problem.models.vertex import Vertex +from local_search.problems.graph_coloring_problem.state import GraphColoringState + +GraphDict = dict[int, dict[int]] + + +def create_edges(graph: GraphDict) -> List[Edge]: + result = [] + for start, ends in graph.items(): + result += [Edge(start, end) for end in ends] + return result + + +@pytest.fixture() +def graph(): + return {0: {4, 5, 8}, 1: {4, 6}, 2: {4, 5}, 3: {6}, 4: {0, 1, 2}, 5: {0, 2}, 6: {1, 3}, 7: {8}, 8: {0, 7}} + + +def create_goal(graph: GraphDict) -> GraphColoringGoal: + n_vertices = len(graph) + edges = create_edges(graph) + return MinColors(edges, n_vertices) + + +def create_graph_coloring_state(graph: GraphDict, num_colors: int): + n_vertices = len(graph) + colors = list(range(num_colors)) + return GraphColoringState([ + Vertex(i, colors[i % num_colors]) for i in range(n_vertices) + ]) + + +@pytest.mark.parametrize('num_colors', [5, 3, 1]) +def test_num_colors(graph: GraphDict, num_colors: int): + goal = create_goal(graph) + state = create_graph_coloring_state(graph, num_colors) + expected_num_colors = num_colors + actual_num_colors = goal._num_colors(state) + assert expected_num_colors == goal._num_colors(state), f'expected {expected_num_colors} colors, got {actual_num_colors}\n' \ + f'\t- state {state},' + + +@pytest.mark.parametrize('num_colors, expected', [ + (5, [2, 2, 0, 0, 0, 0, 0, 0, 0]), + (3, [2, 2, 2, 0, 0, 0, 0, 0, 0]), + (1, [18, 0, 0, 0, 0, 0, 0, 0, 0]) +]) +def test_bad_edges(graph: GraphDict, num_colors: int, expected: list[int]): + goal = create_goal(graph) + state = create_graph_coloring_state(graph, num_colors) + actual_bad_edges = goal._bad_edges(state) + assert expected == actual_bad_edges, f'expected {expected} bad edges, got {actual_bad_edges}\n' \ + f'\t- state {state},' + + +@pytest.mark.parametrize('num_colors, expected', [ + (5, [2, 2, 2, 2, 1, 0, 0, 0, 0]), + (3, [3, 3, 3, 0, 0, 0, 0, 0, 0]), + (1, [9, 0, 0, 0, 0, 0, 0, 0, 0]) +]) +def test_color_classes(graph: GraphDict, num_colors: int, expected: list[int]): + goal = create_goal(graph) + state = create_graph_coloring_state(graph, num_colors) + actual_color_classes = goal._color_classes(state) + assert expected == actual_color_classes, f'expected {expected} color classes, got {actual_color_classes}\n' \ + f'\t- state {state},' From b1d753e6c3310a47510593d29e043943f6108a69 Mon Sep 17 00:00:00 2001 From: Bohdan Date: Sat, 24 Sep 2022 02:05:18 +0200 Subject: [PATCH 2/3] Adds kempe chain tests --- tests/test_graph_coloring_kempe_chain.py | 85 ++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 tests/test_graph_coloring_kempe_chain.py diff --git a/tests/test_graph_coloring_kempe_chain.py b/tests/test_graph_coloring_kempe_chain.py new file mode 100644 index 0000000..e468994 --- /dev/null +++ b/tests/test_graph_coloring_kempe_chain.py @@ -0,0 +1,85 @@ +from typing import List, Dict, Set + +import pytest + +from local_search.problems.graph_coloring_problem.goals import MinColors +from local_search.problems.graph_coloring_problem.models.edge import Edge +from local_search.problems.graph_coloring_problem.models.vertex import Vertex +from local_search.problems.graph_coloring_problem.moves.kempe_chain import KempeChainMove +from local_search.problems.graph_coloring_problem.state import GraphColoringState + + +@pytest.fixture() +def start_index(): + return 0 + + +@pytest.fixture() +def graph() -> dict[int, Set[int]]: + return {0: {4, 5, 8}, 1: {4, 6}, 2: {4, 5}, 3: {6}, 4: {0, 1, 2}, 5: {0, 2}, 6: {1, 3}, 7: {8}, 8: {0, 7}} + + +@pytest.fixture() +def old_state(graph) -> GraphColoringState: + def color(i): + return i // 4 + return GraphColoringState([Vertex(idx, color(idx)) for idx in range(len(graph))]) + + +@pytest.fixture() +def new_state(old_state, start_index, new_color) -> GraphColoringState: + old_state.coloring[start_index].color = new_color + return old_state + + +@pytest.fixture() +def student_move(graph, old_state, start_index, new_color): + return KempeChainMove(graph, old_state, start_index, new_color) + + +@pytest.fixture() +def edges(graph: Dict[int, Set[int]]) -> List[Edge]: + result = [] + for start, ends in graph.items(): + result += [Edge(start, end) for end in ends] + return result + + +@pytest.mark.parametrize('new_color', [3, 5]) +def test_kempe_chain_should_have_result_with_no_conflicts(student_move, new_state, edges): + student_move._kempe_chain(new_state.coloring) + goal = MinColors(edges, len(student_move.graph)) + bad_edges = goal._bad_edges(new_state) + n_bad_edges = sum(bad_edges) + assert n_bad_edges == 0, f"there are still {n_bad_edges} conflicts after kempe chain\n" \ + f"\t- bad edges: {bad_edges}\n" \ + f"\t- state: {new_state}\n" \ + f"\t- graph: {student_move.graph}\n" + + +@pytest.mark.parametrize('new_color', [3, 5]) +def test_kempe_chain_should_solve_direct_conflicts(student_move, new_state): + student_move._kempe_chain(new_state.coloring) + for n in student_move.graph[0]: + assert new_state.coloring[n].color != new_state.coloring[0].color, \ + f"kempe chain fails to correctly fix direct coloring conflict\n" \ + f"\t- state: {new_state}\n" \ + f"\t- graph: {student_move.graph}\n" + + +@pytest.mark.parametrize('new_color', [2]) +def test_kempe_chain_should_solve_indirect_conflicts(student_move, new_state): + student_move._kempe_chain(new_state.coloring) + assert new_state.coloring[1].color == 0 \ + and new_state.coloring[3].color == 0 \ + and new_state.coloring[6].color == 1, f"kempe chain fails to fix indirect coloring conflicts:\n" \ + f"\t- state: {new_state}\n" \ + f"\t- graph: {student_move.graph}\n" + + +@pytest.mark.parametrize('new_color', [2]) +def test_kempe_chain_should_handle_cycles(student_move, new_state): + student_move._kempe_chain(new_state.coloring) + assert new_state.coloring[2].color == 0, f"kempe chain doesn't handle correctly cycles in the graph\n" \ + f"\t- state: {new_state}\n" \ + f"\t- graph: {student_move.graph}\n" From acdebcb6e80628f2b34ca1c3cfc61d48e7e0165f Mon Sep 17 00:00:00 2001 From: Bohdan Date: Sun, 25 Sep 2022 01:43:01 +0200 Subject: [PATCH 3/3] Adds tests for simulated annealing and hill climbing --- tests/__init__.py | 0 tests/sum_problem.py | 109 +++++++++++++++++++++++++ tests/test_hill_climbing.py | 60 ++++++++++++++ tests/test_simulated_annealing.py | 131 ++++++++++++++++++++++++++++++ 4 files changed, 300 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/sum_problem.py create mode 100644 tests/test_hill_climbing.py create mode 100644 tests/test_simulated_annealing.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/sum_problem.py b/tests/sum_problem.py new file mode 100644 index 0000000..dd41cc0 --- /dev/null +++ b/tests/sum_problem.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import random +from abc import ABC +from dataclasses import dataclass +from typing import Iterable, Type, Generator + +from local_search.problems import State, Problem +from local_search.problems.base import Move +from local_search.problems.base.goal import Goal, GoalType +from local_search.problems.base.move_generator import MoveGenerator + + +@dataclass +class SumProblemState(State): + a: int + b: int + + def __str__(self) -> str: + return f"{self.a} + {self.b}" + + def __eq__(self, other): + return isinstance(other, SumProblemState) and self.a == other.a and self.b == other.b + + @staticmethod + def suboptimal_state(sum: int = 100) -> SumProblemState: + a = int(0.25 * sum) + return SumProblemState(a, sum - a) + + @staticmethod + def optimal_state(goal_type: GoalType, sum: int = 100) -> SumProblemState: + a = int(0.5 * sum) if goal_type == GoalType.MIN else 0 + return SumProblemState(a, sum - a) + + +class SumProblemGoal(Goal, ABC): + + def objective_for(self, state: SumProblemState) -> int: + return state.a ** 2 + state.b ** 2 + + def human_readable_objective_for(self, state: SumProblemState) -> str: + return f"{self.objective_for(state)}" + + +class Maximize(SumProblemGoal): + + def type(self) -> GoalType: + return GoalType.MAX + + +class Minimize(SumProblemGoal): + + def type(self) -> GoalType: + return GoalType.MIN + + +@dataclass +class SumProblemMoveGenerator(MoveGenerator): + sum: int + + def available_moves(self, state: SumProblemState) -> Generator[Move[SumProblemState], None, None]: + for na in range(max(0, state.a - 2), min(self.sum, state.a + 2)): + yield SumProblemMove(na, self.sum) + + +@dataclass +class SumProblemMove(Move): + new_a: int + sum: int + + def make(self) -> SumProblemState: + return SumProblemState(self.new_a, self.sum - self.new_a) + + +class SumProblem(Problem): + """ + Demonstration of a simple problem, where the goal is to decompose :param sum: into components. + """ + def __init__(self, sum: int, goal: SumProblemGoal): + self.sum = sum + self.goal = goal + self.move_generator = SumProblemMoveGenerator(self.sum) + + def random_state(self) -> SumProblemState: + a = random.randrange(self.sum + 1) + return SumProblemState(a, self.sum - a) + + @staticmethod + def get_available_move_generation_strategies() -> Iterable[str]: + return ["MockMoveGenerator"] + + @staticmethod + def get_available_goals() -> Iterable[str]: + return ["MockGoalMin", "MockGoalMax"] + + @staticmethod + def from_benchmark(**kwargs) -> SumProblem: + raise NotImplementedError( + f"{SumProblem.__name__} cannot be created from benchmark") + + @classmethod + def from_dict(cls: Type[SumProblem], **kwargs) -> SumProblem: + raise NotImplementedError( + f"{SumProblem.__name__} cannot be created from dict") + + def next_states_from(self, state: SumProblemState) -> Iterable[SumProblemState]: + for move in self.move_generator.available_moves(state): + yield move.make() + diff --git a/tests/test_hill_climbing.py b/tests/test_hill_climbing.py new file mode 100644 index 0000000..408648a --- /dev/null +++ b/tests/test_hill_climbing.py @@ -0,0 +1,60 @@ +import pytest +from local_search.algorithms.hill_climbing.best_choice_hill_climbing import BestChoiceHillClimbing +from local_search.algorithms.hill_climbing.hill_climbing import HillClimbing, DEFAULT_CONFIG +from local_search.algorithms.hill_climbing.random_choice_hill_climbing import RandomChoiceHillClimbing +from local_search.algorithms.hill_climbing.worst_choice_hill_climbing import WorstChoiceHillClimbing +from tests.sum_problem import SumProblem, SumProblemGoal, Maximize, Minimize, SumProblemState + + +# TODO: should be here? +PROBLEM_SIZE = 100 + + +@pytest.fixture +def goals() -> list[SumProblemGoal]: + return [Minimize(), Maximize()] + + +def test_best_choice_hill_climbing_should_find_the_best_neighbour(goals: list[SumProblemGoal]): + solver = BestChoiceHillClimbing(DEFAULT_CONFIG) + for goal in goals: + state = SumProblemState.suboptimal_state(PROBLEM_SIZE) + next_state, problem = get_climbing_results_for_a_mock_problem( + solver, goal, state) + assert problem.improvement(next_state, + state) > 0, "algorithm returns a state that's not better than the previous " \ + f"one (goal type: {goal.type()})" + + next_states = problem.next_states_from(state) + improving_states = [ + s for s in next_states if problem.improvement(s, state) > 0] + expected_state = max( + improving_states, key=lambda next_state: problem.improvement(next_state, state)) + assert problem.objective_for(next_state) == problem.objective_for( + expected_state), "algorithm does return an improving state, but it's not the best " \ + f"(goal type: {goal.type()})" + + +def test_random_choice_hill_climbing_should_find_the_random_improving_neighbour(goals): + solver = RandomChoiceHillClimbing(DEFAULT_CONFIG) + for goal in goals: + state = SumProblemState.suboptimal_state(PROBLEM_SIZE) + + next_state, problem = get_climbing_results_for_a_mock_problem( + solver, goal, state) + assert problem.improvement(next_state, state) >= 0, f"algorithm returns a state that's worse than " \ + f"the previous " \ + "one (goal type: {goal.type()})" + + next_values = set([problem.objective_for( + solver._climb_the_hill(problem, state)) for _ in range(100)]) + assert len( + next_values) > 1, f"algorithm is deterministic, always returns the same state, while it should be random " \ + f"(goal type: {goal.type()})) " + + +def get_climbing_results_for_a_mock_problem(solver: HillClimbing, goal: SumProblemGoal, state: SumProblemState): + problem = SumProblem(PROBLEM_SIZE, goal) + next_state = solver._climb_the_hill(problem, state) + assert next_state is not None, "algorithm returns None instead of a state" + return next_state, problem diff --git a/tests/test_simulated_annealing.py b/tests/test_simulated_annealing.py new file mode 100644 index 0000000..2670ac0 --- /dev/null +++ b/tests/test_simulated_annealing.py @@ -0,0 +1,131 @@ +from http.cookies import SimpleCookie +import random +from unittest.mock import patch + +import pytest + +from local_search.algorithms.simulated_annealing import SimulatedAnnealing +from tests.sum_problem import Maximize, SumProblem, SumProblemState +from mpmath import mpf + +PROBLEM_SIZE = 100 + + +def test_reheat_should_restore_temp_and_reset_schedule(): + solver = SimulatedAnnealing() + solver.temperature = 0 + solver.steps_from_last_state_update = 100 + solver.cooling_time = 100 + state = SumProblemState.suboptimal_state() + new_state = solver._reheat(state) + assert state == new_state, "reheating modifies the state, it shouldn't!" + expected_temp = solver.config.escape_reheat_ratio * \ + solver.config.initial_temperature + assert solver.temperature == expected_temp, f"reheating should change temperature according to " \ + f"self.config, got {solver.temperature}, " \ + f"expected {expected_temp}" + assert solver.steps_from_last_state_update == 0, "reheating should reset the " \ + "'steps_from_last_state_update' " + assert solver.cooling_time == 0, "reheating should reset the 'cooling_time'" + + +def test_update_temperature_not_goes_below_min_temperature(): + random.seed(42) + solver = SimulatedAnnealing() + solver.config.min_temperature = 1 + solver.temperature = 0.9 * solver.config.min_temperature + solver.cooling_time = random.randint(1, 5) + solver.config.cooling_step = random.random() + solver._update_temperature() + assert solver.temperature == solver.config.min_temperature, 'update temperature should not go below min' \ + 'temperature during update' + + +def test_update_temperature_uses_correct_decrease_function(): + solver = SimulatedAnnealing() + initial_temperature = solver.config.initial_temperature + initial_cooling_time = solver.cooling_time + solver._update_temperature() + expected_temperature = 5 + assert solver.temperature == expected_temperature, 'update temperature, does not uses correct formula to update temperature:' \ + 'for params:' \ + f'temperature: {initial_temperature}' \ + f'cooling_time: {initial_cooling_time}' \ + f'cooling_step: {solver.config.cooling_step}' \ + f'expected new temperature to be: {expected_temperature}' \ + f'but received: {solver.temperature}' + + +def test_update_temperature_updates_cooling_time(): + INITIAL_COOLING_TIME = 1 + solver = SimulatedAnnealing() + solver.cooling_time = INITIAL_COOLING_TIME + solver._update_temperature() + assert solver.cooling_time == INITIAL_COOLING_TIME + 1, 'expected update temperature to update cooling time by 1,' \ + f'not by {solver.cooling_time - INITIAL_COOLING_TIME}' + + +@pytest.mark.parametrize('mocked_improvement, expected_probability', zip([ + 10 ** i for i in range(2) +], [mpf('2.7182818284590451'), mpf('22026.465794806718')])) +def test_calculate_transition_probability(mocked_improvement, expected_probability): + class TestProblem: + def improvement(*args): + return mocked_improvement + + problem = TestProblem() + solver = SimulatedAnnealing() + initial_temp = solver.temperature = 1 + actual_probability = solver._calculate_transition_probability( + problem, None, None) + assert expected_probability == actual_probability, f'expected to calculate {expected_probability}' \ + f'for delta model improvement {mocked_improvement} and temperature {initial_temp}' + + +def test_find_next_state_gets_random_neighbour(): + random.seed(42) + solver = SimulatedAnnealing() + model = SumProblem(PROBLEM_SIZE, Maximize()) + initial_state = SumProblemState.suboptimal_state(PROBLEM_SIZE) + with patch.object(solver, SimulatedAnnealing._get_random_neighbours.__name__): + solver._get_random_neighbours.return_value = ( + s for s in [initial_state]) + _ = solver._find_next_state(model, initial_state) + solver._get_random_neighbours.assert_called_once_with( + model, initial_state) + + +def test_find_next_state_returns_next_state_if_state_is_better(): + solver = SimulatedAnnealing() + model = SumProblem(PROBLEM_SIZE, Maximize()) + state = SumProblemState.suboptimal_state(PROBLEM_SIZE) + optimal_state = SumProblemState.optimal_state(model.goal.type(), model.sum) + with patch.object(solver, SimulatedAnnealing._get_random_neighbours.__name__): + solver._get_random_neighbours.return_value = ( + s for s in [optimal_state]) + next_state = solver._find_next_state(model, state) + assert next_state == optimal_state, 'expected algorithm to select improving state' + + +def test_find_next_state_calculates_transition_probability_if_state_is_not_better(): + solver = SimulatedAnnealing() + model = SumProblem(PROBLEM_SIZE, Maximize()) + state = SumProblemState.optimal_state(model.goal.type(), PROBLEM_SIZE) + next_state = SumProblemState.suboptimal_state(PROBLEM_SIZE) + with patch.object(solver, SimulatedAnnealing._calculate_transition_probability.__name__), \ + patch.object(solver, SimulatedAnnealing._get_random_neighbours.__name__): + solver._get_random_neighbours.return_value = ( + s for s in [next_state]) + solver._calculate_transition_probability.return_value = 1 + _ = solver._find_next_state(model, state) + solver._calculate_transition_probability.assert_called_once_with( + model, state, next_state) + + +def test_find_next_state_updates_temperatures(): + solver = SimulatedAnnealing() + model = SumProblem(PROBLEM_SIZE, Maximize()) + state = SumProblemState.suboptimal_state(PROBLEM_SIZE) + with patch.object(solver, SimulatedAnnealing._update_temperature.__name__): + _ = solver._find_next_state(model, state) + solver._update_temperature.assert_called_once()