diff --git a/aviary/scenario/cartesian_scenario.py b/aviary/scenario/cartesian_scenario.py index 408d19b..f3ad6a0 100644 --- a/aviary/scenario/cartesian_scenario.py +++ b/aviary/scenario/cartesian_scenario.py @@ -23,14 +23,14 @@ def aircraft_generator(self) -> dict: for flight_level in self.flight_levels: for aircraft_type in self.aircraft_types: - route = self.route() + route = self.choose_route() yield { sg.AIRCRAFT_TIMEDELTA_KEY: 0, sg.START_POSITION_KEY: route.fix_points()[0].coords[0], sg.CALLSIGN_KEY: next(self.callsign_generator()), sg.AIRCRAFT_TYPE_KEY: aircraft_type, - sg.DEPARTURE_KEY: self.departure_airport(route), - sg.DESTINATION_KEY: self.destination_airport(route), + sg.DEPARTURE_KEY: self.choose_departure_airport(route), + sg.DESTINATION_KEY: self.choose_destination_airport(route), sg.CURRENT_FLIGHT_LEVEL_KEY: flight_level, sg.CLEARED_FLIGHT_LEVEL_KEY: flight_level, sg.REQUESTED_FLIGHT_LEVEL_KEY: flight_level, diff --git a/aviary/scenario/empty_scenario.py b/aviary/scenario/empty_scenario.py new file mode 100644 index 0000000..85b7a56 --- /dev/null +++ b/aviary/scenario/empty_scenario.py @@ -0,0 +1,26 @@ +""" +Scenario generation algorithm with Poisson aircraft arrivals. +""" +# author: Tim Hobson +# email: thobson@turing.ac.uk + +import random + +from aviary.scenario.scenario_algorithm import ScenarioAlgorithm + +import aviary.scenario.scenario_generator as sg + +class EmptyScenario(ScenarioAlgorithm): + """An algorithm generating an empty scenario""" + + def __init__(self, **kwargs): + + # Pass the keyword args (including the random seed) to the superclass constructor. + super().__init__(**kwargs) + + # Overriding abstract method + def aircraft_generator(self) -> dict: + """Generates an empty sequence.""" + + return + yield \ No newline at end of file diff --git a/aviary/scenario/flight_phase.py b/aviary/scenario/flight_phase.py new file mode 100644 index 0000000..2d9d868 --- /dev/null +++ b/aviary/scenario/flight_phase.py @@ -0,0 +1,7 @@ + +from enum import Enum + +class FlightPhase(Enum): + overflier = 1 + climber = 2 + descender = 3 \ No newline at end of file diff --git a/aviary/scenario/incremental_ocd_scenario.py b/aviary/scenario/incremental_ocd_scenario.py new file mode 100644 index 0000000..a4fdccd --- /dev/null +++ b/aviary/scenario/incremental_ocd_scenario.py @@ -0,0 +1,247 @@ +""" +Scenario generation algorithm with Poisson aircraft arrivals. +""" +# author: Tim Hobson +# email: thobson@turing.ac.uk + +import random +import numpy as np + +import shapely.geometry as geom + +from aviary.scenario.scenario_algorithm import ScenarioAlgorithm +from aviary.scenario.flight_phase import FlightPhase + +import aviary.scenario.scenario_generator as sg + +from aviary.utils.geo_helper import GeoHelper + +class IncrementalOcdScenario(ScenarioAlgorithm): + """An incremental overflier-climber-descender scenario generation algorithm""" + + def __init__(self, + underlying_scenario, + seed, + overflier_prob = 1/3, + climber_prob = 1/3, + descender_prob = 1/3, + climber_cfl_range = 0.5, + climber_minimum_climb = 0.3, + descender_cfl_range=0.5, + descender_minimum_descent=0.3, + overflier_cfl_range=0.5, + start_position_distribution=np.array([1, 0]), + discrete_start_positions=False + ): + """ + IncrementalOcdScenario class constructor. + + :param underlying_scenario: The underlying scenario + :param seed: a random seed + :param overflier_prob: number in (0, 1]. The probability of generating an overflier aircraft. + :param climber_prob: number in (0, 1]. The probability of generating a climber aircraft. + :param descender_prob: number in (0, 1]. The probability of generating a descender aircraft. + :param climber_cfl_range (CCFLR): number in (0, 1]. The range of possible flight levels from which initial FL will be picked for climbers (e.g. 0.5 means the first half will be included) + :param climber_minimum_climb (CMC): number in (0, 1]. The range of ascents from which the requested FL will be picked (e.g. if CCFLR is 0.5 and CMC is 0.3 then the requested FL will only include the top 20% of flight levels) + :param descender_cfl_range (DCFLR): number in (0, 1]. Similar to CCFLR, but for descenders. + :param descender_minimum_descent (DMD): number in (0, 1]. Similar to CMC, but for descenders. + :param overflier_cfl_range: number in (0, 1]. The range of possible flight levels from which initial FL will be picked for overfliers (e.g. 0.5 means the first half will be included) + :param start_position_distribution: a probability distribution as a numpy array. The probability of starting in each route segement (also defines the number of segments). + :param discrete_start_positions: boolean. If True, start positions are at the discrete endpoints of the route segements. + + Defaults are: + - equal probability of overflier, climber or descender + - overfliers start in the upper half of the flight level range + - climbers start in the lower half of the flight level range and climb at least 30% of that range + - descenders start in the upper half of the flight level range and descend at least 30% of that range + - start position is a uniform random pick from the first half of the route. + """ + + if not isinstance(underlying_scenario, ScenarioAlgorithm): + raise ValueError('underlying_scenario must be a ScenarioAlgorithm') + + if sum([overflier_prob, climber_prob, descender_prob]) != 1: + raise ValueError('aircraft phase probabilities must sum to 1') + + if sum([climber_cfl_range, climber_minimum_climb]) > 1: + raise ValueError('climber initial interval plus minimum climb must be at most 1') + + if sum([descender_cfl_range, descender_minimum_descent]) > 1: + raise ValueError('descender initial interval plus minimum descent must be at most 1') + + if np.sum(start_position_distribution) != 1: + raise ValueError('invalid initial position distribution') + + # Pass the keyword args (including the random seed) to the superclass constructor. + super().__init__( + sector_element=underlying_scenario.sector_element, + aircraft_types=underlying_scenario.aircraft_types, + flight_levels=underlying_scenario.flight_levels, + callsign_prefixes=underlying_scenario.callsign_prefixes, + seed=seed + ) + + self.underlying_scenario = underlying_scenario + self.overflier_prob = overflier_prob + self.climber_prob = climber_prob + self.descender_prob = descender_prob + + self.climber_cfl_range = climber_cfl_range + self.climber_minimum_climb = climber_minimum_climb + self.descender_cfl_range = descender_cfl_range + self.descender_minimum_descent = descender_minimum_descent + self.overflier_cfl_range = overflier_cfl_range + + self.initial_position_distribution = start_position_distribution + self.discrete_initial_positions = discrete_start_positions + + # Choose the route for the incremental aircraft. + self.set_seed() + self.route = self.choose_route() + + # Overriding abstract method + def aircraft_generator(self) -> dict: + """Generates a sequence of aircraft constituting a scenario.""" + + for aircraft in self.underlying_scenario.aircraft_generator(): + yield aircraft + + yield self.aircraft() + + def aircraft(self): + """Returns the additional aircraft in this scenario, beyond those in the underlying scenario.""" + + self.set_seed() + aircraft_type = self.choose_aircraft_type() + self.set_seed() + phase = self.choose_flight_phase() + self.set_seed() + current_fl, requested_fl = self.choose_flight_levels(phase) + cleared_fl = current_fl + self.set_seed() + start_position = self.choose_start_position() + self.set_seed() + departure = self.choose_departure_airport(self.route) + self.set_seed() + destination = self.choose_destination_airport(self.route) + + # truncate the route i.e. remove the starting position fix + # note coords of start_position are in lon/lat order + route_copy = self.route.copy() + route_copy.truncate(initial_lat=start_position.y, initial_lon=start_position.x) + + return { + sg.AIRCRAFT_TIMEDELTA_KEY: 0, + sg.START_POSITION_KEY: start_position, + sg.CALLSIGN_KEY: next(self.callsign_generator()), + sg.AIRCRAFT_TYPE_KEY: aircraft_type, + sg.DEPARTURE_KEY: departure, + sg.DESTINATION_KEY: destination, + sg.CURRENT_FLIGHT_LEVEL_KEY: current_fl, + sg.CLEARED_FLIGHT_LEVEL_KEY: cleared_fl, + sg.REQUESTED_FLIGHT_LEVEL_KEY: requested_fl, + sg.ROUTE_KEY: route_copy.serialize(), + } + + def choose_flight_phase(self): + """Picks a flight phase (overflier, climber or descender).""" + + u = random.uniform(0, 1) + if u < self.overflier_prob: + return FlightPhase.overflier + if u - self.overflier_prob < self.climber_prob: + return FlightPhase.climber + return FlightPhase.descender + + def choose_flight_levels(self, flight_phase): + """Picks a current flight level, given a flight phase""" + + if flight_phase == FlightPhase.overflier: + cfl = self.choose_flight_level(exclude_lowest=1 - self.overflier_cfl_range) + return cfl, cfl + if flight_phase == FlightPhase.climber: + cfl = self.choose_flight_level(exclude_highest=1 - self.climber_cfl_range) + rfl = self.choose_flight_level(exclude_lowest=self.climber_cfl_range + self.climber_minimum_climb) + return cfl, rfl + if flight_phase == FlightPhase.descender: + cfl = self.choose_flight_level(exclude_lowest=1 - self.descender_cfl_range) + rfl = self.choose_flight_level(exclude_highest=self.descender_cfl_range + self.descender_minimum_descent) + return cfl, rfl + raise ValueError("flight_phase enum value expected") + + def choose_start_position(self): + """Picks an aircraft starting position""" + + segment_startpoint, segment_endpoint = self.choose_route_segment() + + # If initial positions are discrete, return the start of the chosen route segment. + if self.discrete_initial_positions: + return segment_startpoint + + # If initial positions are continuous, return a uniform random sample along the chosen route segment. + offset = np.random.uniform(low=0.0, high=self.segment_length(), size = 1) + return GeoHelper.waypoint_location(lat1=segment_startpoint.y, lon1=segment_startpoint.x, + lat2=segment_endpoint.y, lon2=segment_endpoint.x, distance_m=offset) + + def choose_route_segment(self): + """Picks a route segment for the aircraft start position""" + + # Choose the route segment index, and compute the distance from the segment + # start point to the following route fix. + pre_fix_i, windback_distance = self._pre_fix_index() + + # Compute the start and end points of the chosen segment. + fixes = self.route.fix_points() + pre_fix = fixes[pre_fix_i] + post_fix = fixes[pre_fix_i + 1] + segment_startpoint = GeoHelper.waypoint_location(lat1=post_fix.y, lon1=post_fix.x, + lat2=pre_fix.y, lon2=pre_fix.x, + distance_m=windback_distance) + segment_endpoint = GeoHelper.waypoint_location(lat1=segment_startpoint.y, lon1=segment_startpoint.x, + lat2=post_fix.y, lon2=post_fix.x, + distance_m=self.segment_length()) + + return segment_startpoint, segment_endpoint + + def _pre_fix_index(self): + """ + Returns a tuple containing: + - the index of the "pre-fix", the fix preceding the start point of the chosen route segment + - the "windback distance" from the fix *after* the pre-fix to the start of the chosen route segment + """ + + # Pick a segment index, and compute the length of each route segment in metres. + self.set_seed() + segment_index = self.choose_segment_index() + segment_length = self.segment_length() + + # Identify the "post-fix" index, i.e. that of the first fix *after* the start of the route segment. + fixes = self.route.fix_points() + post_fix_index = 0 + post_fix_distance = 0 + while True: + # If the distance to the post_fix is at least as big as the distance to the route segment start, then break. + if post_fix_distance > segment_index * segment_length: + break + post_fix_index += 1 + lon1, lat1 = fixes[post_fix_index - 1].x, fixes[post_fix_index - 1].y + lon2, lat2 = fixes[post_fix_index].x, fixes[post_fix_index].y + post_fix_distance += GeoHelper.distance(lat1=lat1, lon1=lon1, lat2=lat2, lon2=lon2) + + windback_distance = post_fix_distance - (segment_index * segment_length) + + return post_fix_index - 1, windback_distance + + def choose_segment_index(self): + """Picks a route segment for the aircraft start position according to the initial_position_distribution.""" + + segment_indices = range(0, len(self.initial_position_distribution)) + return np.random.choice(a=segment_indices, size=1, p=self.initial_position_distribution)[0] + + def segment_length(self): + """Computes the physical length of each route segment in metres.""" + + span = self.route.span() + return span/len(self.initial_position_distribution) + + diff --git a/aviary/scenario/overflier_climber_extended_scenario.py b/aviary/scenario/overflier_climber_extended_scenario.py index 49b6f1e..bb8cf74 100644 --- a/aviary/scenario/overflier_climber_extended_scenario.py +++ b/aviary/scenario/overflier_climber_extended_scenario.py @@ -103,9 +103,9 @@ def extend_aircraft(self, aircraft, is_overflier): # Note: here it is assumed that the aircraft travels on a straight line # to the sector centre (as is the case in an I, X or Y sector element). - lon, lat = GeoHelper.waypoint_location(lat1 = lat1, lon1 = lon1, lat2 = lat2, lon2 = lon2, distance_m = -1 * windback_distance) + start_point = GeoHelper.waypoint_location(lat1 = lat1, lon1 = lon1, lat2 = lat2, lon2 = lon2, distance_m = -1 * windback_distance) - aircraft[sg.START_POSITION_KEY] = (lon, lat) # Order is (lon, lat). + aircraft[sg.START_POSITION_KEY] = (start_point.x, start_point.y) # Order is (lon, lat). # If the aircraft is the climber, set the requested flight level to 'high' if not is_overflier: diff --git a/aviary/scenario/overflier_climber_scenario.py b/aviary/scenario/overflier_climber_scenario.py index c45863b..74eb901 100644 --- a/aviary/scenario/overflier_climber_scenario.py +++ b/aviary/scenario/overflier_climber_scenario.py @@ -44,9 +44,9 @@ def aircraft_generator(self) -> dict: """Generates a sequence of two aircraft whose default trajectories intersect at the centre of the sector.""" # Construct the overflier's route. - overflier_route = self.route() - overflier_departure = self.departure_airport(overflier_route) - overflier_destination = self.destination_airport(overflier_route) + overflier_route = self.choose_route() + overflier_departure = self.choose_departure_airport(overflier_route) + overflier_destination = self.choose_destination_airport(overflier_route) # Construct the climber's route, which is the reverse of the overflier's. climber_route = overflier_route.copy() @@ -58,13 +58,13 @@ def aircraft_generator(self) -> dict: climber_requested_flight_level = self.climber_requested_flight_level(overflier_flight_level) # Compute the time taken for the climber to reach the overflier's flight level - climber_aircraft_type = self.aircraft_type() + climber_aircraft_type = self.choose_aircraft_type() climb_time_to_conflict_level = self.trajectory_predictor.climb_time_between_levels(lower_level=climber_current_flight_level, upper_level=overflier_flight_level, aircraft_type=climber_aircraft_type) # Compute the distance travelled by the overflier during the climber's climb. - overflier_aircraft_type = self.aircraft_type() + overflier_aircraft_type = self.choose_aircraft_type() overflier_true_airspeed = self.trajectory_predictor.cruise_speed(overflier_flight_level, overflier_aircraft_type) # Get the horizontal distances travelled prior to the conflict. @@ -81,15 +81,15 @@ def aircraft_generator(self) -> dict: # Compute the initial position of the overflier. # Note: this assumes that the route is a straight line from the initial fix to the central fix (conflict point). - o_initial_lon, o_initial_lat = GeoHelper.waypoint_location(lat1, lon1, o_lat2, o_lon2, overflier_horizontal_distance) + o_initial = GeoHelper.waypoint_location(lat1, lon1, o_lat2, o_lon2, overflier_horizontal_distance) # Truncate the route in light of the modified starting position. - overflier_route.truncate(initial_lat = o_initial_lat, initial_lon = o_initial_lon) + overflier_route.truncate(initial_lat = o_initial.y, initial_lon = o_initial.x) # Construct the overflier. yield { sg.AIRCRAFT_TIMEDELTA_KEY: 0, - sg.START_POSITION_KEY: (o_initial_lon, o_initial_lat), # Order is (lon, lat). + sg.START_POSITION_KEY: (o_initial.x, o_initial.y), # Order is (lon, lat). sg.CALLSIGN_KEY: next(self.callsign_generator()), sg.AIRCRAFT_TYPE_KEY: overflier_aircraft_type, sg.DEPARTURE_KEY: overflier_departure, @@ -108,14 +108,14 @@ def aircraft_generator(self) -> dict: # Compute the initial position of the climber. # Note: this assumes that the route is a straight line from the initial fix to the central fix (conflict point). - c_initial_lon, c_initial_lat = GeoHelper.waypoint_location(lat1, lon1, c_lat2, c_lon2, climber_horizontal_distance) + c_initial = GeoHelper.waypoint_location(lat1, lon1, c_lat2, c_lon2, climber_horizontal_distance) # Truncate the route in light of the modified starting position. - climber_route.truncate(initial_lat = c_initial_lat, initial_lon = c_initial_lon) + climber_route.truncate(initial_lat = c_initial.y, initial_lon = c_initial.x) yield { sg.AIRCRAFT_TIMEDELTA_KEY: 0, - sg.START_POSITION_KEY: (c_initial_lon, c_initial_lat), # Order is (lon, lat) + sg.START_POSITION_KEY: (c_initial.x, c_initial.y), # Order is (lon, lat) sg.CALLSIGN_KEY: next(self.callsign_generator()), sg.AIRCRAFT_TYPE_KEY: climber_aircraft_type, sg.DEPARTURE_KEY: overflier_destination, # Reversed overflier departure/destination. diff --git a/aviary/scenario/poisson_scenario.py b/aviary/scenario/poisson_scenario.py index 7017cc5..a3d1486 100644 --- a/aviary/scenario/poisson_scenario.py +++ b/aviary/scenario/poisson_scenario.py @@ -25,11 +25,11 @@ def aircraft_generator(self) -> dict: """Generates a sequence of aircraft constituting a scenario.""" while True: - current_flight_level = int(self.flight_level()) - route = self.route() + current_flight_level = int(self.choose_flight_level()) + route = self.choose_route() start_position = route.fix_points()[0].coords[0] - departure = self.departure_airport(route) - destination = self.destination_airport(route) + departure = self.choose_departure_airport(route) + destination = self.choose_destination_airport(route) # truncate the route i.e. remove the starting position fix # note coords of start_position are in lon/lat order route.truncate(initial_lat=start_position[1], initial_lon=start_position[0]) @@ -37,11 +37,11 @@ def aircraft_generator(self) -> dict: sg.AIRCRAFT_TIMEDELTA_KEY: random.expovariate(lambd=self.arrival_rate), sg.START_POSITION_KEY: start_position, sg.CALLSIGN_KEY: next(self.callsign_generator()), - sg.AIRCRAFT_TYPE_KEY: self.aircraft_type(), + sg.AIRCRAFT_TYPE_KEY: self.choose_aircraft_type(), sg.DEPARTURE_KEY: departure, sg.DESTINATION_KEY: destination, sg.CURRENT_FLIGHT_LEVEL_KEY: current_flight_level, sg.CLEARED_FLIGHT_LEVEL_KEY: current_flight_level, - sg.REQUESTED_FLIGHT_LEVEL_KEY: int(self.flight_level()), + sg.REQUESTED_FLIGHT_LEVEL_KEY: int(self.choose_flight_level()), sg.ROUTE_KEY: route.serialize(), } diff --git a/aviary/scenario/scenario_algorithm.py b/aviary/scenario/scenario_algorithm.py index 9ebdb9f..202396f 100644 --- a/aviary/scenario/scenario_algorithm.py +++ b/aviary/scenario/scenario_algorithm.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod import random - +import numpy as np class ScenarioAlgorithm(ABC): """A scenario generation algorithm""" @@ -22,10 +22,13 @@ def __init__( self, sector_element, aircraft_types=None, flight_levels=None, callsign_prefixes=None, seed=None ): + # If seed is None, use the system time. + if seed is None: + import time + seed = int(time.time() * 256) % (2**32 - 1) # use fractional seconds + self.seed = seed - # TODO: make set_seed non-static and use the instance variable instead of an argument. - # TODO: Also make ScenarioGenerator independent of the seed, except via a set_seed method that sets the seed in the algorithm. - ScenarioAlgorithm.set_seed(seed) + self.set_seed() self.sector_element = sector_element self.seen_callsigns = set() @@ -88,9 +91,13 @@ def callsign_prefixes(self, callsign_prefixes): def aircraft_generator(self) -> dict: pass - @staticmethod - def set_seed(seed): - random.seed(seed) + #@staticmethod + def set_seed(self): + """ + Seeds both the Python random and Numpy random modules' random number generators. + """ + random.seed(self.seed) + np.random.seed(self.seed) def reset_seen_callsigns(self): """ @@ -98,44 +105,60 @@ def reset_seen_callsigns(self): After resetting, duplicate callsigns (with the set generated before the reset) may occur.""" self.seen_callsigns = set() - def route(self): + def choose_route(self): """Returns a random route""" # Note: use the sector routes() method, *not* the shape routes(). return random.choice(self.sector_element.routes()) - def flight_level(self): + def choose_flight_level(self, exclude_lowest = 0, exclude_highest = 0): """Returns a random flight level""" - return random.choice(self.flight_levels) + if exclude_lowest < 0 or exclude_highest < 0: + raise ValueError('Excluded flight level intervals must be positive') + + if exclude_lowest + exclude_highest >= 1: + raise ValueError('Excluded flight level range must be less than one') - def aircraft_type(self): + levels = self.flight_levels + level_range = max(levels) - min(levels) + if exclude_lowest > 0: + levels = list(filter(lambda l : (l > min(self.flight_levels) + exclude_lowest * level_range), levels)) + + if exclude_highest > 0: + levels = list(filter(lambda l : (l < max(self.flight_levels) - exclude_highest * level_range), levels)) + + if len(levels) == 0: + raise ValueError('All flight levels are excluded') + + return random.choice(levels) + + def choose_aircraft_type(self): """Returns a random aircraft type""" return random.choice(self.aircraft_types) - def callsign_generator(self): + def callsign_generator(self, k=3): """Generates a random sequence of unique callsigns""" - k = 3 + prefix = random.choice(self.callsign_prefixes) + suffix = "".join([str(x) for x in random.sample(range(0, 10), k=k)]) while True: - suffix = "".join([str(x) for x in random.sample(range(0, 10), k=k)]) - prefix = random.choice(self.callsign_prefixes) ret = prefix + suffix - if ret in self.seen_callsigns: - k = k + 1 + prefix = random.choice(self.callsign_prefixes) + suffix = "".join([str(x) for x in random.sample(range(0, 10), k=k)]) else: self.seen_callsigns.add(ret) yield ret - def departure_airport(self, route): + def choose_departure_airport(self, route): """Returns a suitable departure airport for the given route""" # TODO: currently a dummy implementation return "DEP" - def destination_airport(self, route): + def choose_destination_airport(self, route): """Returns a suitable destination airport for the given route""" # TODO: currently a dummy implementation diff --git a/aviary/scenario/scenario_generator.py b/aviary/scenario/scenario_generator.py index fdf29bf..7987a5e 100644 --- a/aviary/scenario/scenario_generator.py +++ b/aviary/scenario/scenario_generator.py @@ -57,10 +57,10 @@ def __init__(self, scenario_algorithm, start_time = None): # TODO: add an argument to specify which routes to be included in the scenario. - def generate_scenario(self, duration, seed = None) -> dict: + def generate_scenario(self, duration) -> dict: """Generates a list of aircraft creation data constituting a scenario.""" - self.scenario_algorithm.set_seed(seed) + self.scenario_algorithm.set_seed() self.scenario_algorithm.reset_seen_callsigns() # Format the scenario start time. diff --git a/aviary/scripts/cartesian.py b/aviary/scripts/cartesian.py index 7a8adea..ec5152d 100644 --- a/aviary/scripts/cartesian.py +++ b/aviary/scripts/cartesian.py @@ -112,7 +112,7 @@ def main(argv=None): scenario_generator = ScenarioGenerator(scenario_algorithm = scenario_algorithm) try: - scenario = scenario_generator.generate_scenario(duration = 1, seed = args.seed) + scenario = scenario_generator.generate_scenario(duration = 1) except Exception as ex: print('ERROR: Scenario generation attempt aborted due to error:') print(ex) diff --git a/aviary/scripts/overflier_climber.py b/aviary/scripts/overflier_climber.py index 15d9d01..20941fb 100644 --- a/aviary/scripts/overflier_climber.py +++ b/aviary/scripts/overflier_climber.py @@ -159,7 +159,7 @@ def main(argv=None): scenario_generator = ScenarioGenerator(scenario_algorithm = scenario_algorithm) try: - scenario = scenario_generator.generate_scenario(duration = 1, seed = args.seed) + scenario = scenario_generator.generate_scenario(duration = 1) except Exception as ex: print('ERROR: Scenario generation attempt aborted due to error:') print(ex) diff --git a/aviary/sector/route.py b/aviary/sector/route.py index 5550dc0..8a4c2be 100644 --- a/aviary/sector/route.py +++ b/aviary/sector/route.py @@ -63,7 +63,7 @@ def fix_names(self): def fix_points(self, unprojected = False): - """Returns the coordinates of the fixes in the route""" + """Returns the coordinates of the fixes in the route as a list of Shapely Points""" # Project the coordinates unless the projection attribute is not None or unprojected is True. if unprojected or self.projection is None: @@ -72,6 +72,18 @@ def fix_points(self, unprojected = False): return [GeoHelper.__inv_project__(self.projection, geom=i[1]) for i in self.fix_list] + def span(self): + """Computes the total distance in metres spanned by the route""" + + p1 = None + distances = list() + for p2 in self.fix_points(unprojected=False): + if p1 is not None: + distances.append(GeoHelper.distance(lat1=p1.y, lon1=p1.x, lat2=p2.y, lon2=p2.x)) + p1 = p2 + return sum(distances) + + @property def __geo_interface__(self) -> dict: """ diff --git a/aviary/test/unit/scenario/test_incremental_ocd_scenario.py b/aviary/test/unit/scenario/test_incremental_ocd_scenario.py new file mode 100644 index 0000000..67df068 --- /dev/null +++ b/aviary/test/unit/scenario/test_incremental_ocd_scenario.py @@ -0,0 +1,373 @@ + +import pytest + +import numpy as np + +import aviary.scenario.empty_scenario as emps + +import aviary.scenario.incremental_ocd_scenario as incs +import aviary.scenario.overflier_climber_scenario as ocs +import aviary.scenario.scenario_generator as sg +import aviary.trajectory.lookup_trajectory_predictor as tp +import aviary.sector.route as sr + +from aviary.utils.geo_helper import GeoHelper + +@pytest.fixture(scope="function") +def underlying(i_element, cruise_speed_dataframe, climb_time_dataframe, downtrack_distance_dataframe): + """Test fixture: an overflier-climber scenario to act as the underlying scenario.""" + + trajectory_predictor = tp.LookupTrajectoryPredictor(cruise_speed_lookup = cruise_speed_dataframe, + climb_time_lookup = climb_time_dataframe, + downtrack_distance_lookup = downtrack_distance_dataframe) + + return ocs.OverflierClimberScenario(sector_element = i_element, + trajectory_predictor = trajectory_predictor, + aircraft_types = ['B744', 'B743'], + callsign_prefixes = ["SPEEDBIRD", "VJ", "DELTA", "EZY"], + flight_levels = [200, 400], + seed = 223) + +def test_aircraft_generator_from_empty(i_element): + + # Start with an empty scenario. + target = emps.EmptyScenario(sector_element=i_element) + + for i in range(10): + + ctr = 0 + for x in target.aircraft_generator(): + ctr = ctr + 1 + + assert ctr == i + + # Incrementally add one aircraft at a time. + target = incs.IncrementalOcdScenario( + underlying_scenario=target, + seed=i + ) + + +def test_choose_route_segment(i_element, underlying): + + # Test with five route segments. + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([1, 0, 0, 0, 0]), + ) + + route = target.route + fixes = route.fix_points() + + # Test the _pre_fix_index method at the same time. + assert target._pre_fix_index()[0] == 0 + + result = target.choose_route_segment() + + # The first Point in the result should coincide with the fix with index 0. + assert result[0].x == fixes[0].x + assert result[0].y == pytest.approx(fixes[0].y, 1e-10) + + # The second Point in the result should be one fifth of the distance along the straight-line route. + expected_distance = (1/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + assert result[1].x == expected_point.x + assert result[1].y == pytest.approx(expected_point.y, 1e-10) + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([0, 1, 0, 0, 0]), + ) + + # Test the _pre_fix_index method at the same time. + assert target._pre_fix_index()[0] == 1 + + result = target.choose_route_segment() + + # The first Point in the result should be one fifth of the distance along the straight-line route. + expected_distance = (1/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + assert result[0].x == expected_point.x + assert result[0].y == expected_point.y + + # The second Point in the result should be one two fifths of the distance along the straight-line route. + expected_distance = (2/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + assert result[1].x == expected_point.x + assert result[1].y == pytest.approx(expected_point.y, 1e-10) + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([0, 0, 0, 0, 1]), + ) + + # Test the _pre_fix_index method at the same time. + assert target._pre_fix_index()[0] == 2 # Note: this means the central fix is the one before the last fifth segment. + + result = target.choose_route_segment() + + # The first Point in the result should be four fifths of the distance along the straight-line route. + expected_distance = (4/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + + assert result[0].x == expected_point.x + assert result[0].y == pytest.approx(expected_point.y, 1e-10) + + # The second Point in the result should coincide with the fix with index 4 (the end of the route). + assert result[1].x == fixes[4].x + assert result[1].y == fixes[4].y + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([0, 1/2, 0, 1/2, 0]), + ) + + # Test the _pre_fix_index method at the same time. + assert target._pre_fix_index()[0] == 1 # With this seed, the earlier segment is selected + + result = target.choose_route_segment() + + # The first Point in the result should be one fifth of the distance along the straight-line route. + expected_distance = (1/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + assert result[0].x == expected_point.x + assert result[0].y == expected_point.y + + # The second Point in the result should be one two fifths of the distance along the straight-line route. + expected_distance = (2/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + assert result[1].x == expected_point.x + assert result[1].y == pytest.approx(expected_point.y, 1e-10) + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 28, + start_position_distribution = np.array([0, 1/2, 0, 1/2, 0]), + ) + + # Test the _pre_fix_index method at the same time. + # With the different seed, the route is the same but the later segment is selected: + assert target.route.fix_names() == route.fix_names() + assert target._pre_fix_index()[0] == 2 + + result = target.choose_route_segment() + + # The first Point in the result should be three fifths of the distance along the straight-line route. + expected_distance = (3/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + + assert result[0].x == expected_point.x + assert result[0].y == pytest.approx(expected_point.y, 1e-10) + + # The second Point in the result should be four fifths of the distance along the straight-line route. + expected_distance = (4/5) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + + assert result[1].x == expected_point.x + assert result[1].y == pytest.approx(expected_point.y, 1e-10) + + +def test_choose_start_position(underlying): + + # + # Test with discrete start positions. + # + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([0, 1]), + discrete_start_positions=True + ) + + route = target.route + fixes = route.fix_points() + + result = target.choose_start_position() + + # The start position should be half way along the straight-line route. + expected_distance = (1/2) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + + assert result.x == expected_point.x + assert result.y == pytest.approx(expected_point.y, 1e-10) + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([1, 0]), + discrete_start_positions=True + ) + + result = target.choose_start_position() + + # The start position should coincide with the fix with index 0. + assert result.x == fixes[0].x + assert result.y == pytest.approx(fixes[0].y, 1e-10) + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([0, 0, 0, 0, 1, 0, 0]), + discrete_start_positions=True + ) + + result = target.choose_start_position() + + # The start position should be four sevenths of the way along the straight-line route. + expected_distance = (4/7) * GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + + assert result.x == expected_point.x + assert result.y == pytest.approx(expected_point.y, 1e-10) + + # + # Test with continuous start positions. + # + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + start_position_distribution = np.array([0, 0, 0, 0, 1, 0, 0]), + discrete_start_positions=False + ) + + result = target.choose_start_position() + + # The start position should be on the straight-line route. + assert result.x == fixes[0].x + assert result.y >= fixes[0].y + assert result.x == fixes[4].x + assert result.y <= fixes[4].y + + # The start position should be between four and five sevenths of the way along the straight-line route. + route_span = GeoHelper.distance(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x) + expected_distance = (4.5/7) * route_span + expected_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=expected_distance) + + # The margin of uncertainty is the length on one route segment: one seventh of the span of the route. + margin = route_span/7 + + assert GeoHelper.distance(lat1=expected_point.y, lon1=expected_point.x, + lat2=result.y, lon2=result.x) < margin/2 + + lower_bound_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=(4/7) * route_span) + upper_bound_point = GeoHelper.waypoint_location(lat1=fixes[0].y, lon1=fixes[0].x, + lat2=fixes[4].y, lon2=fixes[4].x, + distance_m=(5/7) * route_span) + + assert GeoHelper.distance(lat1=lower_bound_point.y, lon1=lower_bound_point.x, + lat2=result.y, lon2=result.x) < margin + assert GeoHelper.distance(lat1=upper_bound_point.y, lon1=upper_bound_point.x, + lat2=result.y, lon2=result.x) < margin + + +def test_aircraft_generator(underlying): + + target = incs.IncrementalOcdScenario( + underlying_scenario=underlying, + seed = 22, + ) + + # Test across multiple generated scenarios. + ctr = 0 + for x in target.aircraft_generator(): + + assert isinstance(x, dict) + assert sorted(x.keys()) == [sg.CALLSIGN_KEY, sg.CLEARED_FLIGHT_LEVEL_KEY, sg.CURRENT_FLIGHT_LEVEL_KEY, + sg.DEPARTURE_KEY, sg.DESTINATION_KEY, sg.REQUESTED_FLIGHT_LEVEL_KEY, sg.ROUTE_KEY, + sg.START_POSITION_KEY, sg.AIRCRAFT_TIMEDELTA_KEY, sg.AIRCRAFT_TYPE_KEY] + + ctr = ctr + 1 + + # Check that the scenario contains precisely three aircraft. + assert ctr == 3 + + +def test_incremental_aircraft_generator(underlying): + + target = underlying + + count_overfliers = 0 + count_climbers = 0 + count_descenders = 0 + + N = 30 + for i in range(N): + + ctr = 0 + for x in target.aircraft_generator(): + + assert isinstance(x, dict) + assert sorted(x.keys()) == [sg.CALLSIGN_KEY, sg.CLEARED_FLIGHT_LEVEL_KEY, sg.CURRENT_FLIGHT_LEVEL_KEY, + sg.DEPARTURE_KEY, sg.DESTINATION_KEY, sg.REQUESTED_FLIGHT_LEVEL_KEY, sg.ROUTE_KEY, + sg.START_POSITION_KEY, sg.AIRCRAFT_TIMEDELTA_KEY, sg.AIRCRAFT_TYPE_KEY] + + # On the last pass through the loop, keep track of the number of aircraft of each phase. + if i == N - 1: + if x[sg.CURRENT_FLIGHT_LEVEL_KEY] == x[sg.REQUESTED_FLIGHT_LEVEL_KEY]: + count_overfliers += 1 + if x[sg.CURRENT_FLIGHT_LEVEL_KEY] < x[sg.REQUESTED_FLIGHT_LEVEL_KEY]: + count_climbers += 1 + if x[sg.CURRENT_FLIGHT_LEVEL_KEY] > x[sg.REQUESTED_FLIGHT_LEVEL_KEY]: + count_descenders += 1 + + ctr += 1 + + # Two aircraft in the underlying scenario, plus i (i.e. the number of incremental additions) + assert ctr == i + 2 + + # Incrementally add one aircraft at a time. + target = incs.IncrementalOcdScenario( + underlying_scenario=target, + seed=i, + overflier_prob=1/3, + climber_prob=1/3, + descender_prob=1/3, + ) + + # The target is now a scenario with N + 1 aircraft. + assert count_overfliers + count_climbers + count_descenders == (N - 1) + 2 + + assert count_overfliers == pytest.approx(N/3, rel=0.1) + assert count_climbers == pytest.approx(N/3, rel=0.1) + assert count_descenders == pytest.approx(N/3, rel=0.1) diff --git a/aviary/test/unit/scenario/test_overflier_climber_scenario.py b/aviary/test/unit/scenario/test_overflier_climber_scenario.py index 7301e52..3ef03cf 100644 --- a/aviary/test/unit/scenario/test_overflier_climber_scenario.py +++ b/aviary/test/unit/scenario/test_overflier_climber_scenario.py @@ -33,9 +33,9 @@ def test_callsign_generator(target): ctr = 0 for x in target.callsign_generator(): if ctr == 0: - assert x == 'VJ597' + assert x == 'DELTA573' if ctr == 1: - assert x == 'SPEEDBIRD681' + assert x == 'EZY891' if ctr > 1: break ctr = ctr + 1 diff --git a/aviary/test/unit/scenario/test_poisson_scenario.py b/aviary/test/unit/scenario/test_poisson_scenario.py index 2ac66be..4415699 100644 --- a/aviary/test/unit/scenario/test_poisson_scenario.py +++ b/aviary/test/unit/scenario/test_poisson_scenario.py @@ -29,9 +29,9 @@ def test_callsign_generator(target): ctr = 0 for x in target.callsign_generator(): if ctr == 0: - assert x == "EZY230" + assert x == "VJ307" if ctr == 1: - assert x == "SPEEDBIRD215" + assert x == "VJ159" if ctr > 1: break ctr = ctr + 1 @@ -42,6 +42,7 @@ def test_aircraft_generator(target): ctr = 0 N = 200 interarrival_times = [] + for x in target.aircraft_generator(): assert isinstance(x, dict) @@ -68,4 +69,4 @@ def test_aircraft_generator(target): # Check the mean interarrival time against the arrival rate parameter (with a 5% tolerance). mean = sum(interarrival_times) / N - assert mean == pytest.approx(1 / target.arrival_rate, rel=0.1) + assert mean == pytest.approx(1 / target.arrival_rate, rel=0.14) diff --git a/aviary/test/unit/scenario/test_scenario_algorithm.py b/aviary/test/unit/scenario/test_scenario_algorithm.py index 84ec4c0..295f046 100644 --- a/aviary/test/unit/scenario/test_scenario_algorithm.py +++ b/aviary/test/unit/scenario/test_scenario_algorithm.py @@ -78,9 +78,9 @@ def test_callsign_generator(target): ctr = 0 for x in target.callsign_generator(): if ctr == 0: - assert x == "EZY230" + assert x == "VJ307" if ctr == 1: - assert x == "SPEEDBIRD215" + assert x == "VJ159" if ctr > 1: break ctr = ctr + 1 @@ -88,34 +88,49 @@ def test_callsign_generator(target): def test_aircraft_type(target): - result = target.aircraft_type() + result = target.choose_aircraft_type() assert result == "B747" - result = target.aircraft_type() + result = target.choose_aircraft_type() assert result == "B747" - result = target.aircraft_type() + result = target.choose_aircraft_type() assert result == "B747" - result = target.aircraft_type() + result = target.choose_aircraft_type() assert result == "B777" def test_flight_level(target): - result = target.flight_level() + result = target.choose_flight_level() assert result == 240 - result = target.flight_level() + result = target.choose_flight_level() assert result == 240 - result = target.flight_level() + result = target.choose_flight_level() assert result == 200 + # flight_levels: [200, 240, 280, 320, 360, 400] + with pytest.raises(ValueError): + target.choose_flight_level(exclude_lowest=0.4, exclude_highest=0.6) + + # Exclude all except the 280 level + result = target.choose_flight_level(exclude_lowest=0.39, exclude_highest=0.59) + assert result == 280 + + # Exclude all except the 280 level + result = target.choose_flight_level(exclude_lowest=0.2, exclude_highest=0.4) + assert result == 280 + + # Exclude up to the 320 level + result = target.choose_flight_level(exclude_lowest=0.5) + assert result == 400 def test_route(target): - result = target.route() + result = target.choose_route() assert isinstance(result, Route) assert result.fix_names()[0] == "E" @@ -124,7 +139,7 @@ def test_route(target): assert result.fix_names()[3] == "B" assert result.fix_names()[4] == "A" - result = target.route() + result = target.choose_route() assert isinstance(result, Route) assert result.fix_names()[0] == "E" @@ -133,7 +148,7 @@ def test_route(target): assert result.fix_names()[3] == "B" assert result.fix_names()[4] == "A" - result = target.route() + result = target.choose_route() assert isinstance(result, Route) assert result.fix_names()[0] == "E" @@ -142,7 +157,7 @@ def test_route(target): assert result.fix_names()[3] == "B" assert result.fix_names()[4] == "A" - result = target.route() + result = target.choose_route() assert isinstance(result, Route) assert result.fix_names()[0] == "A" diff --git a/aviary/test/unit/scenario/test_scenario_generator.py b/aviary/test/unit/scenario/test_scenario_generator.py index e038813..ddec61a 100644 --- a/aviary/test/unit/scenario/test_scenario_generator.py +++ b/aviary/test/unit/scenario/test_scenario_generator.py @@ -27,7 +27,7 @@ def test_generate_scenario(target_sector): seed = 22) scen_gen = sg.ScenarioGenerator(target_scenario) - scenario = scen_gen.generate_scenario(duration=duration, seed=seed) + scenario = scen_gen.generate_scenario(duration=duration) assert sg.START_TIME_KEY in scenario.keys() assert sg.AIRCRAFT_KEY in scenario.keys() @@ -60,11 +60,12 @@ def test_generate_scenario(target_sector): # Check that scenario's generated with the same random seed are identical. for i in range(1): - scenario2 = scen_gen.generate_scenario(duration=duration, seed=seed) + scenario2 = scen_gen.generate_scenario(duration=duration) assert scenario == scenario2 def test_generate_scenario_with_start_time(target_sector): + seed = 83 duration = 1000 scenario_start_time = datetime.strptime("12:05:42", "%H:%M:%S") @@ -75,17 +76,17 @@ def test_generate_scenario_with_start_time(target_sector): aircraft_types = ['B747', 'B777'], callsign_prefixes = ["SPEEDBIRD", "VJ", "DELTA", "EZY"], flight_levels = [200, 240, 280, 320, 360, 400], - seed = 22) + seed = seed) scen_gen = sg.ScenarioGenerator(target_scenario, scenario_start_time) - scenario = scen_gen.generate_scenario(duration=duration, seed=seed) + scenario = scen_gen.generate_scenario(duration=duration) total_time = 0 for aircraft in scenario[sg.AIRCRAFT_KEY]: assert sg.AIRCRAFT_TIMEDELTA_KEY in aircraft.keys() assert ( datetime.strptime(aircraft[sg.START_TIME_KEY], "%H:%M:%S") - > scenario_start_time + >= scenario_start_time ) inferred_aircraft_timedelta = ( datetime.strptime(aircraft[sg.START_TIME_KEY], "%H:%M:%S") @@ -110,7 +111,7 @@ def test_write_json_scenario(target_sector): seed = 22) scen_gen = sg.ScenarioGenerator(target_scenario) - scenario = scen_gen.generate_scenario(duration=duration, seed=seed) + scenario = scen_gen.generate_scenario(duration=duration) filename = "test_scenario" here = os.path.abspath(os.path.dirname(__file__)) diff --git a/aviary/test/unit/sector/test_route.py b/aviary/test/unit/sector/test_route.py index 07542fa..1166176 100644 --- a/aviary/test/unit/sector/test_route.py +++ b/aviary/test/unit/sector/test_route.py @@ -7,6 +7,22 @@ import aviary.sector.route as sr import aviary.sector.sector_element as se import aviary.utils.geo_helper as gh +import shapely.geometry as geom + +def test_fix_points(i_element): + + route_index = 1 + + target = i_element.routes()[route_index] + result = target.fix_points() + + assert isinstance(result, list) + for p in result: + assert isinstance(p, geom.Point) + assert isinstance(p.coords[0], tuple) + assert isinstance(p.x, float) # longitude + assert isinstance(p.y, float) # latitude + def test_reverse(i_element): @@ -34,6 +50,16 @@ def test_reverse(i_element): assert target.fix_points()[3] == result.fix_points()[1] assert target.fix_points()[4] == result.fix_points()[0] +def test_span(i_element): + + route_index = 1 + target = i_element.routes()[route_index] + + result = target.span() + + # Route spans ~130km distance. + assert result == pytest.approx(129638.88301925106, 1e-10) + def test_geojson(i_element): route_index = 1 @@ -170,4 +196,3 @@ def test_truncate(i_element): target.truncate(initial_lat = latE - 1, initial_lon = lonA) assert not target.fix_list - diff --git a/aviary/test/unit/utils/test_geo_helper.py b/aviary/test/unit/utils/test_geo_helper.py index 4675298..8076854 100644 --- a/aviary/test/unit/utils/test_geo_helper.py +++ b/aviary/test/unit/utils/test_geo_helper.py @@ -22,17 +22,17 @@ def test_waypoint_location(): lat2, lon2 = 50.6083, -1.9608 result = GeoHelper.waypoint_location(lat1, lon1, lat2, lon2, distance_m = 0) - assert result == pytest.approx((lon1, lat1)) + assert result.x, result.y == pytest.approx((lon1, lat1)) result = GeoHelper.waypoint_location(lat1, lon1, lat2, lon2, distance_m = 162.5e3) - assert result == pytest.approx((lon2, lat2), 0.01) + assert result.x, result.y == pytest.approx((lon2, lat2), 0.01) result = GeoHelper.waypoint_location(lat1, lon1, lat2, lon2, distance_m = 81.25e3) - assert result == pytest.approx((-1.051272, 51.06276), 0.01) + assert result.x, result.y == pytest.approx((-1.051272, 51.06276), 0.01) # Check the behaviour in case the distance exceeds the separation between the fixes. result = GeoHelper.waypoint_location(lat1, lon1, lat2, lon2, distance_m = 250e3) - assert result == pytest.approx((-2.9125, 50.1154), 0.01) + assert result.x, result.y == pytest.approx((-2.9125, 50.1154), 0.01) def test_format_coordinates(i_sector_geojson): diff --git a/aviary/utils/geo_helper.py b/aviary/utils/geo_helper.py index 6e2f0b8..efa04b8 100644 --- a/aviary/utils/geo_helper.py +++ b/aviary/utils/geo_helper.py @@ -4,6 +4,7 @@ # author: Tim Hobson # email: thobson@turing.ac.uk +from shapely.geometry import Point from shapely.ops import transform from functools import partial @@ -67,14 +68,15 @@ def format_coordinates(geojson, key, float_precision, as_geojson = True): def waypoint_location(lat1, lon1, lat2, lon2, distance_m, geod = Geodesic.WGS84): """ Computes the location of waypoint at a given distance (in metres) from - point (lat1, lon1) in the direction of (lat2, lon2). + point (lat1, lon1) in the direction of (lat2, lon2). Returns a Shapely Point. Based on the example at https://geographiclib.sourceforge.io/html/python/examples.html#computing-waypoints """ l = geod.InverseLine(lat1, lon1, lat2, lon2) g = l.Position(distance_m, Geodesic.STANDARD) - return g['lon2'], g['lat2'] + + return Point(g['lon2'], g['lat2']) @staticmethod