tests: refactor to use Amaranth-native API

This commit is contained in:
Fischer Moseley 2024-08-01 07:07:37 -07:00
parent b20d7c7822
commit a01b6981e2
9 changed files with 185 additions and 217 deletions

View File

@ -59,7 +59,7 @@ class IOCore(MantaCore):
for name, width in inputs.items():
if not isinstance(name, str):
raise ValueError(
f'Input probe "{name}" has invalid name, names must be strings.'
f"Input probe '{name}' has invalid name, names must be strings."
)
if not isinstance(width, int):
@ -75,15 +75,15 @@ class IOCore(MantaCore):
for name, attrs in outputs.items():
if not isinstance(name, str):
raise ValueError(
f'Output probe "{name}" has invalid name, names must be strings.'
f"Output probe '{name}' has invalid name, names must be strings."
)
if not isinstance(attrs, int) and not isinstance(attrs, dict):
raise ValueError(f'Unrecognized format for output probe "{name}".')
raise ValueError(f"Unrecognized format for output probe '{name}'.")
if isinstance(attrs, int):
if not attrs > 0:
raise ValueError(f'Output probe "{name}" must have positive width.')
raise ValueError(f"Output probe '{name}' must have positive width.")
width = attrs
initial_value = 0
@ -93,17 +93,17 @@ class IOCore(MantaCore):
valid_options = ["width", "initial_value"]
for option in attrs:
if option not in valid_options:
warn(f'Ignoring unrecognized option "{option}" in IO core.')
warn(f"Ignoring unrecognized option '{option}' in IO core.")
# Check that widths are appropriate
if "width" not in attrs:
raise ValueError(f"No width specified for output probe {name}.")
raise ValueError(f"No width specified for output probe '{name}'.")
if not isinstance(attrs["width"], int):
raise ValueError(f'Output probe "{name}" must have integer width.')
raise ValueError(f"Output probe '{name}' must have integer width.")
if not attrs["width"] > 0:
raise ValueError(f'Input probe "{name}" must have positive width.')
raise ValueError(f"Input probe '{name}' must have positive width.")
width = attrs["width"]
@ -192,28 +192,42 @@ class IOCore(MantaCore):
return m
def set_probe(self, name, value):
def set_probe(self, probe, value):
"""
Set the value of an output probe on the FPGA. The value may be either
an unsigned or signed integer, but must fit within the width of the
probe.
"""
# Check that probe exists in memory map
probe = self._memory_map.get(name)
if not probe:
raise KeyError(f"Probe '{name}' not found in IO core.")
# This function accepts either the name of an output probe, or a
# Signal() object that is the output probe itself.
if isinstance(probe, str):
# The name passed should occur exactly once in the output probes
probes = [o for o in self._outputs if o.name == probe]
if len(probes) == 0:
raise ValueError(f"Probe '{probe}' is not an output of the IO core.")
if len(probes) > 1:
raise ValueError(f"Multiple probes found in IO core for name {probe}.")
return self.set_probe(probes[0], value)
# Check that the probe is an output
if not any([o.name == name for o in self._outputs]):
raise KeyError(f"Probe '{name}' is not an output of the IO core.")
probes = [o for o in self._outputs if probe is o]
if len(probes) == 0:
raise KeyError(f"Probe '{probe.name}' is not an output of the IO core.")
if len(probes) > 1:
raise ValueError(
f"Multiple output probes found in IO core for name '{probe.name}'."
)
# Check that value isn't too big for the register
n_bits = sum([len(s) for s in probe["signals"]])
check_value_fits_in_bits(value, n_bits)
check_value_fits_in_bits(value, len(probe))
# Write value to core
addrs = probe["addrs"]
addrs = self._memory_map[probe.name]["addrs"]
datas = value_to_words(value, len(addrs))
self.interface.write(addrs, datas)
@ -222,7 +236,7 @@ class IOCore(MantaCore):
self.interface.write(self.base_addr, 1)
self.interface.write(self.base_addr, 0)
def get_probe(self, name):
def get_probe(self, probe):
"""
Get the present value of a probe on the FPGA, which is returned as an
unsigned integer. This function may be called on both input and output
@ -230,10 +244,35 @@ class IOCore(MantaCore):
(or their initial value, if no value has been written to them yet).
"""
# Check that probe exists in memory map
probe = self._memory_map.get(name)
if not probe:
raise KeyError(f"Probe with name {name} not found in IO core.")
# This function accepts either the name of an output probe, or a
# Signal() object that is the output probe itself.
if isinstance(probe, str):
# The name passed should occur exactly once in the probes
probes = [o for o in self._outputs if o.name == probe]
probes += [i for i in self._inputs if i.name == probe]
if len(probes) == 0:
raise ValueError(f"Probe with name '{probe}' not found in IO core.")
if len(probes) > 1:
raise ValueError(
f"Multiple probes found in IO core for name '{probe}'."
)
return self.get_probe(probes[0])
# Check that probe exists in core
probes = [o for o in self._outputs if probe is o]
probes += [i for i in self._inputs if probe is i]
if len(probes) == 0:
raise KeyError(f"Probe with name '{probe.name}' not found in IO core.")
if len(probes) > 1:
raise ValueError(
f"Multiple probes found in IO core for name '{probe.name}'."
)
# Pulse strobe register
self.interface.write(self.base_addr, 0)
@ -241,5 +280,5 @@ class IOCore(MantaCore):
self.interface.write(self.base_addr, 0)
# Get value from buffer
datas = self.interface.read(probe["addrs"])
datas = self.interface.read(self._memory_map[probe.name]["addrs"])
return words_to_value(datas)

