Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extension of overflier-climber scenario for incremental addition of aircraft #41

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
6 changes: 3 additions & 3 deletions aviary/scenario/cartesian_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ def aircraft_generator(self) -> dict:

for flight_level in self.flight_levels:
for aircraft_type in self.aircraft_types:
route = self.route()
route = self.choose_route()
yield {
sg.AIRCRAFT_TIMEDELTA_KEY: 0,
sg.START_POSITION_KEY: route.fix_points()[0].coords[0],
sg.CALLSIGN_KEY: next(self.callsign_generator()),
sg.AIRCRAFT_TYPE_KEY: aircraft_type,
sg.DEPARTURE_KEY: self.departure_airport(route),
sg.DESTINATION_KEY: self.destination_airport(route),
sg.DEPARTURE_KEY: self.choose_departure_airport(route),
sg.DESTINATION_KEY: self.choose_destination_airport(route),
sg.CURRENT_FLIGHT_LEVEL_KEY: flight_level,
sg.CLEARED_FLIGHT_LEVEL_KEY: flight_level,
sg.REQUESTED_FLIGHT_LEVEL_KEY: flight_level,
Expand Down
26 changes: 26 additions & 0 deletions aviary/scenario/empty_scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Scenario generation algorithm with Poisson aircraft arrivals.
"""
# author: Tim Hobson
# email: [email protected]

import random

from aviary.scenario.scenario_algorithm import ScenarioAlgorithm

import aviary.scenario.scenario_generator as sg

class EmptyScenario(ScenarioAlgorithm):
"""An algorithm generating an empty scenario"""

def __init__(self, **kwargs):

# Pass the keyword args (including the random seed) to the superclass constructor.
super().__init__(**kwargs)

# Overriding abstract method
def aircraft_generator(self) -> dict:
"""Generates an empty sequence."""

return
yield
7 changes: 7 additions & 0 deletions aviary/scenario/flight_phase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

from enum import Enum

class FlightPhase(Enum):
overflier = 1
climber = 2
descender = 3
247 changes: 247 additions & 0 deletions aviary/scenario/incremental_ocd_scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
"""
Scenario generation algorithm with Poisson aircraft arrivals.
"""
# author: Tim Hobson
# email: [email protected]

import random
import numpy as np

import shapely.geometry as geom

from aviary.scenario.scenario_algorithm import ScenarioAlgorithm
from aviary.scenario.flight_phase import FlightPhase

import aviary.scenario.scenario_generator as sg

from aviary.utils.geo_helper import GeoHelper

class IncrementalOcdScenario(ScenarioAlgorithm):
"""An incremental overflier-climber-descender scenario generation algorithm"""

def __init__(self,
underlying_scenario,
seed,
overflier_prob = 1/3,
climber_prob = 1/3,
descender_prob = 1/3,
climber_cfl_range = 0.5,
climber_minimum_climb = 0.3,
descender_cfl_range=0.5,
descender_minimum_descent=0.3,
overflier_cfl_range=0.5,
start_position_distribution=np.array([1, 0]),
discrete_start_positions=False
):
"""
IncrementalOcdScenario class constructor.

:param underlying_scenario: The underlying scenario
:param seed: a random seed
:param overflier_prob: number in (0, 1]. The probability of generating an overflier aircraft.
:param climber_prob: number in (0, 1]. The probability of generating a climber aircraft.
:param descender_prob: number in (0, 1]. The probability of generating a descender aircraft.
:param climber_cfl_range (CCFLR): number in (0, 1]. The range of possible flight levels from which initial FL will be picked for climbers (e.g. 0.5 means the first half will be included)
:param climber_minimum_climb (CMC): number in (0, 1]. The range of ascents from which the requested FL will be picked (e.g. if CCFLR is 0.5 and CMC is 0.3 then the requested FL will only include the top 20% of flight levels)
:param descender_cfl_range (DCFLR): number in (0, 1]. Similar to CCFLR, but for descenders.
:param descender_minimum_descent (DMD): number in (0, 1]. Similar to CMC, but for descenders.
:param overflier_cfl_range: number in (0, 1]. The range of possible flight levels from which initial FL will be picked for overfliers (e.g. 0.5 means the first half will be included)
:param start_position_distribution: a probability distribution as a numpy array. The probability of starting in each route segement (also defines the number of segments).
:param discrete_start_positions: boolean. If True, start positions are at the discrete endpoints of the route segements.

Defaults are:
- equal probability of overflier, climber or descender
- overfliers start in the upper half of the flight level range
- climbers start in the lower half of the flight level range and climb at least 30% of that range
- descenders start in the upper half of the flight level range and descend at least 30% of that range
- start position is a uniform random pick from the first half of the route.
"""

if not isinstance(underlying_scenario, ScenarioAlgorithm):
raise ValueError('underlying_scenario must be a ScenarioAlgorithm')

if sum([overflier_prob, climber_prob, descender_prob]) != 1:
raise ValueError('aircraft phase probabilities must sum to 1')

if sum([climber_cfl_range, climber_minimum_climb]) > 1:
raise ValueError('climber initial interval plus minimum climb must be at most 1')

if sum([descender_cfl_range, descender_minimum_descent]) > 1:
raise ValueError('descender initial interval plus minimum descent must be at most 1')

if np.sum(start_position_distribution) != 1:
raise ValueError('invalid initial position distribution')

# Pass the keyword args (including the random seed) to the superclass constructor.
super().__init__(
sector_element=underlying_scenario.sector_element,
aircraft_types=underlying_scenario.aircraft_types,
flight_levels=underlying_scenario.flight_levels,
callsign_prefixes=underlying_scenario.callsign_prefixes,
seed=seed
)

self.underlying_scenario = underlying_scenario
self.overflier_prob = overflier_prob
self.climber_prob = climber_prob
self.descender_prob = descender_prob

self.climber_cfl_range = climber_cfl_range
self.climber_minimum_climb = climber_minimum_climb
self.descender_cfl_range = descender_cfl_range
self.descender_minimum_descent = descender_minimum_descent
self.overflier_cfl_range = overflier_cfl_range

self.initial_position_distribution = start_position_distribution
self.discrete_initial_positions = discrete_start_positions

# Choose the route for the incremental aircraft.
self.set_seed()
self.route = self.choose_route()

# Overriding abstract method
def aircraft_generator(self) -> dict:
"""Generates a sequence of aircraft constituting a scenario."""

for aircraft in self.underlying_scenario.aircraft_generator():
yield aircraft

yield self.aircraft()

def aircraft(self):
"""Returns the additional aircraft in this scenario, beyond those in the underlying scenario."""

self.set_seed()
aircraft_type = self.choose_aircraft_type()
self.set_seed()
phase = self.choose_flight_phase()
self.set_seed()
current_fl, requested_fl = self.choose_flight_levels(phase)
cleared_fl = current_fl
self.set_seed()
start_position = self.choose_start_position()
self.set_seed()
departure = self.choose_departure_airport(self.route)
self.set_seed()
destination = self.choose_destination_airport(self.route)

# truncate the route i.e. remove the starting position fix
# note coords of start_position are in lon/lat order
route_copy = self.route.copy()
route_copy.truncate(initial_lat=start_position.y, initial_lon=start_position.x)

return {
sg.AIRCRAFT_TIMEDELTA_KEY: 0,
sg.START_POSITION_KEY: start_position,
sg.CALLSIGN_KEY: next(self.callsign_generator()),
sg.AIRCRAFT_TYPE_KEY: aircraft_type,
sg.DEPARTURE_KEY: departure,
sg.DESTINATION_KEY: destination,
sg.CURRENT_FLIGHT_LEVEL_KEY: current_fl,
sg.CLEARED_FLIGHT_LEVEL_KEY: cleared_fl,
sg.REQUESTED_FLIGHT_LEVEL_KEY: requested_fl,
sg.ROUTE_KEY: route_copy.serialize(),
}

def choose_flight_phase(self):
"""Picks a flight phase (overflier, climber or descender)."""

u = random.uniform(0, 1)
if u < self.overflier_prob:
return FlightPhase.overflier
if u - self.overflier_prob < self.climber_prob:
return FlightPhase.climber
return FlightPhase.descender

def choose_flight_levels(self, flight_phase):
"""Picks a current flight level, given a flight phase"""

if flight_phase == FlightPhase.overflier:
cfl = self.choose_flight_level(exclude_lowest=1 - self.overflier_cfl_range)
return cfl, cfl
if flight_phase == FlightPhase.climber:
cfl = self.choose_flight_level(exclude_highest=1 - self.climber_cfl_range)
rfl = self.choose_flight_level(exclude_lowest=self.climber_cfl_range + self.climber_minimum_climb)
return cfl, rfl
if flight_phase == FlightPhase.descender:
cfl = self.choose_flight_level(exclude_lowest=1 - self.descender_cfl_range)
rfl = self.choose_flight_level(exclude_highest=self.descender_cfl_range + self.descender_minimum_descent)
return cfl, rfl
raise ValueError("flight_phase enum value expected")

def choose_start_position(self):
"""Picks an aircraft starting position"""

segment_startpoint, segment_endpoint = self.choose_route_segment()

# If initial positions are discrete, return the start of the chosen route segment.
if self.discrete_initial_positions:
return segment_startpoint

# If initial positions are continuous, return a uniform random sample along the chosen route segment.
offset = np.random.uniform(low=0.0, high=self.segment_length(), size = 1)
return GeoHelper.waypoint_location(lat1=segment_startpoint.y, lon1=segment_startpoint.x,
lat2=segment_endpoint.y, lon2=segment_endpoint.x, distance_m=offset)

def choose_route_segment(self):
"""Picks a route segment for the aircraft start position"""

# Choose the route segment index, and compute the distance from the segment
# start point to the following route fix.
pre_fix_i, windback_distance = self._pre_fix_index()

# Compute the start and end points of the chosen segment.
fixes = self.route.fix_points()
pre_fix = fixes[pre_fix_i]
post_fix = fixes[pre_fix_i + 1]
segment_startpoint = GeoHelper.waypoint_location(lat1=post_fix.y, lon1=post_fix.x,
lat2=pre_fix.y, lon2=pre_fix.x,
distance_m=windback_distance)
segment_endpoint = GeoHelper.waypoint_location(lat1=segment_startpoint.y, lon1=segment_startpoint.x,
lat2=post_fix.y, lon2=post_fix.x,
distance_m=self.segment_length())

return segment_startpoint, segment_endpoint

def _pre_fix_index(self):
"""
Returns a tuple containing:
- the index of the "pre-fix", the fix preceding the start point of the chosen route segment
- the "windback distance" from the fix *after* the pre-fix to the start of the chosen route segment
"""

# Pick a segment index, and compute the length of each route segment in metres.
self.set_seed()
segment_index = self.choose_segment_index()
segment_length = self.segment_length()

# Identify the "post-fix" index, i.e. that of the first fix *after* the start of the route segment.
fixes = self.route.fix_points()
post_fix_index = 0
post_fix_distance = 0
while True:
# If the distance to the post_fix is at least as big as the distance to the route segment start, then break.
if post_fix_distance > segment_index * segment_length:
break
post_fix_index += 1
lon1, lat1 = fixes[post_fix_index - 1].x, fixes[post_fix_index - 1].y
lon2, lat2 = fixes[post_fix_index].x, fixes[post_fix_index].y
post_fix_distance += GeoHelper.distance(lat1=lat1, lon1=lon1, lat2=lat2, lon2=lon2)

windback_distance = post_fix_distance - (segment_index * segment_length)

return post_fix_index - 1, windback_distance

def choose_segment_index(self):
"""Picks a route segment for the aircraft start position according to the initial_position_distribution."""

segment_indices = range(0, len(self.initial_position_distribution))
return np.random.choice(a=segment_indices, size=1, p=self.initial_position_distribution)[0]

def segment_length(self):
"""Computes the physical length of each route segment in metres."""

span = self.route.span()
return span/len(self.initial_position_distribution)


4 changes: 2 additions & 2 deletions aviary/scenario/overflier_climber_extended_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def extend_aircraft(self, aircraft, is_overflier):
# Note: here it is assumed that the aircraft travels on a straight line
# to the sector centre (as is the case in an I, X or Y sector element).

lon, lat = GeoHelper.waypoint_location(lat1 = lat1, lon1 = lon1, lat2 = lat2, lon2 = lon2, distance_m = -1 * windback_distance)
start_point = GeoHelper.waypoint_location(lat1 = lat1, lon1 = lon1, lat2 = lat2, lon2 = lon2, distance_m = -1 * windback_distance)

aircraft[sg.START_POSITION_KEY] = (lon, lat) # Order is (lon, lat).
aircraft[sg.START_POSITION_KEY] = (start_point.x, start_point.y) # Order is (lon, lat).

# If the aircraft is the climber, set the requested flight level to 'high'
if not is_overflier:
Expand Down
22 changes: 11 additions & 11 deletions aviary/scenario/overflier_climber_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def aircraft_generator(self) -> dict:
"""Generates a sequence of two aircraft whose default trajectories intersect at the centre of the sector."""

# Construct the overflier's route.
overflier_route = self.route()
overflier_departure = self.departure_airport(overflier_route)
overflier_destination = self.destination_airport(overflier_route)
overflier_route = self.choose_route()
overflier_departure = self.choose_departure_airport(overflier_route)
overflier_destination = self.choose_destination_airport(overflier_route)

# Construct the climber's route, which is the reverse of the overflier's.
climber_route = overflier_route.copy()
Expand All @@ -58,13 +58,13 @@ def aircraft_generator(self) -> dict:
climber_requested_flight_level = self.climber_requested_flight_level(overflier_flight_level)

# Compute the time taken for the climber to reach the overflier's flight level
climber_aircraft_type = self.aircraft_type()
climber_aircraft_type = self.choose_aircraft_type()
climb_time_to_conflict_level = self.trajectory_predictor.climb_time_between_levels(lower_level=climber_current_flight_level,
upper_level=overflier_flight_level,
aircraft_type=climber_aircraft_type)

# Compute the distance travelled by the overflier during the climber's climb.
overflier_aircraft_type = self.aircraft_type()
overflier_aircraft_type = self.choose_aircraft_type()
overflier_true_airspeed = self.trajectory_predictor.cruise_speed(overflier_flight_level, overflier_aircraft_type)

# Get the horizontal distances travelled prior to the conflict.
Expand All @@ -81,15 +81,15 @@ def aircraft_generator(self) -> dict:

# Compute the initial position of the overflier.
# Note: this assumes that the route is a straight line from the initial fix to the central fix (conflict point).
o_initial_lon, o_initial_lat = GeoHelper.waypoint_location(lat1, lon1, o_lat2, o_lon2, overflier_horizontal_distance)
o_initial = GeoHelper.waypoint_location(lat1, lon1, o_lat2, o_lon2, overflier_horizontal_distance)

# Truncate the route in light of the modified starting position.
overflier_route.truncate(initial_lat = o_initial_lat, initial_lon = o_initial_lon)
overflier_route.truncate(initial_lat = o_initial.y, initial_lon = o_initial.x)

# Construct the overflier.
yield {
sg.AIRCRAFT_TIMEDELTA_KEY: 0,
sg.START_POSITION_KEY: (o_initial_lon, o_initial_lat), # Order is (lon, lat).
sg.START_POSITION_KEY: (o_initial.x, o_initial.y), # Order is (lon, lat).
sg.CALLSIGN_KEY: next(self.callsign_generator()),
sg.AIRCRAFT_TYPE_KEY: overflier_aircraft_type,
sg.DEPARTURE_KEY: overflier_departure,
Expand All @@ -108,14 +108,14 @@ def aircraft_generator(self) -> dict:

# Compute the initial position of the climber.
# Note: this assumes that the route is a straight line from the initial fix to the central fix (conflict point).
c_initial_lon, c_initial_lat = GeoHelper.waypoint_location(lat1, lon1, c_lat2, c_lon2, climber_horizontal_distance)
c_initial = GeoHelper.waypoint_location(lat1, lon1, c_lat2, c_lon2, climber_horizontal_distance)

# Truncate the route in light of the modified starting position.
climber_route.truncate(initial_lat = c_initial_lat, initial_lon = c_initial_lon)
climber_route.truncate(initial_lat = c_initial.y, initial_lon = c_initial.x)

yield {
sg.AIRCRAFT_TIMEDELTA_KEY: 0,
sg.START_POSITION_KEY: (c_initial_lon, c_initial_lat), # Order is (lon, lat)
sg.START_POSITION_KEY: (c_initial.x, c_initial.y), # Order is (lon, lat)
sg.CALLSIGN_KEY: next(self.callsign_generator()),
sg.AIRCRAFT_TYPE_KEY: climber_aircraft_type,
sg.DEPARTURE_KEY: overflier_destination, # Reversed overflier departure/destination.
Expand Down
Loading