diff --git a/spinner/board.py b/spinner/board.py index 3436bc8..733e3f1 100644 --- a/spinner/board.py +++ b/spinner/board.py @@ -8,122 +8,122 @@ from six import iteritems from spinner.topology import Direction, \ - threeboards, wrap_around, add_direction + threeboards, wrap_around, add_direction from spinner import coordinates class Board(object): - """ - Represents a SpiNNaker board in a complete system. - - A board is an entity with links to its six neighbouring boards. - """ - - # Counter used to label boards - NEXT_BOARD_ID = 0 - - def __init__(self): - - # References to other boards in the system which lie at the end of a wire - # connected to a particular port. - self.connection = { - Direction.north : None, - Direction.north_east : None, - Direction.east : None, - Direction.south : None, - Direction.south_west : None, - Direction.west : None, - } - - # Set the board's ID - self.id = Board.NEXT_BOARD_ID - Board.NEXT_BOARD_ID += 1 - - - def connect_wire(self, other, direction): - """ - Connect a wire between this board and another for the given direction. - """ - # Ensure it isn't already connected - assert(self.follow_wire(direction) is None) - assert(other.follow_wire(direction.opposite) is None) - - self.connection[direction] = other - other.connection[direction.opposite] = self - - - def follow_wire(self, direction): - """ - Follow the wire going in the given direction from this board. - """ - return self.connection[direction] - - - def follow_packet(self, in_wire_side, packet_direction): - """ - Follow the path of a packet which entered in to the board via the wire - in_wire_side following packet_direction through the chips in the board. - Returns a tuple (next_in_wire_side, next_board). - - We only need to know the side on which the incoming link is on (not the - exact chip) because for any incoming side there is a fixed outgoing side - when travelling in a fixed direction. - """ - - # Mapping of {(in_wire_side, packet_direction) : out_wire_side,...} - out_sides = { - (Direction.south_west, Direction.east) : Direction.east, - (Direction.west, Direction.east) : Direction.north_east, - - (Direction.south_west, Direction.north_east) : Direction.north, - (Direction.south, Direction.north_east) : Direction.north_east, - - (Direction.south, Direction.north) : Direction.west, - (Direction.east, Direction.north) : Direction.north, - } - # Opposite cases are simply inverted versions of the above... - for (iws, pd), ows in iteritems(out_sides.copy()): - out_sides[( iws.opposite - , pd.opposite - )] = ows.opposite - - out_wire_side = out_sides[(in_wire_side, packet_direction)] - - return (out_wire_side.opposite, self.follow_wire(out_wire_side)) - - - def __repr__(self): - return "".format(self.id) + """ + Represents a SpiNNaker board in a complete system. + + A board is an entity with links to its six neighbouring boards. + """ + + # Counter used to label boards + NEXT_BOARD_ID = 0 + + def __init__(self): + + # References to other boards in the system which lie at the end of a wire + # connected to a particular port. + self.connection = { + Direction.north : None, + Direction.north_east : None, + Direction.east : None, + Direction.south : None, + Direction.south_west : None, + Direction.west : None, + } + + # Set the board's ID + self.id = Board.NEXT_BOARD_ID + Board.NEXT_BOARD_ID += 1 + + + def connect_wire(self, other, direction): + """ + Connect a wire between this board and another for the given direction. + """ + # Ensure it isn't already connected + assert(self.follow_wire(direction) is None) + assert(other.follow_wire(direction.opposite) is None) + + self.connection[direction] = other + other.connection[direction.opposite] = self + + + def follow_wire(self, direction): + """ + Follow the wire going in the given direction from this board. + """ + return self.connection[direction] + + + def follow_packet(self, in_wire_side, packet_direction): + """ + Follow the path of a packet which entered in to the board via the wire + in_wire_side following packet_direction through the chips in the board. + Returns a tuple (next_in_wire_side, next_board). + + We only need to know the side on which the incoming link is on (not the + exact chip) because for any incoming side there is a fixed outgoing side + when travelling in a fixed direction. + """ + + # Mapping of {(in_wire_side, packet_direction) : out_wire_side,...} + out_sides = { + (Direction.south_west, Direction.east) : Direction.east, + (Direction.west, Direction.east) : Direction.north_east, + + (Direction.south_west, Direction.north_east) : Direction.north, + (Direction.south, Direction.north_east) : Direction.north_east, + + (Direction.south, Direction.north) : Direction.west, + (Direction.east, Direction.north) : Direction.north, + } + # Opposite cases are simply inverted versions of the above... + for (iws, pd), ows in iteritems(out_sides.copy()): + out_sides[( iws.opposite + , pd.opposite + )] = ows.opposite + + out_wire_side = out_sides[(in_wire_side, packet_direction)] + + return (out_wire_side.opposite, self.follow_wire(out_wire_side)) + + + def __repr__(self): + return "".format(self.id) def create_torus(width = 1, height = None): - """ - Returns a mapping of boards containing width * height threeboards connected in - a torus with corresponding hexagonal coordinates. If height is not specified, - height = width. - """ - - height = width if height is None else height - - boards = {} - - # Create the boards - for coord in threeboards(width, height): - boards[coordinates.Hexagonal(*coord)] = Board() - - # Link the boards together - for coord in boards: - for direction in [ Direction.east - , Direction.north_east - , Direction.north - ]: - # Get the coordinate of the neighbour in each direction - n_coord = wrap_around( - add_direction(list(coord)+[0], direction), (width, height)) - - # Connect the boards together - boards[coord].connect_wire(boards[n_coord], direction) - - return [(b, c) for (c, b) in iteritems(boards)] + """ + Returns a mapping of boards containing width * height threeboards connected in + a torus with corresponding hexagonal coordinates. If height is not specified, + height = width. + """ + + height = width if height is None else height + + boards = {} + + # Create the boards + for coord in threeboards(width, height): + boards[coordinates.Hexagonal(*coord)] = Board() + + # Link the boards together + for coord in boards: + for direction in [ Direction.east + , Direction.north_east + , Direction.north + ]: + # Get the coordinate of the neighbour in each direction + n_coord = wrap_around( + add_direction(list(coord)+[0], direction), (width, height)) + + # Connect the boards together + boards[coord].connect_wire(boards[n_coord], direction) + + return [(b, c) for (c, b) in iteritems(boards)] diff --git a/spinner/diagrams/interactive_wiring_guide.py b/spinner/diagrams/interactive_wiring_guide.py index d4234ea..0fabe38 100644 --- a/spinner/diagrams/interactive_wiring_guide.py +++ b/spinner/diagrams/interactive_wiring_guide.py @@ -16,11 +16,11 @@ from math import pi try: # pragma: no cover - # Python 3 - from tkinter import Tk, Label + # Python 3 + from tkinter import Tk, Label except ImportError: # pragma: no cover - # Python 2 - from Tkinter import Tk, Label + # Python 2 + from Tkinter import Tk, Label from PIL import Image, ImageTk @@ -30,706 +30,706 @@ class InteractiveWiringGuide(object): - """ - An interactive, graphical tool which can guide a user through a predefined - list of wiring instructions. - - After initialisation, calling "main()" will block while the GUI runs. - - Features: - * Cycle through a list of wiring instructions - * Display a full view of the system being wired up - * Display close-up views of pairs of boards being connected - * Illuminate an LED on boards to be connected - * Read instructions using text-to-speech - * Colour code diagrams by wire-length - """ - - # Colour of highlights for top-left and bottom-right ends of the current - # cable. - TOP_LEFT_COLOUR = (1.0, 0.0, 0.0, 1.0) - BOTTOM_RIGHT_COLOUR = (0.0, 0.0, 1.0, 1.0) - - # Width of the zoomed-in areas as a fraction of the display width - ZOOMED_VIEW_WIDTH = 0.25 - - # Height of each row of text under the drawings. - TEXT_ROW_HEIGHT = 0.07 - - # Zoom-out from zoomed in areas by this ratio - ZOOMED_MARGINS = 0.8 - - # Poll interval in ms between checking the if the current wire has been - # inserted or not. - POLL_INTERVAL_MS = 500 - - def __init__( self - , cabinet - , wire_lengths - , wires - , starting_wire=0 - , focus=[] - , bmp_controller=None - , bmp_led=7 - , wiring_probe=None - , auto_advance=True - , use_tts=True - , show_installed_wires=True - , show_future_wires=False - , timing_logger=None - ): - """ - cabinet defines the size of cabinets in the system. - - wire_lengths is a list of all valid wire lengths. - - wires is a list [(src, dst, length), ...] where src and dst are tuples - (cabinet, rack, slot, socket) and length is a length included in - wire_lengths or is None indicating that the wire should be disconnected. - - starting_wire is the index of the first wire to be inserted. This could be - used, e.g. to resume installation at a specified point. - - focus is a set of arguments (cabinet, frame, board) to use to select the - area of interest for the central system diagram. - - bmp_ips is a dictionary {board_position: ip} where board_position is either - a tuple (cabinet, rack, slot) or (cabinet, rack) where the former will be - used if both are available. The IP should be given as a string. - - bmp_led specifies which LED will be illuminated for boards where an IP is - known. - - wiring_probe is an optional WiringProbe object which will be used to - auto-advance through the wiring when wires are inserted correctly. - - auto_advance specifies whether auto-advance will be enabled initially. - - use_tts specifies whether text-to-spech will be used to announce - instructions. - - show_installed_wires selects whether already-installed wires should be shown - (feintly) at all times. - - show_future_wires selects whether to-be-installed wires should be shown - (feintly) at all times. - - timing_logger is a TimingLogger into which the cabling connections made - will be logged. If None, no timings are logged. - """ - - self.cabinet = cabinet - self.wire_lengths = wire_lengths - self.wires = wires - - self.cur_wire = starting_wire - assert 0 <= self.cur_wire < len(self.wires), "Starting wire out of range." - - self.focus = focus - - self.bmp_controller = bmp_controller - - self.bmp_led = bmp_led - - self.wiring_probe = wiring_probe - - self.auto_advance = auto_advance - - self.use_tts = use_tts - - self.show_installed_wires = show_installed_wires - self.show_future_wires = show_future_wires - - self.timing_logger = timing_logger - - # Human readable names for each socket - self.socket_names = {d: d.name.replace("_", " ") for d in Direction} - - # An infinately cycling iterator over all the boards in the machine. - self.board_iter = iter(cycle(set((c, f, b) - for ((c, f, b, _1), _2, _3) - in self.wires))) - - # A reference to any running TTS job - self.tts_process = None - - # A dict {fn: ([key, ...], description), ...} giving the keybindings and - # homan-readable help string for all bound keyboard shortcuts. - self.bindings = {} - - # Set up the Tk UI - self.tk = Tk() - self._init_ui() - - # Get started - if self.timing_logger is not None: - self.timing_logger.logging_started() - self.go_to_wire(starting_wire) - - - def _init_ui(self): - """Initialise the Tk interface.""" - self.tk.wm_title("SpiNNer Interactive Wiring Guide") - self.tk.geometry("1024x768") - - # Add a label widget into which the rendered UI is drawn - self.widget = Label() - self.widget.pack(expand=True, fill="both") - - # Used to avoid unecessary redraws on window resizes where nothing actually - # changes. - self._old_size = (None, None) - - # A flag which indicates whether, on the last poll of cable insertion, the - # cable was found to be inserted but inserted into the wrong socket. - self.connected_incorrectly = False - - # Set up a timer to poll for cable insertion - self.tk.after(InteractiveWiringGuide.POLL_INTERVAL_MS, self._poll_wiring_probe) - - # Handle window events - self.tk.bind("", self._on_resize) # Resize - self.tk.protocol("WM_DELETE_WINDOW", self._on_close) # Window closed - - # Setup key bindings - self.bindings = { - self._on_next: - (["", "", "", "", - "", ""], - "Next wire"), - self._on_prev: - (["", "", "", ""], - "Previous wire"), - self._on_skip_next: ([""], "Skip forward"), - self._on_skip_prev: ([""], "Skip backward"), - self._on_first: ([""], "First wire"), - self._on_last: ([""], "Last wire"), - self._on_tts_toggle: ([""], "Toggle speech"), - } - - if self.wiring_probe is not None: # pragma: no branch - self.bindings[self._on_auto_advance_toggle] = ( - [""], "Toggle auto-advance") - - if self.timing_logger is not None: # pragma: no branch - self.bindings[self._on_pause] = ([""], "Pause") - - for fn, (keys, help) in iteritems(self.bindings): - for key in keys: - self.tk.bind(key, fn) - - - def go_to_wire(self, wire): - """ - Advance to a specific wire. - """ - last_wire = self.cur_wire - self.cur_wire = wire - - # Reset the incorrect connection flag - self.connected_incorrectly = False - - # Update LEDs - self.set_leds(last_wire, False) - self.set_leds(self.cur_wire, True) - - # Log the start of *insertion* of a new wire - src, dst, length = self.wires[wire] - if self.timing_logger is not None: - self.timing_logger.unpause() - if length is not None: - sc, sf, sb, sd = self.wires[wire][0] - dc, df, db, dd = self.wires[wire][1] - self.timing_logger.connection_started(sc, sf, sb, sd, - dc, df, db, dd) - - # Announce via TTS the distance relative to the last position - if self.use_tts: - self.tts_delta(last_wire, self.cur_wire) - - - def set_leds(self, wire, state): - """ - Set the LEDs for the given wire index to the given state (assuming the - board's IP is known). - """ - try: - if self.bmp_controller is not None: - for c,f,b,p in self.wires[wire][:2]: - self.bmp_controller.set_led(self.bmp_led, state, c, f, b) - except: - # Quit if this goes wrong - self.tk.destroy() - raise - - - def tts_delta(self, last_wire, this_wire): - """ - Announce via TTS a brief instruction indicating what the next wire should be - in terms of the difference to the previous wire. - - Changes are announced relative to the last wire. - """ - message = "" - - # Announce wire-length changes - last_length = self.wires[last_wire][2] - this_length = self.wires[this_wire][2] - - if last_length != this_length: - if this_length is None: - message += "Disconnect cable. " - else: - message += "%s meter cable. "%(("%0.2f"%this_length).rstrip(".0")) - - # Announce which ports are being connected - this_tl = self._top_left_socket(this_wire) - this_br = self._bottom_right_socket(this_wire) - - message += self.socket_names[this_tl[3]] - message += " going " - message += self.socket_names[this_br[3]] - message += "." - - self._tts_speak(message) - - - def _tts_speak(self, text, wpm = 250): - """ - Speak the supplied string, interrupting whatever was already being said. - Non-blocking. - """ - # Kill previous instances - if self.tts_process is not None and self.tts_process.poll() is None: - self.tts_process.terminate() - - # Speak the required text. - self.tts_process = subprocess.Popen( ["espeak", "-s", str(wpm), text] - , stdout = subprocess.PIPE - , stderr = subprocess.PIPE - ) - - - def _get_wire_colour(self, length): - """ - Get the RGB colour (as a tuple) for wires of the specified length. - - Colours are allocated evenly across the spectrum. Wires to be removed are - shown in black. - """ - if length is None: - return (0.0, 0.0, 0.0) - - index = sorted(self.wire_lengths).index(length) - - hue = index / float(len(self.wire_lengths)) - - return colorsys.hsv_to_rgb(hue, 1.0, 0.5) - - - def _top_left_socket(self, wire): - """ - Return the (c,r,s,d) for the top-left socket for the current wire. - """ - - src, dst, length = self.wires[wire] - - return min([src, dst], key=(lambda v: (-v[0], # Right-to-left - +v[1], # Top-to-bottom - -v[2]))) # Right-to-left - - - def _bottom_right_socket(self, wire): - """ - Return the (c,r,s,d) for the bottom-right socket for the current wire. - """ - - src, dst, length = self.wires[wire] - - return max([src, dst], key=(lambda v: (-v[0], # Right-to-left - +v[1], # Top-to-bottom - -v[2]))) # Right-to-left - - - def _get_machine_diagram(self): - """ - Get the MachineDiagram ready to draw the system's current state. - """ - md = MachineDiagram(self.cabinet) - - bg_wire = self.cabinet.board_dimensions.x / 5.0 - fg_wire = self.cabinet.board_dimensions.x / 3.0 - - board_hl = self.cabinet.board_dimensions.x / 3.0 - wire_hl = self.cabinet.board_dimensions.x / 2.0 - - # Wires already installed - if self.show_installed_wires: - for src, dst, length in self.wires[:self.cur_wire]: - r,g,b = self._get_wire_colour(length) - md.add_wire(src, dst, rgba = (r,g,b,0.5), width = bg_wire) - - # Wires still to be installed - if self.show_future_wires: - for src, dst, length in self.wires[self.cur_wire+1:]: - r,g,b = self._get_wire_colour(length) - md.add_wire(src, dst, rgba = (r,g,b,0.5), width = bg_wire) - - # Current wire (with a white outline) - src, dst, length = self.wires[self.cur_wire] - r,g,b = self._get_wire_colour(length) - md.add_wire(src, dst, rgba = (1.0,1.0,1.0,1.0), width = fg_wire * 2) - md.add_wire(src, dst, rgba = (r,g,b,1.0), width = fg_wire) - - # Highlight source and destination - c,r,s,d = self._top_left_socket(self.cur_wire) - md.add_highlight(c,r,s,d, rgba = self.TOP_LEFT_COLOUR, width=wire_hl) - md.add_highlight(c,r,s, rgba = self.TOP_LEFT_COLOUR, width=board_hl) - c,r,s,d = self._bottom_right_socket(self.cur_wire) - md.add_highlight(c,r,s,d, rgba = self.BOTTOM_RIGHT_COLOUR, width=wire_hl) - md.add_highlight(c,r,s, rgba = self.BOTTOM_RIGHT_COLOUR, width=board_hl) - - return md - - - def _draw_text(self, ctx, text, size, rgba = (0.0,0.0,0.0, 1.0)): - """ - Draw the desired text centered below (0,0). - """ - ctx.save() - - ctx.select_font_face("Sans") - ctx.set_source_rgba(*rgba) - ctx.set_font_size(size*0.8) - x,y, w,h, _w,_h = ctx.text_extents(text) - ctx.move_to(-x - w/2, -y + size*0.1) - ctx.show_text(text) - - ctx.restore() - - - def _draw_help_text(self, ctx, width, height, rgba = (0.5,0.5,0.5, 1.0)): - """ - Draw the help text along the bottom of the screen, return the height of the - text in pixels. - """ - # Generate help string - help_text = " | ".join("{} {}".format(keys[0], help) - for keys, help in - sorted(itervalues(self.bindings), - key=(lambda kh: kh[0]))) - - ctx.save() - - # Determine font size which will fill the width of the screen - ctx.select_font_face("Sans") - ctx.set_source_rgba(*rgba) - ctx.set_font_size(1.0) - x,y, w,h, _w,_h = ctx.text_extents(help_text) - scale_factor = (width * 0.95) / w - - # Draw the text along the bottom of the screen - ctx.set_font_size(scale_factor) - x,y, w,h, _w,_h = ctx.text_extents(help_text) - ctx.move_to(x + ((width - w) / 2), height + y) - ctx.show_text(help_text) - - ctx.restore() - - return h - - - def _render_gui(self, ctx, width, height): - """ - Re-draw the whole GUI into the supplied Cairo context. - """ - # Clear the buffer background - ctx.set_source_rgba(1.0,1.0,1.0,1.0); - ctx.rectangle(0,0, width, height) - ctx.fill() - - # Draw help text along bottom of screen. - height -= self._draw_help_text(ctx, width, height) - - md = self._get_machine_diagram() - - # Draw the main overview image - ctx.save() - ctx.translate(width*self.ZOOMED_VIEW_WIDTH, 0.0) - ctx.rectangle(0,0, width * (1.0 - (2*self.ZOOMED_VIEW_WIDTH)), height * (1 - (2*self.TEXT_ROW_HEIGHT))) - ctx.clip() - md.draw( ctx - , width * (1.0 - (2*self.ZOOMED_VIEW_WIDTH)) - , height * (1 - (2*self.TEXT_ROW_HEIGHT)) - , *((list(self.focus) + [None]*3)[:3] * 2) - ) - ctx.restore() - - # Draw the left zoomed-in image - ctx.save() - ctx.rectangle(0,0, width*self.ZOOMED_VIEW_WIDTH, height*(1-(2*self.TEXT_ROW_HEIGHT))) - ctx.clip() - ctx.translate( width*self.ZOOMED_VIEW_WIDTH*(1-self.ZOOMED_MARGINS)/2 - , height*(1-(2*self.TEXT_ROW_HEIGHT))*(1-self.ZOOMED_MARGINS)/2 - ) - ctx.scale(self.ZOOMED_MARGINS, self.ZOOMED_MARGINS) - md.draw( ctx - , width*self.ZOOMED_VIEW_WIDTH - , height*(1 - (2*self.TEXT_ROW_HEIGHT)) - , *(list(self._top_left_socket(self.cur_wire)[:3]) + - list(self.focus)) - ) - ctx.restore() - - # Draw the right zoomed-in image - ctx.save() - ctx.translate(width*(1-self.ZOOMED_VIEW_WIDTH), 0.0) - ctx.rectangle(0,0, width*self.ZOOMED_VIEW_WIDTH, height*(1-(2*self.TEXT_ROW_HEIGHT))) - ctx.clip() - ctx.translate( width*self.ZOOMED_VIEW_WIDTH*(1-self.ZOOMED_MARGINS)/2 - , height*(1-(2*self.TEXT_ROW_HEIGHT))*(1-self.ZOOMED_MARGINS)/2 - ) - ctx.scale(self.ZOOMED_MARGINS, self.ZOOMED_MARGINS) - md.draw( ctx - , width*self.ZOOMED_VIEW_WIDTH - , height*(1 - (2*self.TEXT_ROW_HEIGHT)) - , *(list(self._bottom_right_socket(self.cur_wire)[:3]) + - list(self.focus)) - ) - ctx.restore() - - # Draw the wire length - ctx.save() - ctx.translate(width/2, height*(1 - (2*self.TEXT_ROW_HEIGHT))) - length = self.wires[self.cur_wire][2] - self._draw_text( ctx - , "%0.2f m"%(length) - if length is not None - else "Disconnect Wire" - , height*self.TEXT_ROW_HEIGHT - , rgba = self._get_wire_colour(length) - ) - - # Draw the progress - ctx.translate(0, height*self.TEXT_ROW_HEIGHT) - self._draw_text( ctx - , "%d of %d (%0.1f%%)"%( self.cur_wire + 1 - , len(self.wires) - , 100.0*((self.cur_wire+1)/float(len(self.wires))) - ) - , height*self.TEXT_ROW_HEIGHT - ) - ctx.restore() - - # Draw the endpoint specifications - for x_offset, (c,r,s,d) in [ (width * (self.ZOOMED_VIEW_WIDTH/2), self._top_left_socket(self.cur_wire)) - , (width * (1-(self.ZOOMED_VIEW_WIDTH/2)), self._bottom_right_socket(self.cur_wire)) - ]: - ctx.save() - ctx.translate(x_offset, 0.0) - - # Socket number - ctx.translate(0, height*(1 - (2*self.TEXT_ROW_HEIGHT))) - self._draw_text( ctx - , self.socket_names[d] - , height*self.TEXT_ROW_HEIGHT - ) - - # Draw the progress - ctx.translate(0, height*self.TEXT_ROW_HEIGHT) - self._draw_text( ctx - , "C%d F%d B%02d"%(c,r,s) - , height*self.TEXT_ROW_HEIGHT - ) - ctx.restore() - - # Draw a full-screen "paused" indicator, if paused - if self.timing_logger is not None and self.timing_logger.paused: - ctx.save() - - ctx.translate(width / 2.0, height / 2.0) - scale = min(width, height) * 0.4 - ctx.scale(scale, scale) - - ctx.move_to(0, 0) - ctx.new_sub_path() - ctx.arc(0, 0, 1.0, 0.0, 2.0 * pi) - ctx.set_source_rgba(0.0,0.0,0.0,0.8); - ctx.fill_preserve() - ctx.set_source_rgba(0.0,0.0,0.0,1.0); - ctx.set_line_width(0.1) - ctx.stroke() - - ctx.move_to(-0.25, -0.5); - ctx.rel_line_to(0.0, 1.0); - ctx.move_to(0.25, -0.5); - ctx.rel_line_to(0.0, 1.0); - ctx.set_source_rgba(0.7,0.7,0.7,1.0); - ctx.set_line_width(0.2) - ctx.stroke() - - ctx.restore() - - - def _redraw(self): - """Redraw the GUI and display it on the screen.""" - - # Get a new context to draw the GUI into - height = self.tk.winfo_height() - width = self.tk.winfo_width() - - surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height) - ctx = cairo.Context(surface) - - # Render it - self._render_gui(ctx, width, height) - - # Draw onto the window (note: a reference to the image is kept to keep it - # safe from garbage collection) - self.widget.image = ImageTk.PhotoImage(Image.frombuffer( - "RGBA", (width, height), surface.get_data(), "raw", "BGRA", 0, 1)) - self.widget.configure(image=self.widget.image) - self.widget.pack(expand=True, fill="both") - - - def _poll_wiring_probe(self): - """Poll the machine's hardware to determine if the wiring is complete. - """ - try: - # Check wiring conncectivity - if self.wiring_probe is not None and self.auto_advance: - src, dst, length = self.wires[self.cur_wire] - - # Check both ends of the cable - actual_dst = self.wiring_probe.get_link_target(*src) - actual_src = self.wiring_probe.get_link_target(*dst) - - advance = False - - if length is None: - # We're waiting for the wire to be disconnected - if actual_src is None and actual_dst is None: - # Disconnected! Advance to the next wire! - advance = True - else: - # We're waiting for a wire to be connected - if actual_src == src and actual_dst == dst: - # Connected correctly! Advance to the next wire! - advance = True - if self.timing_logger is not None: - self.timing_logger.unpause() - self.timing_logger.connection_complete() - elif actual_dst is not None or actual_src is not None: - # The wire was connected, but was connected incorrectly! - if not self.connected_incorrectly: - self._tts_speak("Wire inserted incorrectly.") - if self.timing_logger is not None: - self.timing_logger.unpause() - self.timing_logger.connection_error() - self.connected_incorrectly = True - else: - # No wire is connected - self.connected_incorrectly = False - - # Actually advance, as required - if advance and self.cur_wire != len(self.wires) - 1: - self.go_to_wire(self.cur_wire + 1) - self._redraw() - except: - # Fail gracefully... - print(traceback.format_exc()) - - # Schedule next poll - self.tk.after(InteractiveWiringGuide.POLL_INTERVAL_MS, self._poll_wiring_probe) - - - def _on_next(self, event): - """Advance to the next wire.""" - self.go_to_wire((self.cur_wire + 1) % len(self.wires)) - self._redraw() - - - def _on_prev(self, event): - """Retreat to the previous wire.""" - self.go_to_wire((self.cur_wire - 1) % len(self.wires)) - self._redraw() - - - def _on_first(self, event): - """Go back to the first wire.""" - self.go_to_wire(0) - self._redraw() - - - def _on_last(self, event): - """Go to the last first wire.""" - self.go_to_wire(len(self.wires)-1) - self._redraw() - - - def _on_skip_next(self, event): - """Advance rapidly forward through the wires.""" - self.go_to_wire((self.cur_wire + 25) % len(self.wires)) - self._redraw() - - - def _on_skip_prev(self, event): - """Retreat rapidly backward through the wires.""" - self.go_to_wire((self.cur_wire - 25) % len(self.wires)) - self._redraw() - - - def _on_tts_toggle(self, event): - """Toggle whether Text-to-Speech is enabled.""" - self.use_tts = not self.use_tts - if self.use_tts: - self._tts_speak("Text to speech enabled.") - else: - self._tts_speak("Text to speech disabled.") - - - def _on_auto_advance_toggle(self, event): - """Toggle whether auto-advance is enabled.""" - if self.wiring_probe is not None: - self.auto_advance = not self.auto_advance - if self.auto_advance: - self._tts_speak("Auto advance enabled.") - else: - self._tts_speak("Auto advance disabled.") - else: - self._tts_speak("Auto advance not supported.") - - - def _on_pause(self, event): - """Toggle whether timings are being recorded.""" - if self.timing_logger is not None: - if self.timing_logger.paused: - self.timing_logger.unpause() - else: - self.timing_logger.pause() - self._redraw() - - - def _on_resize(self, event): - """Window has been resized, trigger a redraw.""" - new_size = (event.width, event.height) - - if self._old_size != new_size: - self._old_size = new_size - self._redraw() - - - def _on_close(self, event=None): - """The window has been closed.""" - if self.timing_logger is not None: - self.timing_logger.logging_stopped() - - # Turn off LEDs before leaving - self.set_leds(self.cur_wire, False) - self.tk.destroy() - - - def mainloop(self): # pragma: no cover - """Start the interactive wiring guide GUI. Returns when the window is - closed.""" - return self.tk.mainloop() + """ + An interactive, graphical tool which can guide a user through a predefined + list of wiring instructions. + + After initialisation, calling "main()" will block while the GUI runs. + + Features: + * Cycle through a list of wiring instructions + * Display a full view of the system being wired up + * Display close-up views of pairs of boards being connected + * Illuminate an LED on boards to be connected + * Read instructions using text-to-speech + * Colour code diagrams by wire-length + """ + + # Colour of highlights for top-left and bottom-right ends of the current + # cable. + TOP_LEFT_COLOUR = (1.0, 0.0, 0.0, 1.0) + BOTTOM_RIGHT_COLOUR = (0.0, 0.0, 1.0, 1.0) + + # Width of the zoomed-in areas as a fraction of the display width + ZOOMED_VIEW_WIDTH = 0.25 + + # Height of each row of text under the drawings. + TEXT_ROW_HEIGHT = 0.07 + + # Zoom-out from zoomed in areas by this ratio + ZOOMED_MARGINS = 0.8 + + # Poll interval in ms between checking the if the current wire has been + # inserted or not. + POLL_INTERVAL_MS = 500 + + def __init__( self + , cabinet + , wire_lengths + , wires + , starting_wire=0 + , focus=[] + , bmp_controller=None + , bmp_led=7 + , wiring_probe=None + , auto_advance=True + , use_tts=True + , show_installed_wires=True + , show_future_wires=False + , timing_logger=None + ): + """ + cabinet defines the size of cabinets in the system. + + wire_lengths is a list of all valid wire lengths. + + wires is a list [(src, dst, length), ...] where src and dst are tuples + (cabinet, rack, slot, socket) and length is a length included in + wire_lengths or is None indicating that the wire should be disconnected. + + starting_wire is the index of the first wire to be inserted. This could be + used, e.g. to resume installation at a specified point. + + focus is a set of arguments (cabinet, frame, board) to use to select the + area of interest for the central system diagram. + + bmp_ips is a dictionary {board_position: ip} where board_position is either + a tuple (cabinet, rack, slot) or (cabinet, rack) where the former will be + used if both are available. The IP should be given as a string. + + bmp_led specifies which LED will be illuminated for boards where an IP is + known. + + wiring_probe is an optional WiringProbe object which will be used to + auto-advance through the wiring when wires are inserted correctly. + + auto_advance specifies whether auto-advance will be enabled initially. + + use_tts specifies whether text-to-spech will be used to announce + instructions. + + show_installed_wires selects whether already-installed wires should be shown + (feintly) at all times. + + show_future_wires selects whether to-be-installed wires should be shown + (feintly) at all times. + + timing_logger is a TimingLogger into which the cabling connections made + will be logged. If None, no timings are logged. + """ + + self.cabinet = cabinet + self.wire_lengths = wire_lengths + self.wires = wires + + self.cur_wire = starting_wire + assert 0 <= self.cur_wire < len(self.wires), "Starting wire out of range." + + self.focus = focus + + self.bmp_controller = bmp_controller + + self.bmp_led = bmp_led + + self.wiring_probe = wiring_probe + + self.auto_advance = auto_advance + + self.use_tts = use_tts + + self.show_installed_wires = show_installed_wires + self.show_future_wires = show_future_wires + + self.timing_logger = timing_logger + + # Human readable names for each socket + self.socket_names = {d: d.name.replace("_", " ") for d in Direction} + + # An infinately cycling iterator over all the boards in the machine. + self.board_iter = iter(cycle(set((c, f, b) + for ((c, f, b, _1), _2, _3) + in self.wires))) + + # A reference to any running TTS job + self.tts_process = None + + # A dict {fn: ([key, ...], description), ...} giving the keybindings and + # homan-readable help string for all bound keyboard shortcuts. + self.bindings = {} + + # Set up the Tk UI + self.tk = Tk() + self._init_ui() + + # Get started + if self.timing_logger is not None: + self.timing_logger.logging_started() + self.go_to_wire(starting_wire) + + + def _init_ui(self): + """Initialise the Tk interface.""" + self.tk.wm_title("SpiNNer Interactive Wiring Guide") + self.tk.geometry("1024x768") + + # Add a label widget into which the rendered UI is drawn + self.widget = Label() + self.widget.pack(expand=True, fill="both") + + # Used to avoid unecessary redraws on window resizes where nothing actually + # changes. + self._old_size = (None, None) + + # A flag which indicates whether, on the last poll of cable insertion, the + # cable was found to be inserted but inserted into the wrong socket. + self.connected_incorrectly = False + + # Set up a timer to poll for cable insertion + self.tk.after(InteractiveWiringGuide.POLL_INTERVAL_MS, self._poll_wiring_probe) + + # Handle window events + self.tk.bind("", self._on_resize) # Resize + self.tk.protocol("WM_DELETE_WINDOW", self._on_close) # Window closed + + # Setup key bindings + self.bindings = { + self._on_next: + (["", "", "", "", + "", ""], + "Next wire"), + self._on_prev: + (["", "", "", ""], + "Previous wire"), + self._on_skip_next: ([""], "Skip forward"), + self._on_skip_prev: ([""], "Skip backward"), + self._on_first: ([""], "First wire"), + self._on_last: ([""], "Last wire"), + self._on_tts_toggle: ([""], "Toggle speech"), + } + + if self.wiring_probe is not None: # pragma: no branch + self.bindings[self._on_auto_advance_toggle] = ( + [""], "Toggle auto-advance") + + if self.timing_logger is not None: # pragma: no branch + self.bindings[self._on_pause] = ([""], "Pause") + + for fn, (keys, help) in iteritems(self.bindings): + for key in keys: + self.tk.bind(key, fn) + + + def go_to_wire(self, wire): + """ + Advance to a specific wire. + """ + last_wire = self.cur_wire + self.cur_wire = wire + + # Reset the incorrect connection flag + self.connected_incorrectly = False + + # Update LEDs + self.set_leds(last_wire, False) + self.set_leds(self.cur_wire, True) + + # Log the start of *insertion* of a new wire + src, dst, length = self.wires[wire] + if self.timing_logger is not None: + self.timing_logger.unpause() + if length is not None: + sc, sf, sb, sd = self.wires[wire][0] + dc, df, db, dd = self.wires[wire][1] + self.timing_logger.connection_started(sc, sf, sb, sd, + dc, df, db, dd) + + # Announce via TTS the distance relative to the last position + if self.use_tts: + self.tts_delta(last_wire, self.cur_wire) + + + def set_leds(self, wire, state): + """ + Set the LEDs for the given wire index to the given state (assuming the + board's IP is known). + """ + try: + if self.bmp_controller is not None: + for c,f,b,p in self.wires[wire][:2]: + self.bmp_controller.set_led(self.bmp_led, state, c, f, b) + except: + # Quit if this goes wrong + self.tk.destroy() + raise + + + def tts_delta(self, last_wire, this_wire): + """ + Announce via TTS a brief instruction indicating what the next wire should be + in terms of the difference to the previous wire. + + Changes are announced relative to the last wire. + """ + message = "" + + # Announce wire-length changes + last_length = self.wires[last_wire][2] + this_length = self.wires[this_wire][2] + + if last_length != this_length: + if this_length is None: + message += "Disconnect cable. " + else: + message += "%s meter cable. "%(("%0.2f"%this_length).rstrip(".0")) + + # Announce which ports are being connected + this_tl = self._top_left_socket(this_wire) + this_br = self._bottom_right_socket(this_wire) + + message += self.socket_names[this_tl[3]] + message += " going " + message += self.socket_names[this_br[3]] + message += "." + + self._tts_speak(message) + + + def _tts_speak(self, text, wpm = 250): + """ + Speak the supplied string, interrupting whatever was already being said. + Non-blocking. + """ + # Kill previous instances + if self.tts_process is not None and self.tts_process.poll() is None: + self.tts_process.terminate() + + # Speak the required text. + self.tts_process = subprocess.Popen( ["espeak", "-s", str(wpm), text] + , stdout = subprocess.PIPE + , stderr = subprocess.PIPE + ) + + + def _get_wire_colour(self, length): + """ + Get the RGB colour (as a tuple) for wires of the specified length. + + Colours are allocated evenly across the spectrum. Wires to be removed are + shown in black. + """ + if length is None: + return (0.0, 0.0, 0.0) + + index = sorted(self.wire_lengths).index(length) + + hue = index / float(len(self.wire_lengths)) + + return colorsys.hsv_to_rgb(hue, 1.0, 0.5) + + + def _top_left_socket(self, wire): + """ + Return the (c,r,s,d) for the top-left socket for the current wire. + """ + + src, dst, length = self.wires[wire] + + return min([src, dst], key=(lambda v: (-v[0], # Right-to-left + +v[1], # Top-to-bottom + -v[2]))) # Right-to-left + + + def _bottom_right_socket(self, wire): + """ + Return the (c,r,s,d) for the bottom-right socket for the current wire. + """ + + src, dst, length = self.wires[wire] + + return max([src, dst], key=(lambda v: (-v[0], # Right-to-left + +v[1], # Top-to-bottom + -v[2]))) # Right-to-left + + + def _get_machine_diagram(self): + """ + Get the MachineDiagram ready to draw the system's current state. + """ + md = MachineDiagram(self.cabinet) + + bg_wire = self.cabinet.board_dimensions.x / 5.0 + fg_wire = self.cabinet.board_dimensions.x / 3.0 + + board_hl = self.cabinet.board_dimensions.x / 3.0 + wire_hl = self.cabinet.board_dimensions.x / 2.0 + + # Wires already installed + if self.show_installed_wires: + for src, dst, length in self.wires[:self.cur_wire]: + r,g,b = self._get_wire_colour(length) + md.add_wire(src, dst, rgba = (r,g,b,0.5), width = bg_wire) + + # Wires still to be installed + if self.show_future_wires: + for src, dst, length in self.wires[self.cur_wire+1:]: + r,g,b = self._get_wire_colour(length) + md.add_wire(src, dst, rgba = (r,g,b,0.5), width = bg_wire) + + # Current wire (with a white outline) + src, dst, length = self.wires[self.cur_wire] + r,g,b = self._get_wire_colour(length) + md.add_wire(src, dst, rgba = (1.0,1.0,1.0,1.0), width = fg_wire * 2) + md.add_wire(src, dst, rgba = (r,g,b,1.0), width = fg_wire) + + # Highlight source and destination + c,r,s,d = self._top_left_socket(self.cur_wire) + md.add_highlight(c,r,s,d, rgba = self.TOP_LEFT_COLOUR, width=wire_hl) + md.add_highlight(c,r,s, rgba = self.TOP_LEFT_COLOUR, width=board_hl) + c,r,s,d = self._bottom_right_socket(self.cur_wire) + md.add_highlight(c,r,s,d, rgba = self.BOTTOM_RIGHT_COLOUR, width=wire_hl) + md.add_highlight(c,r,s, rgba = self.BOTTOM_RIGHT_COLOUR, width=board_hl) + + return md + + + def _draw_text(self, ctx, text, size, rgba = (0.0,0.0,0.0, 1.0)): + """ + Draw the desired text centered below (0,0). + """ + ctx.save() + + ctx.select_font_face("Sans") + ctx.set_source_rgba(*rgba) + ctx.set_font_size(size*0.8) + x,y, w,h, _w,_h = ctx.text_extents(text) + ctx.move_to(-x - w/2, -y + size*0.1) + ctx.show_text(text) + + ctx.restore() + + + def _draw_help_text(self, ctx, width, height, rgba = (0.5,0.5,0.5, 1.0)): + """ + Draw the help text along the bottom of the screen, return the height of the + text in pixels. + """ + # Generate help string + help_text = " | ".join("{} {}".format(keys[0], help) + for keys, help in + sorted(itervalues(self.bindings), + key=(lambda kh: kh[0]))) + + ctx.save() + + # Determine font size which will fill the width of the screen + ctx.select_font_face("Sans") + ctx.set_source_rgba(*rgba) + ctx.set_font_size(1.0) + x,y, w,h, _w,_h = ctx.text_extents(help_text) + scale_factor = (width * 0.95) / w + + # Draw the text along the bottom of the screen + ctx.set_font_size(scale_factor) + x,y, w,h, _w,_h = ctx.text_extents(help_text) + ctx.move_to(x + ((width - w) / 2), height + y) + ctx.show_text(help_text) + + ctx.restore() + + return h + + + def _render_gui(self, ctx, width, height): + """ + Re-draw the whole GUI into the supplied Cairo context. + """ + # Clear the buffer background + ctx.set_source_rgba(1.0,1.0,1.0,1.0); + ctx.rectangle(0,0, width, height) + ctx.fill() + + # Draw help text along bottom of screen. + height -= self._draw_help_text(ctx, width, height) + + md = self._get_machine_diagram() + + # Draw the main overview image + ctx.save() + ctx.translate(width*self.ZOOMED_VIEW_WIDTH, 0.0) + ctx.rectangle(0,0, width * (1.0 - (2*self.ZOOMED_VIEW_WIDTH)), height * (1 - (2*self.TEXT_ROW_HEIGHT))) + ctx.clip() + md.draw( ctx + , width * (1.0 - (2*self.ZOOMED_VIEW_WIDTH)) + , height * (1 - (2*self.TEXT_ROW_HEIGHT)) + , *((list(self.focus) + [None]*3)[:3] * 2) + ) + ctx.restore() + + # Draw the left zoomed-in image + ctx.save() + ctx.rectangle(0,0, width*self.ZOOMED_VIEW_WIDTH, height*(1-(2*self.TEXT_ROW_HEIGHT))) + ctx.clip() + ctx.translate( width*self.ZOOMED_VIEW_WIDTH*(1-self.ZOOMED_MARGINS)/2 + , height*(1-(2*self.TEXT_ROW_HEIGHT))*(1-self.ZOOMED_MARGINS)/2 + ) + ctx.scale(self.ZOOMED_MARGINS, self.ZOOMED_MARGINS) + md.draw( ctx + , width*self.ZOOMED_VIEW_WIDTH + , height*(1 - (2*self.TEXT_ROW_HEIGHT)) + , *(list(self._top_left_socket(self.cur_wire)[:3]) + + list(self.focus)) + ) + ctx.restore() + + # Draw the right zoomed-in image + ctx.save() + ctx.translate(width*(1-self.ZOOMED_VIEW_WIDTH), 0.0) + ctx.rectangle(0,0, width*self.ZOOMED_VIEW_WIDTH, height*(1-(2*self.TEXT_ROW_HEIGHT))) + ctx.clip() + ctx.translate( width*self.ZOOMED_VIEW_WIDTH*(1-self.ZOOMED_MARGINS)/2 + , height*(1-(2*self.TEXT_ROW_HEIGHT))*(1-self.ZOOMED_MARGINS)/2 + ) + ctx.scale(self.ZOOMED_MARGINS, self.ZOOMED_MARGINS) + md.draw( ctx + , width*self.ZOOMED_VIEW_WIDTH + , height*(1 - (2*self.TEXT_ROW_HEIGHT)) + , *(list(self._bottom_right_socket(self.cur_wire)[:3]) + + list(self.focus)) + ) + ctx.restore() + + # Draw the wire length + ctx.save() + ctx.translate(width/2, height*(1 - (2*self.TEXT_ROW_HEIGHT))) + length = self.wires[self.cur_wire][2] + self._draw_text( ctx + , "%0.2f m"%(length) + if length is not None + else "Disconnect Wire" + , height*self.TEXT_ROW_HEIGHT + , rgba = self._get_wire_colour(length) + ) + + # Draw the progress + ctx.translate(0, height*self.TEXT_ROW_HEIGHT) + self._draw_text( ctx + , "%d of %d (%0.1f%%)"%( self.cur_wire + 1 + , len(self.wires) + , 100.0*((self.cur_wire+1)/float(len(self.wires))) + ) + , height*self.TEXT_ROW_HEIGHT + ) + ctx.restore() + + # Draw the endpoint specifications + for x_offset, (c,r,s,d) in [ (width * (self.ZOOMED_VIEW_WIDTH/2), self._top_left_socket(self.cur_wire)) + , (width * (1-(self.ZOOMED_VIEW_WIDTH/2)), self._bottom_right_socket(self.cur_wire)) + ]: + ctx.save() + ctx.translate(x_offset, 0.0) + + # Socket number + ctx.translate(0, height*(1 - (2*self.TEXT_ROW_HEIGHT))) + self._draw_text( ctx + , self.socket_names[d] + , height*self.TEXT_ROW_HEIGHT + ) + + # Draw the progress + ctx.translate(0, height*self.TEXT_ROW_HEIGHT) + self._draw_text( ctx + , "C%d F%d B%02d"%(c,r,s) + , height*self.TEXT_ROW_HEIGHT + ) + ctx.restore() + + # Draw a full-screen "paused" indicator, if paused + if self.timing_logger is not None and self.timing_logger.paused: + ctx.save() + + ctx.translate(width / 2.0, height / 2.0) + scale = min(width, height) * 0.4 + ctx.scale(scale, scale) + + ctx.move_to(0, 0) + ctx.new_sub_path() + ctx.arc(0, 0, 1.0, 0.0, 2.0 * pi) + ctx.set_source_rgba(0.0,0.0,0.0,0.8); + ctx.fill_preserve() + ctx.set_source_rgba(0.0,0.0,0.0,1.0); + ctx.set_line_width(0.1) + ctx.stroke() + + ctx.move_to(-0.25, -0.5); + ctx.rel_line_to(0.0, 1.0); + ctx.move_to(0.25, -0.5); + ctx.rel_line_to(0.0, 1.0); + ctx.set_source_rgba(0.7,0.7,0.7,1.0); + ctx.set_line_width(0.2) + ctx.stroke() + + ctx.restore() + + + def _redraw(self): + """Redraw the GUI and display it on the screen.""" + + # Get a new context to draw the GUI into + height = self.tk.winfo_height() + width = self.tk.winfo_width() + + surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height) + ctx = cairo.Context(surface) + + # Render it + self._render_gui(ctx, width, height) + + # Draw onto the window (note: a reference to the image is kept to keep it + # safe from garbage collection) + self.widget.image = ImageTk.PhotoImage(Image.frombuffer( + "RGBA", (width, height), surface.get_data(), "raw", "BGRA", 0, 1)) + self.widget.configure(image=self.widget.image) + self.widget.pack(expand=True, fill="both") + + + def _poll_wiring_probe(self): + """Poll the machine's hardware to determine if the wiring is complete. + """ + try: + # Check wiring conncectivity + if self.wiring_probe is not None and self.auto_advance: + src, dst, length = self.wires[self.cur_wire] + + # Check both ends of the cable + actual_dst = self.wiring_probe.get_link_target(*src) + actual_src = self.wiring_probe.get_link_target(*dst) + + advance = False + + if length is None: + # We're waiting for the wire to be disconnected + if actual_src is None and actual_dst is None: + # Disconnected! Advance to the next wire! + advance = True + else: + # We're waiting for a wire to be connected + if actual_src == src and actual_dst == dst: + # Connected correctly! Advance to the next wire! + advance = True + if self.timing_logger is not None: + self.timing_logger.unpause() + self.timing_logger.connection_complete() + elif actual_dst is not None or actual_src is not None: + # The wire was connected, but was connected incorrectly! + if not self.connected_incorrectly: + self._tts_speak("Wire inserted incorrectly.") + if self.timing_logger is not None: + self.timing_logger.unpause() + self.timing_logger.connection_error() + self.connected_incorrectly = True + else: + # No wire is connected + self.connected_incorrectly = False + + # Actually advance, as required + if advance and self.cur_wire != len(self.wires) - 1: + self.go_to_wire(self.cur_wire + 1) + self._redraw() + except: + # Fail gracefully... + print(traceback.format_exc()) + + # Schedule next poll + self.tk.after(InteractiveWiringGuide.POLL_INTERVAL_MS, self._poll_wiring_probe) + + + def _on_next(self, event): + """Advance to the next wire.""" + self.go_to_wire((self.cur_wire + 1) % len(self.wires)) + self._redraw() + + + def _on_prev(self, event): + """Retreat to the previous wire.""" + self.go_to_wire((self.cur_wire - 1) % len(self.wires)) + self._redraw() + + + def _on_first(self, event): + """Go back to the first wire.""" + self.go_to_wire(0) + self._redraw() + + + def _on_last(self, event): + """Go to the last first wire.""" + self.go_to_wire(len(self.wires)-1) + self._redraw() + + + def _on_skip_next(self, event): + """Advance rapidly forward through the wires.""" + self.go_to_wire((self.cur_wire + 25) % len(self.wires)) + self._redraw() + + + def _on_skip_prev(self, event): + """Retreat rapidly backward through the wires.""" + self.go_to_wire((self.cur_wire - 25) % len(self.wires)) + self._redraw() + + + def _on_tts_toggle(self, event): + """Toggle whether Text-to-Speech is enabled.""" + self.use_tts = not self.use_tts + if self.use_tts: + self._tts_speak("Text to speech enabled.") + else: + self._tts_speak("Text to speech disabled.") + + + def _on_auto_advance_toggle(self, event): + """Toggle whether auto-advance is enabled.""" + if self.wiring_probe is not None: + self.auto_advance = not self.auto_advance + if self.auto_advance: + self._tts_speak("Auto advance enabled.") + else: + self._tts_speak("Auto advance disabled.") + else: + self._tts_speak("Auto advance not supported.") + + + def _on_pause(self, event): + """Toggle whether timings are being recorded.""" + if self.timing_logger is not None: + if self.timing_logger.paused: + self.timing_logger.unpause() + else: + self.timing_logger.pause() + self._redraw() + + + def _on_resize(self, event): + """Window has been resized, trigger a redraw.""" + new_size = (event.width, event.height) + + if self._old_size != new_size: + self._old_size = new_size + self._redraw() + + + def _on_close(self, event=None): + """The window has been closed.""" + if self.timing_logger is not None: + self.timing_logger.logging_stopped() + + # Turn off LEDs before leaving + self.set_leds(self.cur_wire, False) + self.tk.destroy() + + + def mainloop(self): # pragma: no cover + """Start the interactive wiring guide GUI. Returns when the window is + closed.""" + return self.tk.mainloop() diff --git a/spinner/diagrams/machine_map.py b/spinner/diagrams/machine_map.py index c20c774..43efa5e 100644 --- a/spinner/diagrams/machine_map.py +++ b/spinner/diagrams/machine_map.py @@ -1,6 +1,6 @@ """ -Tools for producing diagrams which show the network topology overlaid with board -boundaries and physical locations. +Tools for producing diagrams which show the network topology overlaid with +board boundaries and physical locations. """ import cairocffi as cairo @@ -15,19 +15,19 @@ # Colours/thicknesses for drawing edges of chips INTERNAL_EDGE_COLOUR = (0.82, 0.84, 0.81, 1.0) -N_S_EDGE_COLOUR = (0.80, 0.00, 0.00, 1.0) -W_E_EDGE_COLOUR = (0.30, 0.60, 0.02, 1.0) -NE_SW_EDGE_COLOUR = (0.20, 0.40, 0.64, 1.0) +N_S_EDGE_COLOUR = (0.80, 0.00, 0.00, 1.0) +W_E_EDGE_COLOUR = (0.30, 0.60, 0.02, 1.0) +NE_SW_EDGE_COLOUR = (0.20, 0.40, 0.64, 1.0) INTERNAL_EDGE_WIDTH = 0.03 -N_S_EDGE_WIDTH = 0.1 -W_E_EDGE_WIDTH = 0.1 -NE_SW_EDGE_WIDTH = 0.1 +N_S_EDGE_WIDTH = 0.1 +W_E_EDGE_WIDTH = 0.1 +NE_SW_EDGE_WIDTH = 0.1 # Text label sizes/colours BTM_LEFT_CHIP_LABEL_COLOUR = (0.33, 0.34, 0.32, 1.0) -BOARD_LABEL_COLOUR = (0.33, 0.34, 0.32, 0.5) -COORD_LABEL_COLOUR = (0.53, 0.54, 0.52, 1.0) +BOARD_LABEL_COLOUR = (0.33, 0.34, 0.32, 0.5) +COORD_LABEL_COLOUR = (0.53, 0.54, 0.52, 1.0) BTM_LEFT_CHIP_LABEL_SIZE = 0.6 BOARD_LABEL_SIZE = 1.1 @@ -39,306 +39,331 @@ BTM_LEFT_CHIP_DOT_COLOUR = BTM_LEFT_CHIP_LABEL_COLOUR BTM_LEFT_CHIP_DOT_SIZE = 0.20 +EDGE_MAP = { + Direction.south: 0, + Direction.east: 1, + Direction.north_east: 2, + Direction.north: 3, + Direction.west: 4, + Direction.south_west: 5, +} + def D2R(d): - """ - Convert degrees to radians. - """ - return (float(d)/180.0)*pi - - -def _draw_edge(ctx, x,y, edge): - """ - Draw the specified single edge of a hexagon whose bottom-left corner is at - (x,y) using whatever style is currently set. - """ - - edge_num = { - Direction.south: 0, - Direction.east: 1, - Direction.north_east: 2, - Direction.north: 3, - Direction.west: 4, - Direction.south_west: 5, - }[edge] - - # The transformation to use when drawing hexagons such that they sit on an x,y - # grid. - ctx.save() - x_scale = 1.0/(2.0*sin(D2R(60))) - y_scale = 1.0/(1.0+cos(D2R(60))) - ctx.translate( (0.25*tan(D2R(60))) * x_scale - , 0.5*sin(D2R(30)) * y_scale - ) - ctx.move_to(x,y) - ctx.transform(cairo.Matrix( 1.0, 0.0 - , -cos(D2R(60)), 1.0 - )) - ctx.scale( x_scale - , y_scale - ) - - # Draw the specified edge - ctx.rotate(D2R(30)) - for _ in range(edge_num): - ctx.rotate(D2R(-60)) - ctx.rel_move_to(1.0,0.0) - ctx.rotate(D2R(-60)) - ctx.rel_line_to(1.0,0.0) - ctx.restore() - - ctx.stroke() - - -def _draw_text(ctx, text, align_point=0.0, size = 1.0, rgba = (0.0,0.0,0.0, 1.0)): - """ - Draw the desired text centered vertically around (0,0) horizontally - "align_point" along the text's width. - """ - ctx.save() - - ctx.select_font_face("Sans") - ctx.set_source_rgba(*rgba) - ctx.set_font_size(size) - x,y, w,h, _w,_h = ctx.text_extents(text) - ctx.move_to(-x + ((1.0-w)*align_point), -y - size/2) - ctx.show_text(text) - - ctx.restore() - - -def _draw_board(ctx, x,y, width_chips, height_chips, btm_left_chip, board_label): - """ - Draw a board at the given position onto the given context in a system of the - given width/height in chips. If specified a label will be added to the - bottom-left chip and the board as a whole. Edges of chips at the edge of the - board will be drawn differently to indicate which HSS link they will use. - """ - # Draw the chips - hexagon = set(hexagon_zero()) - max_x = max(x for (x,y) in hexagon) - max_y = max(y for (x,y) in hexagon) - for dx,dy in hexagon: - northempty = (dx+0, dy+1) not in hexagon - southempty = (dx+0, dy-1) not in hexagon - southwestempty = (dx-1, dy-1) not in hexagon - westempty = (dx-1, dy+0) not in hexagon - eastempty = (dx+1, dy+0) not in hexagon - northeastempty = (dx+1, dy+1) not in hexagon - - x_ = (x+dx)%width_chips - y_ = -((y+dy)%height_chips) - - if northempty and northeastempty: - ctx.set_line_width(N_S_EDGE_WIDTH) - ctx.set_source_rgba(*N_S_EDGE_COLOUR) - elif northempty and westempty: - ctx.set_line_width(W_E_EDGE_WIDTH) - ctx.set_source_rgba(*W_E_EDGE_COLOUR) - else: - ctx.set_line_width(INTERNAL_EDGE_WIDTH) - ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) - _draw_edge(ctx, x_, y_, Direction.north) - - if northempty and northeastempty: - ctx.set_line_width(N_S_EDGE_WIDTH) - ctx.set_source_rgba(*N_S_EDGE_COLOUR) - elif northeastempty and eastempty: - ctx.set_line_width(NE_SW_EDGE_WIDTH) - ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) - else: - ctx.set_line_width(INTERNAL_EDGE_WIDTH) - ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) - _draw_edge(ctx, x_, y_, Direction.north_east) - - if eastempty and southempty: - ctx.set_line_width(W_E_EDGE_WIDTH) - ctx.set_source_rgba(*W_E_EDGE_COLOUR) - elif northeastempty and eastempty: - ctx.set_line_width(NE_SW_EDGE_WIDTH) - ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) - else: - ctx.set_line_width(INTERNAL_EDGE_WIDTH) - ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) - _draw_edge(ctx, x_, y_, Direction.east) - - if eastempty and southempty: - ctx.set_line_width(W_E_EDGE_WIDTH) - ctx.set_source_rgba(*W_E_EDGE_COLOUR) - elif southempty and southwestempty: - ctx.set_line_width(N_S_EDGE_WIDTH) - ctx.set_source_rgba(*N_S_EDGE_COLOUR) - else: - ctx.set_line_width(INTERNAL_EDGE_WIDTH) - ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) - _draw_edge(ctx, x_, y_, Direction.south) - - if westempty and southwestempty: - ctx.set_line_width(NE_SW_EDGE_WIDTH) - ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) - elif southempty and southwestempty: - ctx.set_line_width(N_S_EDGE_WIDTH) - ctx.set_source_rgba(*N_S_EDGE_COLOUR) - else: - ctx.set_line_width(INTERNAL_EDGE_WIDTH) - ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) - _draw_edge(ctx, x_, y_, Direction.south_west) - - if westempty and southwestempty: - ctx.set_line_width(NE_SW_EDGE_WIDTH) - ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) - elif northempty and westempty: - ctx.set_line_width(W_E_EDGE_WIDTH) - ctx.set_source_rgba(*W_E_EDGE_COLOUR) - else: - ctx.set_line_width(INTERNAL_EDGE_WIDTH) - ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) - _draw_edge(ctx, x_, y_, Direction.west) - - - # Draw the board's (0,0) chip label and dot - ctx.save() - ctx.translate(x+0.5, -y-0.5) - - # Dot - ctx.set_source_rgba(*BTM_LEFT_CHIP_DOT_COLOUR) - ctx.arc( 0.0,0.0 - , BTM_LEFT_CHIP_DOT_SIZE/2.0 - , 0.0, 2.0*pi - ) - ctx.fill() - - # Label - ctx.translate(BTM_LEFT_CHIP_DOT_SIZE*1.5, 0.0) - _draw_text( ctx - , btm_left_chip - , align_point = 0.0 - , size = BTM_LEFT_CHIP_LABEL_SIZE - , rgba = BTM_LEFT_CHIP_LABEL_COLOUR - ) - ctx.restore() - - - # Draw the board label - positions = [] - - # Position (or duplicate) appropriately depending on splitting of the board - if x+max_x < width_chips and y+max_y < height_chips: - # Not split - positions.append((x+(0.5*max_x), -y-(0.5*max_y))) - elif x+max_x > width_chips: - # Split on the right edge - positions.append((x+(0.5*(width_chips-x)), -y-(0.33*max_y))) - # Split on the left edge - positions.append(((0.5*(max_x-(width_chips-x))), -y-(0.66*max_y))) - - board_label = board_label.replace(" ","\n") - elif y+max_y > height_chips: # pragma: no branch - # Split on top edge - positions.append((x+(0.33*max_x), -y-(0.5*(height_chips-y)))) - # Split on bottom edge - positions.append((x+(0.66*max_x), 0.5-(0.5*(max_y-(height_chips-y))))) - - for x_,y_ in positions: - ctx.save() - ctx.translate(x_,y_) - lines = board_label.split("\n") - ctx.translate(0.0, -(BOARD_LABEL_SIZE*(len(lines)/2.0))) - for num, line in enumerate(lines): - _draw_text( ctx - , line - , align_point = 0.5 - , size = BOARD_LABEL_SIZE - , rgba = BOARD_LABEL_COLOUR - ) - ctx.translate(0.0, BOARD_LABEL_SIZE) - ctx.restore() + """ + Convert degrees to radians. + """ + return (float(d)/180.0)*pi + + +def _draw_edge(ctx, x, y, edge): + """ + Draw the specified single edge of a hexagon whose bottom-left corner is at + (x,y) using whatever style is currently set. + """ + + edge_num = EDGE_MAP[edge] + + # The transformation to use when drawing hexagons such that they sit on an + # x,y grid. + ctx.save() + x_scale = 1.0/(2.0*sin(D2R(60))) + y_scale = 1.0/(1.0+cos(D2R(60))) + ctx.translate((0.25*tan(D2R(60))) * x_scale, 0.5*sin(D2R(30)) * y_scale) + ctx.move_to(x, y) + ctx.transform(cairo.Matrix(1.0, 0.0, -cos(D2R(60)), 1.0)) + ctx.scale(x_scale, y_scale) + + # Draw the specified edge + ctx.rotate(D2R(30)) + for _ in range(edge_num): + ctx.rotate(D2R(-60)) + ctx.rel_move_to(1.0, 0.0) + ctx.rotate(D2R(-60)) + ctx.rel_line_to(1.0, 0.0) + ctx.restore() + + ctx.stroke() + + +def _fill_hex(ctx, x, y): + """ + Fill a hexagon whose bottom-left corner is at + (x,y) using whatever style is currently set. + """ + + # The transformation to use when drawing hexagons such that they sit on an + # x,y grid. + ctx.save() + x_scale = 1.0/(2.0*sin(D2R(60))) + y_scale = 1.0/(1.0+cos(D2R(60))) + ctx.translate((0.25*tan(D2R(60))) * x_scale, 0.5*sin(D2R(30)) * y_scale) + ctx.move_to(x, y) + ctx.transform(cairo.Matrix(1.0, 0.0, -cos(D2R(60)), 1.0)) + ctx.scale(x_scale, y_scale) + + # Draw the specified hexagon + ctx.rotate(D2R(30)) + for _ in range(6): + ctx.rotate(D2R(-60)) + ctx.rel_line_to(1.0, 0.0) + ctx.close_path() + ctx.fill() + ctx.restore() + + +def _draw_text(ctx, text, align_point=0.0, size=1.0, + rgba=(0.0, 0.0, 0.0, 1.0)): + """ + Draw the desired text centered vertically around (0,0) horizontally + "align_point" along the text's width. + """ + ctx.save() + + ctx.select_font_face("Sans") + ctx.set_source_rgba(*rgba) + ctx.set_font_size(size) + x, y, w, _h, _w, _h = ctx.text_extents(text) + ctx.move_to(-x + ((1.0-w)*align_point), -y - size/2) + ctx.show_text(text) + + ctx.restore() + + +def _draw_board(ctx, x, y, width_chips, height_chips, btm_left_chip, + board_label, board_colours): + """ + Draw a board at the given position onto the given context in a system of + the given width/height in chips. If specified a label will be added to the + bottom-left chip and the board as a whole. Edges of chips at the edge of + the board will be drawn differently to indicate which HSS link they will + use. + """ + # Draw the chips + hexagon = set(hexagon_zero()) + max_x = max(x_ for (x_, _y) in hexagon) + max_y = max(y_ for (_x, y_) in hexagon) + for dx, dy in hexagon: + northempty = (dx+0, dy+1) not in hexagon + southempty = (dx+0, dy-1) not in hexagon + southwestempty = (dx-1, dy-1) not in hexagon + westempty = (dx-1, dy+0) not in hexagon + eastempty = (dx+1, dy+0) not in hexagon + northeastempty = (dx+1, dy+1) not in hexagon + + x_ = (x + dx) % width_chips + y_ = -((y + dy) % height_chips) + + colour = board_colours.get((x_, -y_)) + if colour is not None: + ctx.set_source_rgba(*colour) + _fill_hex(ctx, x_, y_) + + if northempty and northeastempty: + ctx.set_line_width(N_S_EDGE_WIDTH) + ctx.set_source_rgba(*N_S_EDGE_COLOUR) + elif northempty and westempty: + ctx.set_line_width(W_E_EDGE_WIDTH) + ctx.set_source_rgba(*W_E_EDGE_COLOUR) + else: + ctx.set_line_width(INTERNAL_EDGE_WIDTH) + ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) + _draw_edge(ctx, x_, y_, Direction.north) + + if northempty and northeastempty: + ctx.set_line_width(N_S_EDGE_WIDTH) + ctx.set_source_rgba(*N_S_EDGE_COLOUR) + elif northeastempty and eastempty: + ctx.set_line_width(NE_SW_EDGE_WIDTH) + ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) + else: + ctx.set_line_width(INTERNAL_EDGE_WIDTH) + ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) + _draw_edge(ctx, x_, y_, Direction.north_east) + + if eastempty and southempty: + ctx.set_line_width(W_E_EDGE_WIDTH) + ctx.set_source_rgba(*W_E_EDGE_COLOUR) + elif northeastempty and eastempty: + ctx.set_line_width(NE_SW_EDGE_WIDTH) + ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) + else: + ctx.set_line_width(INTERNAL_EDGE_WIDTH) + ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) + _draw_edge(ctx, x_, y_, Direction.east) + + if eastempty and southempty: + ctx.set_line_width(W_E_EDGE_WIDTH) + ctx.set_source_rgba(*W_E_EDGE_COLOUR) + elif southempty and southwestempty: + ctx.set_line_width(N_S_EDGE_WIDTH) + ctx.set_source_rgba(*N_S_EDGE_COLOUR) + else: + ctx.set_line_width(INTERNAL_EDGE_WIDTH) + ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) + _draw_edge(ctx, x_, y_, Direction.south) + + if westempty and southwestempty: + ctx.set_line_width(NE_SW_EDGE_WIDTH) + ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) + elif southempty and southwestempty: + ctx.set_line_width(N_S_EDGE_WIDTH) + ctx.set_source_rgba(*N_S_EDGE_COLOUR) + else: + ctx.set_line_width(INTERNAL_EDGE_WIDTH) + ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) + _draw_edge(ctx, x_, y_, Direction.south_west) + + if westempty and southwestempty: + ctx.set_line_width(NE_SW_EDGE_WIDTH) + ctx.set_source_rgba(*NE_SW_EDGE_COLOUR) + elif northempty and westempty: + ctx.set_line_width(W_E_EDGE_WIDTH) + ctx.set_source_rgba(*W_E_EDGE_COLOUR) + else: + ctx.set_line_width(INTERNAL_EDGE_WIDTH) + ctx.set_source_rgba(*INTERNAL_EDGE_COLOUR) + _draw_edge(ctx, x_, y_, Direction.west) + + # Draw the board's (0,0) chip label and dot + ctx.save() + ctx.translate(x+0.5, -y-0.5) + + # Dot + ctx.set_source_rgba(*BTM_LEFT_CHIP_DOT_COLOUR) + ctx.arc(0.0, 0.0, BTM_LEFT_CHIP_DOT_SIZE / 2.0, 0.0, 2.0 * pi) + ctx.fill() + + # Label + ctx.translate(BTM_LEFT_CHIP_DOT_SIZE*1.5, 0.0) + _draw_text(ctx, btm_left_chip, align_point=0.0, + size=BTM_LEFT_CHIP_LABEL_SIZE, + rgba=BTM_LEFT_CHIP_LABEL_COLOUR) + ctx.restore() + + # Draw the board label + positions = [] + + # Position (or duplicate) appropriately depending on splitting of the board + if x+max_x < width_chips and y+max_y < height_chips: + # Not split + positions.append((x+(0.5*max_x), -y-(0.5*max_y))) + elif x+max_x > width_chips: + # Split on the right edge + positions.append((x+(0.5*(width_chips-x)), -y-(0.33*max_y))) + # Split on the left edge + positions.append(((0.5*(max_x-(width_chips-x))), -y-(0.66*max_y))) + + board_label = board_label.replace(" ", "\n") + elif y+max_y > height_chips: # pragma: no branch + # Split on top edge + positions.append((x+(0.33*max_x), -y-(0.5*(height_chips-y)))) + # Split on bottom edge + positions.append((x+(0.66*max_x), 0.5-(0.5*(max_y-(height_chips-y))))) + + for x_, y_ in positions: + ctx.save() + ctx.translate(x_, y_) + lines = board_label.split("\n") + ctx.translate(0.0, -(BOARD_LABEL_SIZE*(len(lines)/2.0))) + for line in lines: + _draw_text(ctx, line, align_point=0.5, size=BOARD_LABEL_SIZE, + rgba=BOARD_LABEL_COLOUR) + ctx.translate(0.0, BOARD_LABEL_SIZE) + ctx.restore() def _draw_coords(ctx, width, height): - """ - Given a context and a width and height in chips, draw numbered ticks along the - edges of the diagram. - """ - for num, rotation, offset, alignment in ( (height, 0.0, -1 - COORD_LABEL_OFFSET, 1.0) # Left - , (height, 0.0, width + COORD_LABEL_OFFSET, 0.0) # Right - , (width, 0.5*pi, COORD_LABEL_OFFSET, 0.0) # Bottom - , (width, 0.5*pi, -height - 1 - COORD_LABEL_OFFSET, 1.0) # Top - ): - ctx.save() - ctx.rotate(rotation) - ctx.translate(offset, -0.5) - for i in range(num): - _draw_text( ctx - , str(i) - , align_point = alignment - , size = COORD_LABEL_SIZE - , rgba = COORD_LABEL_COLOUR - ) - ctx.translate(0.0, -1.0) - ctx.restore() + """ + Given a context and a width and height in chips, draw numbered ticks along + tje edges of the diagram. + """ + for num, rotation, offset, alignment in ( + (height, 0.0, -1 - COORD_LABEL_OFFSET, 1.0), # Left + (height, 0.0, width + COORD_LABEL_OFFSET, 0.0), # Right + (width, 0.5 * pi, COORD_LABEL_OFFSET, 0.0), # Bottom + (width, 0.5 * pi, -height - 1 - COORD_LABEL_OFFSET, 1.0)): # Top + ctx.save() + ctx.rotate(rotation) + ctx.translate(offset, -0.5) + for i in range(num): + _draw_text(ctx, str(i), align_point=alignment, + size=COORD_LABEL_SIZE, rgba=COORD_LABEL_COLOUR) + ctx.translate(0.0, -1.0) + ctx.restore() + def draw_machine_map(ctx, image_width, image_height, width, height, - hex_boards, cabinetised_boards): - """ - Given a Cairo context (and the size of the rectangle to draw in), draws a map of - the machine. Takes the width and height of the system in triads, hex_boards - (mapping 3D hexagonal board coordinates to boards) and a cabinetised version - of the same. - """ - ctx.save() - - # Number of chips in the system - width_chips = width * 12 - height_chips = height * 12 - - # Get the extent of the cabinets used - max_cabinet = max(c for (b,(c,f,b)) in cabinetised_boards) - max_frame = max(f for (b,(c,f,b)) in cabinetised_boards) - max_board = max(b for (b,(c,f,b)) in cabinetised_boards) - - # Lookup from board to cabinet position - b2c = dict(cabinetised_boards) - - # Rescale the drawing such that the diagram is rescaled to nicely fit in the - # space given - diagram_width = width_chips + (2.0*MARGIN) - diagram_height = height_chips + (2.0*MARGIN) - scale = min(image_width/diagram_width, image_height/diagram_height) - ctx.translate( (image_width - (diagram_width*scale)) / 2 - , (image_height - (diagram_height*scale)) / 2 - ) - ctx.scale(scale, scale) - - ctx.set_line_cap(cairo.LINE_CAP_ROUND) - - # Move to bottom-left chip - ctx.translate(MARGIN, height_chips+MARGIN) - - # Draw coordinates around edge - _draw_coords(ctx, width_chips, height_chips) - - # Draw each board - for board, board_pos in hex_boards: - x,y = to_xy(board_to_chip(board_pos)) - c,f,b = b2c[board] - _draw_board( ctx - , x,y, width_chips, height_chips - , "(%d,%d)"%(x,y) - , ("C%d "%c if max_cabinet else "") - + ("F%d "%f if max_frame else "") - + ("B%d"%b) - ) - - ctx.restore() + hex_boards, cabinetised_boards, board_colours=None, + include_boards=None): + """ + Given a Cairo context (and the size of the rectangle to draw in), draws a + map of the machine. Takes the width and height of the system in triads, + hex_boards (mapping 3D hexagonal board coordinates to boards) and a + cabinetised version of the same. + """ + if board_colours is None: + board_colours = dict() + + ctx.save() + + # Number of chips in the system + width_chips = width * 12 + height_chips = height * 12 + if include_boards is not None: + width_chips = max(x for x, _y in include_boards) + 8 + height_chips = max(y for _x, y in include_boards) + 8 + + # Get the extent of the cabinets used + max_cabinet = None + max_frame = None + if cabinetised_boards: + max_cabinet = max(c for (_b, (c, _f, _b)) in cabinetised_boards) + max_frame = max(f for (_b, (_c, f, _b)) in cabinetised_boards) + + # Lookup from board to cabinet position + b2c = dict(cabinetised_boards) + + # Rescale the drawing such that the diagram is rescaled to nicely fit in + # the space given + diagram_width = width_chips + (2.0 * MARGIN) + diagram_height = height_chips + (2.0*MARGIN) + scale = min(image_width/diagram_width, image_height/diagram_height) + ctx.translate((image_width - (diagram_width*scale)) / 2, + (image_height - (diagram_height*scale)) / 2) + ctx.scale(scale, scale) + + ctx.set_line_cap(cairo.LINE_CAP_ROUND) + + # Move to bottom-left chip + ctx.translate(MARGIN, height_chips+MARGIN) + + # Draw coordinates around edge + _draw_coords(ctx, width_chips, height_chips) + + # Draw each board + for board, board_pos in hex_boards: + x, y = to_xy(board_to_chip(board_pos)) + if include_boards is not None and (x, y) not in include_boards: + continue + label = "" + if cabinetised_boards: + c, f, b = b2c[board] + label = (("C%d " % c if max_cabinet else "") + + ("F%d " % f if max_frame else "") + + ("B%d" % b)) + _draw_board(ctx, x, y, width_chips, height_chips, "(%d,%d)" % (x, y), + label, board_colours) + + ctx.restore() def get_machine_map_aspect_ratio(width, height): - """Given a system size (in triads), returns the aspect ratio of the diagram - generated by draw_machine_map. - """ - diagram_width = (width * 12) + (2.0*MARGIN) - diagram_height = (height * 12) + (2.0*MARGIN) - - return diagram_width / float(diagram_height) + """Given a system size (in triads), returns the aspect ratio of the diagram + generated by draw_machine_map. + """ + diagram_width = (width * 12) + (2.0*MARGIN) + diagram_height = (height * 12) + (2.0*MARGIN) + + return diagram_width / float(diagram_height) diff --git a/spinner/scripts/arguments.py b/spinner/scripts/arguments.py index e6b0de1..e75b7a0 100644 --- a/spinner/scripts/arguments.py +++ b/spinner/scripts/arguments.py @@ -18,235 +18,235 @@ def CabinetAction(num_levels=4, append=False): - """"An argparse Action which accepts cabinet/frame/board/link references.""" - assert 1 <= num_levels <= 4 - - class _CabinetAction(argparse.Action): - - """Names of directions for command line arguments.""" - DIRECTION_NAMES = {d.name.replace("_", "-"): d for d in Direction} - - def __init__(self, *args, **kwargs): - kwargs.setdefault("type", str) - kwargs.setdefault("nargs", "+") - - metavar = "" - if num_levels >= 4: - metavar = " [{{{}}}{}]".format(",".join(self.DIRECTION_NAMES), metavar) - if num_levels >= 3: - metavar = " [BOARD{}]".format(metavar) - if num_levels >= 2: - metavar = " [FRAME{}]".format(metavar) - metavar = "CABINET{}".format(metavar) - kwargs.setdefault("metavar", (metavar, "")) - - argparse.Action.__init__(self, *args, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - # Fail with too many/few arguments - if not 1 <= len(values) <= num_levels: - parser.error("{} expects between 1 and {} values".format( - option_string, num_levels)) - - # Check cabinet/frame/board are integer types (and cast to int as - # appropriate) - for int_val, name in enumerate("CABINET FRAME BOARD".split()): - if len(values) > int_val: - if not values[int_val].isdigit(): - parser.error("{} value for {} must be a non-negative integer, not '{}'".format( - option_string, name, values[int_val])) - else: - values[int_val] = int(values[int_val]) - - # Convert direction into a Direction - if len(values) >= 4: - # Typecheck - if not values[3] in self.DIRECTION_NAMES: - parser.error("{} value for link must be one of {{{}}}, not {}".format( - option_string, ",".join(self.DIRECTION_NAMES), values[3])) - - values[3] = self.DIRECTION_NAMES[values[3]] - - values = tuple(values) - - if append: - if getattr(namespace, self.dest) is None: - setattr(namespace, self.dest, []) - getattr(namespace, self.dest).append(values) - else: - setattr(namespace, self.dest, values) - - return _CabinetAction + """"An argparse Action which accepts cabinet/frame/board/link references.""" + assert 1 <= num_levels <= 4 + + class _CabinetAction(argparse.Action): + + """Names of directions for command line arguments.""" + DIRECTION_NAMES = {d.name.replace("_", "-"): d for d in Direction} + + def __init__(self, *args, **kwargs): + kwargs.setdefault("type", str) + kwargs.setdefault("nargs", "+") + + metavar = "" + if num_levels >= 4: + metavar = " [{{{}}}{}]".format(",".join(self.DIRECTION_NAMES), metavar) + if num_levels >= 3: + metavar = " [BOARD{}]".format(metavar) + if num_levels >= 2: + metavar = " [FRAME{}]".format(metavar) + metavar = "CABINET{}".format(metavar) + kwargs.setdefault("metavar", (metavar, "")) + + argparse.Action.__init__(self, *args, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + # Fail with too many/few arguments + if not 1 <= len(values) <= num_levels: + parser.error("{} expects between 1 and {} values".format( + option_string, num_levels)) + + # Check cabinet/frame/board are integer types (and cast to int as + # appropriate) + for int_val, name in enumerate("CABINET FRAME BOARD".split()): + if len(values) > int_val: + if not values[int_val].isdigit(): + parser.error("{} value for {} must be a non-negative integer, not '{}'".format( + option_string, name, values[int_val])) + else: + values[int_val] = int(values[int_val]) + + # Convert direction into a Direction + if len(values) >= 4: + # Typecheck + if not values[3] in self.DIRECTION_NAMES: + parser.error("{} value for link must be one of {{{}}}, not {}".format( + option_string, ",".join(self.DIRECTION_NAMES), values[3])) + + values[3] = self.DIRECTION_NAMES[values[3]] + + values = tuple(values) + + if append: + if getattr(namespace, self.dest) is None: + setattr(namespace, self.dest, []) + getattr(namespace, self.dest).append(values) + else: + setattr(namespace, self.dest, values) + + return _CabinetAction def add_version_args(parser): - """Adds a standard --version/-V incantation which prints the version number - from spinner.__version__.""" - parser.add_argument("--version", "-V", action="version", - version="%(prog)s {}".format(spinner.__version__)) + """Adds a standard --version/-V incantation which prints the version number + from spinner.__version__.""" + parser.add_argument("--version", "-V", action="version", + version="%(prog)s {}".format(spinner.__version__)) def add_topology_args(parser): - """Add arguments for specifying SpiNNaker machine topologies and their - folding.""" - # Require the specification of the size of the system - topology_group = parser.add_argument_group("machine topology dimensions") - system_size_group = topology_group.add_mutually_exclusive_group(required=True) - system_size_group.add_argument("--num-boards", "-n", type=int, metavar="N", - help="build the 'squarest' system with this " - "many boards") - system_size_group.add_argument("--triads", "-t", type=int, nargs=2, - metavar=("W", "H"), - help="build a system with the specified " - "number of triads of boards in each " - "dimension (yielding 3*W*H boards)") - - # Arguments for specification of how the system should be folded. If left out, - # this is done automatically - folding_group = parser.add_argument_group("topology folding options") - folding_group.add_argument("--transformation", "-T", choices=["shear","slice"], - help="the transformation function to use from " - "hexagonal torus to rectangular Cartesian " - "grid (selected automatically if omitted)") - folding_group.add_argument("--uncrinkle-direction", choices=["columns", "rows"], - help="direction in which to uncrinkle the hexagonal " - "mesh to form a regular grid (default: rows)") - folding_group.add_argument("--folds", "-F", type=int, nargs=2, - default=None, metavar=("X", "Y"), - help="the number of pieces to fold into in each " - "dimension (default: (2, 2)) ignored if " - "--transformation is not given") + """Add arguments for specifying SpiNNaker machine topologies and their + folding.""" + # Require the specification of the size of the system + topology_group = parser.add_argument_group("machine topology dimensions") + system_size_group = topology_group.add_mutually_exclusive_group(required=True) + system_size_group.add_argument("--num-boards", "-n", type=int, metavar="N", + help="build the 'squarest' system with this " + "many boards") + system_size_group.add_argument("--triads", "-t", type=int, nargs=2, + metavar=("W", "H"), + help="build a system with the specified " + "number of triads of boards in each " + "dimension (yielding 3*W*H boards)") + + # Arguments for specification of how the system should be folded. If left out, + # this is done automatically + folding_group = parser.add_argument_group("topology folding options") + folding_group.add_argument("--transformation", "-T", choices=["shear","slice"], + help="the transformation function to use from " + "hexagonal torus to rectangular Cartesian " + "grid (selected automatically if omitted)") + folding_group.add_argument("--uncrinkle-direction", choices=["columns", "rows"], + help="direction in which to uncrinkle the hexagonal " + "mesh to form a regular grid (default: rows)") + folding_group.add_argument("--folds", "-F", type=int, nargs=2, + default=None, metavar=("X", "Y"), + help="the number of pieces to fold into in each " + "dimension (default: (2, 2)) ignored if " + "--transformation is not given") def get_topology_from_args(parser, args): - """To be used with add_topology_args. - - Check that the supplied arguments are valid and return corresponding - parameters for spinner.utils.folded_torus. - - Returns - ------- - ((w, h), transformation, uncrinkle_direction, (x, y)) - (w, h) is the dimensions of the system in triads - - transformation is "slice" or "shear" - - uncrinkle_direction is "rows" or "columns" - - (x, y) gives the number of pieces to fold each dimension into - """ - # Extract the system dimensions - if args.num_boards is not None: - try: - w, h = ideal_system_size(args.num_boards) - except TypeError: - parser.error("number of boards must be a multiple of three") - else: - w, h = args.triads - if w <= 0 or h <= 0: - parser.error("system dimensions must be positive and non-zero") - - # Fold accordingly - transformation = "shear" - uncrinkle_direction = "rows" - folds = (2, 2) - - if args.transformation is None: - # Work out the folding process to use by following the guidelines set out in - # "Bringing the Hexagonal Torus Topology into the Real-World" by Heathcote - # et. al. (unpublished at the time of writing...). - if h == 2 * w: - transformation = "slice" - else: - transformation = "shear" - uncrinkle_direction = "rows" - folds = (2, 2) - else: - transformation = args.transformation - - if args.folds is not None: - folds = tuple(args.folds) - - if folds[0] <= 0 or folds[1] <= 0: - parser.error("number of pieces to fold into must be at least 1") - - if args.uncrinkle_direction is not None: - uncrinkle_direction = args.uncrinkle_direction - - return ((w, h), transformation, uncrinkle_direction, folds) + """To be used with add_topology_args. + + Check that the supplied arguments are valid and return corresponding + parameters for spinner.utils.folded_torus. + + Returns + ------- + ((w, h), transformation, uncrinkle_direction, (x, y)) + (w, h) is the dimensions of the system in triads + + transformation is "slice" or "shear" + + uncrinkle_direction is "rows" or "columns" + + (x, y) gives the number of pieces to fold each dimension into + """ + # Extract the system dimensions + if args.num_boards is not None: + try: + w, h = ideal_system_size(args.num_boards) + except TypeError: + parser.error("number of boards must be a multiple of three") + else: + w, h = args.triads + if w <= 0 or h <= 0: + parser.error("system dimensions must be positive and non-zero") + + # Fold accordingly + transformation = "shear" + uncrinkle_direction = "rows" + folds = (2, 2) + + if args.transformation is None: + # Work out the folding process to use by following the guidelines set out in + # "Bringing the Hexagonal Torus Topology into the Real-World" by Heathcote + # et. al. (unpublished at the time of writing...). + if h == 2 * w: + transformation = "slice" + else: + transformation = "shear" + uncrinkle_direction = "rows" + folds = (2, 2) + else: + transformation = args.transformation + + if args.folds is not None: + folds = tuple(args.folds) + + if folds[0] <= 0 or folds[1] <= 0: + parser.error("number of pieces to fold into must be at least 1") + + if args.uncrinkle_direction is not None: + uncrinkle_direction = args.uncrinkle_direction + + return ((w, h), transformation, uncrinkle_direction, folds) def add_cabinet_args(parser): - """Add arguments for specifying SpiNNaker cabinet sizes.""" - # Physical dimensions (defaults to standard rack sizes) - board_group = parser.add_argument_group("board physical dimensions") - board_group.add_argument("--board-dimensions", type=float, nargs=3, - metavar=("W", "H", "D"), - default=(0.014, 0.233, 0.240), - help="physical board dimensions in meters (default: " - "%(default)s)") - for direction, default in [("south-west", (0.008, 0.013, 0.0)), - ("north-east", (0.008, 0.031, 0.0)), - ("east", (0.008, 0.049, 0.0)), - ("west", (0.008, 0.067, 0.0)), - ("north", (0.008, 0.085, 0.0)), - ("south", (0.008, 0.103, 0.0))]: - board_group.add_argument("--board-wire-offset-{}".format(direction), - type=float, nargs=3, default=default, - metavar=("X", "Y", "Z"), - help="physical offset of the {} connector from " - "board left-top-front corner in meters " - "(default: %(default)s)".format(direction)) - board_group.add_argument("--inter-board-spacing", type=float, - default=0.00124, metavar=("S"), - help="physical spacing between each board in a " - "frame in meters (default: %(default)s)") - - frame_group = parser.add_argument_group("frame physical dimensions") - frame_group.add_argument("--boards-per-frame", type=int, default=24, - help="number of boards per frame (default: " - "%(default)s)") - frame_group.add_argument("--frame-dimensions", type=float, nargs=3, - metavar=("W", "H", "D"), - default=(0.430, 0.266, 0.250), - help="frame physical dimensions in meters (default: " - "%(default)s)") - frame_group.add_argument("--frame-board-offset", type=float, nargs=3, - default=(0.06, 0.017, 0.0), - metavar=("X", "Y", "Z"), - help="physical offset of the left-top-front corner of " - "the left-most board from the left-top-front " - "corner of a frame in meters (default: " - "%(default)s)") - frame_group.add_argument("--inter-frame-spacing", type=float, - default=0.133, metavar=("S"), - help="physical spacing between frames in a " - "cabinet in meters (default: %(default)s)") - - cabinet_group = parser.add_argument_group("cabinet physical dimensions") - cabinet_group.add_argument("--frames-per-cabinet", type=int, default=5, - help="number of frames per cabinet (default: " - "%(default)s)") - cabinet_group.add_argument("--cabinet-dimensions", type=float, nargs=3, - metavar=("W", "H", "D"), - default=(0.600, 2.000, 0.250), - help="cabinet physical dimensions in meters (default: " - "%(default)s)") - cabinet_group.add_argument("--cabinet-frame-offset", type=float, nargs=3, - default=(0.085, 0.047, 0.0), - metavar=("X", "Y", "Z"), - help="physical offset of the left-top-front corner " - "of the top frame from the left-top-front " - "corner of a cabinet in meters (default: " - "%(default)s)") - cabinet_group.add_argument("--inter-cabinet-spacing", type=float, - default=0.0, metavar=("S"), - help="physical spacing between each cabinet in " - "meters (default: %(default)s)") - cabinet_group.add_argument("--num-cabinets", "-c", type=int, metavar="N", - help="specify how many cabinets to spread the " - "system over (default: the minimum possible)") - cabinet_group.add_argument("--num-frames", "-f", type=int, metavar="N", + """Add arguments for specifying SpiNNaker cabinet sizes.""" + # Physical dimensions (defaults to standard rack sizes) + board_group = parser.add_argument_group("board physical dimensions") + board_group.add_argument("--board-dimensions", type=float, nargs=3, + metavar=("W", "H", "D"), + default=(0.014, 0.233, 0.240), + help="physical board dimensions in meters (default: " + "%(default)s)") + for direction, default in [("south-west", (0.008, 0.013, 0.0)), + ("north-east", (0.008, 0.031, 0.0)), + ("east", (0.008, 0.049, 0.0)), + ("west", (0.008, 0.067, 0.0)), + ("north", (0.008, 0.085, 0.0)), + ("south", (0.008, 0.103, 0.0))]: + board_group.add_argument("--board-wire-offset-{}".format(direction), + type=float, nargs=3, default=default, + metavar=("X", "Y", "Z"), + help="physical offset of the {} connector from " + "board left-top-front corner in meters " + "(default: %(default)s)".format(direction)) + board_group.add_argument("--inter-board-spacing", type=float, + default=0.00124, metavar=("S"), + help="physical spacing between each board in a " + "frame in meters (default: %(default)s)") + + frame_group = parser.add_argument_group("frame physical dimensions") + frame_group.add_argument("--boards-per-frame", type=int, default=24, + help="number of boards per frame (default: " + "%(default)s)") + frame_group.add_argument("--frame-dimensions", type=float, nargs=3, + metavar=("W", "H", "D"), + default=(0.430, 0.266, 0.250), + help="frame physical dimensions in meters (default: " + "%(default)s)") + frame_group.add_argument("--frame-board-offset", type=float, nargs=3, + default=(0.06, 0.017, 0.0), + metavar=("X", "Y", "Z"), + help="physical offset of the left-top-front corner of " + "the left-most board from the left-top-front " + "corner of a frame in meters (default: " + "%(default)s)") + frame_group.add_argument("--inter-frame-spacing", type=float, + default=0.133, metavar=("S"), + help="physical spacing between frames in a " + "cabinet in meters (default: %(default)s)") + + cabinet_group = parser.add_argument_group("cabinet physical dimensions") + cabinet_group.add_argument("--frames-per-cabinet", type=int, default=5, + help="number of frames per cabinet (default: " + "%(default)s)") + cabinet_group.add_argument("--cabinet-dimensions", type=float, nargs=3, + metavar=("W", "H", "D"), + default=(0.600, 2.000, 0.250), + help="cabinet physical dimensions in meters (default: " + "%(default)s)") + cabinet_group.add_argument("--cabinet-frame-offset", type=float, nargs=3, + default=(0.085, 0.047, 0.0), + metavar=("X", "Y", "Z"), + help="physical offset of the left-top-front corner " + "of the top frame from the left-top-front " + "corner of a cabinet in meters (default: " + "%(default)s)") + cabinet_group.add_argument("--inter-cabinet-spacing", type=float, + default=0.0, metavar=("S"), + help="physical spacing between each cabinet in " + "meters (default: %(default)s)") + cabinet_group.add_argument("--num-cabinets", "-c", type=int, metavar="N", + help="specify how many cabinets to spread the " + "system over (default: the minimum possible)") + cabinet_group.add_argument("--num-frames", "-f", type=int, metavar="N", help="when only one cabinet is required, " "specifies how many frames within that " "cabinet the system should be spread " @@ -254,443 +254,443 @@ def add_cabinet_args(parser): def get_cabinets_from_args(parser, args): - """For use with add_cabinet_args (and optionally add_topology_args). - - Get information about the dimensions of the cabinets in the system from the - supplied arguments. - - Returns - ------- - (:py:class:`spinner.cabinet.Cabinet`, num_frames) - num_frames is the number of frames (per cabinet) to actually fill with - boards. - """ - kwargs = { - kw: tuple(getattr(args, kw)) - if type(getattr(args, kw)) is list - else getattr(args, kw) - for kw in [ - "board_dimensions", - "board_wire_offset_south_west", - "board_wire_offset_north_east", - "board_wire_offset_east", - "board_wire_offset_west", - "board_wire_offset_north", - "board_wire_offset_south", - "inter_board_spacing", - "boards_per_frame", - "frame_dimensions", - "frame_board_offset", - "inter_frame_spacing", - "frames_per_cabinet", - "cabinet_dimensions", - "cabinet_frame_offset", - "inter_cabinet_spacing", - ] - } - - # Work out number of boards to allow checking of num_cabinets and num_frames - # (only possible if topology args are present) - if hasattr(args, "num_boards") and args.num_boards is not None: - num_boards = args.num_boards - elif hasattr(args, "triads") and args.triads is not None: - num_boards = 3 * args.triads[0] * args.triads[1] - else: - num_boards = None # unknown! - - if args.num_cabinets is None and args.num_frames is None: - # Try to pick an sensible default value if number of boards is known, - # otherwise default to a single cabinet system. - if num_boards is not None: - num_cabinets, num_frames = min_num_cabinets(num_boards, - args.frames_per_cabinet, - args.boards_per_frame) - else: - num_cabinets = 1 - num_frames = args.frames_per_cabinet - else: - # Default to 1 cabinet - if args.num_cabinets is None: - num_cabinets = 1 - else: - num_cabinets = args.num_cabinets - - # Default to the full number of frames - if args.num_frames is None: - num_frames = args.frames_per_cabinet - else: - num_frames = args.num_frames - if num_frames > args.frames_per_cabinet: - parser.error("more frames specified than fit in a cabinet") - if num_cabinets > 1 and num_frames != args.frames_per_cabinet: - parser.error("--num-frames must equal --frames-per-cabinet " - "when there is more than one cabinet") - - # Check that the number of cabinets/frames is sufficient for the number of - # boards present (if known) - if (num_boards is not None and - num_cabinets * num_frames * args.boards_per_frame < num_boards): - parser.error("not enough cabinets/frames available for {} " - "boards".format(num_boards)) - - kwargs["num_cabinets"] = num_cabinets - - try: - return (Cabinet(**kwargs), num_frames) - except ValueError as e: - parser.error(e.args[0]) + """For use with add_cabinet_args (and optionally add_topology_args). + + Get information about the dimensions of the cabinets in the system from the + supplied arguments. + + Returns + ------- + (:py:class:`spinner.cabinet.Cabinet`, num_frames) + num_frames is the number of frames (per cabinet) to actually fill with + boards. + """ + kwargs = { + kw: tuple(getattr(args, kw)) + if type(getattr(args, kw)) is list + else getattr(args, kw) + for kw in [ + "board_dimensions", + "board_wire_offset_south_west", + "board_wire_offset_north_east", + "board_wire_offset_east", + "board_wire_offset_west", + "board_wire_offset_north", + "board_wire_offset_south", + "inter_board_spacing", + "boards_per_frame", + "frame_dimensions", + "frame_board_offset", + "inter_frame_spacing", + "frames_per_cabinet", + "cabinet_dimensions", + "cabinet_frame_offset", + "inter_cabinet_spacing", + ] + } + + # Work out number of boards to allow checking of num_cabinets and num_frames + # (only possible if topology args are present) + if hasattr(args, "num_boards") and args.num_boards is not None: + num_boards = args.num_boards + elif hasattr(args, "triads") and args.triads is not None: + num_boards = 3 * args.triads[0] * args.triads[1] + else: + num_boards = None # unknown! + + if args.num_cabinets is None and args.num_frames is None: + # Try to pick an sensible default value if number of boards is known, + # otherwise default to a single cabinet system. + if num_boards is not None: + num_cabinets, num_frames = min_num_cabinets(num_boards, + args.frames_per_cabinet, + args.boards_per_frame) + else: + num_cabinets = 1 + num_frames = args.frames_per_cabinet + else: + # Default to 1 cabinet + if args.num_cabinets is None: + num_cabinets = 1 + else: + num_cabinets = args.num_cabinets + + # Default to the full number of frames + if args.num_frames is None: + num_frames = args.frames_per_cabinet + else: + num_frames = args.num_frames + if num_frames > args.frames_per_cabinet: + parser.error("more frames specified than fit in a cabinet") + if num_cabinets > 1 and num_frames != args.frames_per_cabinet: + parser.error("--num-frames must equal --frames-per-cabinet " + "when there is more than one cabinet") + + # Check that the number of cabinets/frames is sufficient for the number of + # boards present (if known) + if (num_boards is not None and + num_cabinets * num_frames * args.boards_per_frame < num_boards): + parser.error("not enough cabinets/frames available for {} " + "boards".format(num_boards)) + + kwargs["num_cabinets"] = num_cabinets + + try: + return (Cabinet(**kwargs), num_frames) + except ValueError as e: + parser.error(e.args[0]) def add_histogram_args(parser): - """Add arguments for specifying the histogram of wire lengths (i.e. - cabinets/frames) available for a given system.""" - histogram_group = parser.add_argument_group("wire length histogram options") - histogram_mut_group = histogram_group.add_mutually_exclusive_group() - histogram_mut_group.add_argument("--histogram-bins", "-H", type=int, metavar="N", - default=5, - help="number of bins to pack wire lengths into " - "in the histogram of wire lengths (default: " - "%(default)s)") + """Add arguments for specifying the histogram of wire lengths (i.e. + cabinets/frames) available for a given system.""" + histogram_group = parser.add_argument_group("wire length histogram options") + histogram_mut_group = histogram_group.add_mutually_exclusive_group() + histogram_mut_group.add_argument("--histogram-bins", "-H", type=int, metavar="N", + default=5, + help="number of bins to pack wire lengths into " + "in the histogram of wire lengths (default: " + "%(default)s)") def get_histogram_from_args(parser, args): - """To be used with add_histogram_args. - - Check that the supplied arguments are valid and returns the number of bins - requested. - - Returns - ------- - int - The number of bins to use in the wire-length histogram. - """ - if args.histogram_bins < 1: - parser.error("--histogram-bins must be at least 1") - return args.histogram_bins + """To be used with add_histogram_args. + + Check that the supplied arguments are valid and returns the number of bins + requested. + + Returns + ------- + int + The number of bins to use in the wire-length histogram. + """ + if args.histogram_bins < 1: + parser.error("--histogram-bins must be at least 1") + return args.histogram_bins def add_wire_length_args(parser): - """Add arguments for specifying sets of wire lengths available for a given - system.""" - wire_length_group = parser.add_argument_group("available wire lengths") - wire_length_group.add_argument("--wire-length", "-l", type=float, metavar="L", - action="append", nargs="+", - help="specify one or more available wire " - "lengths in meters") - wire_length_group.add_argument("--minimum-slack", type=float, metavar="H", - default=0.05, - help="the minimum slack to allow in a " - "wire connecting two boards in meters") + """Add arguments for specifying sets of wire lengths available for a given + system.""" + wire_length_group = parser.add_argument_group("available wire lengths") + wire_length_group.add_argument("--wire-length", "-l", type=float, metavar="L", + action="append", nargs="+", + help="specify one or more available wire " + "lengths in meters") + wire_length_group.add_argument("--minimum-slack", type=float, metavar="H", + default=0.05, + help="the minimum slack to allow in a " + "wire connecting two boards in meters") def get_wire_lengths_from_args(parser, args, mandatory=False): - """To be used with add_wire_length_args. - - Also used internally by get_histogram_from_args. - - Check that the supplied arguments are valid and then return the list of wire - lengths supplied.. - - Returns - ------- - ([float, ...], min_slack) - Gives the wire lengths available (which may be empty) and the minimum slack - requested. - """ - if args.wire_length is None: - wire_lengths = [] - else: - # Flatten list of lists - args.wire_length = sum(args.wire_length, []) - - # Check signs and for duplicates - seen = set() - for wire_length in args.wire_length: - if wire_length <= 0.0: - parser.error("--wire-lengths must be positive and non-zero") - if wire_length in seen: - parser.error("wire length {} defined multiple times".format(wire_length)) - seen.add(wire_length) - - wire_lengths = sorted(args.wire_length) - - if args.minimum_slack < 0.0: - parser.error("--minimum-slack must be positive") - - if mandatory and not wire_lengths: - parser.error("At least one --wire-length argument must be provided.") - - return (wire_lengths, args.minimum_slack) + """To be used with add_wire_length_args. + + Also used internally by get_histogram_from_args. + + Check that the supplied arguments are valid and then return the list of wire + lengths supplied.. + + Returns + ------- + ([float, ...], min_slack) + Gives the wire lengths available (which may be empty) and the minimum slack + requested. + """ + if args.wire_length is None: + wire_lengths = [] + else: + # Flatten list of lists + args.wire_length = sum(args.wire_length, []) + + # Check signs and for duplicates + seen = set() + for wire_length in args.wire_length: + if wire_length <= 0.0: + parser.error("--wire-lengths must be positive and non-zero") + if wire_length in seen: + parser.error("wire length {} defined multiple times".format(wire_length)) + seen.add(wire_length) + + wire_lengths = sorted(args.wire_length) + + if args.minimum_slack < 0.0: + parser.error("--minimum-slack must be positive") + + if mandatory and not wire_lengths: + parser.error("At least one --wire-length argument must be provided.") + + return (wire_lengths, args.minimum_slack) def add_image_args(parser): - """Add arguments for specifying output image filenames and dimensions.""" - image_group = parser.add_argument_group("image file parameters") - image_group.add_argument("filename", type=str, - help="filename to write the output to (.pdf or .png)") - image_group.add_argument("width", type=float, nargs="?", - help="width of the image in mm for PDF and pixels for " - "PNG (defaults to 280 mm if PDF and 1000 px for PNG)") - image_group.add_argument("height", type=float, nargs="?", - help="height of the image in mm for PDF and pixels for " - "PNG (if only width is given, output will be at " - "most width wide and width tall)") - + """Add arguments for specifying output image filenames and dimensions.""" + image_group = parser.add_argument_group("image file parameters") + image_group.add_argument("filename", type=str, + help="filename to write the output to (.pdf or .png)") + image_group.add_argument("width", type=float, nargs="?", + help="width of the image in mm for PDF and pixels for " + "PNG (defaults to 280 mm if PDF and 1000 px for PNG)") + image_group.add_argument("height", type=float, nargs="?", + help="height of the image in mm for PDF and pixels for " + "PNG (if only width is given, output will be at " + "most width wide and width tall)") + def get_image_from_args(parser, args, aspect_ratio=1.0): - """To be used with add_image_args. - - Check that the supplied arguments are valid and then return the filename, type - and dimensions of the image to be created. - - Parameters - ---------- - aspect_ratio : float - If the user does not fully specify the image size, what aspect ratio - (width/height) should the image defined be? - - Returns - ------- - output_filename, file_type, image_width, image_height - """ - output_filename = args.filename - - # Detect file-type from file-name - if args.filename.lower().endswith(".png"): - file_type = "png" - elif args.filename.lower().endswith(".pdf"): - file_type = "pdf" - else: - parser.error("filename must end in .png or .pdf") - - # Determine the size of the output image - if args.width is None and args.height is None: - # Width and height not specified, use default sizes - if file_type == "png": - args.width = 1000 - else: # if file_type == "png": - args.width = 280.0 - - if args.width is not None and args.height is None: - # Height is not specified, make the longest side as long as width and make - # the whole image the same aspect ratio as the part of the system we're - # focusing on - if aspect_ratio < 1.0: - # Tall image - args.height = args.width - args.width = args.height * aspect_ratio - else: - # Wide image - args.height = args.width / aspect_ratio - - # Clamp to integers if PNG - if file_type == "png": - args.width = int(args.width) - args.height = int(args.height) - - image_width = args.width - image_height = args.height - - # Image dimensions must be non-zero and positive - if image_width <= 0 or image_height <= 0: - parser.error("image dimensions must be greater than 0") - - return (output_filename, file_type, image_width, image_height) + """To be used with add_image_args. + + Check that the supplied arguments are valid and then return the filename, type + and dimensions of the image to be created. + + Parameters + ---------- + aspect_ratio : float + If the user does not fully specify the image size, what aspect ratio + (width/height) should the image defined be? + + Returns + ------- + output_filename, file_type, image_width, image_height + """ + output_filename = args.filename + + # Detect file-type from file-name + if args.filename.lower().endswith(".png"): + file_type = "png" + elif args.filename.lower().endswith(".pdf"): + file_type = "pdf" + else: + parser.error("filename must end in .png or .pdf") + + # Determine the size of the output image + if args.width is None and args.height is None: + # Width and height not specified, use default sizes + if file_type == "png": + args.width = 1000 + else: # if file_type == "png": + args.width = 280.0 + + if args.width is not None and args.height is None: + # Height is not specified, make the longest side as long as width and make + # the whole image the same aspect ratio as the part of the system we're + # focusing on + if aspect_ratio < 1.0: + # Tall image + args.height = args.width + args.width = args.height * aspect_ratio + else: + # Wide image + args.height = args.width / aspect_ratio + + # Clamp to integers if PNG + if file_type == "png": + args.width = int(args.width) + args.height = int(args.height) + + image_width = args.width + image_height = args.height + + # Image dimensions must be non-zero and positive + if image_width <= 0 or image_height <= 0: + parser.error("image dimensions must be greater than 0") + + return (output_filename, file_type, image_width, image_height) def add_bmp_args(parser): - """Add arguments for specifying a set of BMPs to connect to.""" - bmp_group = parser.add_argument_group("SpiNNaker BMP connection details") - bmp_group.add_argument("--bmp", type=str, nargs=3, action="append", - metavar=("CABINET", "FRAME", "HOSTNAME"), - help="specify the hostname of a BMP to use to " - "communicate with SpiNNaker boards in the given " - "frame") + """Add arguments for specifying a set of BMPs to connect to.""" + bmp_group = parser.add_argument_group("SpiNNaker BMP connection details") + bmp_group.add_argument("--bmp", type=str, nargs=3, action="append", + metavar=("CABINET", "FRAME", "HOSTNAME"), + help="specify the hostname of a BMP to use to " + "communicate with SpiNNaker boards in the given " + "frame") def get_bmps_from_args(parser, args, num_cabinets, num_frames): - """To be used with add_bmp_args. - - Check that the supplied arguments are valid and then return either a - dictionary mapping (cabinet, frame) tuples to hostnames or a (hostname, port) - tuple indicating the . - - Either no arguments must be supplied or a proxy client or exactly one per - frame. - """ - # Special case when no arguments supplied. - if args.bmp is None: - return {} - else: - bmp_hostnames = {} - - for cabinet, frame, hostname in args.bmp: - # Check types - if not cabinet.isdigit(): - parser.error("--bmp cabinet number must be a number") - cabinet = int(cabinet) - - if not frame.isdigit(): - parser.error("--bmp frame number must be a number") - frame = int(frame) - - # Check for duplicates - if (cabinet, frame) in bmp_hostnames: - parser.error("bmp hostname for cabinet {}, frame {} " - "specified multiple times".format(cabinet, frame)) - if hostname in itervalues(bmp_hostnames): - parser.error("bmp hostname '{}' given for more than one frame".format( - hostname)) - - bmp_hostnames[(cabinet, frame)] = hostname - - # Check every frame is included - missing = set((c, f) - for c in range(num_cabinets) - for f in range(num_frames)) - set(bmp_hostnames) - extra = set(bmp_hostnames) - set((c, f) - for c in range(num_cabinets) - for f in range(num_frames)) - - if missing: - parser.error("BMP hostname missing for {}".format( # pragma: no branch - ", ".join("C:{} F:{}".format(c, f) for c, f in missing))) - elif extra: - parser.error( # pragma: no branch - "unexpected BMP for {} which are not part of the system".format( - ", ".join("C:{} F:{}".format(c, f) for c, f in extra))) - - return bmp_hostnames + """To be used with add_bmp_args. + + Check that the supplied arguments are valid and then return either a + dictionary mapping (cabinet, frame) tuples to hostnames or a (hostname, port) + tuple indicating the . + + Either no arguments must be supplied or a proxy client or exactly one per + frame. + """ + # Special case when no arguments supplied. + if args.bmp is None: + return {} + else: + bmp_hostnames = {} + + for cabinet, frame, hostname in args.bmp: + # Check types + if not cabinet.isdigit(): + parser.error("--bmp cabinet number must be a number") + cabinet = int(cabinet) + + if not frame.isdigit(): + parser.error("--bmp frame number must be a number") + frame = int(frame) + + # Check for duplicates + if (cabinet, frame) in bmp_hostnames: + parser.error("bmp hostname for cabinet {}, frame {} " + "specified multiple times".format(cabinet, frame)) + if hostname in itervalues(bmp_hostnames): + parser.error("bmp hostname '{}' given for more than one frame".format( + hostname)) + + bmp_hostnames[(cabinet, frame)] = hostname + + # Check every frame is included + missing = set((c, f) + for c in range(num_cabinets) + for f in range(num_frames)) - set(bmp_hostnames) + extra = set(bmp_hostnames) - set((c, f) + for c in range(num_cabinets) + for f in range(num_frames)) + + if missing: + parser.error("BMP hostname missing for {}".format( # pragma: no branch + ", ".join("C:{} F:{}".format(c, f) for c, f in missing))) + elif extra: + parser.error( # pragma: no branch + "unexpected BMP for {} which are not part of the system".format( + ", ".join("C:{} F:{}".format(c, f) for c, f in extra))) + + return bmp_hostnames def add_proxy_args(parser): - """Add arguments for specifying a proxy server connect to.""" - proxy_group = parser.add_argument_group("SpiNNaker proxy connection details") - proxy_group.add_argument("--proxy", type=str, metavar="HOSTNAME", - help="specify the hostname of a " - "spinner-proxy-server instance to use " - "to communicate with the SpiNNaker system") - proxy_group.add_argument("--proxy-port", type=int, default=DEFAULT_PORT, - metavar="PORT", - help="specify the port to connect to " - "spinner-proxy-server with (default: " - "%(default)d)") + """Add arguments for specifying a proxy server connect to.""" + proxy_group = parser.add_argument_group("SpiNNaker proxy connection details") + proxy_group.add_argument("--proxy", type=str, metavar="HOSTNAME", + help="specify the hostname of a " + "spinner-proxy-server instance to use " + "to communicate with the SpiNNaker system") + proxy_group.add_argument("--proxy-port", type=int, default=DEFAULT_PORT, + metavar="PORT", + help="specify the port to connect to " + "spinner-proxy-server with (default: " + "%(default)d)") def get_proxy_from_args(parser, args): - """To be used with add_proxy_args. - - Returns a (hostname, port) or None if no proxy server details provided. - - This call checks that no conflicting --bmp arguments are present. - """ - # Make sure only one of the --bmp or --proxy arguments is given - if hasattr(args, "bmp"): - if args.bmp is not None and args.proxy is not None: - parser.error("--bmp and --proxy are mutually exclusive") - - # If proxy specified, return one - if args.proxy is not None: - return (args.proxy, args.proxy_port) - else: - return None + """To be used with add_proxy_args. + + Returns a (hostname, port) or None if no proxy server details provided. + + This call checks that no conflicting --bmp arguments are present. + """ + # Make sure only one of the --bmp or --proxy arguments is given + if hasattr(args, "bmp"): + if args.bmp is not None and args.proxy is not None: + parser.error("--bmp and --proxy are mutually exclusive") + + # If proxy specified, return one + if args.proxy is not None: + return (args.proxy, args.proxy_port) + else: + return None def add_subset_args(parser): - """Add arguments for specifying a subset of wires to connect.""" - subset_group = parser.add_argument_group( - "wire subset selection", - description=""" - These arguments allow the specificiation of subsets of wires - to install, for example, selecting only particular wires - within a particular cabinet or frame. If no subsets are - specified, all wires will be included, otherwise the union - of all specified subsets are included. Use '1.2.*' to select all wires - between boards in cabinet 1, frame 2. Use '1.*.*' to select all wires - between boards in cabinet 1. Use '1-2.*.*' to select all - wires which cross between cabinets 1 and 2. - """ - ) - subset_group.add_argument("--subset", nargs="+", type=str, metavar="SUBSET", - help="specify the subset of wires to include") + """Add arguments for specifying a subset of wires to connect.""" + subset_group = parser.add_argument_group( + "wire subset selection", + description=""" + These arguments allow the specificiation of subsets of wires + to install, for example, selecting only particular wires + within a particular cabinet or frame. If no subsets are + specified, all wires will be included, otherwise the union + of all specified subsets are included. Use '1.2.*' to select all wires + between boards in cabinet 1, frame 2. Use '1.*.*' to select all wires + between boards in cabinet 1. Use '1-2.*.*' to select all + wires which cross between cabinets 1 and 2. + """ + ) + subset_group.add_argument("--subset", nargs="+", type=str, metavar="SUBSET", + help="specify the subset of wires to include") def get_subset_from_args(parser, args): - """To be used with add_subset_args. - - Check that the supplied arguments are valid and then return a function with - the prototype fn(wire) -> bool where wire is a tuple ((c, f, b, d), (c, f, b, - d), length). - """ - if args.subset is None: - # Special case when no arguments supplied: accept everything - return (lambda wire: True) - else: - rules = [] - for subset in args.subset: - # Split into cabinet, frame and board rules - parts = re.split(r"\s*[,.]\s*", subset) - if len(parts) != 3: - parser.error( - "--subset arguments must be of the form '???.???.???'" - ", not {}".format(subset)) - - rule_fragments = [] - for i, part in enumerate(parts): - # For each part, work out the rule - match = re.match(r"^(" - r"(?P[0-9]+)|" # Specific c/f/b - r"((?P[0-9]+)-(?P[0-9]+))|" # Between two c/f/b - r"(?P[*])" # Wildcard - r")$", part) - if not match: - parser.error( - "--subset components must be either 'N', 'N-M' or '*', " - "not {}".format(part)) - elif match.group("single"): - value = int(match.group("single")) - rule_fragments.append( - lambda wire, i=i, value=value: (wire[0][i] == value and - wire[1][i] == value)) - elif match.group("from") and match.group("to"): - frm = int(match.group("from")) - to = int(match.group("to")) - rule_fragments.append( - lambda wire, i=i, frm=frm, to=to: (wire[0][i] == frm and - wire[1][i] == to) - or (wire[1][i] == frm and - wire[0][i] == to)) - elif match.group("wildcard"): - # Don't append any rules: the wildcard always matches - pass - else: # pragma: no cover - assert False - - rules.append(lambda wire, rule_fragments=rule_fragments: - all(rf(wire) for rf in rule_fragments)) - - return (lambda wire: any(r(wire) for r in rules)) + """To be used with add_subset_args. + + Check that the supplied arguments are valid and then return a function with + the prototype fn(wire) -> bool where wire is a tuple ((c, f, b, d), (c, f, b, + d), length). + """ + if args.subset is None: + # Special case when no arguments supplied: accept everything + return (lambda wire: True) + else: + rules = [] + for subset in args.subset: + # Split into cabinet, frame and board rules + parts = re.split(r"\s*[,.]\s*", subset) + if len(parts) != 3: + parser.error( + "--subset arguments must be of the form '???.???.???'" + ", not {}".format(subset)) + + rule_fragments = [] + for i, part in enumerate(parts): + # For each part, work out the rule + match = re.match(r"^(" + r"(?P[0-9]+)|" # Specific c/f/b + r"((?P[0-9]+)-(?P[0-9]+))|" # Between two c/f/b + r"(?P[*])" # Wildcard + r")$", part) + if not match: + parser.error( + "--subset components must be either 'N', 'N-M' or '*', " + "not {}".format(part)) + elif match.group("single"): + value = int(match.group("single")) + rule_fragments.append( + lambda wire, i=i, value=value: (wire[0][i] == value and + wire[1][i] == value)) + elif match.group("from") and match.group("to"): + frm = int(match.group("from")) + to = int(match.group("to")) + rule_fragments.append( + lambda wire, i=i, frm=frm, to=to: (wire[0][i] == frm and + wire[1][i] == to) + or (wire[1][i] == frm and + wire[0][i] == to)) + elif match.group("wildcard"): + # Don't append any rules: the wildcard always matches + pass + else: # pragma: no cover + assert False + + rules.append(lambda wire, rule_fragments=rule_fragments: + all(rf(wire) for rf in rule_fragments)) + + return (lambda wire: any(r(wire) for r in rules)) if __name__=="__main__": # pragma: no cover - # This file when run as a script acts as a quick proof-of-concept of all - # argument parsing capabilities - import argparse - parser = argparse.ArgumentParser() - add_image_args(parser) - add_topology_args(parser) - add_cabinet_args(parser) - add_histogram_args(parser) - add_bmp_args(parser) - add_subset_args(parser) - - args = parser.parse_args() - print(get_image_from_args(parser, args, 0.5)) - print(get_topology_from_args(parser, args)) - print(get_cabinets_from_args(parser, args)) - print(get_histogram_from_args(parser, args)) - print(get_bmps_from_args(parser, args, 2, 2)) - print(get_subset_from_args(parser, args)) + # This file when run as a script acts as a quick proof-of-concept of all + # argument parsing capabilities + import argparse + parser = argparse.ArgumentParser() + add_image_args(parser) + add_topology_args(parser) + add_cabinet_args(parser) + add_histogram_args(parser) + add_bmp_args(parser) + add_subset_args(parser) + + args = parser.parse_args() + print(get_image_from_args(parser, args, 0.5)) + print(get_topology_from_args(parser, args)) + print(get_cabinets_from_args(parser, args)) + print(get_histogram_from_args(parser, args)) + print(get_bmps_from_args(parser, args, 2, 2)) + print(get_subset_from_args(parser, args)) diff --git a/spinner/scripts/machine_map.py b/spinner/scripts/machine_map.py index 044fd69..fcea1e2 100644 --- a/spinner/scripts/machine_map.py +++ b/spinner/scripts/machine_map.py @@ -5,7 +5,7 @@ import argparse from spinner.diagrams.machine_map import draw_machine_map, \ - get_machine_map_aspect_ratio + get_machine_map_aspect_ratio from spinner.utils import folded_torus @@ -16,49 +16,49 @@ def main(args=None): - parser = argparse.ArgumentParser( - description="Generate visual maps from the SpiNNaker network topology to " - "board locations.") - arguments.add_version_args(parser) - arguments.add_image_args(parser) - arguments.add_topology_args(parser) - arguments.add_cabinet_args(parser) - - # Process command-line arguments - args = parser.parse_args(args) - (w, h), transformation, uncrinkle_direction, folds =\ - arguments.get_topology_from_args(parser, args) - - cabinet, num_frames = arguments.get_cabinets_from_args(parser, args) - - aspect_ratio = get_machine_map_aspect_ratio(w, h) - - output_filename, file_type, image_width, image_height =\ - arguments.get_image_from_args(parser, args, aspect_ratio) - - # Generate folded system - hex_boards, folded_boards = folded_torus(w, h, - transformation, - uncrinkle_direction, - folds) - - # Divide into cabinets - cabinetised_boards = transforms.cabinetise(folded_boards, - cabinet.num_cabinets, - num_frames, - cabinet.boards_per_frame) - cabinetised_boards = transforms.remove_gaps(cabinetised_boards) - - # Render the image - Context = {"png": PNGContextManager, "pdf": PDFContextManager}[file_type] - with Context(output_filename, image_width, image_height) as ctx: - draw_machine_map(ctx, image_width, image_height, - w, h, hex_boards, cabinetised_boards) - - return 0 + parser = argparse.ArgumentParser( + description="Generate visual maps from the SpiNNaker network topology to " + "board locations.") + arguments.add_version_args(parser) + arguments.add_image_args(parser) + arguments.add_topology_args(parser) + arguments.add_cabinet_args(parser) + + # Process command-line arguments + args = parser.parse_args(args) + (w, h), transformation, uncrinkle_direction, folds =\ + arguments.get_topology_from_args(parser, args) + + cabinet, num_frames = arguments.get_cabinets_from_args(parser, args) + + aspect_ratio = get_machine_map_aspect_ratio(w, h) + + output_filename, file_type, image_width, image_height =\ + arguments.get_image_from_args(parser, args, aspect_ratio) + + # Generate folded system + hex_boards, folded_boards = folded_torus(w, h, + transformation, + uncrinkle_direction, + folds) + + # Divide into cabinets + cabinetised_boards = transforms.cabinetise(folded_boards, + cabinet.num_cabinets, + num_frames, + cabinet.boards_per_frame) + cabinetised_boards = transforms.remove_gaps(cabinetised_boards) + + # Render the image + Context = {"png": PNGContextManager, "pdf": PDFContextManager}[file_type] + with Context(output_filename, image_width, image_height) as ctx: + draw_machine_map(ctx, image_width, image_height, + w, h, hex_boards, cabinetised_boards) + + return 0 if __name__=="__main__": # pragma: no cover - import sys - sys.exit(main()) + import sys + sys.exit(main()) diff --git a/spinner/topology.py b/spinner/topology.py index c2bb876..cf9be9d 100644 --- a/spinner/topology.py +++ b/spinner/topology.py @@ -17,8 +17,8 @@ This uses the hexagonal addressing scheme suggested in - Addressing and Routing in Hexagonal Networks with Applications for Tracking - Mobile Users and Connection Rerouting in Cellular Networks by Nocetti et. al. + Addressing and Routing in Hexagonal Networks with Applications for Tracking + Mobile Users and Connection Rerouting in Cellular Networks by Nocetti et. al. """ from enum import IntEnum @@ -30,48 +30,48 @@ ################################################################################ class Direction(IntEnum): - east = 0 - north_east = 1 - north = 2 - west = 3 - south_west = 4 - south = 5 - - @property - def next_ccw(self): - """ - Returns the next direction counter-clockwise from the given direction. - """ - return Direction((self + 1) % 6) - - @property - def next_cw(self): - """ - Returns the next direction clockwise from the given direction. - """ - return Direction((self - 1) % 6) - - @property - def opposite(self): - """ - Returns the opposite direction. - """ - return Direction((self + 3) % 6) - - @property - def vector(self): - """ - Returns the vector which moves one unit in the given direction. - """ - return _DIRECTION_VECTORS[self] + east = 0 + north_east = 1 + north = 2 + west = 3 + south_west = 4 + south = 5 + + @property + def next_ccw(self): + """ + Returns the next direction counter-clockwise from the given direction. + """ + return Direction((self + 1) % 6) + + @property + def next_cw(self): + """ + Returns the next direction clockwise from the given direction. + """ + return Direction((self - 1) % 6) + + @property + def opposite(self): + """ + Returns the opposite direction. + """ + return Direction((self + 3) % 6) + + @property + def vector(self): + """ + Returns the vector which moves one unit in the given direction. + """ + return _DIRECTION_VECTORS[self] _DIRECTION_VECTORS = { - Direction.east: ( 1, 0, 0), - Direction.west: (-1, 0, 0), - Direction.north: ( 0, 1, 0), - Direction.south: ( 0,-1, 0), - Direction.north_east: ( 0, 0,-1), - Direction.south_west: ( 0, 0, 1), + Direction.east: ( 1, 0, 0), + Direction.west: (-1, 0, 0), + Direction.north: ( 0, 1, 0), + Direction.south: ( 0,-1, 0), + Direction.north_east: ( 0, 0,-1), + Direction.south_west: ( 0, 0, 1), } @@ -80,144 +80,144 @@ def vector(self): ################################################################################ def add_direction(vector, direction): - """ - Returns the vector moved one unit in the given direction. - """ - return coordinates.Hexagonal(*(v + a for (v,a) in zip(vector, direction.vector))) + """ + Returns the vector moved one unit in the given direction. + """ + return coordinates.Hexagonal(*(v + a for (v,a) in zip(vector, direction.vector))) def manhattan(vector): - """ - Calculate the Manhattan distance required to traverse the given vector. - """ - return sum(map(abs, vector)) + """ + Calculate the Manhattan distance required to traverse the given vector. + """ + return sum(map(abs, vector)) def median_element(values): - """ - Returns the value of the median element of the set. - """ - return sorted(values)[len(values)//2] + """ + Returns the value of the median element of the set. + """ + return sorted(values)[len(values)//2] def to_shortest_path(vector): - """ - Converts a vector into the shortest-path variation. - - A shortest path has at least one dimension equal to zero and the remaining two - dimensions have opposite signs (or are zero). - """ - assert(len(vector) == 3) - - # The vector (1,1,1) has distance zero so this can be added or subtracted - # freely without effect on the destination reached. As a result, simply - # subtract the median value from all dimensions to yield the shortest path. - median = median_element(vector) - return coordinates.Hexagonal(*(v - median for v in vector)) + """ + Converts a vector into the shortest-path variation. + + A shortest path has at least one dimension equal to zero and the remaining two + dimensions have opposite signs (or are zero). + """ + assert(len(vector) == 3) + + # The vector (1,1,1) has distance zero so this can be added or subtracted + # freely without effect on the destination reached. As a result, simply + # subtract the median value from all dimensions to yield the shortest path. + median = median_element(vector) + return coordinates.Hexagonal(*(v - median for v in vector)) def to_xy(vector): - """ - Takes a 3D vector and returns the equivalent 2D version. - """ - return coordinates.Hexagonal2D(vector[0] - vector[2], vector[1] - vector[2]) + """ + Takes a 3D vector and returns the equivalent 2D version. + """ + return coordinates.Hexagonal2D(vector[0] - vector[2], vector[1] - vector[2]) def hex_to_cartesian(coords): - """ - Convert a set of hexagonal coordinates into equivalent (for presentation - purposes) Cartesian values. - """ - - old_x, old_y = to_xy(coords) - - new_x = old_x - new_y = (old_y * 2) - old_x - - return coordinates.Cartesian2D(new_x, new_y) + """ + Convert a set of hexagonal coordinates into equivalent (for presentation + purposes) Cartesian values. + """ + + old_x, old_y = to_xy(coords) + + new_x = old_x + new_y = (old_y * 2) - old_x + + return coordinates.Cartesian2D(new_x, new_y) def board_to_chip(coords, layers = 4): - """ - Convert a hexagonal board coordinate into a hexagonal chip coordinate for the - chip at the bottom-left of a board that size. - """ - x,y = hex_to_skewed_cartesian(coords) - x *= layers - y *= layers - return coordinates.Hexagonal(x,y,0) + """ + Convert a hexagonal board coordinate into a hexagonal chip coordinate for the + chip at the bottom-left of a board that size. + """ + x,y = hex_to_skewed_cartesian(coords) + x *= layers + y *= layers + return coordinates.Hexagonal(x,y,0) def hex_to_skewed_cartesian(coords): - """ - Convert a set of hexagonal coordinates into equivalent Cartesian values - skewed to make x and y in the coordinate space match x and y in Cartesian - space. - """ - - old_x, old_y = to_xy(coords) - - new_x = old_x + old_y - new_y = (old_y * 2) - old_x - - return coordinates.Cartesian2D(new_x, new_y) + """ + Convert a set of hexagonal coordinates into equivalent Cartesian values + skewed to make x and y in the coordinate space match x and y in Cartesian + space. + """ + + old_x, old_y = to_xy(coords) + + new_x = old_x + old_y + new_y = (old_y * 2) - old_x + + return coordinates.Cartesian2D(new_x, new_y) def wrap_around(coord, bounds): - """ - Wrap the coordinate given around the edges of a torus made of hexagonal - pieces. Assumes that the world is a NxM arrangement of threeboards (see - threeboards function) with bounds = (N, M). - - XXX: Implementation could be nicer (and non iterative)... - """ - - w,h = bounds - x,y = to_xy(coord) - - assert(w > 0) - assert(h > 0) - - while True: - # Where is the coordinate relative to the world - left = x + y < 0 - right = x + y >= w*3 - below = (2*y) - x < 0 - above = (2*y) - x >= h*3 - - if below and left: - x += 1 + (w-1)*2 - (h-1) - y += 2 + (w-1) + (h-1) - continue - - if above and right: - x -= 1 + (w-1)*2 - (h-1) - y -= 2 + (w-1) + (h-1) - continue - - if left: - x += w*2 - y += w - continue - - if right: - x -= w*2 - y -= w - continue - - if below: - x -= h - y += h - continue - - if above: - x += h - y -= h - continue - - break - - return coordinates.Hexagonal(x,y,0) + """ + Wrap the coordinate given around the edges of a torus made of hexagonal + pieces. Assumes that the world is a NxM arrangement of threeboards (see + threeboards function) with bounds = (N, M). + + XXX: Implementation could be nicer (and non iterative)... + """ + + w,h = bounds + x,y = to_xy(coord) + + assert(w > 0) + assert(h > 0) + + while True: + # Where is the coordinate relative to the world + left = x + y < 0 + right = x + y >= w*3 + below = (2*y) - x < 0 + above = (2*y) - x >= h*3 + + if below and left: + x += 1 + (w-1)*2 - (h-1) + y += 2 + (w-1) + (h-1) + continue + + if above and right: + x -= 1 + (w-1)*2 - (h-1) + y -= 2 + (w-1) + (h-1) + continue + + if left: + x += w*2 + y += w + continue + + if right: + x -= w*2 + y -= w + continue + + if below: + x -= h + y += h + continue + + if above: + x += h + y -= h + continue + + break + + return coordinates.Hexagonal(x,y,0) @@ -228,70 +228,70 @@ def wrap_around(coord, bounds): def euclidean(v): - """ - The Euclidean distance represented by the given vector. - """ - return sum(x**2 for x in v) ** 0.5 + """ + The Euclidean distance represented by the given vector. + """ + return sum(x**2 for x in v) ** 0.5 def fold_dimension(x, w, f): - r""" - Takes a coordinate, x, on a single dimension of length w. Returns a tuple - (new_x, fold) where new_x is the coordinate after the dimension has been - folded into f pieces and fold is the fold number it is on. - - Input:: - - ______ w _____ - | | - -----+---------- - | - x - - | | - \| |/ - \./ - - new_x ___\__.\ f = 3 - \/| \ - | - fold - """ - # Width of the folded sections (round up so sections are bigger than - # neccessary if not evenly divisible) - fold_width = (w+(f-1)) // f - - new_x = x % fold_width - fold = x // fold_width - - # If on a reverse-facing fold, flip the coordinate - if fold%2: - # If this is the last fold, it may be smaller if not evenly divisible - if fold == f - 1: - new_x = (fold_width - ((fold_width*f) - w)) - new_x - 1 - else: - new_x = fold_width - new_x - 1 - - return (new_x, fold) + r""" + Takes a coordinate, x, on a single dimension of length w. Returns a tuple + (new_x, fold) where new_x is the coordinate after the dimension has been + folded into f pieces and fold is the fold number it is on. + + Input:: + + ______ w _____ + | | + -----+---------- + | + x + + | | + \| |/ + \./ + + new_x ___\__.\ f = 3 + \/| \ + | + fold + """ + # Width of the folded sections (round up so sections are bigger than + # neccessary if not evenly divisible) + fold_width = (w+(f-1)) // f + + new_x = x % fold_width + fold = x // fold_width + + # If on a reverse-facing fold, flip the coordinate + if fold%2: + # If this is the last fold, it may be smaller if not evenly divisible + if fold == f - 1: + new_x = (fold_width - ((fold_width*f) - w)) - new_x - 1 + else: + new_x = fold_width - new_x - 1 + + return (new_x, fold) def fold_interleave_dimension(x, w, f): - r""" - As fold_dimension but returns a new x such that if the following points were - folded, they would be mapped like so:: - - _______ ---\ (0,0) \ / (0,1) ---\ _______ - 0 1 2 3 ---/ (1,0) \/ (1,1) ---/ 0 3 1 2 - - That is, it interleaves points which would be mapped to the same position by - fold_dimension. - """ - new_x, fold = fold_dimension(x,w,f) - - new_x *= f - new_x += fold - - return new_x + r""" + As fold_dimension but returns a new x such that if the following points were + folded, they would be mapped like so:: + + _______ ---\ (0,0) \ / (0,1) ---\ _______ + 0 1 2 3 ---/ (1,0) \/ (1,1) ---/ 0 3 1 2 + + That is, it interleaves points which would be mapped to the same position by + fold_dimension. + """ + new_x, fold = fold_dimension(x,w,f) + + new_x *= f + new_x += fold + + return new_x ################################################################################ @@ -300,58 +300,58 @@ def fold_interleave_dimension(x, w, f): def cabinetise(coord, bounds, num_cabinets, frames_per_cabinet, boards_per_frame = None): - r""" - Takes a set of Cartesian coordinates and maps them into a series of cabinets. - Splits the system into columns, one per cabinet. Splits each column into rows, - one per frame. These rows likely consist of several columns and rows in - Cartesian space and so values are interleaved to yield a board allocation. - - If the width of the set of boards doesn't divide into the num_cabinets - or the height doesn't divide into the number of frames_per_cabinet, the axes - are flipped and tried again. If this doesn't solve the problem, a ValueError - is raised. - - coord is an (x,y) tuple containing the coordinate to map - - bounds is a (w,h) tuple containing the width and height of the Cartesian space - - If boards_per_frame is given then an assertion checks that the number of boards - is adequate. - - Returns a tuple (cabinet, frame, board). - """ - - x, y = coord - w, h = bounds - - # If not divisible, try flipping the axes - if w % num_cabinets != 0 or h % frames_per_cabinet != 0: - y, x = x, y - h, w = w, h - - # If still not divisible into cabinets and frames, fail. - if w % num_cabinets != 0 or h % frames_per_cabinet != 0: - raise ValueError("Cannot directly map boards into cabinets.") - - cols_per_cabinet = w // num_cabinets - rows_per_frame = h // frames_per_cabinet - - cabinet = x // cols_per_cabinet - frame = y // rows_per_frame - - # Sub coordinate within the frame - x %= cols_per_cabinet - y %= rows_per_frame - - # Interleave into board number - board = y + (rows_per_frame * x) - - # Sanity check the position is actually within the system. - assert(board < boards_per_frame) - assert(frame < frames_per_cabinet) - assert(cabinet < num_cabinets) - - return coordinates.Cabinet(cabinet, frame, board) + r""" + Takes a set of Cartesian coordinates and maps them into a series of cabinets. + Splits the system into columns, one per cabinet. Splits each column into rows, + one per frame. These rows likely consist of several columns and rows in + Cartesian space and so values are interleaved to yield a board allocation. + + If the width of the set of boards doesn't divide into the num_cabinets + or the height doesn't divide into the number of frames_per_cabinet, the axes + are flipped and tried again. If this doesn't solve the problem, a ValueError + is raised. + + coord is an (x,y) tuple containing the coordinate to map + + bounds is a (w,h) tuple containing the width and height of the Cartesian space + + If boards_per_frame is given then an assertion checks that the number of boards + is adequate. + + Returns a tuple (cabinet, frame, board). + """ + + x, y = coord + w, h = bounds + + # If not divisible, try flipping the axes + if w % num_cabinets != 0 or h % frames_per_cabinet != 0: + y, x = x, y + h, w = w, h + + # If still not divisible into cabinets and frames, fail. + if w % num_cabinets != 0 or h % frames_per_cabinet != 0: + raise ValueError("Cannot directly map boards into cabinets.") + + cols_per_cabinet = w // num_cabinets + rows_per_frame = h // frames_per_cabinet + + cabinet = x // cols_per_cabinet + frame = y // rows_per_frame + + # Sub coordinate within the frame + x %= cols_per_cabinet + y %= rows_per_frame + + # Interleave into board number + board = y + (rows_per_frame * x) + + # Sanity check the position is actually within the system. + assert(board < boards_per_frame) + assert(frame < frames_per_cabinet) + assert(cabinet < num_cabinets) + + return coordinates.Cabinet(cabinet, frame, board) @@ -360,61 +360,61 @@ def cabinetise(coord, bounds, num_cabinets, frames_per_cabinet, boards_per_frame ################################################################################ def hexagon(layers = 4): - """ - Generator which produces a list of (x,y) tuples which produce a hexagon of the - given number of layers. - - Try me:: - - points = set(hexagon(4)) - for y in range(min(y for (x,y) in points), max(y for (x,y) in points) + 1)[::-1]: - for x in range(min(x for (x,y) in points), max(x for (x,y) in points) + 1): - if (x,y) in points: - print "#", - else: - print " ", - print - """ - - X,Y,Z = 0,1,2 - - next_position = [0,0,0] - - for n in range(layers): - for _ in range(n): - yield to_xy(next_position) - next_position[Y] -= 1 - - for _ in range(n): - yield to_xy(next_position) - next_position[Z] += 1 - - for _ in range(n+1): - yield to_xy(next_position) - next_position[X] -= 1 - - for _ in range(n): - yield to_xy(next_position) - next_position[Y] += 1 - - for _ in range(n+1): - yield to_xy(next_position) - next_position[Z] -= 1 - - for _ in range(n+1): - yield to_xy(next_position) - next_position[X] += 1 + """ + Generator which produces a list of (x,y) tuples which produce a hexagon of the + given number of layers. + + Try me:: + + points = set(hexagon(4)) + for y in range(min(y for (x,y) in points), max(y for (x,y) in points) + 1)[::-1]: + for x in range(min(x for (x,y) in points), max(x for (x,y) in points) + 1): + if (x,y) in points: + print "#", + else: + print " ", + print + """ + + X,Y,Z = 0,1,2 + + next_position = [0,0,0] + + for n in range(layers): + for _ in range(n): + yield to_xy(next_position) + next_position[Y] -= 1 + + for _ in range(n): + yield to_xy(next_position) + next_position[Z] += 1 + + for _ in range(n+1): + yield to_xy(next_position) + next_position[X] -= 1 + + for _ in range(n): + yield to_xy(next_position) + next_position[Y] += 1 + + for _ in range(n+1): + yield to_xy(next_position) + next_position[Z] -= 1 + + for _ in range(n+1): + yield to_xy(next_position) + next_position[X] += 1 def hexagon_zero(layers = 4): - """ - As with hexagon except coordinates are given relative to the bottom-left - coordinate of the hexagon. - """ - - return ( coordinates.Hexagonal2D(x+layers, y+layers-1) - for (x,y) in hexagon(layers) - ) + """ + As with hexagon except coordinates are given relative to the bottom-left + coordinate of the hexagon. + """ + + return ( coordinates.Hexagonal2D(x+layers, y+layers-1) + for (x,y) in hexagon(layers) + ) @@ -425,60 +425,60 @@ def hexagon_zero(layers = 4): def threeboards(width = 1, height = None): - r""" - Generates a list of width x height threeboards. If height is not specified, - height = width. Width defaults to 1. Coordinates are given as (x,y,0) tuples - on a hexagonal coordinate system like so:: - - - | y - | - / \ - z / \ x - - A threeboard looks like so:: - - ___ - / 1 \___ - \___/ 2 \ - / 0 \___/ - \___/ - - With the bottom-left hexagon being at (0,0). - - And is tiled in to long rows like so:: - - ___ ___ ___ ___ - / 0 \___/ 1 \___/ 2 \___/ 3 \___ - \___/ 0 \___/ 1 \___/ 2 \___/ 3 \ - / 0 \___/ 1 \___/ 2 \___/ 3 \___/ - \___/ \___/ \___/ \___/ - - And into a mesh like so:: - - ___ ___ ___ ___ - / 4 \___/ 5 \___/ 6 \___/ 7 \___ - \___/ 4 \___/ 5 \___/ 6 \___/ 7 \ - / 4 \___/ 5 \___/ 6 \___/ 7 \___/ - \___/ 0 \___/ 1 \___/ 2 \___/ 3 \___ - \___/ 0 \___/ 1 \___/ 2 \___/ 3 \ - / 0 \___/ 1 \___/ 2 \___/ 3 \___/ - \___/ \___/ \___/ \___/ - - """ - - height = width if height is None else height - - # Create the boards - for y in range(height): - for x in range(width): - # z is the index of the board within the set. 0 is the bottom left, 1 is - # the top, 2 is the right - for z in range(3): - # Offset within threeboard --+ - # Y offset ---------+ | - # X offset -+ | | - # | | | - x_coord = (x*2) + (-y) + (z >= 2) - y_coord = (x ) + ( y) + (z >= 1) - yield coordinates.Hexagonal(x_coord,y_coord,0) + r""" + Generates a list of width x height threeboards. If height is not specified, + height = width. Width defaults to 1. Coordinates are given as (x,y,0) tuples + on a hexagonal coordinate system like so:: + + + | y + | + / \ + z / \ x + + A threeboard looks like so:: + + ___ + / 1 \___ + \___/ 2 \ + / 0 \___/ + \___/ + + With the bottom-left hexagon being at (0,0). + + And is tiled in to long rows like so:: + + ___ ___ ___ ___ + / 0 \___/ 1 \___/ 2 \___/ 3 \___ + \___/ 0 \___/ 1 \___/ 2 \___/ 3 \ + / 0 \___/ 1 \___/ 2 \___/ 3 \___/ + \___/ \___/ \___/ \___/ + + And into a mesh like so:: + + ___ ___ ___ ___ + / 4 \___/ 5 \___/ 6 \___/ 7 \___ + \___/ 4 \___/ 5 \___/ 6 \___/ 7 \ + / 4 \___/ 5 \___/ 6 \___/ 7 \___/ + \___/ 0 \___/ 1 \___/ 2 \___/ 3 \___ + \___/ 0 \___/ 1 \___/ 2 \___/ 3 \ + / 0 \___/ 1 \___/ 2 \___/ 3 \___/ + \___/ \___/ \___/ \___/ + + """ + + height = width if height is None else height + + # Create the boards + for y in range(height): + for x in range(width): + # z is the index of the board within the set. 0 is the bottom left, 1 is + # the top, 2 is the right + for z in range(3): + # Offset within threeboard --+ + # Y offset ---------+ | + # X offset -+ | | + # | | | + x_coord = (x*2) + (-y) + (z >= 2) + y_coord = (x ) + ( y) + (z >= 1) + yield coordinates.Hexagonal(x_coord,y_coord,0)