-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changed and refactored the code #2
base: main
Are you sure you want to change the base?
Changes from 1 commit
e1a5fe0
f23005b
7f338c2
0f1602f
145b33d
1d341c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,13 @@ | |
# | ||
# pifo-fifo.py | ||
|
||
"""First in, first out (FIFO) | ||
|
||
The FIFO scheduling algorithm preserves the order of the scheduled packets. This | ||
implementation is here for completeness and uses a PIFO. It is here to help | ||
people understand how to add new scheduling algorithms to this framework. | ||
""" | ||
|
||
__copyright__ = """ | ||
Copyright (c) 2021, Toke Høiland-Jørgensen <[email protected]> | ||
Copyright (c) 2021, Frey Alfredsson <[email protected]> | ||
|
@@ -26,11 +33,28 @@ | |
""" | ||
|
||
from pifo_lib import Packet, Runner, Pifo | ||
from pifo_lib import SchedulingAlgorithm | ||
|
||
|
||
class Fifo(SchedulingAlgorithm): | ||
"""First in, first out (FIFO)""" | ||
|
||
def __init__(self): | ||
self._pifo = Pifo() | ||
|
||
def enqueue(self, item): | ||
rank = self.get_rank(item) | ||
self._pifo.enqueue(item, rank) | ||
|
||
class Fifo(Pifo): | ||
def get_rank(self, item): | ||
return self.qlen | ||
return self._pifo.qlen | ||
|
||
def dequeue(self): | ||
return self._pifo.dequeue() | ||
|
||
def dump(self): | ||
self._pifo.dump() | ||
|
||
|
||
if __name__ == "__main__": | ||
pkts = [ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,32 +27,74 @@ def __init__(self, flow, idn, length=1): | |
self.flow = flow | ||
self.idn = idn | ||
self.length = length | ||
self.rank = 0 | ||
|
||
def __repr__(self): | ||
return f"P(F:{self.flow}, I:{self.idn}, L:{self.length})" | ||
|
||
|
||
class Runner: | ||
def __init__(self, pkts, queue): | ||
"""This class is responsible for running a test on a packet scheduling | ||
algorithm. It is accountable for enquing and dequeing packets. For now, it | ||
does so by dequing as many packets as it enqued. In the next iteration, when | ||
we add pacing, it will need to handle virtual time cycling. | ||
""" | ||
|
||
def __init__(self, pkts, scheduler): | ||
self.input_pkts = pkts | ||
self.queue = queue | ||
self.scheduler = scheduler | ||
|
||
def run(self): | ||
print(f"Running with queue: {self.queue}") | ||
print(" Inserting packets into queue:") | ||
print(f"Running with scheduler: {self.scheduler}") | ||
print(" Inserting packets into scheduler:") | ||
pprint(self.input_pkts, indent=4) | ||
for p in self.input_pkts: | ||
self.queue.enqueue(p) | ||
print(" Queue state:") | ||
self.queue.dump() | ||
self.scheduler.enqueue(p) | ||
print(" Scheduler state:") | ||
self.scheduler.dump() | ||
output = [] | ||
for p in self.queue: | ||
|
||
for p in self.scheduler: | ||
output.append(p) | ||
print(" Got packets from queue:") | ||
pprint(output, indent=4) | ||
|
||
|
||
class SchedulingAlgorithm(): | ||
|
||
"""A queuing packet scheduling algorithm requires an abstraction that keeps | ||
the queuing data structure and the algorithm separate. To create a new | ||
Scheduling algorithm, inherit this class, add the scheduling data structures | ||
to the constructor, and implement the constructor, enqueue, dequeue, and the | ||
dump functions. | ||
|
||
Please look at the pifo_fifo.py to see how you implement a FIFO. | ||
""" | ||
|
||
def __init__(self): | ||
raise NotImplementedError(self.__class__.__name__ + ' missing implementation') | ||
|
||
def enqueue(self, item): | ||
raise NotImplementedError(self.__class__.__name__ + ' missing implementation') | ||
|
||
def dequeue(self): | ||
raise NotImplementedError(self.__class__.__name__ + ' missing implementation') | ||
|
||
def dump(self): | ||
raise NotImplementedError(self.__class__.__name__ + ' missing implementation') | ||
|
||
def __next__(self): | ||
item = self.dequeue() | ||
if item is None: | ||
raise StopIteration | ||
return item | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __repr__(self): | ||
return f"{self.__class__.__name__} - {self.__class__.__doc__}" | ||
|
||
|
||
class Queue: | ||
def __init__(self, idx=None): | ||
self._list = [] | ||
|
@@ -97,11 +139,10 @@ def dump(self): | |
|
||
|
||
class Pifo(Queue): | ||
|
||
def enqueue(self, item, rank=None): | ||
def enqueue(self, item, rank): | ||
if rank is None: | ||
rank = self.get_rank(item) | ||
item.rank = rank | ||
raise ValueError("Rank can't be of value 'None'.") | ||
|
||
super().enqueue((rank, item)) | ||
self.sort() | ||
|
||
|
@@ -116,24 +157,42 @@ def peek(self): | |
itm = super().peek() | ||
return itm[1] if itm else None | ||
|
||
def get_rank(self, item): | ||
raise NotImplementedError | ||
|
||
|
||
class Flow(Queue): | ||
def __init__(self, idx): | ||
super().__init__() | ||
self.idx = idx | ||
self.rank = 0 | ||
|
||
def __repr__(self): | ||
return f"F({self.idx})" | ||
return f"F(I:{self.idx}, Q:{self.qlen}, L:{self.length})" | ||
|
||
# Return the length of the first packet in the queue as the "length" of the | ||
# flow. This is not the correct thing to do, but it works as a stopgap | ||
# solution for testing the hierarchical mode, and we're only using | ||
# unit-length for that anyway | ||
@property | ||
def length(self): | ||
itm = self.peek() | ||
return itm.length if itm else 0 | ||
result = 0 | ||
for itm in self._list: | ||
result += itm.length if itm else 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be too obvious, right? Ha ha |
||
return result | ||
|
||
|
||
class FlowTracker(): | ||
"""This class provides us with the typical operation of keeping track of | ||
flows. Use this class in your scheduling algorithms when your algorithm only | ||
has one type of flows. | ||
""" | ||
|
||
def __init__(self): | ||
self._flows = {} | ||
|
||
def enqueue(self, pkt, flow_id=None): | ||
if not isinstance(pkt, Packet): | ||
raise ValueError(f"Expected a packet, but got '{pkt}' instead.") | ||
if flow_id is None: | ||
flow_id = pkt.flow | ||
if not flow_id in self._flows: | ||
self._flows[flow_id] = Flow(flow_id) | ||
flow = self._flows[flow_id] | ||
flow.enqueue(pkt) | ||
return flow | ||
|
||
def get_flow(self, flow_id): | ||
return self._flows[flow_id] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
#!/usr/bin/env python3 | ||
# coding: utf-8 -*- | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
# | ||
# pifo-srpt.py | ||
|
||
"""Shortest Remaining Processing Time (SRPT). | ||
|
||
This scheduling algorithm is referenced in companion C++ implementation for the | ||
paper "Programmable packet scheduling at line rate" by Sivaraman, Anirudh, et | ||
al. | ||
|
||
It schedules packets in the order of how much data the flow has left. It assumes | ||
complete knowledge of the flow length. In the real world, this would either need | ||
to be estimated or limited to predictable flows. | ||
""" | ||
|
||
__copyright__ = """ | ||
Copyright (c) 2021 Toke Høiland-Jørgensen <[email protected]> | ||
Copyright (c) 2021 Frey Alfredsson <[email protected]> | ||
""" | ||
|
||
__license__ = """ | ||
This program is free software: you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation, either version 3 of the License, or | ||
(at your option) any later version. | ||
|
||
This program is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU General Public License for more details. | ||
|
||
You should have received a copy of the GNU General Public License | ||
along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
""" | ||
|
||
from pifo_lib import Packet, Runner, Pifo | ||
from pifo_lib import SchedulingAlgorithm | ||
from pifo_lib import FlowTracker | ||
|
||
|
||
class Srpt(SchedulingAlgorithm): | ||
"""Shortest Remaining Processing Time""" | ||
|
||
def __init__(self): | ||
self._pifo = Pifo() | ||
self._flow_tracker = FlowTracker() | ||
|
||
self._remains = {} | ||
|
||
# We cheat by accessing the global packet list directly | ||
for pkt in pkts: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought of this as the typical example given in operating systems courses where you are supposed to write a nonexistent and optimal scheduling algorithm. I was not too fond of these examples. After all, it is impossible to implement because it needs complete knowledge of each process's remaining time beforehand. Is it different here? Would we reschedule depending on the currently known flow size instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, now I see what you're trying to do. I think the idea for an actual SRPT algorithm is that the sender will tag each packet with its remaining size in a custom header; that's how they explain it in the Pfabric paper (see the start of §4). This also fits with the "implementation" in the Pifo code, where it just extracts a ranking from the packet... |
||
if pkt.flow in self._remains.keys(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the 'in' operator works directly on dictionaries, no need to use .keys() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, of course. I knew that. I was overthinking that I need to do this when working with values() and forgot myself. |
||
self._remains[pkt.flow] += pkt.length | ||
else: | ||
self._remains[pkt.flow] = pkt.length | ||
|
||
def get_rank(self, pkt): | ||
rank = self._remains[pkt.flow] | ||
self._remains[pkt.flow] -= pkt.length | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I modeled the get_rank function of this algorithm after the Stfq. It is supposed to rank the packets on their remaining flow length given absolute knowledge of the flows. In Stfq, we similarly track the information and update the tracking information in the get_rank function, except we track it based on how many packets we have seen of the flow so far. Here is how the PIFO paper's companion repo refers to the ranking of the packets. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, right, I did realise later that this was similar to the other get_rank() functions; just forgot to go back and edit this comment; sorry about that! And now that I grok'ed what you are tying to do with the above "initialise from global state" thing I also see why it's decrementing. However, if you're going to have an implementation of SRPT here I think it makes more sense to just assume the length is in the packet. Kinda makes the algorithm itself trivial, but what can you do? :) |
||
return rank | ||
|
||
def enqueue(self, item): | ||
flow = self._flow_tracker.enqueue(item) | ||
rank = self.get_rank(item) | ||
self._pifo.enqueue(flow, rank) | ||
|
||
def dequeue(self): | ||
flow = self._pifo.dequeue() | ||
pkt = None | ||
if flow is not None: | ||
pkt = flow.dequeue() | ||
return pkt | ||
|
||
def dump(self): | ||
self._pifo.dump() | ||
|
||
if __name__ == "__main__": | ||
pkts = [ | ||
Packet(flow=1, idn=1, length=2), | ||
Packet(flow=1, idn=2, length=2), | ||
Packet(flow=2, idn=1, length=1), | ||
Packet(flow=2, idn=2, length=1), | ||
Packet(flow=2, idn=3, length=1), | ||
] | ||
Runner(pkts, Srpt()).run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs more vowels! :) ('dequeueing' and 'enqueued')
See https://en.wikipedia.org/wiki/Queueing_theory#Spelling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha ha, yes. It also just feels uncomfortable to write with too many vowels as well :D