Skip to content

Commit

Permalink
Merge pull request #157 from antmicro/mkurc/51803-ahb-to-axi-tests
Browse files Browse the repository at this point in the history
AHB to AXI4 Lite converter tests
  • Loading branch information
tmichalak authored Jan 10, 2024
2 parents 41103f3 + 5544790 commit 685da9e
Show file tree
Hide file tree
Showing 9 changed files with 1,462 additions and 226 deletions.
3 changes: 3 additions & 0 deletions verification/block/common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ CURDIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST))))
CFGDIR := $(abspath $(CURDIR)/snapshots/default)
CONFIG := $(abspath $(CURDIR)/../../configs)

# Set pythonpath so that tests can access common modules
export PYTHONPATH := $(CURDIR)/common

# Common sources
COMMON_SOURCES = $(CFGDIR)/common_defines.vh
COMMON_SOURCES += $(CFGDIR)/el2_pdef.vh
Expand Down
186 changes: 186 additions & 0 deletions verification/block/common/axi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright (c) 2023 Antmicro
# SPDX-License-Identifier: Apache-2.0

from cocotb.triggers import RisingEdge
from pyuvm import *
from utils import collect_bytes

# ==============================================================================


class BusWriteItem(uvm_sequence_item):
"""
A generic data bus write request / response
"""

def __init__(self, addr, data, resp=None):
super().__init__("BusWriteItem")
self.addr = addr
self.data = data
self.resp = resp


class BusReadItem(uvm_sequence_item):
"""
A generic data bus read request / response
"""

def __init__(self, addr, data=None, resp=None):
super().__init__("BusReadItem")
self.addr = addr
self.data = data
self.resp = resp


# ==============================================================================


class Axi4LiteMonitor(uvm_component):
"""
A monitor for AXI4 lite bus
"""

class Transfer:
def __init__(self, tid, addr=None):
self.tid = tid
self.addr = addr
self.data = bytearray()

def __init__(self, *args, **kwargs):
self.bfm = kwargs["bfm"]
del kwargs["bfm"]
super().__init__(*args, **kwargs)

def build_phase(self):
self.ap = uvm_analysis_port("ap", self)

def _aw_active(self):
return self.bfm.axi_awready.value != 0 and self.bfm.axi_awvalid.value != 0

def _w_active(self):
return self.bfm.axi_wready.value != 0 and self.bfm.axi_wvalid.value != 0

def _ar_active(self):
return self.bfm.axi_arready.value != 0 and self.bfm.axi_arvalid.value != 0

def _r_active(self):
return self.bfm.axi_rready.value != 0 and self.bfm.axi_rvalid.value != 0

def _b_active(self):
return self.bfm.axi_bready.value != 0 and self.bfm.axi_bvalid.value != 0

def _sample_w(self):
return collect_bytes(
self.bfm.axi_wdata,
self.bfm.axi_wstrb,
)

def _sample_r(self):
return collect_bytes(
self.bfm.axi_rdata,
)

async def watch_write(self):
"""
Watches the bus for writes
"""
xfers = dict()
awid = None

# Main loop
while True:
# Wait for clock
await RisingEdge(self.bfm.axi_clk)

# A new write request
if self._aw_active():
addr = int(self.bfm.axi_awaddr.value)
awid = int(self.bfm.axi_awid.value)

if awid in xfers:
self.logger.error(
"Write request for a pending transaction, awid={}".format(awid)
)

else:
xfers[awid] = self.Transfer(awid, addr)

# Data (for the last seen awid)
if self._w_active():
if awid not in xfers:
self.logger.error("Data write but no transaction is pending")

else:
xfer = xfers[awid]
xfer.data = self._sample_w()

# Write completion
if self._b_active():
bresp = int(self.bfm.axi_bresp.value)
bid = int(self.bfm.axi_bid.value)

if bid not in xfers:
self.logger.error("Response for a non-pending transaction, bid={}".format(bid))

else:
xfer = xfers[bid]
del xfers[bid]

self.ap.write(BusWriteItem(xfer.addr, xfer.data, bresp))

self.logger.debug(
"WR: 0x{:08X} {} 0b{:03b}".format(
xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], bresp
)
)

async def watch_read(self):
"""
Watches the bus for reads
"""
xfers = dict()

# Main loop
while True:
# Wait for clock
await RisingEdge(self.bfm.axi_clk)

# A new read request
if self._ar_active():
addr = int(self.bfm.axi_araddr.value)
arid = int(self.bfm.axi_arid.value)

if arid in xfers:
self.logger.error(
"Read request for a pending transaction, arid={}".format(awid)
)

else:
xfers[arid] = self.Transfer(arid, addr)

# Read completion
if self._r_active():
rresp = int(self.bfm.axi_rresp.value)
rid = int(self.bfm.axi_rid.value)

if rid not in xfers:
self.logger.error("Data read but no transaction is pending")

else:
xfer = xfers[rid]
xfer.data = self._sample_r()

del xfers[rid]

self.ap.write(BusReadItem(xfer.addr, xfer.data, rresp))

self.logger.debug(
"RD: 0x{:08X} {} 0b{:03b}".format(
xfer.addr, ["0x{:02X}".format(b) for b in xfer.data], rresp
)
)

async def run_phase(self):
# Start read & write watchers
cocotb.start_soon(self.watch_write())
cocotb.start_soon(self.watch_read())
48 changes: 48 additions & 0 deletions verification/block/common/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2023 Antmicro
# SPDX-License-Identifier: Apache-2.0
import logging

# ==============================================================================


def collect_signals(signals, uut, obj, uut_prefix="", obj_prefix="", signal_map=None):
"""
Collects signal objects from UUT and attaches them to the given object.
Optionally UUT signals can be prefixed with the uut_prefix and object
signals with the obj_prefix. If signal_map is given it should be a dict
mapping signal names to actual UUT signal names.
"""

for sig in signals:
if signal_map is not None:
uut_sig = signal_map.get(sig, uut_prefix + sig)
else:
uut_sig = uut_prefix + sig
obj_sig = obj_prefix + sig
if hasattr(uut, uut_sig):
s = getattr(uut, uut_sig)

else:
s = None
logging.error("Module {} does not have a signal '{}'".format(str(uut), sig))

setattr(obj, obj_sig, s)


def collect_bytes(data, strb=None):
"""
Collects data bytes asserted on a data bus. Uses the strb value to
determine which octets are valid. Both data and strb must be cocotb
signals. strb can be None.
"""

if strb is not None:
assert len(data) == 8 * len(strb)

res = []
for i in range(len(data) // 8):
if strb is None or strb.value & (1 << i):
dat = (int(data.value) >> (8 * i)) & 0xFF
res.append(dat)

return bytes(res)
Loading

0 comments on commit 685da9e

Please sign in to comment.