Skip to content

Commit

Permalink
bug fixes and factoring out code
Browse files Browse the repository at this point in the history
  • Loading branch information
Cassandra Sziklai committed Jul 1, 2024
1 parent 887bd53 commit 8633672
Show file tree
Hide file tree
Showing 13 changed files with 3,909 additions and 9,860 deletions.
9 changes: 7 additions & 2 deletions calyx-py/calyx/gen_queue_data_expect.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#!/usr/bin/bash

num_cmds=20000

num_cmds_cass=1050
queue_size=16
numflows=2

# For SDN, we use piezo mode when making the data file and
# use pifotree_oracle to generate the expected output
Expand Down Expand Up @@ -29,5 +32,7 @@ cat ../test/correctness/queues/binheap/stable_binheap.data | python3 binheap_ora

# For the Round Robin queues, we drop piezo mode as well and use rrqueue_oracle to
# generate the expected output
python3 queue_data_gen.py 20 > ../test/correctness/queues/rr_queues/rr_queue.data
cat ../test/correctness/queues/rr_queues/rr_queue.data | python3 rrqueue_oracle.py 20 $queue_size --keepgoing > ../test/correctness/queues/rr_queues/rr_queue.expect

#numflows = 2..7
python3 queue_data_gen.py $num_cmds_cass > ../test/correctness/queues/rr_queues/rr_queue.data
cat ../test/correctness/queues/rr_queues/rr_queue.data | python3 rrqueue_oracle.py $num_cmds_cass $queue_size $numflows > ../test/correctness/queues/rr_queues/rr_queue.expect
18 changes: 14 additions & 4 deletions calyx-py/calyx/rrqueue_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@
from calyx import queue_util

if __name__ == "__main__":
max_cmds, len = int(sys.argv[1]), int(sys.argv[2])
keepgoing = "--keepgoing" in sys.argv
max_cmds, len, numflows = int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3])
#keepgoing = "--keepgoing" in sys.argv
commands, values, _ = queue_util.parse_json()

if numflows == 2:
boundaries = [200, 400]
elif numflows == 3:
boundaries = [133, 266, 400]
elif numflows == 4:
boundaries = [100, 200, 300, 400]
elif numflows == 7:
boundaries = [50, 100, 150, 200, 250, 300, 400]


# Our Round Robin Queue (formerly known as generalized pifo) is simple: it
# just orchestrates n FIFOs, in this case 3. It takes in a list of
# boundaries of length n (in this case 3).
pifo = queues.RRQueue(7, [50, 100, 150, 200, 250, 300, 400], len)
pifo = queues.RRQueue(numflows, boundaries, len)

ans = queues.operate_queue(pifo, max_cmds, commands, values, keepgoing=keepgoing)
ans = queues.operate_queue(pifo, max_cmds, commands, values, keepgoing=True)

queue_util.dump_json(ans, commands, values)
245 changes: 245 additions & 0 deletions calyx-py/test/correctness/queues/rr_queues/roundrobin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# pylint: disable=import-error
"""Common code factored out, to be imported by the different flow implementations."""
import os
import sys
import inspect

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)

import fifo
import calyx.builder as cb
import calyx.queue_call as qc

# This determines the maximum possible length of the queue:
# The max length of the queue will be 2^QUEUE_LEN_FACTOR.
QUEUE_LEN_FACTOR = 4


def invoke_subqueue(queue_cell, cmd, value, ans, err) -> cb.invoke:
"""Invokes the cell {queue_cell} with:
{cmd} passed by value
{value} passed by value
{ans} passed by reference
{err} passed by reference
"""
return cb.invoke(
queue_cell,
in_cmd=cmd,
in_value=value,
ref_ans=ans,
ref_err=err,
)


def insert_rr_pifo(
prog,
name,
fifos,
boundaries,
numflows, # the number of flows
queue_len_factor=QUEUE_LEN_FACTOR,
stats=None,
static=False,
):
"""Inserts the component `pifo` into the program."""

pifo: cb.ComponentBuilder = prog.component(name)
cmd = pifo.input("cmd", 2) # the size in bits is 2
# If this is 0, we pop. If it is 1, we peek.
# If it is 2, we push `value` to the queue.
value = pifo.input("value", 32) # The value to push to the queue

fifo_cells = [pifo.cell(f"queue_{i}", fifo_i) for i, fifo_i in enumerate(fifos)]

# If a stats component was provided, declare it as a cell of this component.
if stats:
stats = pifo.cell("stats", stats, is_ref=True)

flow = pifo.reg(32, "flow") # The flow to push to: 0 to n.
# We will infer this using a separate component;
# it is a function of the value being pushed.

ans = pifo.reg(32, "ans", is_ref=True)
# If the user wants to pop, we will write the popped value to `ans`.

err = pifo.reg(1, "err", is_ref=True)
# We'll raise this as a general error flag for overflow and underflow.

length = pifo.reg(32, "length") # The active length of the PIFO.

# A register that marks the next sub-queue to `pop` from.
hot = pifo.reg(32, "hot")
og_hot = pifo.reg(32, "og_hot")
copy_hot = pifo.reg_store(og_hot, hot.out) # og_hot := hot.out
restore_hot = pifo.reg_store(hot, og_hot.out) # hot := og_hot.out

