Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed and refactored the code #2

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions queue-exp/pifo_fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
#
# pifo-fifo.py

"""First in, first out (FIFO)

The FIFO scheduling algorithm preserves the order of the scheduled packets. This
implementation is here for completeness and uses a PIFO. It is here to help
people understand how to add new scheduling algorithms to this framework.
"""

__copyright__ = """
Copyright (c) 2021, Toke Høiland-Jørgensen <[email protected]>
Copyright (c) 2021, Frey Alfredsson <[email protected]>
Expand All @@ -26,11 +33,28 @@
"""

from pifo_lib import Packet, Runner, Pifo
from pifo_lib import SchedulingAlgorithm


class Fifo(SchedulingAlgorithm):
"""First in, first out (FIFO)"""

def __init__(self):
self._pifo = Pifo()

def enqueue(self, item):
rank = self.get_rank(item)
self._pifo.enqueue(item, rank)

class Fifo(Pifo):
def get_rank(self, item):
return self.qlen
return self._pifo.qlen

def dequeue(self):
return self._pifo.dequeue()

def dump(self):
self._pifo.dump()


if __name__ == "__main__":
pkts = [
Expand Down
107 changes: 83 additions & 24 deletions queue-exp/pifo_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,74 @@ def __init__(self, flow, idn, length=1):
self.flow = flow
self.idn = idn
self.length = length
self.rank = 0

def __repr__(self):
return f"P(F:{self.flow}, I:{self.idn}, L:{self.length})"


class Runner:
def __init__(self, pkts, queue):
"""This class is responsible for running a test on a packet scheduling
algorithm. It is accountable for enquing and dequeing packets. For now, it
does so by dequing as many packets as it enqued. In the next iteration, when
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs more vowels! :) ('dequeueing' and 'enqueued')

See https://en.wikipedia.org/wiki/Queueing_theory#Spelling

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha ha, yes. It also just feels uncomfortable to write with too many vowels as well :D

we add pacing, it will need to handle virtual time cycling.
"""

def __init__(self, pkts, scheduler):
self.input_pkts = pkts
self.queue = queue
self.scheduler = scheduler

def run(self):
print(f"Running with queue: {self.queue}")
print(" Inserting packets into queue:")
print(f"Running with scheduler: {self.scheduler}")
print(" Inserting packets into scheduler:")
pprint(self.input_pkts, indent=4)
for p in self.input_pkts:
self.queue.enqueue(p)
print(" Queue state:")
self.queue.dump()
self.scheduler.enqueue(p)
print(" Scheduler state:")
self.scheduler.dump()
output = []
for p in self.queue:

for p in self.scheduler:
output.append(p)
print(" Got packets from queue:")
pprint(output, indent=4)


class SchedulingAlgorithm():

"""A queuing packet scheduling algorithm requires an abstraction that keeps
the queuing data structure and the algorithm separate. To create a new
Scheduling algorithm, inherit this class, add the scheduling data structures
to the constructor, and implement the constructor, enqueue, dequeue, and the
dump functions.

Please look at the pifo_fifo.py to see how you implement a FIFO.
"""

def __init__(self):
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')

def enqueue(self, item):
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')

def dequeue(self):
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')

def dump(self):
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')

def __next__(self):
item = self.dequeue()
if item is None:
raise StopIteration
return item

def __iter__(self):
return self

def __repr__(self):
return f"{self.__class__.__name__} - {self.__class__.__doc__}"


class Queue:
def __init__(self, idx=None):
self._list = []
Expand Down Expand Up @@ -97,11 +139,10 @@ def dump(self):


class Pifo(Queue):

def enqueue(self, item, rank=None):
def enqueue(self, item, rank):
if rank is None:
rank = self.get_rank(item)
item.rank = rank
raise ValueError("Rank can't be of value 'None'.")

super().enqueue((rank, item))
self.sort()

Expand All @@ -116,24 +157,42 @@ def peek(self):
itm = super().peek()
return itm[1] if itm else None

def get_rank(self, item):
raise NotImplementedError


class Flow(Queue):
def __init__(self, idx):
super().__init__()
self.idx = idx
self.rank = 0

def __repr__(self):
return f"F({self.idx})"
return f"F(I:{self.idx}, Q:{self.qlen}, L:{self.length})"

# Return the length of the first packet in the queue as the "length" of the
# flow. This is not the correct thing to do, but it works as a stopgap
# solution for testing the hierarchical mode, and we're only using
# unit-length for that anyway
@property
def length(self):
itm = self.peek()
return itm.length if itm else 0
result = 0
for itm in self._list:
result += itm.length if itm else 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sum()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be too obvious, right? Ha ha

return result


class FlowTracker():
"""This class provides us with the typical operation of keeping track of
flows. Use this class in your scheduling algorithms when your algorithm only
has one type of flows.
"""

def __init__(self):
self._flows = {}

def enqueue(self, pkt, flow_id=None):
if not isinstance(pkt, Packet):
raise ValueError(f"Expected a packet, but got '{pkt}' instead.")
if flow_id is None:
flow_id = pkt.flow
if not flow_id in self._flows:
self._flows[flow_id] = Flow(flow_id)
flow = self._flows[flow_id]
flow.enqueue(pkt)
return flow

def get_flow(self, flow_id):
return self._flows[flow_id]
88 changes: 88 additions & 0 deletions queue-exp/pifo_srpt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
# coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# pifo-srpt.py

"""Shortest Remaining Processing Time (SRPT).

This scheduling algorithm is referenced in companion C++ implementation for the
paper "Programmable packet scheduling at line rate" by Sivaraman, Anirudh, et
al.

It schedules packets in the order of how much data the flow has left. It assumes
complete knowledge of the flow length. In the real world, this would either need
to be estimated or limited to predictable flows.
"""

__copyright__ = """
Copyright (c) 2021 Toke Høiland-Jørgensen <[email protected]>
Copyright (c) 2021 Frey Alfredsson <[email protected]>
"""

__license__ = """
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""

from pifo_lib import Packet, Runner, Pifo
from pifo_lib import SchedulingAlgorithm
from pifo_lib import FlowTracker


class Srpt(SchedulingAlgorithm):
"""Shortest Remaining Processing Time"""

def __init__(self):
self._pifo = Pifo()
self._flow_tracker = FlowTracker()

self._remains = {}

# We cheat by accessing the global packet list directly
for pkt in pkts:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is pkts coming from? this looks like it'll crash?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of this as the typical example given in operating systems courses where you are supposed to write a nonexistent and optimal scheduling algorithm. I was not too fond of these examples. After all, it is impossible to implement because it needs complete knowledge of each process's remaining time beforehand. Is it different here? Would we reschedule depending on the currently known flow size instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I see what you're trying to do. I think the idea for an actual SRPT algorithm is that the sender will tag each packet with its remaining size in a custom header; that's how they explain it in the Pfabric paper (see the start of §4). This also fits with the "implementation" in the Pifo code, where it just extracts a ranking from the packet...

if pkt.flow in self._remains.keys():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 'in' operator works directly on dictionaries, no need to use .keys()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course. I knew that. I was overthinking that I need to do this when working with values() and forgot myself.

self._remains[pkt.flow] += pkt.length
else:
self._remains[pkt.flow] = pkt.length

def get_rank(self, pkt):
rank = self._remains[pkt.flow]
self._remains[pkt.flow] -= pkt.length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a get_rank() function modifying the state? That seems like a surprising side effect.
Also, why is it subtracting? :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modeled the get_rank function of this algorithm after the Stfq. It is supposed to rank the packets on their remaining flow length given absolute knowledge of the flows. In Stfq, we similarly track the information and update the tracking information in the get_rank function, except we track it based on how many packets we have seen of the flow so far.

Here is how the PIFO paper's companion repo refers to the ranking of the packets.
https://github.com/programmable-scheduling/pifo-machine/blob/master/examples/srpt.dot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, right, I did realise later that this was similar to the other get_rank() functions; just forgot to go back and edit this comment; sorry about that!

And now that I grok'ed what you are tying to do with the above "initialise from global state" thing I also see why it's decrementing. However, if you're going to have an implementation of SRPT here I think it makes more sense to just assume the length is in the packet. Kinda makes the algorithm itself trivial, but what can you do? :)

return rank

def enqueue(self, item):
flow = self._flow_tracker.enqueue(item)
rank = self.get_rank(item)
self._pifo.enqueue(flow, rank)

def dequeue(self):
flow = self._pifo.dequeue()
pkt = None
if flow is not None:
pkt = flow.dequeue()
return pkt

def dump(self):
self._pifo.dump()

if __name__ == "__main__":
pkts = [
Packet(flow=1, idn=1, length=2),
Packet(flow=1, idn=2, length=2),
Packet(flow=2, idn=1, length=1),
Packet(flow=2, idn=2, length=1),
Packet(flow=2, idn=3, length=1),
]
Runner(pkts, Srpt()).run()