View File

@ -15,8 +15,6 @@ class LogicAnalyzerCapture:
self._trigger_mode = trigger_mode
self._data = data
print(self._trigger_mode)
def get_trigger_location(self):
"""
Returns the location of the trigger in the capture. This will match the

View File

@ -2,6 +2,8 @@ from manta import Manta
from amaranth.lib import io
from amaranth_boards.nexys4ddr import Nexys4DDRPlatform
from amaranth_boards.icestick import ICEStickPlatform
from manta.io_core import IOCore
from manta.uart import UARTInterface
from manta.utils import *
import pytest
from random import getrandbits
@ -13,70 +15,44 @@ class IOCoreLoopbackTest(Elaboratable):
self.platform = platform
self.port = port
self.config = self.platform_specific_config()
self.manta = Manta(self.config)
def platform_specific_config(self):
return {
"cores": {
"io_core": {
"type": "io",
"inputs": {"probe0": 1, "probe1": 2, "probe2": 8, "probe3": 20},
"outputs": {
"probe4": {"width": 1, "initial_value": 1},
"probe5": {
"width": 2,
"initial_value": 2,
},
"probe6": 8,
"probe7": {"width": 20, "initial_value": 65538},
},
}
},
"uart": {
"port": self.port,
"baudrate": 3e6,
"clock_freq": self.platform.default_clk_frequency,
},
}
def get_probe(self, name):
# This is a hack! And should be removed once the full Amaranth-native
# API is built out
for i in self.manta.io_core._inputs:
if i.name == name:
return i
for o in self.manta.io_core._outputs:
if o.name == name:
return o
return None
def elaborate(self, platform):
# Since we know that all the tests will be called only after the FPGA
# is programmed, we can just push all the wiring into the elaborate
# method, instead of needing to define Manta in the __init__() method
probe0 = Signal()
probe1 = Signal(2)
probe2 = Signal(8)
probe3 = Signal(20)
probe4 = Signal(init=1)
probe5 = Signal(2, init=2)
probe6 = Signal(8)
probe7 = Signal(20, init=65538)
self.manta = manta = Manta()
manta.cores.io = IOCore(
inputs=[probe0, probe1, probe2, probe3],
outputs=[probe4, probe5, probe6, probe7],
)
manta.interface = UARTInterface(
port=self.port, baudrate=3e6, clock_freq=platform.default_clk_frequency
)
m = Module()
m.submodules.manta = self.manta
m.submodules.manta = manta
uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"})
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
probe0 = self.get_probe("probe0")
probe1 = self.get_probe("probe1")
probe2 = self.get_probe("probe2")
probe3 = self.get_probe("probe3")
probe4 = self.get_probe("probe4")
probe5 = self.get_probe("probe5")
probe6 = self.get_probe("probe6")
probe7 = self.get_probe("probe7")
m.d.comb += [
probe0.eq(probe4),
probe1.eq(probe5),
probe2.eq(probe6),
probe3.eq(probe7),
self.manta.interface.rx.eq(uart_rx.i),
uart_tx.o.eq(self.manta.interface.tx),
manta.interface.rx.eq(uart_rx.i),
uart_tx.o.eq(manta.interface.tx),
]
return m
@ -91,23 +67,11 @@ class IOCoreLoopbackTest(Elaboratable):
strobe register pulses every time the get_probe() method is called.
"""
# Test that all output probes take their initial values
inputs = self.config["cores"]["io_core"]["inputs"]
outputs = self.config["cores"]["io_core"]["outputs"]
for name, attrs in outputs.items():
actual = self.manta.io_core.get_probe(name)
if isinstance(attrs, dict):
if "initial_value" in attrs:
expected = attrs["initial_value"]
else:
expected = 0
if actual != expected:
for p in self.manta.cores.io._outputs:
measured = self.manta.cores.io.get_probe(p)
if measured != p.init:
raise ValueError(
f"Output probe {name} took initial value of {actual} instead of {expected}."
f"Output probe {p.name} took initial value of {measured} instead of {p.init}."
)
def verify_probes_update(self):
@ -115,27 +79,26 @@ class IOCoreLoopbackTest(Elaboratable):
This design ties all the output probes to input probes, so this
test sets the outputs to random values, and verifies the inputs match
"""
inputs = self.config["cores"]["io_core"]["inputs"]
outputs = self.config["cores"]["io_core"]["outputs"]
inputs = self.manta.cores.io._inputs
outputs = self.manta.cores.io._outputs
# The config is specified in such a way that the first output is
# connected to the first output, the second output is connected
# to the second input, and so on...
for input, output in zip(inputs, outputs):
width = self.config["cores"]["io_core"]["inputs"][input]
value = getrandbits(width)
for i, o in zip(inputs, outputs):
value = getrandbits(len(i))
self.manta.io_core.set_probe(output, value)
readback = self.manta.io_core.get_probe(input)
self.manta.cores.io.set_probe(o, value)
readback = self.manta.cores.io.get_probe(i)
if readback != value:
raise ValueError(
f"Reading {output} through {input} yielded {readback} instead of {value}!"
f"Reading {o.name} through {i.name} yielded {readback} instead of {value}!"
)
else:
print(
f"Reading {output} through {input} yielded {readback} as expected."
f"Reading {o.name} through {i.name} yielded {readback} as expected."
)
def verify(self):

View File

@ -15,7 +15,9 @@ probe6 = Signal(8)
probe7 = Signal(20, init=65538)
outputs = [probe4, probe5, probe6, probe7]
io_core = IOCore(base_addr=0, interface=None, inputs=inputs, outputs=outputs)
io_core = IOCore(inputs=inputs, outputs=outputs)
io_core.base_addr = 0
_ = io_core.max_addr
async def pulse_strobe_register(ctx):

View File

@ -1,8 +1,10 @@
from manta.logic_analyzer import *
from manta.logic_analyzer import TriggerModes
from manta.logic_analyzer.fsm import LogicAnalyzerFSM, States
from manta.utils import *
config = {"sample_depth": 8}
fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
sample_depth = 8
fsm = LogicAnalyzerFSM(sample_depth, base_addr=0, interface=None)
_ = fsm.max_addr
@simulate(fsm)
@ -97,7 +99,7 @@ async def test_single_shot_wait_for_trigger(ctx):
# Check that write_pointer points to the end of memory
rp = ctx.get(fsm.read_pointer)
wp = ctx.get(fsm.write_pointer)
if (wp + 1) % config["sample_depth"] != rp:
if (wp + 1) % sample_depth != rp:
raise ValueError
# Check that state is CAPTURED
@ -116,7 +118,7 @@ async def test_immediate(ctx):
if not ctx.get(fsm.write_enable):
raise ValueError
for i in range(config["sample_depth"]):
for i in range(sample_depth):
rp = ctx.get(fsm.read_pointer)
wp = ctx.get(fsm.write_pointer)
@ -219,7 +221,7 @@ async def test_immediate_write_enable(ctx):
ctx.set(fsm.request_start, 1)
await ctx.tick()
for _ in range(config["sample_depth"]):
for _ in range(sample_depth):
if not ctx.get(fsm.write_enable):
raise ValueError

View File

@ -3,6 +3,8 @@ from amaranth.lib import io
from amaranth_boards.nexys4ddr import Nexys4DDRPlatform
from amaranth_boards.icestick import ICEStickPlatform
from manta import Manta
from manta.logic_analyzer import LogicAnalyzerCore
from manta.uart import UARTInterface
from manta.utils import *
import pytest
import os
@ -13,41 +15,33 @@ class LogicAnalyzerCounterTest(Elaboratable):
self.platform = platform
self.port = port
self.config = self.platform_specific_config()
self.manta = Manta(self.config)
def platform_specific_config(self):
return {
"cores": {
"la": {
"type": "logic_analyzer",
"sample_depth": 1024,
"trigger_mode": "immediate",
"probes": {"larry": 1, "curly": 3, "moe": 9},
},
},
"uart": {
"port": self.port,
"baudrate": 3e6,
"clock_freq": self.platform.default_clk_frequency,
},
}
def elaborate(self, platform):
# Since we know that all the tests will be called only after the FPGA
# is programmed, we can just push all the wiring into the elaborate
# method, instead of needing to define Manta in the __init__() method
probe0 = Signal()
probe1 = Signal(3)
probe2 = Signal(9)
self.manta = manta = Manta()
manta.interface = UARTInterface(
port=self.port, baudrate=3e6, clock_freq=platform.default_clk_frequency
)
manta.cores.la = LogicAnalyzerCore(
sample_depth=1024, probes=[probe0, probe1, probe2]
)
m = Module()
m.submodules.manta = self.manta
m.submodules.manta = manta
uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"})
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
larry = self.manta.la._probes[0]
curly = self.manta.la._probes[1]
moe = self.manta.la._probes[2]
m.d.sync += larry.eq(larry + 1)
m.d.sync += curly.eq(curly + 1)
m.d.sync += moe.eq(moe + 1)
m.d.sync += probe0.eq(probe0 + 1)
m.d.sync += probe1.eq(probe1 + 1)
m.d.sync += probe2.eq(probe2 + 1)
m.d.comb += [
self.manta.interface.rx.eq(uart_rx.i),
@ -61,7 +55,9 @@ class LogicAnalyzerCounterTest(Elaboratable):
def verify(self):
self.build_and_program()
cap = self.manta.la.capture()
self.manta.cores.la.triggers = ["probe0 EQ 0"]
cap = self.manta.cores.la.capture()
make_build_dir_if_it_does_not_exist_already()
@ -75,11 +71,11 @@ class LogicAnalyzerCounterTest(Elaboratable):
cap.export_playback_verilog("build/logic_analzyer_capture_playback.v")
# verify that each signal is just a counter modulo the width of the signal
for name, width in self.manta.la._config["probes"].items():
trace = cap.get_trace(name)
for p in self.manta.cores.la._probes:
trace = cap.get_trace(p.name)
for i in range(len(trace) - 1):
if trace[i + 1] != (trace[i] + 1) % (2**width):
if trace[i + 1] != (trace[i] + 1) % (2 ** len(p)):
raise ValueError("Bad counter!")

View File

@ -4,15 +4,12 @@ from manta.logic_analyzer.trigger_block import Operations
from manta.utils import *
from random import sample
config = {
"type": "logic_analyzer",
"sample_depth": 1024,
"trigger_location": 512,
"probes": {"larry": 1, "curly": 3, "moe": 9},
"triggers": ["moe RISING"],
}
larry = Signal(1)
curly = Signal(3)
moe = Signal(9)
la = LogicAnalyzerCore(config, base_addr=0, interface=None)
la = LogicAnalyzerCore(1024, [larry, curly, moe])
la.base_addr = 0
async def print_data_at_addr(ctx, addr):

View File

@ -3,6 +3,9 @@ from amaranth.lib import io
from amaranth_boards.nexys4ddr import Nexys4DDRPlatform
from amaranth_boards.icestick import ICEStickPlatform
from manta import Manta
from manta.memory_core import MemoryCore
from manta.io_core import IOCore
from manta.uart import UARTInterface
from manta.utils import *
import pytest
from random import getrandbits
@ -18,51 +21,25 @@ class MemoryCoreLoopbackTest(Elaboratable):
self.depth = depth
self.port = port
self.config = self.platform_specific_config()
self.manta = Manta(self.config)
def platform_specific_config(self):
return {
"cores": {
"io_core": {
"type": "io",
"outputs": {
"user_addr": ceil(log2(self.depth)),
"user_data_in": self.width,
"user_write_enable": 1,
},
"inputs": {
"user_data_out": self.width,
},
},
"mem_core": {
"type": "memory",
"mode": self.mode,
"width": self.width,
"depth": self.depth,
},
},
"uart": {
"port": self.port,
"baudrate": 3e6,
"clock_freq": self.platform.default_clk_frequency,
},
}
def get_probe(self, name):
# This is a hack! And should be removed once the full Amaranth-native
# API is built out
for i in self.manta.io_core._inputs:
if i.name == name:
return i
for o in self.manta.io_core._outputs:
if o.name == name:
return o
return None
def elaborate(self, platform):
# Since we know that all the tests will be called only after the FPGA
# is programmed, we can just push all the wiring into the elaborate
# method, instead of needing to define Manta in the __init__() method
user_addr = Signal(range(self.depth))
user_data_in = Signal(self.width)
user_data_out = Signal(self.width)
user_write_enable = Signal()
self.manta = manta = Manta()
manta.cores.mem = MemoryCore(self.mode, self.width, self.depth)
manta.cores.io = IOCore(
inputs=[user_data_out], outputs=[user_addr, user_data_in, user_write_enable]
)
manta.interface = UARTInterface(
port=self.port, baudrate=3e6, clock_freq=platform.default_clk_frequency
)
m = Module()
m.submodules.manta = self.manta
@ -70,18 +47,14 @@ class MemoryCoreLoopbackTest(Elaboratable):
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
user_addr = self.get_probe("user_addr")
user_data_in = self.get_probe("user_data_in")
user_data_out = self.get_probe("user_data_out")
user_write_enable = self.get_probe("user_write_enable")
m.d.comb += self.manta.interface.rx.eq(uart_rx.i)
m.d.comb += uart_tx.o.eq(self.manta.interface.tx)
m.d.comb += self.manta.mem_core.user_addr.eq(user_addr)
m.d.comb += self.manta.cores.mem.user_addr.eq(user_addr)
if self.mode in ["bidirectional", "fpga_to_host"]:
m.d.comb += self.manta.mem_core.user_data_in.eq(user_data_in)
m.d.comb += self.manta.mem_core.user_write_enable.eq(user_write_enable)
m.d.comb += self.manta.cores.mem.user_data_in.eq(user_data_in)
m.d.comb += self.manta.cores.mem.user_write_enable.eq(user_write_enable)
if self.mode in ["bidirectional", "host_to_fpga"]:
m.d.comb += user_data_out.eq(self.manta.mem_core.user_data_out)
@ -92,16 +65,16 @@ class MemoryCoreLoopbackTest(Elaboratable):
self.platform.build(self, do_program=True)
def write_user_side(self, addr, data):
self.manta.io_core.set_probe("user_write_enable", 0)
self.manta.io_core.set_probe("user_addr", addr)
self.manta.io_core.set_probe("user_data_in", data)
self.manta.io_core.set_probe("user_write_enable", 1)
self.manta.io_core.set_probe("user_write_enable", 0)
self.manta.cores.io.set_probe("user_write_enable", 0)
self.manta.cores.io.set_probe("user_addr", addr)
self.manta.cores.io.set_probe("user_data_in", data)
self.manta.cores.io.set_probe("user_write_enable", 1)
self.manta.cores.io.set_probe("user_write_enable", 0)
def read_user_side(self, addr):
self.manta.io_core.set_probe("user_write_enable", 0)
self.manta.io_core.set_probe("user_addr", addr)
return self.manta.io_core.get_probe("user_data_out")
self.manta.cores.io.set_probe("user_write_enable", 0)
self.manta.cores.io.set_probe("user_addr", addr)
return self.manta.cores.io.get_probe("user_data_out")
def verify(self):
self.build_and_program()
@ -111,7 +84,7 @@ class MemoryCoreLoopbackTest(Elaboratable):
# Write a random balue to a random bus address
data = getrandbits(self.width)
self.manta.mem_core.write(addr, data)
self.manta.cores.mem.write(addr, data)
# Verify the same number is returned when reading on the user side
readback = self.read_user_side(addr)
@ -128,7 +101,7 @@ class MemoryCoreLoopbackTest(Elaboratable):
self.write_user_side(addr, data)
# Verify the same number is returned when reading on the bus side
readback = self.manta.mem_core.read(addr)
readback = self.manta.cores.mem.read(addr)
if readback != data:
raise ValueError(
f"Memory read from {hex(addr)} returned {hex(data)} instead of {hex(readback)}."
@ -136,9 +109,6 @@ class MemoryCoreLoopbackTest(Elaboratable):
# Nexys4DDR Tests
# Omit the bidirectional mode for now, pending completion of:
# https://github.com/amaranth-lang/amaranth/issues/1011
modes = ["fpga_to_host", "host_to_fpga", "bidirectional"]
widths = [1, 8, 14, 16, 33]
depths = [2, 512, 1024]

View File

@ -8,7 +8,7 @@ import pytest
class MemoryCoreTests:
def __init__(self, mem_core):
self.mem_core = mem_core
self.base_addr = mem_core._base_addr
self.base_addr = mem_core.base_addr
self.max_addr = mem_core.max_addr
self.width = self.mem_core._width
self.depth = self.mem_core._depth
@ -271,7 +271,8 @@ cases = [
@pytest.mark.parametrize("mode, width, depth, base_addr", cases)
def test_mem_core(mode, width, depth, base_addr):
mem_core = MemoryCore(mode, width, depth, base_addr, interface=None)
mem_core = MemoryCore(mode, width, depth)
mem_core.base_addr = 0
tests = MemoryCoreTests(mem_core)