max_queue_len = 2**queue_len_factor

# Some equality checks.
len_eq_0 = pifo.eq_use(length.out, 0)
len_eq_max_queue_len = pifo.eq_use(length.out, max_queue_len)
err_eq_0 = pifo.eq_use(err.out, 0)
err_neq_0 = pifo.neq_use(err.out, 0)

raise_err = pifo.reg_store(err, 1, "raise_err") # err := 1
lower_err = pifo.reg_store(err, 0, "lower_err") # err := 0

len_incr = pifo.incr(length) # len++
len_decr = pifo.decr(length) # len--

# We first create a list of invoke-statement handles.
# Each invoke is guarded by an equality check on the hot register,
# and each guard is unique to the subqueue it is associated with.
# This means we can eventually execute all of these invokes in parallel.
invoke_subqueues_hot_guard_seq = [
cb.if_with(
pifo.eq_use(hot.out, n),
invoke_subqueue(fifo_cells[n], cmd, value, ans, err),
)
for n in range(numflows)
]
invoke_subqueues_hot_guard = cb.par(
invoke_subqueues_hot_guard_seq
) # Execute in parallel.

# We create a list of invoke-statement handles.
# Each invoke is guarded by a pair of inequality checks on the value register,
# and each pair of guards is unique to the subqueue it is associated with.
# This means we can eventually execute all of these invokes in parallel.
invoke_subqueues_value_guard_seq = [
cb.if_with(
pifo.le_use(value, boundaries[b + 1]),
cb.if_with(
pifo.gt_use(value, boundaries[b]),
invoke_subqueue(fifo_cells[b], cmd, value, ans, err),
),
)
for b in range(numflows)
]
# invoke_subqueues_value_guard_seq = [
# cb.if_with(
# pifo.le_use(value, boundaries[b + 1]),
# cb.if_with(
# pifo.gt_use(value, boundaries[b]),
# invoke_subqueue(fifo_cells[b], cmd, value, ans, err),
# ),
# )
# for b in range(numflows)
# ]

invoke_zero_edge_case = [
cb.if_with( #edge case of pushing the value 0
pifo.eq_use(value, 0),
cb.if_with(
pifo.eq_use(cmd, 2),
invoke_subqueue(fifo_cells[0], cmd, value, ans, err)),
)
]

invoke_subqueues_value_guard = cb.par(
invoke_subqueues_value_guard_seq
) # Execute in parallel.

incr_hot_wraparound = cb.if_with(
# If hot = numflows - 1, we need to wrap around to 0. Otherwise, we increment.
pifo.eq_use(hot.out, numflows - 1),
pifo.reg_store(hot, 0, "reset_hot"),
pifo.incr(hot),
)

pop_logic = cb.if_with(
len_eq_0,
raise_err, # The queue is empty: underflow.
[ # The queue is not empty. Proceed.
lower_err,
[
invoke_subqueues_hot_guard,
# Our next step depends on whether `fifos[hot]` raised the error flag.
cb.while_with(
err_neq_0,
[ # `fifo_cells[hot]` raised an error.
# We'll try to pop from `fifo_cells[hot+1]`.
# We'll pass it a lowered err
lower_err,
incr_hot_wraparound,
invoke_subqueues_hot_guard,
], # `queue[hot+n]` succeeded. Its answer is our answer.
),
],
incr_hot_wraparound,
len_decr,
],
)

peek_logic = cb.if_with(
len_eq_0,
raise_err, # The queue is empty: underflow.
[ # The queue is not empty. Proceed.
lower_err,
copy_hot,
[
invoke_subqueues_hot_guard,
cb.while_with(
err_neq_0,
[ # `fifo_cells[hot]` raised an error.
# We'll try to peek from `fifo_cells[hot+1]`.
# We'll pass it a lowered `err`.
lower_err,
incr_hot_wraparound,
# increment hot and invoke_subqueue on the next one
invoke_subqueues_hot_guard,
],
),
# Peeking does not affect `hot`.
# Peeking does not affect the length.
],
restore_hot,
],
)

push_logic = cb.if_with(
len_eq_max_queue_len,
raise_err, # The queue is full: overflow.
[ # The queue is not full. Proceed.
lower_err,
# We need to check which flow this value should be pushed to.
invoke_subqueues_value_guard,
invoke_zero_edge_case,
cb.if_with(
err_eq_0,
# If no stats component is provided,
# just increment the active length.
(
len_incr
if not stats
else cb.par(
# If a stats component is provided,
# Increment the active length and also
# tell the stats component what flow we pushed.
len_incr,
(
cb.static_invoke(stats, in_flow=flow.out)
if static
else cb.invoke(stats, in_flow=flow.out)
),
)
),
),
],
)

# Was it a pop, peek, push, or an invalid command?
# We can do those four cases in parallel.
pifo.control += pifo.case(
cmd,
{
0: pop_logic,
1: peek_logic,
2: push_logic,
3: raise_err,
},
)

return pifo
Loading

0 comments on commit 8633672

Please sign in to comment.