From 00ac8241fe964d3b52d90b9143bfc8a6fd76a4ae Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 29 Nov 2023 10:31:41 +0100 Subject: [PATCH 01/12] Isolate AXI 4 Lite monitor into a common file. Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/common/axi.py | 181 ++++++++++++++++++++++ verification/block/common/utils.py | 29 ++++ verification/block/dma/testbench.py | 230 +++------------------------- verification/block/noxfile.py | 1 + 4 files changed, 234 insertions(+), 207 deletions(-) create mode 100644 verification/block/common/axi.py create mode 100644 verification/block/common/utils.py diff --git a/verification/block/common/axi.py b/verification/block/common/axi.py new file mode 100644 index 00000000000..1b93f13902c --- /dev/null +++ b/verification/block/common/axi.py @@ -0,0 +1,181 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +from cocotb.triggers import RisingEdge +from pyuvm import * + +# ============================================================================== + +class BusWriteItem(uvm_sequence_item): + """ + A generic data bus write request / response + """ + + def __init__(self, addr, data, resp=None): + super().__init__("BusWriteItem") + self.addr = addr + self.data = data + self.resp = resp + + +class BusReadItem(uvm_sequence_item): + """ + A generic data bus read request / response + """ + + def __init__(self, addr, data=None, resp=None): + super().__init__("BusReadItem") + self.addr = addr + self.data = data + self.resp = resp + +# ============================================================================== + +class Axi4LiteMonitor(uvm_component): + """ + A monitor for AXI4 lite bus + """ + + class Transfer: + def __init__(self, tid, addr=None): + self.tid = tid + self.addr = addr + self.data = None + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + def _aw_active(self): + return self.bfm.axi_awready.value != 0 and self.bfm.axi_awvalid.value != 0 + + def _w_active(self): + return self.bfm.axi_wready.value != 0 and self.bfm.axi_wvalid.value != 0 + + def _ar_active(self): + return self.bfm.axi_arready.value != 0 and self.bfm.axi_arvalid.value != 0 + + def _r_active(self): + return self.bfm.axi_rready.value != 0 and self.bfm.axi_rvalid.value != 0 + + def _b_active(self): + return self.bfm.axi_bready.value != 0 and self.bfm.axi_bvalid.value != 0 + + def _sample_w(self): + return self.bfm.collect_bytes( + self.bfm.axi_wdata, + self.bfm.axi_wstrb, + ) + + def _sample_r(self): + return self.bfm.collect_bytes( + self.bfm.axi_rdata, + ) + + async def watch_write(self): + """ + Watches the bus for writes + """ + xfers = dict() + + # Main loop + while True: + # Wait for clock + await RisingEdge(self.bfm.axi_clk) + + # A new write request + if self._aw_active(): + addr = int(self.bfm.axi_awaddr.value) + awid = int(self.bfm.axi_awid.value) + + if awid in xfers: + self.logger.error( + "Write request for a pending transaction, awid={}".format(awid) + ) + + else: + xfers[awid] = self.Transfer(awid, addr) + + # Data (for the last seen awid) + if self._w_active(): + if awid not in xfers: + self.logger.error("Data write but no transaction is pending") + + else: + xfer = xfers[awid] + xfer.data = self._sample_w() + + # Write completion + if self._b_active(): + bresp = int(self.bfm.axi_bresp.value) + bid = int(self.bfm.axi_bid.value) + + if bid not in xfers: + self.logger.error("Response for a non-pending transaction, bid={}".format(bid)) + + else: + xfer = xfers[bid] + del xfers[bid] + + self.ap.write(BusWriteItem(xfer.addr, xfer.data, bresp)) + + self.logger.debug( + "WR: 0x{:08X} {} 0b{:03b}".format( + xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], bresp + ) + ) + + async def watch_read(self): + """ + Watches the bus for reads + """ + xfers = dict() + + # Main loop + while True: + # Wait for clock + await RisingEdge(self.bfm.axi_clk) + + # A new read request + if self._ar_active(): + addr = int(self.bfm.axi_araddr.value) + arid = int(self.bfm.axi_arid.value) + + if arid in xfers: + self.logger.error( + "Read request for a pending transaction, arid={}".format(awid) + ) + + else: + xfers[arid] = self.Transfer(arid, addr) + + # Read completion + if self._r_active(): + rresp = int(self.bfm.axi_rresp.value) + rid = int(self.bfm.axi_rid.value) + + if rid not in xfers: + self.logger.error("Data read but no transaction is pending") + + else: + xfer = xfers[rid] + xfer.data = self._sample_r() + + del xfers[rid] + + self.ap.write(BusReadItem(xfer.addr, xfer.data, rresp)) + + self.logger.debug( + "RD: 0x{:08X} {} 0b{:03b}".format( + xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], rresp + ) + ) + + async def run_phase(self): + # Start read & write watchers + cocotb.start_soon(self.watch_write()) + cocotb.start_soon(self.watch_read()) diff --git a/verification/block/common/utils.py b/verification/block/common/utils.py new file mode 100644 index 00000000000..40ef2a92563 --- /dev/null +++ b/verification/block/common/utils.py @@ -0,0 +1,29 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 +import logging + +# ============================================================================== + + +def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix="", signal_map=None): + """ + Collects signal objects from UUT and attaches them to the given object. + Optionally UUT signals can be prefixed with the uut_prefix and object + signals with the obj_prefix. If signal_map is given it should be a dict + mapping signal names to actual UUT signal names. + """ + + for sig in signals: + if signal_map is not None: + uut_sig = signal_map.get(sig, uut_prefix + sig) + else: + uut_sig = uut_prefix + sig + obj_sig = obj_prefix + sig + if hasattr(uut, uut_sig): + s = getattr(uut, uut_sig) + + else: + s = None + logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) + + setattr(obj, obj_sig, s) diff --git a/verification/block/dma/testbench.py b/verification/block/dma/testbench.py index d91fc74f99c..3a9930b2346 100644 --- a/verification/block/dma/testbench.py +++ b/verification/block/dma/testbench.py @@ -7,6 +7,7 @@ import struct import pyuvm +from axi import Axi4LiteMonitor, BusReadItem, BusWriteItem from cocotb.clock import Clock from cocotb.triggers import ( ClockCycles, @@ -18,34 +19,11 @@ Timer, ) from pyuvm import * +from utils import collect_signals # ============================================================================== -class BusWriteItem(uvm_sequence_item): - """ - A generic data bus write request / response - """ - - def __init__(self, addr, data, resp=None): - super().__init__("BusWriteItem") - self.addr = addr - self.data = data - self.resp = resp - - -class BusReadItem(uvm_sequence_item): - """ - A generic data bus read request / response - """ - - def __init__(self, addr, data=None, resp=None): - super().__init__("BusReadItem") - self.addr = addr - self.data = data - self.resp = resp - - class MemWriteItem(uvm_sequence_item): """ A generic memory bus write item @@ -103,29 +81,6 @@ def __init__(self, addr, data=None, size=32, fail=False): # ============================================================================== -def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix=""): - """ - Collects signal objects from UUT and attaches them to the given object. - Optionally UUT signals can be prefixed with the uut_prefix and object - signals with the obj_prefix - """ - - for sig in signals: - uut_sig = uut_prefix + sig - obj_sig = obj_prefix + sig - if hasattr(uut, uut_sig): - s = getattr(uut, uut_sig) - - else: - s = None - logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) - - setattr(obj, obj_sig, s) - - -# ============================================================================== - - class CoreMemoryBFM(uvm_component): """ A BFM for the memory side interface of the DMA module @@ -358,6 +313,8 @@ class Axi4LiteBFM(uvm_component): """ SIGNALS = [ + "clk", + "rst", "awvalid", "awready", "awid", @@ -396,13 +353,13 @@ def __init__(self, tid): self.pending = False - def __init__(self, name, parent, uut): + def __init__(self, name, parent, uut, signal_map): super().__init__(name, parent) # Collect signals - collect_signals(self.SIGNALS, uut, self, uut_prefix="dma_axi_", obj_prefix="axi_") - - collect_signals(["clk", "rst_l"], uut, self) + collect_signals( + self.SIGNALS, uut, self, uut_prefix="dma_axi_", obj_prefix="axi_", signal_map=signal_map + ) # Determine bus parameters self.awidth = len(self.axi_awaddr) @@ -444,7 +401,7 @@ async def _wait(self, signal, max_cycles=200): Raises an exception if it does not """ for i in range(max_cycles): - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) if signal.value != 0: break else: @@ -457,7 +414,7 @@ async def write(self, addr, data): # Wait for a free transfer id while True: - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) async with self.xfer_lock: awid = None @@ -480,7 +437,7 @@ async def write(self, addr, data): self.axi_awid.value = awid self.axi_awsize.value = int(math.ceil(math.log2(self.dwidth))) - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) self.axi_awvalid.value = 0 # Send data @@ -526,7 +483,7 @@ async def write_handler(self): while True: # Wait for response - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) if not self.axi_bvalid.value: continue @@ -561,7 +518,7 @@ async def read(self, addr, data): self.axi_arid.value = 1 self.axi_arsize.value = int(math.ceil(math.log2(self.dwidth))) - await RisingEdge(self.clk) + await RisingEdge(self.axi_clk) self.axi_arvalid.value = 0 # Receive data @@ -625,156 +582,6 @@ async def run_phase(self): self.seq_item_port.item_done() -class Axi4LiteMonitor(uvm_component): - """ - A monitor for AXI4 lite bus - """ - - class Transfer: - def __init__(self, tid, addr=None): - self.tid = tid - self.addr = addr - self.data = None - - def __init__(self, *args, **kwargs): - self.bfm = kwargs["bfm"] - del kwargs["bfm"] - super().__init__(*args, **kwargs) - - def build_phase(self): - self.ap = uvm_analysis_port("ap", self) - - def _aw_active(self): - return self.bfm.axi_awready.value != 0 and self.bfm.axi_awvalid.value != 0 - - def _w_active(self): - return self.bfm.axi_wready.value != 0 and self.bfm.axi_wvalid.value != 0 - - def _ar_active(self): - return self.bfm.axi_arready.value != 0 and self.bfm.axi_arvalid.value != 0 - - def _r_active(self): - return self.bfm.axi_rready.value != 0 and self.bfm.axi_rvalid.value != 0 - - def _b_active(self): - return self.bfm.axi_bready.value != 0 and self.bfm.axi_bvalid.value != 0 - - def _sample_w(self): - return self.bfm.collect_bytes( - self.bfm.axi_wdata, - self.bfm.axi_wstrb, - ) - - def _sample_r(self): - return self.bfm.collect_bytes( - self.bfm.axi_rdata, - ) - - async def watch_write(self): - """ - Watches the bus for writes - """ - xfers = dict() - - # Main loop - while True: - # Wait for clock - await RisingEdge(self.bfm.clk) - - # A new write request - if self._aw_active(): - addr = int(self.bfm.axi_awaddr.value) - awid = int(self.bfm.axi_awid.value) - - if awid in xfers: - self.logger.error( - "Write request for a pending transaction, awid={}".format(awid) - ) - - else: - xfers[awid] = self.Transfer(awid, addr) - - # Data (for the last seen awid) - if self._w_active(): - if awid not in xfers: - self.logger.error("Data write but no transaction is pending") - - else: - xfer = xfers[awid] - xfer.data = self._sample_w() - - # Write completion - if self._b_active(): - bresp = int(self.bfm.axi_bresp.value) - bid = int(self.bfm.axi_bid.value) - - if bid not in xfers: - self.logger.error("Response for a non-pending transaction, bid={}".format(bid)) - - else: - xfer = xfers[bid] - del xfers[bid] - - self.ap.write(BusWriteItem(xfer.addr, xfer.data, bresp)) - - self.logger.debug( - "WR: 0x{:08X} {} 0b{:03b}".format( - xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], bresp - ) - ) - - async def watch_read(self): - """ - Watches the bus for reads - """ - xfers = dict() - - # Main loop - while True: - # Wait for clock - await RisingEdge(self.bfm.clk) - - # A new read request - if self._ar_active(): - addr = int(self.bfm.axi_araddr.value) - arid = int(self.bfm.axi_arid.value) - - if arid in xfers: - self.logger.error( - "Read request for a pending transaction, arid={}".format(awid) - ) - - else: - xfers[arid] = self.Transfer(arid, addr) - - # Read completion - if self._r_active(): - rresp = int(self.bfm.axi_rresp.value) - rid = int(self.bfm.axi_rid.value) - - if rid not in xfers: - self.logger.error("Data read but no transaction is pending") - - else: - xfer = xfers[rid] - xfer.data = self._sample_r() - - del xfers[rid] - - self.ap.write(BusReadItem(xfer.addr, xfer.data, rresp)) - - self.logger.debug( - "RD: 0x{:08X} {} 0b{:03b}".format( - xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], rresp - ) - ) - - async def run_phase(self): - # Start read & write watchers - cocotb.start_soon(self.watch_write()) - cocotb.start_soon(self.watch_read()) - - # ============================================================================== @@ -1040,7 +847,16 @@ def build_phase(self): self.dbg_seqr = uvm_sequencer("dbg_seqr", self) # AXI interface - self.axi_bfm = Axi4LiteBFM("axi_bfm", self, cocotb.top) + self.axi_bfm = Axi4LiteBFM( + "axi_bfm", + self, + uut=cocotb.top, + signal_map={ + "clk": "clk", + "rst": "rst_l", + }, + ) + self.axi_drv = Axi4LiteSubordinateDriver("axi_drv", self, bfm=self.axi_bfm) self.axi_mon = Axi4LiteMonitor("axi_mon", self, bfm=self.axi_bfm) diff --git a/verification/block/noxfile.py b/verification/block/noxfile.py index 0ef06a08a7f..4bbf051579e 100644 --- a/verification/block/noxfile.py +++ b/verification/block/noxfile.py @@ -89,6 +89,7 @@ def isSimFailure( def verify_block(session, blockName, testName, coverage=""): session.install("-r", pipRequirementsPath) + session.env["PYTHONPATH"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "common")) testPath = os.path.join(blockPath, blockName) testNameXML = os.path.join(testName + ".xml") testNameXMLPath = os.path.join(testPath, testNameXML) From a19444735636f02f371839b00db6072861e1e4ef Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Mon, 27 Nov 2023 16:34:53 +0100 Subject: [PATCH 02/12] [WIP] AHB BFM and testbench Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/lib_ahb_to_axi4/Makefile | 17 + .../block/lib_ahb_to_axi4/test_read.py | 60 ++ .../block/lib_ahb_to_axi4/test_write.py | 144 +++++ .../block/lib_ahb_to_axi4/testbench.py | 606 ++++++++++++++++++ verification/block/noxfile.py | 13 + 5 files changed, 840 insertions(+) create mode 100644 verification/block/lib_ahb_to_axi4/Makefile create mode 100644 verification/block/lib_ahb_to_axi4/test_read.py create mode 100644 verification/block/lib_ahb_to_axi4/test_write.py create mode 100644 verification/block/lib_ahb_to_axi4/testbench.py diff --git a/verification/block/lib_ahb_to_axi4/Makefile b/verification/block/lib_ahb_to_axi4/Makefile new file mode 100644 index 00000000000..f361874c995 --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/Makefile @@ -0,0 +1,17 @@ + +null := +space := $(null) # +comma := , + +CURDIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST)))) +SRCDIR := $(abspath $(CURDIR)../../../../design) + +TEST_FILES = $(sort $(wildcard test_*.py)) + +MODULE ?= $(subst $(space),$(comma),$(subst .py,,$(TEST_FILES))) +TOPLEVEL = ahb_to_axi4 + +VERILOG_SOURCES = \ + $(SRCDIR)/lib/ahb_to_axi4.sv + +include $(CURDIR)/../common.mk diff --git a/verification/block/lib_ahb_to_axi4/test_read.py b/verification/block/lib_ahb_to_axi4/test_read.py new file mode 100644 index 00000000000..d464794664f --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/test_read.py @@ -0,0 +1,60 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import random + +import pyuvm +from pyuvm import * + +from cocotb.triggers import ClockCycles + +from testbench import BaseEnv, BaseTest +from testbench import BusWriteItem, BusReadItem + +# ============================================================================= + +class AHBReadSequence(uvm_sequence): + """ + """ + + def __init__(self, name): + super().__init__(name) + + async def body(self): + + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + dwidth = 64 + align = 8 + + for i in range(count): + addr = 0xF0040000 + random.randrange(0, 0x1000) + addr = (addr // align) * align + + # Issue a single read, the converter module does not support + # bursts + item = BusReadItem(addr) + + await self.start_item(item) + await self.finish_item(item) + + await ClockCycles(cocotb.top.clk, gap) + +# ============================================================================= + +@pyuvm.test() +class TestRead(BaseTest): + """ + AHB Lite to AXI4 Lite read test + """ + + def __init__(self, name, parent): + super().__init__(name, parent) + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = AHBReadSequence("stimulus") + + async def run(self): + await self.seq.start(self.env.ahb_seqr) diff --git a/verification/block/lib_ahb_to_axi4/test_write.py b/verification/block/lib_ahb_to_axi4/test_write.py new file mode 100644 index 00000000000..a1aabb6ff08 --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/test_write.py @@ -0,0 +1,144 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import random + +import pyuvm +from pyuvm import * + +from cocotb.triggers import ClockCycles + +from testbench import BaseEnv, BaseTest +from testbench import BusWriteItem, AXI4LiteResponseItem, AXI4LiteReadyItem + +# ============================================================================= + +class AHBWriteSequence(uvm_sequence): + """ + """ + + def __init__(self, name): + super().__init__(name) + + async def body(self): + + dwidth = 64 + align = 8 + + addr = 0xF0040000 + random.randrange(0, 0x1000) + addr = (addr // align) * align + data = [random.randrange(0, (1 << dwidth) - 1)] + + item = BusWriteItem(addr, data) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteWriteResponseSequence(uvm_sequence): + """ + """ + + def __init__(self, name): + super().__init__(name) + + async def body(self): + + # Respond to AW and W + item = AXI4LiteResponseItem(["aw", "w"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on B + item = AXI4LiteResponseItem(["b"]) + await self.start_item(item) + await self.finish_item(item) + +class AXI4LiteWriteReadySequence(uvm_sequence): + """ + """ + + def __init__(self, name): + super().__init__(name) + + async def body(self): + + # Become ready + item = AXI4LiteReadyItem(["aw", "w"], True) + await self.start_item(item) + await self.finish_item(item) + + +# ============================================================================= + +class NoBackpressureWriteSequence(uvm_sequence): + + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteWriteReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteWriteResponseSequence("response") + + # Issue am AHB write and do a correct AXI response + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class BackpressureWriteSequence(uvm_sequence): + + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteWriteResponseSequence("response") + + # Issue am AHB write and do a correct AXI response + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + +# ============================================================================= + +@pyuvm.test() +class TestWriteNoBackpressure(BaseTest): + """ + Write test with no AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoBackpressureWriteSequence() + + async def run(self): + + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + +@pyuvm.test() +class TestWriteBackpressure(BaseTest): + """ + Write test with AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = BackpressureWriteSequence() + + async def run(self): + + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py new file mode 100644 index 00000000000..a8076187881 --- /dev/null +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -0,0 +1,606 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import os +import random + +from enum import Enum + +import pyuvm +from cocotb.clock import Clock +from cocotb.triggers import ( + ClockCycles, + FallingEdge, + RisingEdge, + Combine, +) +from pyuvm import * + +# ============================================================================== + +class BusWriteItem(uvm_sequence_item): + """ + A generic data bus write request / response + """ + + def __init__(self, addr, data, resp=None): + super().__init__("BusWriteItem") + self.addr = addr + self.data = data + self.resp = resp + + +class BusReadItem(uvm_sequence_item): + """ + A generic data bus read request / response + """ + + def __init__(self, addr, data=None, resp=None): + super().__init__("BusReadItem") + self.addr = addr + self.data = data + self.resp = resp + +class AXI4LiteReadyItem(uvm_sequence_item): + """ + """ + + def __init__(self, channels, ready=True): + super().__init__("AXI4LiteReadyItem") + self.channels = channels + self.ready = ready + +class AXI4LiteResponseItem(uvm_sequence_item): + """ + An item describing a response for an AXI4 lite channel(s) + """ + + def __init__(self, channels): + super().__init__("AXI4LiteResponseItem") + self.channels = channels + +# ============================================================================== + +def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix="", signal_map=None): + """ + Collects signal objects from UUT and attaches them to the given object. + Optionally UUT signals can be prefixed with the uut_prefix and object + signals with the obj_prefix. If signal_map is given it should be a dict + mapping signal names to actual UUT signal names. + """ + + for sig in signals: + if signal_map is not None: + uut_sig = signal_map.get(sig, uut_prefix + sig) + else: + uut_sig = uut_prefix + sig + obj_sig = obj_prefix + sig + if hasattr(uut, uut_sig): + s = getattr(uut, uut_sig) + + else: + s = None + logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) + + setattr(obj, obj_sig, s) + +# ============================================================================== + +class AHBLiteManagerBFM(uvm_component): + """ + AHB Lite bus BFM that operates as a manager. + """ + + SIGNALS = [ + "hclk", + "hreset", + + "haddr", + "hburst", + "hmastlock", + "hprot", + "hsize", + "htrans", + "hwrite", + "hwdata", + "hsel", + + "hrdata", + "hreadyout", + "hresp", + ] + + class HTRANS(Enum): + IDLE = 0b00 + BUSY = 0b01 + NONSEQ = 0b10 + SEQ = 0b11 + + class HBURST(Enum): + SINGLE = 0b000 + INCR = 0b001 + WRAP4 = 0b010 + INCR4 = 0b011 + WRAP8 = 0b100 + INCR8 = 0b101 + WRAP16 = 0b110 + INCR16 = 0b111 + + def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): + super().__init__(name, parent) + + # Collect signals + collect_signals(self.SIGNALS, uut, self, uut_prefix=signal_prefix, + obj_prefix="ahb_", signal_map=signal_map) + + # Determine bus parameters + self.awidth = len(self.ahb_haddr) + self.dwidth = len(self.ahb_hwdata) # Assuming hrdata is the same + + self.logger.debug("AHB Lite manager BFM:") + self.logger.debug(" awidth = {}".format(self.awidth)) + self.logger.debug(" dwidth = {}".format(self.dwidth)) + + async def _wait(self, signal, max_cycles=200): + """ + Waits for a signal to be asserted for at most max_cycles. + Raises an exception if it does not + """ + + for i in range(max_cycles): + await RisingEdge(self.ahb_hclk) + if signal.value != 0: + break + else: + raise RuntimeError("{} timeout".format(str(signal))) + + async def write(self, addr, data): + """ + Issues a write transfer + """ + + # Wait for reset deassertion if necessary + if self.ahb_hreset.value == 0: + await RisingEdge(self.ahb_hreset) + + # Address phase + await RisingEdge(self.ahb_hclk) + self.ahb_hsel.value = 1 + self.ahb_hprot.value = 1 # Data + self.ahb_hsize.value = 3 # 64B + self.ahb_haddr.value = addr + self.ahb_hwrite.value = 1 + self.ahb_htrans.value = self.HTRANS.NONSEQ.value + self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? + await self._wait(self.ahb_hreadyout) + + # Data phase + for i, word in enumerate(data): + + if i != len(data) - 1: + addr += self.dwidth // 8 + self.ahb_haddr.value = addr + self.ahb_htrans.value = self.HTRANS.SEQ.value + else: + self.ahb_htrans.value = self.HTRANS.IDLE.value + + self.ahb_hwdata.value = word + await self._wait(self.ahb_hreadyout) + + async def read(self, addr, length): + """ + Issues an AHB read transfer for the given number of words. The word + count must be one of miltiplies of data bus width supported by AHB + Lite. + """ + + hsize = { + 64 // self.dwidth: 3, + 128 // self.dwidth: 4, + 256 // self.dwidth: 5, + 512 // self.dwidth: 6, + 1024 // self.dwidth: 7, + } + assert length in hsize + + # Wait for reset deassertion if necessary + if self.ahb_hreset.value == 0: + await RisingEdge(self.ahb_hreset) + + # Address phase + await RisingEdge(self.ahb_hclk) + self.ahb_hsel.value = 1 + self.ahb_hprot.value = 1 # Data + self.ahb_hsize.value = hsize[length] + self.ahb_haddr.value = addr + self.ahb_hwrite.value = 0 + self.ahb_htrans.value = self.HTRANS.NONSEQ.value + self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? + await self._wait(self.ahb_hreadyout) + + # Data phase + for i in range(length): + + if i != length - 1: + addr += (self.dwidth // 8) + self.ahb_haddr.value = addr + self.ahb_htrans.value = self.HTRANS.SEQ.value + else: + self.ahb_htrans.value = self.HTRANS.IDLE.value + + await self._wait(self.ahb_hreadyout) + + +class AHBLiteManagerDriver(uvm_driver): + """ + A driver for AHB Lite BFM + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + async def run_phase(self): + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, BusWriteItem): + await self.bfm.write(it.addr, it.data) + + elif isinstance(it, BusReadItem): + await self.bfm.read(it.addr, 1) + + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + +# ============================================================================== + +class AXI4LiteSubordinateBFM(uvm_component): + """ + """ + + SIGNALS = [ + "clk", + "rst", + + "awvalid", + "awready", + "awid", + "awaddr", + "awsize", + + "wvalid", + "wready", + "wdata", + "wstrb", + + "bvalid", + "bready", + "bresp", + "bid", + + "arvalid", + "arready", + "arid", + "araddr", + "arsize", + + "rvalid", + "rready", + "rid", + "rdata", + "rresp", + "rlast", + ] + + def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): + super().__init__(name, parent) + + # Collect signals + collect_signals(self.SIGNALS, uut, self, uut_prefix=signal_prefix, + obj_prefix="axi_", signal_map=signal_map) + + # Determine bus parameters + self.awidth = len(self.axi_awaddr) + self.dwidth = len(self.axi_wdata) + self.swidth = len(self.axi_wstrb) + + assert self.swidth == (self.dwidth // 8) + + self.logger.debug("AXI4 Lite BFM:") + self.logger.debug(" awidth = {}".format(self.awidth)) + self.logger.debug(" dwidth = {}".format(self.dwidth)) + self.logger.debug(" swidth = {}".format(self.swidth)) + + self.axi_awready.value = 0 + self.axi_wready.value = 0 + self.axi_arready.value = 0 + self.axi_rready.value = 0 + + async def _wait(self, signal, max_cycles=200): + """ + Waits for a signal to be asserted for at most max_cycles. + Raises an exception if it does not + """ + + for i in range(max_cycles): + await RisingEdge(self.axi_clk) + if signal.value != 0: + break + else: + raise RuntimeError("{} timeout".format(str(signal))) + + async def set_ready(self, channel, ready): + """ + Sets/clears ready signal for the given channel on next clock edge + """ + assert channel in ["aw", "w", "ar", "r"], channel + await RisingEdge(self.axi_clk) + + sig = "axi_{}ready".format(channel) + sig = getattr(self, sig) + sig.value = int(ready) + + async def respond_aw(self): + + # Assert awready + self.axi_awready.value = 1 + # Wait for awvalid + await self._wait(self.axi_awvalid) + + # Sample request + # TODO: + + # Deassert awready + self.axi_awready.value = 0 + + async def respond_w(self): + + # Assert wready + self.axi_wready.value = 1 + + # Wait for valid, receive data + for i in range(1): + await self._wait(self.axi_wvalid) + # TODO: + + # Deassert wready + self.axi_wready.value = 0 + + async def respond_b(self): + + # Wait for bready + await self._wait(self.axi_bready) + + # Transmitt acknowledge + self.axi_bvalid.value = 1 + self.axi_bid.value = 0 # TODO + self.axi_bresp.value = 0 # TODO + + await RisingEdge(self.axi_clk) + self.axi_bvalid.value = 0 + +# """ +# Write responder task (AW, W and B channels). +# """ +# while True: +# +# # Wait for reset deassertion if necessary +# if self.axi_rst.value == 0: +# await RisingEdge(self.axi_rst) +# +# # Assert awready and wready +# self.axi_awready.value = 1 +# self.axi_wready.value = 1 +# +# # Wait for AW +# while True: +# await RisingEdge(self.axi_clk) +# if self.axi_awvalid.value and self.axi_awready.value: +# break +# +# # Deassert awready +# self.axi_awready.value = 0 +# +# # Receive data +# for i in range(1): +# await self._wait(self.axi_wvalid) +# +# # Wait for bready +# await self._wait(self.axi_bready) +# +# # Transmitt acknowledge +# self.axi_bvalid.value = 1 +# self.axi_bid.value = 0 # TODO +# self.axi_bresp.value = 0 # TODO +# +# await RisingEdge(self.axi_clk) +# self.axi_bvalid.value = 0 +# +# async def respond_to_reads(self): +# """ +# Read responder task (AR and R channels). Returns random data. +# """ +# while True: +# +# # Wait for reset deassertion if necessary +# if self.axi_rst.value == 0: +# await RisingEdge(self.axi_rst) +# +# # Assert arready +# self.axi_arready.value = 1 +# +# # Wait for AR +# while True: +# await RisingEdge(self.axi_clk) +# if self.axi_arvalid.value and self.axi_arready.value: +# break +# +# # Deassert arready +# self.axi_arready.value = 0 +# +# # Dummy wait to simulate subordinate latency +# await ClockCycles(self.axi_clk, 5) +# +# # Wait for rready +# await self._wait(self.axi_rready) +# +# # Transmitt data +# self.axi_rvalid.value = 1 +# self.axi_rid.value = 0 # TODO +# self.axi_rdata.value = random.randrange(0, (1 << self.dwidth) - 1) +# self.axi_rresp.value = 0 # TODO +# +# await RisingEdge(self.axi_clk) +# self.axi_rvalid.value = 0 + +class AXI4LiteSubordinateDriver(uvm_driver): + """ + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + async def run_phase(self): + + func_map = { + "aw": self.bfm.respond_aw, + "w": self.bfm.respond_w, + "b": self.bfm.respond_b, + } + + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, AXI4LiteResponseItem): + tasks = [cocotb.start_soon(func_map[c]()) for c in it.channels] + await Combine(*tasks) + + elif isinstance(it, AXI4LiteReadyItem): + tasks = [cocotb.start_soon(self.bfm.set_ready(c, it.ready)) for c in it.channels] + await Combine(*tasks) + + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + +# ============================================================================== + + +class BaseEnv(uvm_env): + """ + Base PyUVM test environment + """ + + def build_phase(self): + + # Config + ConfigDB().set(None, "*", "TEST_CLK_PERIOD", 1) + ConfigDB().set(None, "*", "TEST_ITERATIONS", 50) + ConfigDB().set(None, "*", "TEST_BURST_LEN", 10) + ConfigDB().set(None, "*", "TEST_BURST_GAP", 10) + + # Sequencers + self.ahb_seqr = uvm_sequencer("ahb_seqr", self) + self.axi_seqr = uvm_sequencer("axi_seqr", self) + + ConfigDB().set(None, "*", "AHB_SEQR", self.ahb_seqr) + ConfigDB().set(None, "*", "AXI_SEQR", self.axi_seqr) + + # BFM + self.ahb_bfm = AHBLiteManagerBFM( + "ahb_bfm", self, uut=cocotb.top, + signal_prefix="ahb_", signal_map = { + "hclk": "clk", + "hreset": "rst_l", + }) + + self.axi_bfm = AXI4LiteSubordinateBFM( + "axi_bfm", self, uut=cocotb.top, + signal_prefix="axi_", signal_map = { + "clk": "clk", + "rst": "rst_l", + }) + + # Driver + self.ahb_drv = AHBLiteManagerDriver("ahb_drv", self, bfm=self.ahb_bfm) + self.axi_drv = AXI4LiteSubordinateDriver("axi_drv", self, bfm=self.axi_bfm) + +# # Monitor +# self.mem_mon = MemMonitor("mem_mon", self, dut=cocotb.top) +# +# # Scoreboard +# self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + self.ahb_drv.seq_item_port.connect(self.ahb_seqr.seq_item_export) + self.axi_drv.seq_item_port.connect(self.axi_seqr.seq_item_export) +# self.mem_mon.ap.connect(self.scoreboard.fifo.analysis_export) + pass + + +# ============================================================================== + + +class BaseTest(uvm_test): + """ + Base test for the module + """ + + def __init__(self, name, parent, env_class=BaseEnv): + super().__init__(name, parent) + self.env_class = env_class + + # Synchronize pyuvm logging level with cocotb logging level. Unclear + # why it does not happen automatically. + level = logging.getLevelName(os.environ.get("COCOTB_LOG_LEVEL", "INFO")) + uvm_report_object.set_default_logging_level(level) + + def build_phase(self): + self.env = self.env_class("env", self) + + def start_clock(self, name): + period = ConfigDB().get(None, "", "TEST_CLK_PERIOD") + sig = getattr(cocotb.top, name) + clock = Clock(sig, period, units="ns") + cocotb.start_soon(clock.start(start_high=False)) + + async def do_reset(self): + cocotb.top.rst_l.value = 0 + await ClockCycles(cocotb.top.clk, 2) + await FallingEdge(cocotb.top.clk) + cocotb.top.rst_l.value = 1 + + async def run_phase(self): + self.raise_objection() + + # Start clocks + self.start_clock("clk") + + # Issue reset + await self.do_reset() + + # Set common DUT signals + cocotb.top.bus_clk_en.value = 1 + cocotb.top.ahb_hreadyin.value = 1 + + # Wait some cycles + await ClockCycles(cocotb.top.clk, 2) + + # Run the actual test + await self.run() + + # Wait some cycles + await ClockCycles(cocotb.top.clk, 10) + + self.drop_objection() + + async def run(self): + raise NotImplementedError() + diff --git a/verification/block/noxfile.py b/verification/block/noxfile.py index 4bbf051579e..924137e026e 100644 --- a/verification/block/noxfile.py +++ b/verification/block/noxfile.py @@ -266,6 +266,19 @@ def dccm_verify(session, blockName, testName, coverage): def lib_axi4_to_ahb_verify(session, blockName, testName, coverage): verify_block(session, blockName, testName, coverage) +@nox.session(tags=["tests"]) +@nox.parametrize("blockName", ["lib_ahb_to_axi4"]) +@nox.parametrize( + "testName", + [ + "test_axi", +# "test_axi_read_channel", +# "test_axi_write_channel", + ], +) +@nox.parametrize("coverage", coverageTypes) +def lib_ahb_to_axi4_verify(session, blockName, testName, coverage): + verify_block(session, blockName, testName, coverage) @nox.session(tags=["tests"]) @nox.parametrize("blockName", ["pmp"]) From 85dffe5100f6d0741bf63deeb0723cabfa7235d6 Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 29 Nov 2023 11:20:05 +0100 Subject: [PATCH 03/12] Isolate collect_bytes() Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/common/axi.py | 8 ++++++-- verification/block/common/utils.py | 19 +++++++++++++++++++ verification/block/dma/testbench.py | 22 ++-------------------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/verification/block/common/axi.py b/verification/block/common/axi.py index 1b93f13902c..d374a5ab90b 100644 --- a/verification/block/common/axi.py +++ b/verification/block/common/axi.py @@ -3,9 +3,11 @@ from cocotb.triggers import RisingEdge from pyuvm import * +from utils import collect_bytes # ============================================================================== + class BusWriteItem(uvm_sequence_item): """ A generic data bus write request / response @@ -29,8 +31,10 @@ def __init__(self, addr, data=None, resp=None): self.data = data self.resp = resp + # ============================================================================== + class Axi4LiteMonitor(uvm_component): """ A monitor for AXI4 lite bus @@ -66,13 +70,13 @@ def _b_active(self): return self.bfm.axi_bready.value != 0 and self.bfm.axi_bvalid.value != 0 def _sample_w(self): - return self.bfm.collect_bytes( + return collect_bytes( self.bfm.axi_wdata, self.bfm.axi_wstrb, ) def _sample_r(self): - return self.bfm.collect_bytes( + return collect_bytes( self.bfm.axi_rdata, ) diff --git a/verification/block/common/utils.py b/verification/block/common/utils.py index 40ef2a92563..415a896b175 100644 --- a/verification/block/common/utils.py +++ b/verification/block/common/utils.py @@ -27,3 +27,22 @@ def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix="", signal_map= logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) setattr(obj, obj_sig, s) + + +def collect_bytes(data, strb=None): + """ + Collects data bytes asserted on a data bus. Uses the strb value to + determine which octets are valid. Both data and strb must be cocotb + signals. strb can be None. + """ + + if strb is not None: + assert len(data) == 8 * len(strb) + + res = [] + for i in range(len(data) // 8): + if strb is None or strb.value & (1 << i): + dat = (int(data.value) >> (8 * i)) & 0xFF + res.append(dat) + + return bytes(res) diff --git a/verification/block/dma/testbench.py b/verification/block/dma/testbench.py index 3a9930b2346..e36b9fa0e26 100644 --- a/verification/block/dma/testbench.py +++ b/verification/block/dma/testbench.py @@ -19,7 +19,7 @@ Timer, ) from pyuvm import * -from utils import collect_signals +from utils import collect_bytes, collect_signals # ============================================================================== @@ -377,24 +377,6 @@ def __init__(self, name, parent, uut, signal_map): self.wr_xfers = {i: self.Transfer(i) for i in range(1 << len(self.axi_awid))} - @staticmethod - def collect_bytes(data, strb=None): - """ - Collects data bytes asserted on a data bus. Uses the strb value to - determine which octets are valid. - """ - - if strb is not None: - assert len(data) == 8 * len(strb) - - res = [] - for i in range(len(data) // 8): - if strb is None or strb.value & (1 << i): - dat = (int(data.value) >> (8 * i)) & 0xFF - res.append(dat) - - return bytes(res) - async def _wait(self, signal, max_cycles=200): """ Waits for a signal to be asserted for at most max_cycles. @@ -532,7 +514,7 @@ async def read(self, addr, data): await self._wait(self.axi_rvalid) # Get the data - data.extend(self.collect_bytes(self.axi_rdata)) + data.extend(collect_bytes(self.axi_rdata)) # Last, finish reception if self.axi_rlast.value: From 3de5b450d6c903fb9cb8dc0d488c00fd1654af8c Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 29 Nov 2023 12:05:00 +0100 Subject: [PATCH 04/12] Add AXI vs. AHB monitor, format code Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- .../block/lib_ahb_to_axi4/testbench.py | 460 ++++++++++-------- 1 file changed, 244 insertions(+), 216 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index a8076187881..c8f4056ac17 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -3,52 +3,27 @@ import os import random - +import sys from enum import Enum import pyuvm +from axi import Axi4LiteMonitor, BusReadItem, BusWriteItem from cocotb.clock import Clock -from cocotb.triggers import ( - ClockCycles, - FallingEdge, - RisingEdge, - Combine, -) +from cocotb.triggers import ClockCycles, Combine, FallingEdge, RisingEdge from pyuvm import * +from utils import collect_bytes, collect_signals # ============================================================================== -class BusWriteItem(uvm_sequence_item): - """ - A generic data bus write request / response - """ - - def __init__(self, addr, data, resp=None): - super().__init__("BusWriteItem") - self.addr = addr - self.data = data - self.resp = resp - - -class BusReadItem(uvm_sequence_item): - """ - A generic data bus read request / response - """ - - def __init__(self, addr, data=None, resp=None): - super().__init__("BusReadItem") - self.addr = addr - self.data = data - self.resp = resp class AXI4LiteReadyItem(uvm_sequence_item): - """ - """ + """ """ def __init__(self, channels, ready=True): super().__init__("AXI4LiteReadyItem") self.channels = channels - self.ready = ready + self.ready = ready + class AXI4LiteResponseItem(uvm_sequence_item): """ @@ -59,42 +34,18 @@ def __init__(self, channels): super().__init__("AXI4LiteResponseItem") self.channels = channels -# ============================================================================== - -def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix="", signal_map=None): - """ - Collects signal objects from UUT and attaches them to the given object. - Optionally UUT signals can be prefixed with the uut_prefix and object - signals with the obj_prefix. If signal_map is given it should be a dict - mapping signal names to actual UUT signal names. - """ - - for sig in signals: - if signal_map is not None: - uut_sig = signal_map.get(sig, uut_prefix + sig) - else: - uut_sig = uut_prefix + sig - obj_sig = obj_prefix + sig - if hasattr(uut, uut_sig): - s = getattr(uut, uut_sig) - - else: - s = None - logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) - - setattr(obj, obj_sig, s) # ============================================================================== + class AHBLiteManagerBFM(uvm_component): """ AHB Lite bus BFM that operates as a manager. """ - + SIGNALS = [ "hclk", "hreset", - "haddr", "hburst", "hmastlock", @@ -104,38 +55,43 @@ class AHBLiteManagerBFM(uvm_component): "hwrite", "hwdata", "hsel", - "hrdata", "hreadyout", "hresp", ] class HTRANS(Enum): - IDLE = 0b00 - BUSY = 0b01 - NONSEQ = 0b10 - SEQ = 0b11 + IDLE = 0b00 + BUSY = 0b01 + NONSEQ = 0b10 + SEQ = 0b11 class HBURST(Enum): - SINGLE = 0b000 - INCR = 0b001 - WRAP4 = 0b010 - INCR4 = 0b011 - WRAP8 = 0b100 - INCR8 = 0b101 - WRAP16 = 0b110 - INCR16 = 0b111 + SINGLE = 0b000 + INCR = 0b001 + WRAP4 = 0b010 + INCR4 = 0b011 + WRAP8 = 0b100 + INCR8 = 0b101 + WRAP16 = 0b110 + INCR16 = 0b111 def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): super().__init__(name, parent) # Collect signals - collect_signals(self.SIGNALS, uut, self, uut_prefix=signal_prefix, - obj_prefix="ahb_", signal_map=signal_map) + collect_signals( + self.SIGNALS, + uut, + self, + uut_prefix=signal_prefix, + obj_prefix="ahb_", + signal_map=signal_map, + ) # Determine bus parameters self.awidth = len(self.ahb_haddr) - self.dwidth = len(self.ahb_hwdata) # Assuming hrdata is the same + self.dwidth = len(self.ahb_hwdata) # Assuming hrdata is the same self.logger.debug("AHB Lite manager BFM:") self.logger.debug(" awidth = {}".format(self.awidth)) @@ -165,21 +121,20 @@ async def write(self, addr, data): # Address phase await RisingEdge(self.ahb_hclk) - self.ahb_hsel.value = 1 - self.ahb_hprot.value = 1 # Data - self.ahb_hsize.value = 3 # 64B - self.ahb_haddr.value = addr - self.ahb_hwrite.value = 1 - self.ahb_htrans.value = self.HTRANS.NONSEQ.value - self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? + self.ahb_hsel.value = 1 + self.ahb_hprot.value = 1 # Data + self.ahb_hsize.value = 3 # 64B + self.ahb_haddr.value = addr + self.ahb_hwrite.value = 1 + self.ahb_htrans.value = self.HTRANS.NONSEQ.value + self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? await self._wait(self.ahb_hreadyout) # Data phase for i, word in enumerate(data): - if i != len(data) - 1: addr += self.dwidth // 8 - self.ahb_haddr.value = addr + self.ahb_haddr.value = addr self.ahb_htrans.value = self.HTRANS.SEQ.value else: self.ahb_htrans.value = self.HTRANS.IDLE.value @@ -195,10 +150,10 @@ async def read(self, addr, length): """ hsize = { - 64 // self.dwidth: 3, - 128 // self.dwidth: 4, - 256 // self.dwidth: 5, - 512 // self.dwidth: 6, + 64 // self.dwidth: 3, + 128 // self.dwidth: 4, + 256 // self.dwidth: 5, + 512 // self.dwidth: 6, 1024 // self.dwidth: 7, } assert length in hsize @@ -209,21 +164,20 @@ async def read(self, addr, length): # Address phase await RisingEdge(self.ahb_hclk) - self.ahb_hsel.value = 1 - self.ahb_hprot.value = 1 # Data - self.ahb_hsize.value = hsize[length] - self.ahb_haddr.value = addr - self.ahb_hwrite.value = 0 - self.ahb_htrans.value = self.HTRANS.NONSEQ.value - self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? + self.ahb_hsel.value = 1 + self.ahb_hprot.value = 1 # Data + self.ahb_hsize.value = hsize[length] + self.ahb_haddr.value = addr + self.ahb_hwrite.value = 0 + self.ahb_htrans.value = self.HTRANS.NONSEQ.value + self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? await self._wait(self.ahb_hreadyout) # Data phase for i in range(length): - if i != length - 1: - addr += (self.dwidth // 8) - self.ahb_haddr.value = addr + addr += self.dwidth // 8 + self.ahb_haddr.value = addr self.ahb_htrans.value = self.HTRANS.SEQ.value else: self.ahb_htrans.value = self.HTRANS.IDLE.value @@ -256,38 +210,114 @@ async def run_phase(self): self.seq_item_port.item_done() + +class AHBLiteMonitor(uvm_component): + """ + AHB Lite bus monitor + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def watch(self): + """ + Watches the bus + """ + + data_stage = False + + hdata = bytearray() + haddr = None + htrans = None + hwrite = None + + while True: + # Wait for reset deassertion if necessary + if self.bfm.ahb_hreset.value == 0: + await RisingEdge(self.bfm.ahb_hreset) + + # Wait for clock edge and hready + await RisingEdge(self.bfm.ahb_hclk) + hready = int(self.bfm.ahb_hreadyout) + + if hready: + # Sample data + if data_stage: + if htrans in [AHBLiteManagerBFM.HTRANS.SEQ, AHBLiteManagerBFM.HTRANS.NONSEQ]: + if hwrite: + data = collect_bytes(self.bfm.ahb_hwdata) + else: + data = collect_bytes(self.bfm.ahb_hwdata) + hdata += data + + # Transfer end + elif htrans == AHBLiteManagerBFM.HTRANS.IDLE: + self.logger.debug( + "{}: 0x{:08X} {}".format( + "WR" if hwrite else "RD", + haddr, + ["0x{:02X}".format(b) for b in hdata], + ) + ) + + data_stage = False + + # Send response + if hwrite: + cls = BusWriteItem + else: + cls = BusReadItem + + self.ap.write(cls(haddr, hdata)) + + # Sample bus signals + htrans = AHBLiteManagerBFM.HTRANS(self.bfm.ahb_htrans.value) + if not data_stage: + if htrans in [AHBLiteManagerBFM.HTRANS.SEQ, AHBLiteManagerBFM.HTRANS.NONSEQ]: + hwrite = int(self.bfm.ahb_hwrite.value) + haddr = int(self.bfm.ahb_haddr.value) + hdata = bytearray() + data_stage = True + + async def run_phase(self): + cocotb.start_soon(self.watch()) + + # ============================================================================== + class AXI4LiteSubordinateBFM(uvm_component): """ + AXI 4 Lite subordinate BFM. Allows low-level per-channel acknowledgement + control. """ SIGNALS = [ "clk", "rst", - "awvalid", "awready", "awid", "awaddr", "awsize", - "wvalid", "wready", "wdata", "wstrb", - "bvalid", "bready", "bresp", "bid", - "arvalid", "arready", "arid", "araddr", "arsize", - "rvalid", "rready", "rid", @@ -300,8 +330,14 @@ def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): super().__init__(name, parent) # Collect signals - collect_signals(self.SIGNALS, uut, self, uut_prefix=signal_prefix, - obj_prefix="axi_", signal_map=signal_map) + collect_signals( + self.SIGNALS, + uut, + self, + uut_prefix=signal_prefix, + obj_prefix="axi_", + signal_map=signal_map, + ) # Determine bus parameters self.awidth = len(self.axi_awaddr) @@ -316,9 +352,9 @@ def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): self.logger.debug(" swidth = {}".format(self.swidth)) self.axi_awready.value = 0 - self.axi_wready.value = 0 + self.axi_wready.value = 0 self.axi_arready.value = 0 - self.axi_rready.value = 0 + self.axi_rready.value = 0 async def _wait(self, signal, max_cycles=200): """ @@ -345,120 +381,41 @@ async def set_ready(self, channel, ready): sig.value = int(ready) async def respond_aw(self): - # Assert awready self.axi_awready.value = 1 # Wait for awvalid await self._wait(self.axi_awvalid) - # Sample request - # TODO: - # Deassert awready self.axi_awready.value = 0 async def respond_w(self): - # Assert wready self.axi_wready.value = 1 # Wait for valid, receive data for i in range(1): await self._wait(self.axi_wvalid) - # TODO: # Deassert wready self.axi_wready.value = 0 async def respond_b(self): - # Wait for bready await self._wait(self.axi_bready) # Transmitt acknowledge - self.axi_bvalid.value = 1 - self.axi_bid.value = 0 # TODO - self.axi_bresp.value = 0 # TODO + self.axi_bvalid.value = 1 + self.axi_bid.value = 0 # TODO + self.axi_bresp.value = 0 # TODO await RisingEdge(self.axi_clk) - self.axi_bvalid.value = 0 - -# """ -# Write responder task (AW, W and B channels). -# """ -# while True: -# -# # Wait for reset deassertion if necessary -# if self.axi_rst.value == 0: -# await RisingEdge(self.axi_rst) -# -# # Assert awready and wready -# self.axi_awready.value = 1 -# self.axi_wready.value = 1 -# -# # Wait for AW -# while True: -# await RisingEdge(self.axi_clk) -# if self.axi_awvalid.value and self.axi_awready.value: -# break -# -# # Deassert awready -# self.axi_awready.value = 0 -# -# # Receive data -# for i in range(1): -# await self._wait(self.axi_wvalid) -# -# # Wait for bready -# await self._wait(self.axi_bready) -# -# # Transmitt acknowledge -# self.axi_bvalid.value = 1 -# self.axi_bid.value = 0 # TODO -# self.axi_bresp.value = 0 # TODO -# -# await RisingEdge(self.axi_clk) -# self.axi_bvalid.value = 0 -# -# async def respond_to_reads(self): -# """ -# Read responder task (AR and R channels). Returns random data. -# """ -# while True: -# -# # Wait for reset deassertion if necessary -# if self.axi_rst.value == 0: -# await RisingEdge(self.axi_rst) -# -# # Assert arready -# self.axi_arready.value = 1 -# -# # Wait for AR -# while True: -# await RisingEdge(self.axi_clk) -# if self.axi_arvalid.value and self.axi_arready.value: -# break -# -# # Deassert arready -# self.axi_arready.value = 0 -# -# # Dummy wait to simulate subordinate latency -# await ClockCycles(self.axi_clk, 5) -# -# # Wait for rready -# await self._wait(self.axi_rready) -# -# # Transmitt data -# self.axi_rvalid.value = 1 -# self.axi_rid.value = 0 # TODO -# self.axi_rdata.value = random.randrange(0, (1 << self.dwidth) - 1) -# self.axi_rresp.value = 0 # TODO -# -# await RisingEdge(self.axi_clk) -# self.axi_rvalid.value = 0 + self.axi_bvalid.value = 0 + class AXI4LiteSubordinateDriver(uvm_driver): """ + PyUVM driver for AXI 4 Lite subordinate BFM """ def __init__(self, *args, **kwargs): @@ -467,11 +424,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) async def run_phase(self): - func_map = { - "aw": self.bfm.respond_aw, - "w": self.bfm.respond_w, - "b": self.bfm.respond_b, + "aw": self.bfm.respond_aw, + "w": self.bfm.respond_w, + "b": self.bfm.respond_b, } while True: @@ -490,6 +446,70 @@ async def run_phase(self): self.seq_item_port.item_done() + +# ============================================================================== + +class Scoreboard(uvm_component): + """ + A scoreboard that compares AHB and AXI transfers and checks if they + refer to the same address and containd the same data. + """ + + def __init__(self, name, parent): + super().__init__(name, parent) + + self.passed = None + + def build_phase(self): + self.ahb_fifo = uvm_tlm_analysis_fifo("ahb_fifo", self) + self.ahb_port = uvm_get_port("ahb_port", self) + self.axi_fifo = uvm_tlm_analysis_fifo("axi_fifo", self) + self.axi_port = uvm_get_port("axi_port", self) + + def connect_phase(self): + self.ahb_port.connect(self.ahb_fifo.get_export) + self.axi_port.connect(self.axi_fifo.get_export) + + def check_phase(self): + + # Check transactions + while self.ahb_port.can_get() or self.axi_port.can_get(): + + # A transaction is missing + if not self.ahb_port.can_get() or not self.axi_port.can_get(): + self.logger.error("A transaction is missing on one of the buses") + self.passed = False + break + + self.passed = True + + # Get items + _, ahb_item = self.ahb_port.try_get() + _, axi_item = self.axi_port.try_get() + + # Check + msg = "AHB: {} A:0x{:08X} D:[{}], ".format( + type(ahb_item).__name__, + ahb_item.addr, + ",".join(["0x{:02X}".format(d) for d in ahb_item.data])) + + msg += "AXI: {} A:0x{:08X} D:[{}]".format( + type(ahb_item).__name__, + ahb_item.addr, + ",".join(["0x{:02X}".format(d) for d in ahb_item.data])) + + if ahb_item.addr != axi_item.addr or \ + ahb_item.data != axi_item.data: + self.logger.error(msg) + self.passed = False + else: + self.logger.debug(msg) + + def final_phase(self): + if not self.passed: + self.logger.critical("{} reports a failure".format(type(self))) + assert False + # ============================================================================== @@ -499,12 +519,11 @@ class BaseEnv(uvm_env): """ def build_phase(self): - # Config ConfigDB().set(None, "*", "TEST_CLK_PERIOD", 1) ConfigDB().set(None, "*", "TEST_ITERATIONS", 50) - ConfigDB().set(None, "*", "TEST_BURST_LEN", 10) - ConfigDB().set(None, "*", "TEST_BURST_GAP", 10) + ConfigDB().set(None, "*", "TEST_BURST_LEN", 10) + ConfigDB().set(None, "*", "TEST_BURST_GAP", 10) # Sequencers self.ahb_seqr = uvm_sequencer("ahb_seqr", self) @@ -515,34 +534,44 @@ def build_phase(self): # BFM self.ahb_bfm = AHBLiteManagerBFM( - "ahb_bfm", self, uut=cocotb.top, - signal_prefix="ahb_", signal_map = { - "hclk": "clk", + "ahb_bfm", + self, + uut=cocotb.top, + signal_prefix="ahb_", + signal_map={ + "hclk": "clk", "hreset": "rst_l", - }) + }, + ) self.axi_bfm = AXI4LiteSubordinateBFM( - "axi_bfm", self, uut=cocotb.top, - signal_prefix="axi_", signal_map = { - "clk": "clk", - "rst": "rst_l", - }) + "axi_bfm", + self, + uut=cocotb.top, + signal_prefix="axi_", + signal_map={ + "clk": "clk", + "rst": "rst_l", + }, + ) # Driver self.ahb_drv = AHBLiteManagerDriver("ahb_drv", self, bfm=self.ahb_bfm) self.axi_drv = AXI4LiteSubordinateDriver("axi_drv", self, bfm=self.axi_bfm) -# # Monitor -# self.mem_mon = MemMonitor("mem_mon", self, dut=cocotb.top) -# -# # Scoreboard -# self.scoreboard = Scoreboard("scoreboard", self) + # Monitor + self.ahb_mon = AHBLiteMonitor("ahb_mon", self, bfm=self.ahb_bfm) + self.axi_mon = Axi4LiteMonitor("axi_mon", self, bfm=self.axi_bfm) + + # Scoreboard + self.scoreboard = Scoreboard("scoreboard", self) def connect_phase(self): self.ahb_drv.seq_item_port.connect(self.ahb_seqr.seq_item_export) self.axi_drv.seq_item_port.connect(self.axi_seqr.seq_item_export) -# self.mem_mon.ap.connect(self.scoreboard.fifo.analysis_export) - pass + + self.ahb_mon.ap.connect(self.scoreboard.ahb_fifo.analysis_export) + self.axi_mon.ap.connect(self.scoreboard.axi_fifo.analysis_export) # ============================================================================== @@ -587,7 +616,7 @@ async def run_phase(self): await self.do_reset() # Set common DUT signals - cocotb.top.bus_clk_en.value = 1 + cocotb.top.bus_clk_en.value = 1 cocotb.top.ahb_hreadyin.value = 1 # Wait some cycles @@ -603,4 +632,3 @@ async def run_phase(self): async def run(self): raise NotImplementedError() - From 55b4523d9db1ab870b10c8e705452d7d4e4798f3 Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 29 Nov 2023 13:02:15 +0100 Subject: [PATCH 05/12] Comprehensive AHB to AXI write tests Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- .../block/lib_ahb_to_axi4/test_write.py | 213 ++++++++++++++++-- .../block/lib_ahb_to_axi4/testbench.py | 45 ++-- verification/block/noxfile.py | 6 +- 3 files changed, 225 insertions(+), 39 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/test_write.py b/verification/block/lib_ahb_to_axi4/test_write.py index a1aabb6ff08..2bd1c1b62c2 100644 --- a/verification/block/lib_ahb_to_axi4/test_write.py +++ b/verification/block/lib_ahb_to_axi4/test_write.py @@ -4,26 +4,26 @@ import random import pyuvm -from pyuvm import * - from cocotb.triggers import ClockCycles - -from testbench import BaseEnv, BaseTest -from testbench import BusWriteItem, AXI4LiteResponseItem, AXI4LiteReadyItem +from pyuvm import * +from testbench import ( + AXI4LiteReadyItem, + AXI4LiteResponseItem, + BaseEnv, + BaseTest, + BusWriteItem, +) # ============================================================================= -class AHBWriteSequence(uvm_sequence): - """ - """ +class AHBWriteSequence(uvm_sequence): def __init__(self, name): super().__init__(name) async def body(self): - dwidth = 64 - align = 8 + align = 8 addr = 0xF0040000 + random.randrange(0, 0x1000) addr = (addr // align) * align @@ -35,14 +35,10 @@ async def body(self): class AXI4LiteWriteResponseSequence(uvm_sequence): - """ - """ - def __init__(self, name): super().__init__(name) async def body(self): - # Respond to AW and W item = AXI4LiteResponseItem(["aw", "w"]) await self.start_item(item) @@ -56,25 +52,96 @@ async def body(self): await self.start_item(item) await self.finish_item(item) -class AXI4LiteWriteReadySequence(uvm_sequence): - """ - """ +class AXI4LiteNoWriteDataResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AW only + item = AXI4LiteResponseItem(["aw"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on B + item = AXI4LiteResponseItem(["b"]) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteAddrResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to W only + item = AXI4LiteResponseItem(["w"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on B + item = AXI4LiteResponseItem(["b"]) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteResponseSequence(uvm_sequence): def __init__(self, name): super().__init__(name) async def body(self): + # Respond to AW and W, do NOT respond to B + item = AXI4LiteResponseItem(["aw", "w"]) + await self.start_item(item) + await self.finish_item(item) + + +# ============================================================================= + +class AXI4LiteWriteReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): # Become ready item = AXI4LiteReadyItem(["aw", "w"], True) await self.start_item(item) await self.finish_item(item) +class AXI4LiteNoWriteDataReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Become ready + item = AXI4LiteReadyItem(["aw"], True) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteNoWriteAddrReadySequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Become ready + item = AXI4LiteReadyItem(["w"], True) + await self.start_item(item) + await self.finish_item(item) + + # ============================================================================= -class NoBackpressureWriteSequence(uvm_sequence): +class NoBackpressureWriteSequence(uvm_sequence): async def body(self): ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") @@ -90,7 +157,6 @@ async def body(self): class BackpressureWriteSequence(uvm_sequence): - async def body(self): ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") @@ -102,8 +168,52 @@ async def body(self): await ahb_seq.start(ahb_seqr) await axi_seq.start(axi_seqr) + +class NoWriteResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteWriteReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteNoWriteResponseSequence("response") + + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class NoWriteDataResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteNoWriteDataReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteNoWriteDataResponseSequence("response") + + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + +class NoWriteAddrResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteNoWriteAddrReadySequence("ready") + ahb_seq = AHBWriteSequence("stimulus") + axi_seq = AXI4LiteNoWriteAddrResponseSequence("response") + + await axi_rdy.start(axi_seqr) + await ahb_seq.start(ahb_seqr) + await axi_seq.start(axi_seqr) + + # ============================================================================= + @pyuvm.test() class TestWriteNoBackpressure(BaseTest): """ @@ -115,14 +225,14 @@ def end_of_elaboration_phase(self): self.seq = NoBackpressureWriteSequence() async def run(self): - count = ConfigDB().get(None, "", "TEST_ITERATIONS") - gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") for i in range(count): await self.seq.start() await ClockCycles(cocotb.top.clk, gap) + @pyuvm.test() class TestWriteBackpressure(BaseTest): """ @@ -134,11 +244,70 @@ def end_of_elaboration_phase(self): self.seq = BackpressureWriteSequence() async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +# FIXME: This test is expected to fail as the AHB to AXI bridge does not wait +# for response on B channel and completely ignores it +@pyuvm.test(expect_fail=True) # FIXME: should be expect_fail=False +class TestWriteNoResponse(BaseTest): + """ + Write test with no AXI backpressure but without a response on B channel + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoWriteResponseSequence() + + async def run(self): count = ConfigDB().get(None, "", "TEST_ITERATIONS") - gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") for i in range(count): await self.seq.start() await ClockCycles(cocotb.top.clk, gap) + +@pyuvm.test(expect_error=TimeoutError) +class TestWriteNoAddrResponse(BaseTest): + """ + Write test with no AXI backpressure and no response on AW + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoWriteAddrResponseSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +# FIXME: The module ignores wready and does not wait until data gets accepted +# by the subordinate. +@pyuvm.test(expect_fail=True) # FIXME: should be expect_error=Timeout +class TestWriteNoDataResponse(BaseTest): + """ + Write test with no AXI backpressure and no response on W + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoWriteDataResponseSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index c8f4056ac17..f7a768d80c0 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -17,7 +17,10 @@ class AXI4LiteReadyItem(uvm_sequence_item): - """ """ + """ + An item describing ready signal assertion / deassertion for an AXI4 lite + channel(s) + """ def __init__(self, channels, ready=True): super().__init__("AXI4LiteReadyItem") @@ -108,13 +111,24 @@ async def _wait(self, signal, max_cycles=200): if signal.value != 0: break else: - raise RuntimeError("{} timeout".format(str(signal))) + raise TimeoutError("{} timeout".format(str(signal))) async def write(self, addr, data): """ - Issues a write transfer + Issues a write transfer. Parameter data must be a list of integers + where each one represents a full bus data word. The word count must be + one of multiplies of data bus width supported by AHB Lite. """ + hsize = { + 64 // self.dwidth: 3, + 128 // self.dwidth: 4, + 256 // self.dwidth: 5, + 512 // self.dwidth: 6, + 1024 // self.dwidth: 7, + } + assert len(data) in hsize + # Wait for reset deassertion if necessary if self.ahb_hreset.value == 0: await RisingEdge(self.ahb_hreset) @@ -123,7 +137,7 @@ async def write(self, addr, data): await RisingEdge(self.ahb_hclk) self.ahb_hsel.value = 1 self.ahb_hprot.value = 1 # Data - self.ahb_hsize.value = 3 # 64B + self.ahb_hsize.value = hsize[len(data)] self.ahb_haddr.value = addr self.ahb_hwrite.value = 1 self.ahb_htrans.value = self.HTRANS.NONSEQ.value @@ -144,8 +158,8 @@ async def write(self, addr, data): async def read(self, addr, length): """ - Issues an AHB read transfer for the given number of words. The word - count must be one of miltiplies of data bus width supported by AHB + Issues an AHB read transfer for the given number of bus data words. The + word count must be one of multiplies of data bus width supported by AHB Lite. """ @@ -203,6 +217,8 @@ async def run_phase(self): await self.bfm.write(it.addr, it.data) elif isinstance(it, BusReadItem): + # TODO: Since the intended UUT does not support burst transfers + # here we are reading only a single data word. await self.bfm.read(it.addr, 1) else: @@ -367,7 +383,7 @@ async def _wait(self, signal, max_cycles=200): if signal.value != 0: break else: - raise RuntimeError("{} timeout".format(str(signal))) + raise TimeoutError("{} timeout".format(str(signal))) async def set_ready(self, channel, ready): """ @@ -449,6 +465,7 @@ async def run_phase(self): # ============================================================================== + class Scoreboard(uvm_component): """ A scoreboard that compares AHB and AXI transfers and checks if they @@ -471,10 +488,8 @@ def connect_phase(self): self.axi_port.connect(self.axi_fifo.get_export) def check_phase(self): - # Check transactions while self.ahb_port.can_get() or self.axi_port.can_get(): - # A transaction is missing if not self.ahb_port.can_get() or not self.axi_port.can_get(): self.logger.error("A transaction is missing on one of the buses") @@ -488,18 +503,19 @@ def check_phase(self): _, axi_item = self.axi_port.try_get() # Check - msg = "AHB: {} A:0x{:08X} D:[{}], ".format( + msg = "AHB: {} A:0x{:08X} D:[{}], ".format( type(ahb_item).__name__, ahb_item.addr, - ",".join(["0x{:02X}".format(d) for d in ahb_item.data])) + ",".join(["0x{:02X}".format(d) for d in ahb_item.data]), + ) msg += "AXI: {} A:0x{:08X} D:[{}]".format( type(ahb_item).__name__, ahb_item.addr, - ",".join(["0x{:02X}".format(d) for d in ahb_item.data])) + ",".join(["0x{:02X}".format(d) for d in ahb_item.data]), + ) - if ahb_item.addr != axi_item.addr or \ - ahb_item.data != axi_item.data: + if ahb_item.addr != axi_item.addr or ahb_item.data != axi_item.data: self.logger.error(msg) self.passed = False else: @@ -510,6 +526,7 @@ def final_phase(self): self.logger.critical("{} reports a failure".format(type(self))) assert False + # ============================================================================== diff --git a/verification/block/noxfile.py b/verification/block/noxfile.py index 924137e026e..86bada73591 100644 --- a/verification/block/noxfile.py +++ b/verification/block/noxfile.py @@ -266,20 +266,20 @@ def dccm_verify(session, blockName, testName, coverage): def lib_axi4_to_ahb_verify(session, blockName, testName, coverage): verify_block(session, blockName, testName, coverage) + @nox.session(tags=["tests"]) @nox.parametrize("blockName", ["lib_ahb_to_axi4"]) @nox.parametrize( "testName", [ - "test_axi", -# "test_axi_read_channel", -# "test_axi_write_channel", + "test_write", ], ) @nox.parametrize("coverage", coverageTypes) def lib_ahb_to_axi4_verify(session, blockName, testName, coverage): verify_block(session, blockName, testName, coverage) + @nox.session(tags=["tests"]) @nox.parametrize("blockName", ["pmp"]) @nox.parametrize( From 73cfed302e217cb5ba2f63c524fc8ec3aec59a5a Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 29 Nov 2023 16:04:38 +0100 Subject: [PATCH 06/12] Add AHB read tests Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/common/axi.py | 3 +- .../block/lib_ahb_to_axi4/test_read.py | 191 +++++++++++++++--- .../block/lib_ahb_to_axi4/testbench.py | 28 ++- verification/block/noxfile.py | 1 + 4 files changed, 194 insertions(+), 29 deletions(-) diff --git a/verification/block/common/axi.py b/verification/block/common/axi.py index d374a5ab90b..b2046d8f79b 100644 --- a/verification/block/common/axi.py +++ b/verification/block/common/axi.py @@ -44,7 +44,7 @@ class Transfer: def __init__(self, tid, addr=None): self.tid = tid self.addr = addr - self.data = None + self.data = bytearray() def __init__(self, *args, **kwargs): self.bfm = kwargs["bfm"] @@ -85,6 +85,7 @@ async def watch_write(self): Watches the bus for writes """ xfers = dict() + awid = None # Main loop while True: diff --git a/verification/block/lib_ahb_to_axi4/test_read.py b/verification/block/lib_ahb_to_axi4/test_read.py index d464794664f..db471b845a3 100644 --- a/verification/block/lib_ahb_to_axi4/test_read.py +++ b/verification/block/lib_ahb_to_axi4/test_read.py @@ -4,57 +4,198 @@ import random import pyuvm +from cocotb.triggers import ClockCycles, Combine from pyuvm import * +from testbench import ( + AXI4LiteReadyItem, + AXI4LiteResponseItem, + BaseEnv, + BaseTest, + BusReadItem, +) + +# ============================================================================= + + +class AHBReadSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + align = 8 + + addr = 0xF0040000 + random.randrange(0, 0x1000) + addr = (addr // align) * align + + item = BusReadItem(addr) + await self.start_item(item) + await self.finish_item(item) + + +class AXI4LiteReadResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AR + item = AXI4LiteResponseItem(["ar"]) + await self.start_item(item) + await self.finish_item(item) + + # Emulate latency + await ClockCycles(cocotb.top.clk, 2) + + # Respond on R + item = AXI4LiteResponseItem(["r"]) + await self.start_item(item) + await self.finish_item(item) -from cocotb.triggers import ClockCycles -from testbench import BaseEnv, BaseTest -from testbench import BusWriteItem, BusReadItem +class AXI4LiteNoReadDataResponseSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + async def body(self): + # Respond to AR but not to R + item = AXI4LiteResponseItem(["ar"]) + await self.start_item(item) + await self.finish_item(item) + # ============================================================================= -class AHBReadSequence(uvm_sequence): - """ - """ +class AXI4LiteReadReadySequence(uvm_sequence): def __init__(self, name): super().__init__(name) async def body(self): + # Become ready + item = AXI4LiteReadyItem(["ar"], True) + await self.start_item(item) + await self.finish_item(item) - count = ConfigDB().get(None, "", "TEST_ITERATIONS") - gap = ConfigDB().get(None, "", "TEST_BURST_GAP") - dwidth = 64 - align = 8 +# ============================================================================= - for i in range(count): - addr = 0xF0040000 + random.randrange(0, 0x1000) - addr = (addr // align) * align - # Issue a single read, the converter module does not support - # bursts - item = BusReadItem(addr) +async def later(cr, cycles): + """ + A helper function to start a task after a number of clock cycles + """ + await ClockCycles(cocotb.top.clk, cycles) + await cr - await self.start_item(item) - await self.finish_item(item) - await ClockCycles(cocotb.top.clk, gap) +class NoBackpressureReadSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteReadReadySequence("ready") + ahb_seq = AHBReadSequence("stimulus") + axi_seq = AXI4LiteReadResponseSequence("response") + + # Issue am AHB read and do a correct AXI response + await axi_rdy.start(axi_seqr) + + tasks = [ + cocotb.start_soon(ahb_seq.start(ahb_seqr)), + cocotb.start_soon(axi_seq.start(axi_seqr)), + ] + await Combine(*tasks) + + +class BackpressureReadSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + ahb_seq = AHBReadSequence("stimulus") + axi_seq = AXI4LiteReadResponseSequence("response") + + # Issue am AHB read and do a correct AXI response + tasks = [ + cocotb.start_soon(ahb_seq.start(ahb_seqr)), + cocotb.start_soon(later(axi_seq.start(axi_seqr), 5)), + ] + await Combine(*tasks) + + +class NoReadDataResponseSequence(uvm_sequence): + async def body(self): + ahb_seqr = ConfigDB().get(None, "", "AHB_SEQR") + axi_seqr = ConfigDB().get(None, "", "AXI_SEQR") + + axi_rdy = AXI4LiteReadReadySequence("ready") + ahb_seq = AHBReadSequence("stimulus") + axi_seq = AXI4LiteNoReadDataResponseSequence("response") + + # Issue am AHB read and do a correct AXI response + await axi_rdy.start(axi_seqr) + + tasks = [ + cocotb.start_soon(ahb_seq.start(ahb_seqr)), + cocotb.start_soon(axi_seq.start(axi_seqr)), + ] + await Combine(*tasks) + # ============================================================================= + +@pyuvm.test() +class TestReadNoBackpressure(BaseTest): + """ + Read test with no AXI backpressure + """ + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = NoBackpressureReadSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + @pyuvm.test() -class TestRead(BaseTest): +class TestReadBackpressure(BaseTest): """ - AHB Lite to AXI4 Lite read test + Read test with AXI backpressure """ - def __init__(self, name, parent): - super().__init__(name, parent) + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = BackpressureReadSequence() + + async def run(self): + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) + + +@pyuvm.test(expect_error=TimeoutError) +class TestReadNoDataResponse(BaseTest): + """ + Read test with response on AR channel but not on R channel + """ def end_of_elaboration_phase(self): super().end_of_elaboration_phase() - self.seq = AHBReadSequence("stimulus") + self.seq = NoReadDataResponseSequence() async def run(self): - await self.seq.start(self.env.ahb_seqr) + count = ConfigDB().get(None, "", "TEST_ITERATIONS") + gap = ConfigDB().get(None, "", "TEST_BURST_GAP") + + for i in range(count): + await self.seq.start() + await ClockCycles(cocotb.top.clk, gap) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index f7a768d80c0..0ae1607b496 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -268,7 +268,7 @@ async def watch(self): if hwrite: data = collect_bytes(self.bfm.ahb_hwdata) else: - data = collect_bytes(self.bfm.ahb_hwdata) + data = collect_bytes(self.bfm.ahb_hrdata) hdata += data # Transfer end @@ -428,6 +428,26 @@ async def respond_b(self): await RisingEdge(self.axi_clk) self.axi_bvalid.value = 0 + async def respond_ar(self): + # Assert arready + self.axi_arready.value = 1 + # Wait for arvalid + await self._wait(self.axi_arvalid) + + # Deassert arready + self.axi_arready.value = 0 + + async def respond_r(self): + # Wait for rready + await self._wait(self.axi_rready) + + # Transmitt data + self.axi_rvalid.value = 1 + self.axi_rdata.value = random.randrange(0, (1 << self.dwidth) - 1) + + await RisingEdge(self.axi_clk) + self.axi_rvalid.value = 0 + class AXI4LiteSubordinateDriver(uvm_driver): """ @@ -444,6 +464,8 @@ async def run_phase(self): "aw": self.bfm.respond_aw, "w": self.bfm.respond_w, "b": self.bfm.respond_b, + "ar": self.bfm.respond_ar, + "r": self.bfm.respond_r, } while True: @@ -511,8 +533,8 @@ def check_phase(self): msg += "AXI: {} A:0x{:08X} D:[{}]".format( type(ahb_item).__name__, - ahb_item.addr, - ",".join(["0x{:02X}".format(d) for d in ahb_item.data]), + axi_item.addr, + ",".join(["0x{:02X}".format(d) for d in axi_item.data]), ) if ahb_item.addr != axi_item.addr or ahb_item.data != axi_item.data: diff --git a/verification/block/noxfile.py b/verification/block/noxfile.py index 86bada73591..1cafcf8347a 100644 --- a/verification/block/noxfile.py +++ b/verification/block/noxfile.py @@ -273,6 +273,7 @@ def lib_axi4_to_ahb_verify(session, blockName, testName, coverage): "testName", [ "test_write", + "test_read", ], ) @nox.parametrize("coverage", coverageTypes) From 88eb36dee4d4111913238812c1f3cc8e391551cf Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 6 Dec 2023 14:28:22 +0100 Subject: [PATCH 07/12] Commentary cleanup Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- .../block/lib_ahb_to_axi4/test_read.py | 9 ++++--- .../block/lib_ahb_to_axi4/test_write.py | 10 ++++--- .../block/lib_ahb_to_axi4/testbench.py | 27 +++++-------------- 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/test_read.py b/verification/block/lib_ahb_to_axi4/test_read.py index db471b845a3..ee6dbe9a773 100644 --- a/verification/block/lib_ahb_to_axi4/test_read.py +++ b/verification/block/lib_ahb_to_axi4/test_read.py @@ -96,7 +96,7 @@ async def body(self): ahb_seq = AHBReadSequence("stimulus") axi_seq = AXI4LiteReadResponseSequence("response") - # Issue am AHB read and do a correct AXI response + # Issue an AHB read and do a correct AXI response await axi_rdy.start(axi_seqr) tasks = [ @@ -114,7 +114,7 @@ async def body(self): ahb_seq = AHBReadSequence("stimulus") axi_seq = AXI4LiteReadResponseSequence("response") - # Issue am AHB read and do a correct AXI response + # Issue an AHB read and do a correct AXI response tasks = [ cocotb.start_soon(ahb_seq.start(ahb_seqr)), cocotb.start_soon(later(axi_seq.start(axi_seqr), 5)), @@ -131,7 +131,7 @@ async def body(self): ahb_seq = AHBReadSequence("stimulus") axi_seq = AXI4LiteNoReadDataResponseSequence("response") - # Issue am AHB read and do a correct AXI response + # Issue an AHB read and do a correct AXI response await axi_rdy.start(axi_seqr) tasks = [ @@ -185,7 +185,8 @@ async def run(self): @pyuvm.test(expect_error=TimeoutError) class TestReadNoDataResponse(BaseTest): """ - Read test with response on AR channel but not on R channel + Read test with response on AR channel but not on R channel. A timeout should + occur due to lack of the response. """ def end_of_elaboration_phase(self): diff --git a/verification/block/lib_ahb_to_axi4/test_write.py b/verification/block/lib_ahb_to_axi4/test_write.py index 2bd1c1b62c2..3eafc97dd0a 100644 --- a/verification/block/lib_ahb_to_axi4/test_write.py +++ b/verification/block/lib_ahb_to_axi4/test_write.py @@ -150,7 +150,7 @@ async def body(self): ahb_seq = AHBWriteSequence("stimulus") axi_seq = AXI4LiteWriteResponseSequence("response") - # Issue am AHB write and do a correct AXI response + # Issue an AHB write and do a correct AXI response await axi_rdy.start(axi_seqr) await ahb_seq.start(ahb_seqr) await axi_seq.start(axi_seqr) @@ -164,7 +164,7 @@ async def body(self): ahb_seq = AHBWriteSequence("stimulus") axi_seq = AXI4LiteWriteResponseSequence("response") - # Issue am AHB write and do a correct AXI response + # Issue an AHB write and do a correct AXI response await ahb_seq.start(ahb_seqr) await axi_seq.start(axi_seqr) @@ -276,7 +276,8 @@ async def run(self): @pyuvm.test(expect_error=TimeoutError) class TestWriteNoAddrResponse(BaseTest): """ - Write test with no AXI backpressure and no response on AW + Write test with no AXI backpressure and no response on AW. A timeout should + occur due to lack of the response. """ def end_of_elaboration_phase(self): @@ -297,7 +298,8 @@ async def run(self): @pyuvm.test(expect_fail=True) # FIXME: should be expect_error=Timeout class TestWriteNoDataResponse(BaseTest): """ - Write test with no AXI backpressure and no response on W + Write test with no AXI backpressure and no response on W. A timeout should + occur due to lack of the response. """ def end_of_elaboration_phase(self): diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index 0ae1607b496..7c5c318631b 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -82,7 +82,6 @@ class HBURST(Enum): def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): super().__init__(name, parent) - # Collect signals collect_signals( self.SIGNALS, uut, @@ -136,12 +135,12 @@ async def write(self, addr, data): # Address phase await RisingEdge(self.ahb_hclk) self.ahb_hsel.value = 1 - self.ahb_hprot.value = 1 # Data + self.ahb_hprot.value = 1 # Indicates a data transfer self.ahb_hsize.value = hsize[len(data)] self.ahb_haddr.value = addr self.ahb_hwrite.value = 1 self.ahb_htrans.value = self.HTRANS.NONSEQ.value - self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? + self.ahb_hburst.value = self.HBURST.INCR.value await self._wait(self.ahb_hreadyout) # Data phase @@ -184,7 +183,7 @@ async def read(self, addr, length): self.ahb_haddr.value = addr self.ahb_hwrite.value = 0 self.ahb_htrans.value = self.HTRANS.NONSEQ.value - self.ahb_hburst.value = self.HBURST.INCR.value # TODO: Others? + self.ahb_hburst.value = self.HBURST.INCR.value await self._wait(self.ahb_hreadyout) # Data phase @@ -397,51 +396,37 @@ async def set_ready(self, channel, ready): sig.value = int(ready) async def respond_aw(self): - # Assert awready self.axi_awready.value = 1 - # Wait for awvalid await self._wait(self.axi_awvalid) - # Deassert awready self.axi_awready.value = 0 async def respond_w(self): - # Assert wready self.axi_wready.value = 1 - - # Wait for valid, receive data for i in range(1): await self._wait(self.axi_wvalid) - # Deassert wready self.axi_wready.value = 0 async def respond_b(self): - # Wait for bready await self._wait(self.axi_bready) - # Transmitt acknowledge self.axi_bvalid.value = 1 - self.axi_bid.value = 0 # TODO - self.axi_bresp.value = 0 # TODO + self.axi_bid.value = 0 # TODO: support providing different BID values + self.axi_bresp.value = 0 # TODO: support providing different BRESP values await RisingEdge(self.axi_clk) self.axi_bvalid.value = 0 async def respond_ar(self): - # Assert arready self.axi_arready.value = 1 - # Wait for arvalid await self._wait(self.axi_arvalid) - # Deassert arready self.axi_arready.value = 0 async def respond_r(self): - # Wait for rready await self._wait(self.axi_rready) - # Transmitt data self.axi_rvalid.value = 1 self.axi_rdata.value = random.randrange(0, (1 << self.dwidth) - 1) @@ -491,7 +476,7 @@ async def run_phase(self): class Scoreboard(uvm_component): """ A scoreboard that compares AHB and AXI transfers and checks if they - refer to the same address and containd the same data. + refer to the same address and contain the same data. """ def __init__(self, name, parent): From cb1d937383529364201cd4a653def1d62c9d7197 Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 6 Dec 2023 14:28:56 +0100 Subject: [PATCH 08/12] Remove "rlast" from AXI interfase since not used, fixed condition in wait() function Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/lib_ahb_to_axi4/testbench.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index 7c5c318631b..89661fccf69 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -107,7 +107,7 @@ async def _wait(self, signal, max_cycles=200): for i in range(max_cycles): await RisingEdge(self.ahb_hclk) - if signal.value != 0: + if signal.value == 1: break else: raise TimeoutError("{} timeout".format(str(signal))) @@ -338,7 +338,6 @@ class AXI4LiteSubordinateBFM(uvm_component): "rid", "rdata", "rresp", - "rlast", ] def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): From 69808e74f53aa109365524559dc94f816b4d31c4 Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 6 Dec 2023 14:33:41 +0100 Subject: [PATCH 09/12] Use non-burst (SINGLE) AHB transfers Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/lib_ahb_to_axi4/testbench.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index 89661fccf69..b1989d0d736 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -140,7 +140,7 @@ async def write(self, addr, data): self.ahb_haddr.value = addr self.ahb_hwrite.value = 1 self.ahb_htrans.value = self.HTRANS.NONSEQ.value - self.ahb_hburst.value = self.HBURST.INCR.value + self.ahb_hburst.value = self.HBURST.SINGLE.value await self._wait(self.ahb_hreadyout) # Data phase @@ -183,7 +183,7 @@ async def read(self, addr, length): self.ahb_haddr.value = addr self.ahb_hwrite.value = 0 self.ahb_htrans.value = self.HTRANS.NONSEQ.value - self.ahb_hburst.value = self.HBURST.INCR.value + self.ahb_hburst.value = self.HBURST.SINGLE.value await self._wait(self.ahb_hreadyout) # Data phase From f4bf4db3c6d6e0dbbd06a6e3fcf949653694006c Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 6 Dec 2023 14:39:04 +0100 Subject: [PATCH 10/12] Code refactoring Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- .../block/lib_ahb_to_axi4/testbench.py | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index b1989d0d736..9af630cd968 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -95,6 +95,15 @@ def __init__(self, name, parent, uut, signal_prefix="", signal_map=None): self.awidth = len(self.ahb_haddr) self.dwidth = len(self.ahb_hwdata) # Assuming hrdata is the same + # Calculate HSIZE encoding + self.hsize = { + 64 // self.dwidth: 3, + 128 // self.dwidth: 4, + 256 // self.dwidth: 5, + 512 // self.dwidth: 6, + 1024 // self.dwidth: 7, + } + self.logger.debug("AHB Lite manager BFM:") self.logger.debug(" awidth = {}".format(self.awidth)) self.logger.debug(" dwidth = {}".format(self.dwidth)) @@ -119,14 +128,8 @@ async def write(self, addr, data): one of multiplies of data bus width supported by AHB Lite. """ - hsize = { - 64 // self.dwidth: 3, - 128 // self.dwidth: 4, - 256 // self.dwidth: 5, - 512 // self.dwidth: 6, - 1024 // self.dwidth: 7, - } - assert len(data) in hsize + lnt = len(data) + assert lnt in self.hsize # Wait for reset deassertion if necessary if self.ahb_hreset.value == 0: @@ -136,7 +139,7 @@ async def write(self, addr, data): await RisingEdge(self.ahb_hclk) self.ahb_hsel.value = 1 self.ahb_hprot.value = 1 # Indicates a data transfer - self.ahb_hsize.value = hsize[len(data)] + self.ahb_hsize.value = self.hsize[lnt] self.ahb_haddr.value = addr self.ahb_hwrite.value = 1 self.ahb_htrans.value = self.HTRANS.NONSEQ.value @@ -145,7 +148,7 @@ async def write(self, addr, data): # Data phase for i, word in enumerate(data): - if i != len(data) - 1: + if i != lnt - 1: addr += self.dwidth // 8 self.ahb_haddr.value = addr self.ahb_htrans.value = self.HTRANS.SEQ.value @@ -162,14 +165,7 @@ async def read(self, addr, length): Lite. """ - hsize = { - 64 // self.dwidth: 3, - 128 // self.dwidth: 4, - 256 // self.dwidth: 5, - 512 // self.dwidth: 6, - 1024 // self.dwidth: 7, - } - assert length in hsize + assert length in self.hsize # Wait for reset deassertion if necessary if self.ahb_hreset.value == 0: @@ -179,7 +175,7 @@ async def read(self, addr, length): await RisingEdge(self.ahb_hclk) self.ahb_hsel.value = 1 self.ahb_hprot.value = 1 # Data - self.ahb_hsize.value = hsize[length] + self.ahb_hsize.value = self.hsize[length] self.ahb_haddr.value = addr self.ahb_hwrite.value = 0 self.ahb_htrans.value = self.HTRANS.NONSEQ.value From 956d1b1cb2792543b9fcf63bd14a7c546753e425 Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 6 Dec 2023 14:43:35 +0100 Subject: [PATCH 11/12] Move setting PYTHONPATH from noxfile.py to common.mk Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/common.mk | 3 +++ verification/block/noxfile.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/verification/block/common.mk b/verification/block/common.mk index 4d68b936694..99e53a58c64 100644 --- a/verification/block/common.mk +++ b/verification/block/common.mk @@ -10,6 +10,9 @@ CURDIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST)))) CFGDIR := $(abspath $(CURDIR)/snapshots/default) CONFIG := $(abspath $(CURDIR)/../../configs) +# Set pythonpath so that tests can access common modules +export PYTHONPATH := $(CURDIR)/common + # Common sources COMMON_SOURCES = $(CFGDIR)/common_defines.vh COMMON_SOURCES += $(CFGDIR)/el2_pdef.vh diff --git a/verification/block/noxfile.py b/verification/block/noxfile.py index 1cafcf8347a..b6c5b6841bf 100644 --- a/verification/block/noxfile.py +++ b/verification/block/noxfile.py @@ -89,7 +89,6 @@ def isSimFailure( def verify_block(session, blockName, testName, coverage=""): session.install("-r", pipRequirementsPath) - session.env["PYTHONPATH"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "common")) testPath = os.path.join(blockPath, blockName) testNameXML = os.path.join(testName + ".xml") testNameXMLPath = os.path.join(testPath, testNameXML) From 5544790ae410a306913d0dec730dd8b4a1045b36 Mon Sep 17 00:00:00 2001 From: Maciej Kurc Date: Wed, 6 Dec 2023 14:48:48 +0100 Subject: [PATCH 12/12] Refactor AHB vs. AXI4 PyUVM monitor class Internal-tag: [#51803] Signed-off-by: Maciej Kurc --- verification/block/lib_ahb_to_axi4/testbench.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/verification/block/lib_ahb_to_axi4/testbench.py b/verification/block/lib_ahb_to_axi4/testbench.py index 9af630cd968..9fa840b11ac 100644 --- a/verification/block/lib_ahb_to_axi4/testbench.py +++ b/verification/block/lib_ahb_to_axi4/testbench.py @@ -138,7 +138,7 @@ async def write(self, addr, data): # Address phase await RisingEdge(self.ahb_hclk) self.ahb_hsel.value = 1 - self.ahb_hprot.value = 1 # Indicates a data transfer + self.ahb_hprot.value = 1 # Indicates a data transfer self.ahb_hsize.value = self.hsize[lnt] self.ahb_haddr.value = addr self.ahb_hwrite.value = 1 @@ -491,13 +491,7 @@ def connect_phase(self): def check_phase(self): # Check transactions - while self.ahb_port.can_get() or self.axi_port.can_get(): - # A transaction is missing - if not self.ahb_port.can_get() or not self.axi_port.can_get(): - self.logger.error("A transaction is missing on one of the buses") - self.passed = False - break - + while self.ahb_port.can_get() and self.axi_port.can_get(): self.passed = True # Get items @@ -523,6 +517,12 @@ def check_phase(self): else: self.logger.debug(msg) + # Indicate an error if there is any leftover transaction in any of the + # queues. + if self.ahb_port.can_get() or self.axi_port.can_get(): + self.logger.error("Spurious transaction(s) on one of the buses") + self.passed = False + def final_phase(self): if not self.passed: self.logger.critical("{} reports a failure".format(type(self)))