diff --git a/verification/block/common.mk b/verification/block/common.mk index 4d68b936694..99e53a58c64 100644 --- a/verification/block/common.mk +++ b/verification/block/common.mk @@ -10,6 +10,9 @@ CURDIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST)))) CFGDIR := $(abspath $(CURDIR)/snapshots/default) CONFIG := $(abspath $(CURDIR)/../../configs) +# Set pythonpath so that tests can access common modules +export PYTHONPATH := $(CURDIR)/common + # Common sources COMMON_SOURCES = $(CFGDIR)/common_defines.vh COMMON_SOURCES += $(CFGDIR)/el2_pdef.vh diff --git a/verification/block/common/axi.py b/verification/block/common/axi.py new file mode 100644 index 00000000000..b2046d8f79b --- /dev/null +++ b/verification/block/common/axi.py @@ -0,0 +1,186 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +from cocotb.triggers import RisingEdge +from pyuvm import * +from utils import collect_bytes + +# ============================================================================== + + +class BusWriteItem(uvm_sequence_item): + """ + A generic data bus write request / response + """ + + def __init__(self, addr, data, resp=None): + super().__init__("BusWriteItem") + self.addr = addr + self.data = data + self.resp = resp + + +class BusReadItem(uvm_sequence_item): + """ + A generic data bus read request / response + """ + + def __init__(self, addr, data=None, resp=None): + super().__init__("BusReadItem") + self.addr = addr + self.data = data + self.resp = resp + + +# ============================================================================== + + +class Axi4LiteMonitor(uvm_component): + """ + A monitor for AXI4 lite bus + """ + + class Transfer: + def __init__(self, tid, addr=None): + self.tid = tid + self.addr = addr + self.data = bytearray() + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + def _aw_active(self): + return self.bfm.axi_awready.value != 0 and self.bfm.axi_awvalid.value != 0 + + def _w_active(self): + return self.bfm.axi_wready.value != 0 and self.bfm.axi_wvalid.value != 0 + + def _ar_active(self): + return self.bfm.axi_arready.value != 0 and self.bfm.axi_arvalid.value != 0 + + def _r_active(self): + return self.bfm.axi_rready.value != 0 and self.bfm.axi_rvalid.value != 0 + + def _b_active(self): + return self.bfm.axi_bready.value != 0 and self.bfm.axi_bvalid.value != 0 + + def _sample_w(self): + return collect_bytes( + self.bfm.axi_wdata, + self.bfm.axi_wstrb, + ) + + def _sample_r(self): + return collect_bytes( + self.bfm.axi_rdata, + ) + + async def watch_write(self): + """ + Watches the bus for writes + """ + xfers = dict() + awid = None + + # Main loop + while True: + # Wait for clock + await RisingEdge(self.bfm.axi_clk) + + # A new write request + if self._aw_active(): + addr = int(self.bfm.axi_awaddr.value) + awid = int(self.bfm.axi_awid.value) + + if awid in xfers: + self.logger.error( + "Write request for a pending transaction, awid={}".format(awid) + ) + + else: + xfers[awid] = self.Transfer(awid, addr) + + # Data (for the last seen awid) + if self._w_active(): + if awid not in xfers: + self.logger.error("Data write but no transaction is pending") + + else: + xfer = xfers[awid] + xfer.data = self._sample_w() + + # Write completion + if self._b_active(): + bresp = int(self.bfm.axi_bresp.value) + bid = int(self.bfm.axi_bid.value) + + if bid not in xfers: + self.logger.error("Response for a non-pending transaction, bid={}".format(bid)) + + else: + xfer = xfers[bid] + del xfers[bid] + + self.ap.write(BusWriteItem(xfer.addr, xfer.data, bresp)) + + self.logger.debug( + "WR: 0x{:08X} {} 0b{:03b}".format( + xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], bresp + ) + ) + + async def watch_read(self): + """ + Watches the bus for reads + """ + xfers = dict() + + # Main loop + while True: + # Wait for clock + await RisingEdge(self.bfm.axi_clk) + + # A new read request + if self._ar_active(): + addr = int(self.bfm.axi_araddr.value) + arid = int(self.bfm.axi_arid.value) + + if arid in xfers: + self.logger.error( + "Read request for a pending transaction, arid={}".format(awid) + ) + + else: + xfers[arid] = self.Transfer(arid, addr) + + # Read completion + if self._r_active(): + rresp = int(self.bfm.axi_rresp.value) + rid = int(self.bfm.axi_rid.value) + + if rid not in xfers: + self.logger.error("Data read but no transaction is pending") + + else: + xfer = xfers[rid] + xfer.data = self._sample_r() + + del xfers[rid] + + self.ap.write(BusReadItem(xfer.addr, xfer.data, rresp)) + + self.logger.debug( + "RD: 0x{:08X} {} 0b{:03b}".format( + xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], rresp + ) + ) + + async def run_phase(self): + # Start read & write watchers + cocotb.start_soon(self.watch_write()) + cocotb.start_soon(self.watch_read()) diff --git a/verification/block/common/utils.py b/verification/block/common/utils.py new file mode 100644 index 00000000000..415a896b175 --- /dev/null +++ b/verification/block/common/utils.py @@ -0,0 +1,48 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 +import logging + +# ============================================================================== + + +def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix="", signal_map=None): + """ + Collects signal objects from UUT and attaches them to the given object. + Optionally UUT signals can be prefixed with the uut_prefix and object + signals with the obj_prefix. If signal_map is given it should be a dict + mapping signal names to actual UUT signal names. + """ + + for sig in signals: + if signal_map is not None: + uut_sig = signal_map.get(sig, uut_prefix + sig) + else: + uut_sig = uut_prefix + sig + obj_sig = obj_prefix + sig + if hasattr(uut, uut_sig): + s = getattr(uut, uut_sig) + + else: + s = None + logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) + + setattr(obj, obj_sig, s) + + +def collect_bytes(data, strb=None): + """ + Collects data bytes asserted on a data bus. Uses the strb value to + determine which octets are valid. Both data and strb must be cocotb + signals. strb can be None. + """ + + if strb is not None: + assert len(data) == 8 * len(strb) + + res = [] + for i in range(len(data) // 8): + if strb is None or strb.value & (1 << i): + dat = (int(data.value) >> (8 * i)) & 0xFF + res.append(dat) + + return bytes(res) diff --git a/verification/block/dma/testbench.py b/verification/block/dma/testbench.py index d91fc74f99c..e36b9fa0e26 100644 --- a/verification/block/dma/testbench.py +++ b/verification/block/dma/testbench.py @@ -7,6 +7,7 @@ import struct import pyuvm +from axi import Axi4LiteMonitor, BusReadItem, BusWriteItem from cocotb.clock import Clock from cocotb.triggers import ( ClockCycles, @@ -18,34 +19,11 @@ Timer, ) from pyuvm import * +from utils import collect_bytes, collect_signals # ============================================================================== -class BusWriteItem(uvm_sequence_item): - """ - A generic data bus write request / response - """ - - def __init__(self, addr, data, resp=None): - super().__init__("BusWriteItem") - self.addr = addr - self.data = data - self.resp = resp - - -class BusReadItem(uvm_sequence_item): - """ - A generic data bus read request / response - """ - - def __init__(self, addr, data=None, resp=None): - super().__init__("BusReadItem") - self.addr = addr - self.data = data - self.resp = resp - - class MemWriteItem(uvm_sequence_item): """ A generic memory bus write item @@ -103,29 +81,6 @@ def __init__(self, addr, data=None, size=32, fail=False): # ============================================================================== -def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix=""): - """ - Collects signal objects from UUT and attaches them to the given object. - Optionally UUT signals can be prefixed with the uut_prefix and object - signals with the obj_prefix - """ - - for sig in signals: - uut_sig = uut_prefix + sig - obj_sig = obj_prefix + sig - if hasattr(uut, uut_sig): - s = getattr(uut, uut_sig) - - else: - s = None - logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) - - setattr(obj, obj_sig, s) - - -# ============================================================================== - - class CoreMemoryBFM(uvm_component): """ A BFM for the memory side interface of the DMA module @@ -358,6 +313,8 @@ class Axi4LiteBFM(uvm_component): """ SIGNALS = [ + "clk", + "rst", "awvalid", "awready", "awid", @@ -396,13 +353,13 @@ def __init__(self, tid): self.pending = False - def __init__(self, name, parent, uut): + def __init__(self, name, parent, uut, signal_map): super().__init__(name, parent) # Collect signals - collect_signals(self.SIGNALS, uut, self, uut_prefix="dma_axi_", obj_prefix="axi_") - - collect_signals(["clk", "rst_l"], uut, self) + collect_signals( + self.SIGNALS, uut, self, uut_prefix="dma_axi_", obj_prefix="axi_", signal_map=signal_map + ) # Determine bus parameters self.awidth = len(self.axi_awaddr) @@ -420,31 +377,13 @@ def __init__(self, name, parent, uut): self.wr_xfers = {i: self.Transfer(i) for i in range(1 << len(self.axi_awid))} - @staticmethod - def collect_bytes(data, strb=None): - """ - Collects data bytes asserted on a data bus. Uses the strb value to - determine which octets are valid. - """ - - if strb is not None: - assert len(data) == 8 * len(strb) - - res = [] - for i in range(len(data) // 8): - if strb is None or strb.value & (1 << i): - dat = (int(data.value) >> (8 * i)) & 0xFF - res.append(dat) - - return bytes(res) - async def _wait(self, signal, max_cycles=200): """ Waits for a signal to be asserted for at most max_cycles. Raises an exception if it does not """ for i in range(max_cycles): - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) if signal.value != 0: break else: @@ -457,7 +396,7 @@ async def write(self, addr, data): # Wait for a free transfer id while True: - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) async with self.xfer_lock: awid = None @@ -480,7 +419,7 @@ async def write(self, addr, data): self.axi_awid.value = awid self.axi_awsize.value = int(math.ceil(math.log2(self.dwidth))) - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) self.axi_awvalid.value = 0 # Send data @@ -526,7 +465,7 @@ async def write_handler(self): while True: # Wait for response - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) if not self.axi_bvalid.value: continue @@ -561,7 +500,7 @@ async def read(self, addr, data): self.axi_arid.value = 1 self.axi_arsize.value = int(math.ceil(math.log2(self.dwidth))) - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) self.axi_arvalid.value = 0 # Receive data @@ -575,7 +514,7 @@ async def read(self, addr, data): await self._wait(self.axi_rvalid) # Get the data - data.extend(self.collect_bytes(self.axi_rdata)) + data.extend(collect_bytes(self.axi_rdata)) # Last, finish reception if self.axi_rlast.value: @@ -625,156 +564,6 @@ async def run_phase(self): self.seq_item_port.item_done() -class Axi4LiteMonitor(uvm_component): - """ - A monitor for AXI4 lite bus - """ - - class Transfer: - def __init__(self, tid, addr=None): - self.tid = tid - self.addr = addr - self.data = None - - def __init__(self, *args, **kwargs): - self.bfm = kwargs["bfm"] - del kwargs["bfm"] - super().__init__(*args, **kwargs) - - def build_phase(self): - self.ap = uvm_analysis_port("ap", self) - - def _aw_active(self): - return self.bfm.axi_awready.value != 0 and self.bfm.axi_awvalid.value != 0 - - def _w_active(self): - return self.bfm.axi_wready.value != 0 and self.bfm.axi_wvalid.value != 0 - - def _ar_active(self): - return self.bfm.axi_arready.value != 0 and self.bfm.axi_arvalid.value != 0 - - def _r_active(self): - return self.bfm.axi_rready.value != 0 and self.bfm.axi_rvalid.value != 0 - - def _b_active(self): - return self.bfm.axi_bready.value != 0 and self.bfm.axi_bvalid.value != 0 - - def _sample_w(self): - return self.bfm.collect_bytes( - self.bfm.axi_wdata, - self.bfm.axi_wstrb, - ) - - def _sample_r(self): - return self.bfm.collect_bytes( - self.bfm.axi_rdata, - ) - - async def watch_write(self): - """ - Watches the bus for writes - """ - xfers = dict() - - # Main loop - while True: - # Wait for clock - await RisingEdge(self.bfm.clk) - - # A new write request - if self._aw_active(): - addr = int(self.bfm.axi_awaddr.value) - awid = int(self.bfm.axi_awid.value) - - if awid in xfers: - self.logger.error( - "Write request for a pending transaction, awid={}".format(awid) - ) - - else: - xfers[awid] = self.Transfer(awid, addr) - - # Data (for the last seen awid) - if self._w_active(): - if awid not in xfers: - self.logger.error("Data write but no transaction is pending") - - else: - xfer = xfers[awid] - xfer.data = self._sample_w() - - # Write completion - if self._b_active(): - bresp = int(self.bfm.axi_bresp.value) - bid = int(self.bfm.axi_bid.value) - - if bid not in xfers: - self.logger.error("Response for a non-pending transaction, bid={}".format(bid)) - - else: - xfer = xfers[bid] - del xfers[bid] - - self.ap.write(BusWriteItem(xfer.addr, xfer.data, bresp)) - - self.logger.debug( - "WR: 0x{:08X} {} 0b{:03b}".format( - xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], bresp - ) - ) - - async def watch_read(self): - """ - Watches the bus for reads - """ - xfers = dict() - - # Main loop - while True: - # Wait for clock - await RisingEdge(self.bfm.clk) - - # A new read request - if self._ar_active(): - addr = int(self.bfm.axi_araddr.value) - arid = int(self.bfm.axi_arid.value) - - if arid in xfers: - self.logger.error( - "Read request for a pending transaction, arid={}".format(awid) - ) - - else: - xfers[arid] = self.Transfer(arid, addr) - - # Read completion - if self._r_active(): - rresp = int(self.bfm.axi_rresp.value) - rid = int(self.bfm.axi_rid.value) - - if rid not in xfers: - self.logger.error("Data read but no transaction is pending") - - else: - xfer = xfers[rid] - xfer.data = self._sample_r() - - del xfers[rid] - - self.ap.write(BusReadItem(xfer.addr, xfer.data, rresp)) - - self.logger.debug( - "RD: 0x{:08X} {} 0b{:03b}".format( - xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], rresp - ) - ) - - async def run_phase(self): - # Start read & write watchers - cocotb.start_soon(self.watch_write()) - cocotb.start_soon(self.watch_read()) - - # ============================================================================== @@ -1040,7 +829,16 @@ def build_phase(self): self.dbg_seqr = uvm_sequencer("dbg_seqr", self) # AXI interface - self.axi_bfm = Axi4LiteBFM("axi_bfm", self, cocotb.top) + self.axi_bfm = Axi4LiteBFM( + "axi_bfm", + self, + uut=cocotb.top, + signal_map={ + "clk": "clk", + "rst": "rst_l", + }, + ) + self.axi_drv = Axi4LiteSubordinateDriver("axi_drv", self, bfm=self.axi_bfm) self.axi_mon = Axi4LiteMonitor("axi_mon", self, bfm=self.axi_bfm) diff --git a/verification/block/lib_ahb_to_axi4/Makefile b/verification/block/lib_ahb_to_axi4/Makefile new file mode 100644 index 00000000000..f361874c995 --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/Makefile @@ -0,0 +1,17 @@ + +null := +space := $(null) # +comma := , + +CURDIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST)))) +SRCDIR := $(abspath $(CURDIR)../../../../design) + +TEST_FILES = $(sort $(wildcard test_*.py)) + +MODULE ?= $(subst $(space),$(comma),$(subst .py,,$(TEST_FILES))) +TOPLEVEL = ahb_to_axi4 + +VERILOG_SOURCES = \ + $(SRCDIR)/lib/ahb_to_axi4.sv + +include $(CURDIR)/../common.mk diff --git a/verification/block/lib_ahb_to_axi4/test_read.py b/verification/block/lib_ahb_to_axi4/test_read.py new file mode 100644 index 00000000000..ee6dbe9a773 --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/test_read.py @@ -0,0 +1,202 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import random + +import pyuvm +from cocotb.triggers import ClockCycles, Combine +from pyuvm import * +from testbench import ( + AXI4LiteReadyItem, + AXI4LiteResponseItem, + BaseEnv, + BaseTest, + BusReadItem, +) + +# ============================================================================= + + +class AHBReadSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + align = 8 + + addr = 0xF0040000 + random.randrange(0, 0x1000) + addr = (addr // align) * align + + item = BusReadItem(addr) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteReadResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AR + item = AXI4LiteResponseItem(["ar"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on R + item = AXI4LiteResponseItem(["r"]) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoReadDataResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AR but not to R + item = AXI4LiteResponseItem(["ar"]) + await self.start_item(item) + await self.finish_item(item) + + +# ============================================================================= + + +class AXI4LiteReadReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Become ready + item = AXI4LiteReadyItem(["ar"], True) + await self.start_item(item) + await self.finish_item(item) + + +# ============================================================================= + + +async def later(cr, cycles): + """ + A helper function to start a task after a number of clock cycles + """ + await ClockCycles(cocotb.top.clk, cycles) + await cr + + +class NoBackpressureReadSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteReadReadySequence("ready") + ahb_seq = AHBReadSequence("stimulus") + axi_seq = AXI4LiteReadResponseSequence("response") + + # Issue an AHB read and do a correct AXI response + await axi_rdy.start(axi_seqr) + + tasks = [ + cocotb.start_soon(ahb_seq.start(ahb_seqr)), + cocotb.start_soon(axi_seq.start(axi_seqr)), + ] + await Combine(*tasks) + + +class BackpressureReadSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + ahb_seq = AHBReadSequence("stimulus") + axi_seq = AXI4LiteReadResponseSequence("response") + + # Issue an AHB read and do a correct AXI response + tasks = [ + cocotb.start_soon(ahb_seq.start(ahb_seqr)), + cocotb.start_soon(later(axi_seq.start(axi_seqr), 5)), + ] + await Combine(*tasks) + + +class NoReadDataResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteReadReadySequence("ready") + ahb_seq = AHBReadSequence("stimulus") + axi_seq = AXI4LiteNoReadDataResponseSequence("response") + + # Issue an AHB read and do a correct AXI response + await axi_rdy.start(axi_seqr) + + tasks = [ + cocotb.start_soon(ahb_seq.start(ahb_seqr)), + cocotb.start_soon(axi_seq.start(axi_seqr)), + ] + await Combine(*tasks) + + +# ============================================================================= + + +@pyuvm.test() +class TestReadNoBackpressure(BaseTest): + """ + Read test with no AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoBackpressureReadSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +@pyuvm.test() +class TestReadBackpressure(BaseTest): + """ + Read test with AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = BackpressureReadSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +@pyuvm.test(expect_error=TimeoutError) +class TestReadNoDataResponse(BaseTest): + """ + Read test with response on AR channel but not on R channel. A timeout should + occur due to lack of the response. + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoReadDataResponseSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) diff --git a/verification/block/lib_ahb_to_axi4/test_write.py b/verification/block/lib_ahb_to_axi4/test_write.py new file mode 100644 index 00000000000..3eafc97dd0a --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/test_write.py @@ -0,0 +1,315 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import random + +import pyuvm +from cocotb.triggers import ClockCycles +from pyuvm import * +from testbench import ( + AXI4LiteReadyItem, + AXI4LiteResponseItem, + BaseEnv, + BaseTest, + BusWriteItem, +) + +# ============================================================================= + + +class AHBWriteSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + dwidth = 64 + align = 8 + + addr = 0xF0040000 + random.randrange(0, 0x1000) + addr = (addr // align) * align + data = [random.randrange(0, (1 << dwidth) - 1)] + + item = BusWriteItem(addr, data) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteWriteResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AW and W + item = AXI4LiteResponseItem(["aw", "w"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on B + item = AXI4LiteResponseItem(["b"]) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteDataResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AW only + item = AXI4LiteResponseItem(["aw"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on B + item = AXI4LiteResponseItem(["b"]) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteAddrResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to W only + item = AXI4LiteResponseItem(["w"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on B + item = AXI4LiteResponseItem(["b"]) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AW and W, do NOT respond to B + item = AXI4LiteResponseItem(["aw", "w"]) + await self.start_item(item) + await self.finish_item(item) + + +# ============================================================================= + + +class AXI4LiteWriteReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Become ready + item = AXI4LiteReadyItem(["aw", "w"], True) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteDataReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Become ready + item = AXI4LiteReadyItem(["aw"], True) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteAddrReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Become ready + item = AXI4LiteReadyItem(["w"], True) + await self.start_item(item) + await self.finish_item(item) + + +# ============================================================================= + + +class NoBackpressureWriteSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteWriteReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteWriteResponseSequence("response") + + # Issue an AHB write and do a correct AXI response + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class BackpressureWriteSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteWriteResponseSequence("response") + + # Issue an AHB write and do a correct AXI response + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class NoWriteResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteWriteReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteNoWriteResponseSequence("response") + + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class NoWriteDataResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteNoWriteDataReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteNoWriteDataResponseSequence("response") + + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class NoWriteAddrResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteNoWriteAddrReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteNoWriteAddrResponseSequence("response") + + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +# ============================================================================= + + +@pyuvm.test() +class TestWriteNoBackpressure(BaseTest): + """ + Write test with no AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoBackpressureWriteSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +@pyuvm.test() +class TestWriteBackpressure(BaseTest): + """ + Write test with AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = BackpressureWriteSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +# FIXME: This test is expected to fail as the AHB to AXI bridge does not wait +# for response on B channel and completely ignores it +@pyuvm.test(expect_fail=True) # FIXME: should be expect_fail=False +class TestWriteNoResponse(BaseTest): + """ + Write test with no AXI backpressure but without a response on B channel + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoWriteResponseSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +@pyuvm.test(expect_error=TimeoutError) +class TestWriteNoAddrResponse(BaseTest): + """ + Write test with no AXI backpressure and no response on AW. A timeout should + occur due to lack of the response. + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoWriteAddrResponseSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +# FIXME: The module ignores wready and does not wait until data gets accepted +# by the subordinate. +@pyuvm.test(expect_fail=True) # FIXME: should be expect_error=Timeout +class TestWriteNoDataResponse(BaseTest): + """ + Write test with no AXI backpressure and no response on W. A timeout should + occur due to lack of the response. + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoWriteDataResponseSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py new file mode 100644 index 00000000000..9fa840b11ac --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -0,0 +1,653 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import os +import random +import sys +from enum import Enum + +import pyuvm +from axi import Axi4LiteMonitor, BusReadItem, BusWriteItem +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles, Combine, FallingEdge, RisingEdge +from pyuvm import * +from utils import collect_bytes, collect_signals + +# ============================================================================== + + +class AXI4LiteReadyItem(uvm_sequence_item): + """ + An item describing ready signal assertion / deassertion for an AXI4 lite + channel(s) + """ + + def __init__(self, channels, ready=True): + super().__init__("AXI4LiteReadyItem") + self.channels = channels + self.ready = ready + + +class AXI4LiteResponseItem(uvm_sequence_item): + """ + An item describing a response for an AXI4 lite channel(s) + """ + + def __init__(self, channels): + super().__init__("AXI4LiteResponseItem") + self.channels = channels + + +# ============================================================================== + + +class AHBLiteManagerBFM(uvm_component): + """ + AHB Lite bus BFM that operates as a manager. + """ + + SIGNALS = [ + "hclk", + "hreset", + "haddr", + "hburst", + "hmastlock", + "hprot", + "hsize", + "htrans", + "hwrite", + "hwdata", + "hsel", + "hrdata", + "hreadyout", + "hresp", + ] + + class HTRANS(Enum): + IDLE = 0b00 + BUSY = 0b01 + NONSEQ = 0b10 + SEQ = 0b11 + + class HBURST(Enum): + SINGLE = 0b000 + INCR = 0b001 + WRAP4 = 0b010 + INCR4 = 0b011 + WRAP8 = 0b100 + INCR8 = 0b101 + WRAP16 = 0b110 + INCR16 = 0b111 + + def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): + super().__init__(name, parent) + + collect_signals( + self.SIGNALS, + uut, + self, + uut_prefix=signal_prefix, + obj_prefix="ahb_", + signal_map=signal_map, + ) + + # Determine bus parameters + self.awidth = len(self.ahb_haddr) + self.dwidth = len(self.ahb_hwdata) # Assuming hrdata is the same + + # Calculate HSIZE encoding + self.hsize = { + 64 // self.dwidth: 3, + 128 // self.dwidth: 4, + 256 // self.dwidth: 5, + 512 // self.dwidth: 6, + 1024 // self.dwidth: 7, + } + + self.logger.debug("AHB Lite manager BFM:") + self.logger.debug(" awidth = {}".format(self.awidth)) + self.logger.debug(" dwidth = {}".format(self.dwidth)) + + async def _wait(self, signal, max_cycles=200): + """ + Waits for a signal to be asserted for at most max_cycles. + Raises an exception if it does not + """ + + for i in range(max_cycles): + await RisingEdge(self.ahb_hclk) + if signal.value == 1: + break + else: + raise TimeoutError("{} timeout".format(str(signal))) + + async def write(self, addr, data): + """ + Issues a write transfer. Parameter data must be a list of integers + where each one represents a full bus data word. The word count must be + one of multiplies of data bus width supported by AHB Lite. + """ + + lnt = len(data) + assert lnt in self.hsize + + # Wait for reset deassertion if necessary + if self.ahb_hreset.value == 0: + await RisingEdge(self.ahb_hreset) + + # Address phase + await RisingEdge(self.ahb_hclk) + self.ahb_hsel.value = 1 + self.ahb_hprot.value = 1 # Indicates a data transfer + self.ahb_hsize.value = self.hsize[lnt] + self.ahb_haddr.value = addr + self.ahb_hwrite.value = 1 + self.ahb_htrans.value = self.HTRANS.NONSEQ.value + self.ahb_hburst.value = self.HBURST.SINGLE.value + await self._wait(self.ahb_hreadyout) + + # Data phase + for i, word in enumerate(data): + if i != lnt - 1: + addr += self.dwidth // 8 + self.ahb_haddr.value = addr + self.ahb_htrans.value = self.HTRANS.SEQ.value + else: + self.ahb_htrans.value = self.HTRANS.IDLE.value + + self.ahb_hwdata.value = word + await self._wait(self.ahb_hreadyout) + + async def read(self, addr, length): + """ + Issues an AHB read transfer for the given number of bus data words. The + word count must be one of multiplies of data bus width supported by AHB + Lite. + """ + + assert length in self.hsize + + # Wait for reset deassertion if necessary + if self.ahb_hreset.value == 0: + await RisingEdge(self.ahb_hreset) + + # Address phase + await RisingEdge(self.ahb_hclk) + self.ahb_hsel.value = 1 + self.ahb_hprot.value = 1 # Data + self.ahb_hsize.value = self.hsize[length] + self.ahb_haddr.value = addr + self.ahb_hwrite.value = 0 + self.ahb_htrans.value = self.HTRANS.NONSEQ.value + self.ahb_hburst.value = self.HBURST.SINGLE.value + await self._wait(self.ahb_hreadyout) + + # Data phase + for i in range(length): + if i != length - 1: + addr += self.dwidth // 8 + self.ahb_haddr.value = addr + self.ahb_htrans.value = self.HTRANS.SEQ.value + else: + self.ahb_htrans.value = self.HTRANS.IDLE.value + + await self._wait(self.ahb_hreadyout) + + +class AHBLiteManagerDriver(uvm_driver): + """ + A driver for AHB Lite BFM + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + async def run_phase(self): + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, BusWriteItem): + await self.bfm.write(it.addr, it.data) + + elif isinstance(it, BusReadItem): + # TODO: Since the intended UUT does not support burst transfers + # here we are reading only a single data word. + await self.bfm.read(it.addr, 1) + + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + + +class AHBLiteMonitor(uvm_component): + """ + AHB Lite bus monitor + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def watch(self): + """ + Watches the bus + """ + + data_stage = False + + hdata = bytearray() + haddr = None + htrans = None + hwrite = None + + while True: + # Wait for reset deassertion if necessary + if self.bfm.ahb_hreset.value == 0: + await RisingEdge(self.bfm.ahb_hreset) + + # Wait for clock edge and hready + await RisingEdge(self.bfm.ahb_hclk) + hready = int(self.bfm.ahb_hreadyout) + + if hready: + # Sample data + if data_stage: + if htrans in [AHBLiteManagerBFM.HTRANS.SEQ, AHBLiteManagerBFM.HTRANS.NONSEQ]: + if hwrite: + data = collect_bytes(self.bfm.ahb_hwdata) + else: + data = collect_bytes(self.bfm.ahb_hrdata) + hdata += data + + # Transfer end + elif htrans == AHBLiteManagerBFM.HTRANS.IDLE: + self.logger.debug( + "{}: 0x{:08X} {}".format( + "WR" if hwrite else "RD", + haddr, + ["0x{:02X}".format(b) for b in hdata], + ) + ) + + data_stage = False + + # Send response + if hwrite: + cls = BusWriteItem + else: + cls = BusReadItem + + self.ap.write(cls(haddr, hdata)) + + # Sample bus signals + htrans = AHBLiteManagerBFM.HTRANS(self.bfm.ahb_htrans.value) + if not data_stage: + if htrans in [AHBLiteManagerBFM.HTRANS.SEQ, AHBLiteManagerBFM.HTRANS.NONSEQ]: + hwrite = int(self.bfm.ahb_hwrite.value) + haddr = int(self.bfm.ahb_haddr.value) + hdata = bytearray() + data_stage = True + + async def run_phase(self): + cocotb.start_soon(self.watch()) + + +# ============================================================================== + + +class AXI4LiteSubordinateBFM(uvm_component): + """ + AXI 4 Lite subordinate BFM. Allows low-level per-channel acknowledgement + control. + """ + + SIGNALS = [ + "clk", + "rst", + "awvalid", + "awready", + "awid", + "awaddr", + "awsize", + "wvalid", + "wready", + "wdata", + "wstrb", + "bvalid", + "bready", + "bresp", + "bid", + "arvalid", + "arready", + "arid", + "araddr", + "arsize", + "rvalid", + "rready", + "rid", + "rdata", + "rresp", + ] + + def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): + super().__init__(name, parent) + + # Collect signals + collect_signals( + self.SIGNALS, + uut, + self, + uut_prefix=signal_prefix, + obj_prefix="axi_", + signal_map=signal_map, + ) + + # Determine bus parameters + self.awidth = len(self.axi_awaddr) + self.dwidth = len(self.axi_wdata) + self.swidth = len(self.axi_wstrb) + + assert self.swidth == (self.dwidth // 8) + + self.logger.debug("AXI4 Lite BFM:") + self.logger.debug(" awidth = {}".format(self.awidth)) + self.logger.debug(" dwidth = {}".format(self.dwidth)) + self.logger.debug(" swidth = {}".format(self.swidth)) + + self.axi_awready.value = 0 + self.axi_wready.value = 0 + self.axi_arready.value = 0 + self.axi_rready.value = 0 + + async def _wait(self, signal, max_cycles=200): + """ + Waits for a signal to be asserted for at most max_cycles. + Raises an exception if it does not + """ + + for i in range(max_cycles): + await RisingEdge(self.axi_clk) + if signal.value != 0: + break + else: + raise TimeoutError("{} timeout".format(str(signal))) + + async def set_ready(self, channel, ready): + """ + Sets/clears ready signal for the given channel on next clock edge + """ + assert channel in ["aw", "w", "ar", "r"], channel + await RisingEdge(self.axi_clk) + + sig = "axi_{}ready".format(channel) + sig = getattr(self, sig) + sig.value = int(ready) + + async def respond_aw(self): + self.axi_awready.value = 1 + await self._wait(self.axi_awvalid) + + self.axi_awready.value = 0 + + async def respond_w(self): + self.axi_wready.value = 1 + for i in range(1): + await self._wait(self.axi_wvalid) + + self.axi_wready.value = 0 + + async def respond_b(self): + await self._wait(self.axi_bready) + + self.axi_bvalid.value = 1 + self.axi_bid.value = 0 # TODO: support providing different BID values + self.axi_bresp.value = 0 # TODO: support providing different BRESP values + + await RisingEdge(self.axi_clk) + self.axi_bvalid.value = 0 + + async def respond_ar(self): + self.axi_arready.value = 1 + await self._wait(self.axi_arvalid) + + self.axi_arready.value = 0 + + async def respond_r(self): + await self._wait(self.axi_rready) + + self.axi_rvalid.value = 1 + self.axi_rdata.value = random.randrange(0, (1 << self.dwidth) - 1) + + await RisingEdge(self.axi_clk) + self.axi_rvalid.value = 0 + + +class AXI4LiteSubordinateDriver(uvm_driver): + """ + PyUVM driver for AXI 4 Lite subordinate BFM + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + async def run_phase(self): + func_map = { + "aw": self.bfm.respond_aw, + "w": self.bfm.respond_w, + "b": self.bfm.respond_b, + "ar": self.bfm.respond_ar, + "r": self.bfm.respond_r, + } + + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, AXI4LiteResponseItem): + tasks = [cocotb.start_soon(func_map[c]()) for c in it.channels] + await Combine(*tasks) + + elif isinstance(it, AXI4LiteReadyItem): + tasks = [cocotb.start_soon(self.bfm.set_ready(c, it.ready)) for c in it.channels] + await Combine(*tasks) + + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + + +# ============================================================================== + + +class Scoreboard(uvm_component): + """ + A scoreboard that compares AHB and AXI transfers and checks if they + refer to the same address and contain the same data. + """ + + def __init__(self, name, parent): + super().__init__(name, parent) + + self.passed = None + + def build_phase(self): + self.ahb_fifo = uvm_tlm_analysis_fifo("ahb_fifo", self) + self.ahb_port = uvm_get_port("ahb_port", self) + self.axi_fifo = uvm_tlm_analysis_fifo("axi_fifo", self) + self.axi_port = uvm_get_port("axi_port", self) + + def connect_phase(self): + self.ahb_port.connect(self.ahb_fifo.get_export) + self.axi_port.connect(self.axi_fifo.get_export) + + def check_phase(self): + # Check transactions + while self.ahb_port.can_get() and self.axi_port.can_get(): + self.passed = True + + # Get items + _, ahb_item = self.ahb_port.try_get() + _, axi_item = self.axi_port.try_get() + + # Check + msg = "AHB: {} A:0x{:08X} D:[{}], ".format( + type(ahb_item).__name__, + ahb_item.addr, + ",".join(["0x{:02X}".format(d) for d in ahb_item.data]), + ) + + msg += "AXI: {} A:0x{:08X} D:[{}]".format( + type(ahb_item).__name__, + axi_item.addr, + ",".join(["0x{:02X}".format(d) for d in axi_item.data]), + ) + + if ahb_item.addr != axi_item.addr or ahb_item.data != axi_item.data: + self.logger.error(msg) + self.passed = False + else: + self.logger.debug(msg) + + # Indicate an error if there is any leftover transaction in any of the + # queues. + if self.ahb_port.can_get() or self.axi_port.can_get(): + self.logger.error("Spurious transaction(s) on one of the buses") + self.passed = False + + def final_phase(self): + if not self.passed: + self.logger.critical("{} reports a failure".format(type(self))) + assert False + + +# ============================================================================== + + +class BaseEnv(uvm_env): + """ + Base PyUVM test environment + """ + + def build_phase(self): + # Config + ConfigDB().set(None, "*", "TEST_CLK_PERIOD", 1) + ConfigDB().set(None, "*", "TEST_ITERATIONS", 50) + ConfigDB().set(None, "*", "TEST_BURST_LEN", 10) + ConfigDB().set(None, "*", "TEST_BURST_GAP", 10) + + # Sequencers + self.ahb_seqr = uvm_sequencer("ahb_seqr", self) + self.axi_seqr = uvm_sequencer("axi_seqr", self) + + ConfigDB().set(None, "*", "AHB_SEQR", self.ahb_seqr) + ConfigDB().set(None, "*", "AXI_SEQR", self.axi_seqr) + + # BFM + self.ahb_bfm = AHBLiteManagerBFM( + "ahb_bfm", + self, + uut=cocotb.top, + signal_prefix="ahb_", + signal_map={ + "hclk": "clk", + "hreset": "rst_l", + }, + ) + + self.axi_bfm = AXI4LiteSubordinateBFM( + "axi_bfm", + self, + uut=cocotb.top, + signal_prefix="axi_", + signal_map={ + "clk": "clk", + "rst": "rst_l", + }, + ) + + # Driver + self.ahb_drv = AHBLiteManagerDriver("ahb_drv", self, bfm=self.ahb_bfm) + self.axi_drv = AXI4LiteSubordinateDriver("axi_drv", self, bfm=self.axi_bfm) + + # Monitor + self.ahb_mon = AHBLiteMonitor("ahb_mon", self, bfm=self.ahb_bfm) + self.axi_mon = Axi4LiteMonitor("axi_mon", self, bfm=self.axi_bfm) + + # Scoreboard + self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + self.ahb_drv.seq_item_port.connect(self.ahb_seqr.seq_item_export) + self.axi_drv.seq_item_port.connect(self.axi_seqr.seq_item_export) + + self.ahb_mon.ap.connect(self.scoreboard.ahb_fifo.analysis_export) + self.axi_mon.ap.connect(self.scoreboard.axi_fifo.analysis_export) + + +# ============================================================================== + + +class BaseTest(uvm_test): + """ + Base test for the module + """ + + def __init__(self, name, parent, env_class=BaseEnv): + super().__init__(name, parent) + self.env_class = env_class + + # Synchronize pyuvm logging level with cocotb logging level. Unclear + # why it does not happen automatically. + level = logging.getLevelName(os.environ.get("COCOTB_LOG_LEVEL", "INFO")) + uvm_report_object.set_default_logging_level(level) + + def build_phase(self): + self.env = self.env_class("env", self) + + def start_clock(self, name): + period = ConfigDB().get(None, "", "TEST_CLK_PERIOD") + sig = getattr(cocotb.top, name) + clock = Clock(sig, period, units="ns") + cocotb.start_soon(clock.start(start_high=False)) + + async def do_reset(self): + cocotb.top.rst_l.value = 0 + await ClockCycles(cocotb.top.clk, 2) + await FallingEdge(cocotb.top.clk) + cocotb.top.rst_l.value = 1 + + async def run_phase(self): + self.raise_objection() + + # Start clocks + self.start_clock("clk") + + # Issue reset + await self.do_reset() + + # Set common DUT signals + cocotb.top.bus_clk_en.value = 1 + cocotb.top.ahb_hreadyin.value = 1 + + # Wait some cycles + await ClockCycles(cocotb.top.clk, 2) + + # Run the actual test + await self.run() + + # Wait some cycles + await ClockCycles(cocotb.top.clk, 10) + + self.drop_objection() + + async def run(self): + raise NotImplementedError() diff --git a/verification/block/noxfile.py b/verification/block/noxfile.py index 0ef06a08a7f..b6c5b6841bf 100644 --- a/verification/block/noxfile.py +++ b/verification/block/noxfile.py @@ -266,6 +266,20 @@ def lib_axi4_to_ahb_verify(session, blockName, testName, coverage): verify_block(session, blockName, testName, coverage) +@nox.session(tags=["tests"]) +@nox.parametrize("blockName", ["lib_ahb_to_axi4"]) +@nox.parametrize( + "testName", + [ + "test_write", + "test_read", + ], +) +@nox.parametrize("coverage", coverageTypes) +def lib_ahb_to_axi4_verify(session, blockName, testName, coverage): + verify_block(session, blockName, testName, coverage) + + @nox.session(tags=["tests"]) @nox.parametrize("blockName", ["pmp"]) @nox.parametrize(