sim: update testbenches to async API

This commit is contained in:
Fischer Moseley 2024-06-20 11:47:34 -07:00
parent 13bc196a34
commit 8fd943257c
15 changed files with 409 additions and 410 deletions

View File

@ -2,12 +2,12 @@
name = "manta"
version = "1.0.0"
authors = [
{ name="Fischer Moseley", email="fischerm@mit.edu" },
{ name="Fischer Moseley", email="fischer.moseley@gmail.com" },
]
description = "An In-Situ Debugging Tool for Programmable Hardware"
readme = "README.md"
dependencies = [
"amaranth[builtin-yosys]",
"amaranth[builtin-yosys]==0.5.0",
"PyYAML",
"pyserial",
"liteeth@git+https://github.com/enjoy-digital/liteeth@2023.12",
@ -35,4 +35,4 @@ where = ["src"]
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"

View File

@ -1,7 +1,7 @@
from manta.manta import Manta
from manta.utils import *
from sys import argv
from pkg_resources import get_distribution
from importlib.metadata import distribution
logo = f"""
@ -26,7 +26,7 @@ logo = f"""
Manta - An In-Situ Debugging Tool for Programmable Hardware
Version {get_distribution("manta").version}
Version {distribution("manta").version}
https://github.com/fischermoseley/manta
"""

View File

@ -27,9 +27,9 @@ class IOCore(MantaCore):
# Internal Signals
self._strobe = Signal()
self._input_bufs = [Signal(p.width, name=p.name + "_buf") for p in self._inputs]
self._input_bufs = [Signal(len(p), name=p.name + "_buf") for p in self._inputs]
self._output_bufs = [
Signal(p.width, name=p.name + "_buf", reset=p.reset) for p in self._outputs
Signal(len(p), name=p.name + "_buf", reset=p.reset) for p in self._outputs
]
self._make_memory_map()
@ -118,7 +118,7 @@ class IOCore(MantaCore):
check_value_fits_in_bits(attrs["initial_value"], width)
initial_value = attrs["initial_value"]
output_signals += [Signal(width, name=name, reset=initial_value)]
output_signals += [Signal(width, name=name, init=initial_value)]
return cls(base_addr, interface, inputs=input_signals, outputs=output_signals)
@ -136,7 +136,7 @@ class IOCore(MantaCore):
last_used_addr = self._base_addr
for io, io_buf in zip(ios, io_bufs):
n_slices = ceil(io.width / 16)
n_slices = ceil(len(io) / 16)
signals = split_into_chunks(io_buf, 16)
addrs = [i + last_used_addr + 1 for i in range(n_slices)]

View File

@ -96,7 +96,7 @@ class LogicAnalyzerTrigger(Elaboratable):
def __init__(self, signal):
self.signal = signal
self.op = Signal(Operations, name=signal.name + "_op")
self.arg = Signal(signal.width, name=signal.name + "_arg")
self.arg = Signal(len(signal), name=signal.name + "_arg")
self.triggered = Signal()
def elaborate(self, platform):

View File

@ -43,9 +43,11 @@ class MemoryCore(MantaCore):
elif self._mode == "host_to_fpga":
self.user_addr = Signal(range(self._depth))
self.user_data_out = Signal(self._width)
self.user_clk = Signal()
self._top_level_ports = [
self.user_addr,
self.user_data_out,
self.user_clk,
]
elif self._mode == "bidirectional":
@ -202,7 +204,10 @@ class MemoryCore(MantaCore):
if self._mode in ["host_to_fpga", "bidirectional"]:
read_datas = []
for i, mem in enumerate(self._mems):
read_port = mem.read_port()
m.domains.user = user_cd = ClockDomain(local=True)
m.d.comb += user_cd.clk.eq(self.user_clk)
read_port = mem.read_port(domain="user")
m.d.comb += read_port.addr.eq(self.user_addr)
m.d.comb += read_port.en.eq(1)
read_datas.append(read_port.data)

View File

@ -138,7 +138,7 @@ def simulate(top):
def wrapper(*args, **kwargs):
sim = Simulator(top)
sim.add_clock(1e-6) # 1 MHz
sim.add_sync_process(testbench)
sim.add_testbench(testbench)
vcd_path = "build/" + testbench.__name__ + ".vcd"
@ -159,7 +159,7 @@ def jumble(iterable):
return sample(iterable, len(iterable))
def verify_register(module, addr, expected_data):
async def verify_register(module, ctx, addr, expected_data):
"""
Read the contents of a register out over a module's bus connection, and
verify that it contains the expected data.
@ -173,40 +173,40 @@ def verify_register(module, addr, expected_data):
"""
# Place read transaction on the bus
yield module.bus_i.addr.eq(addr)
yield module.bus_i.data.eq(0)
yield module.bus_i.rw.eq(0)
yield module.bus_i.valid.eq(1)
yield
yield module.bus_i.addr.eq(0)
yield module.bus_i.valid.eq(0)
ctx.set(module.bus_i.addr, addr)
ctx.set(module.bus_i.data, 0)
ctx.set(module.bus_i.valid, True)
ctx.set(module.bus_i.rw, 0)
await ctx.tick()
ctx.set(module.bus_i.addr, 0)
ctx.set(module.bus_i.valid, 0)
# Wait for output to be valid
while not (yield module.bus_o.valid):
yield
while not ctx.get(module.bus_o.valid):
await ctx.tick()
# Compare returned value with expected
data = yield (module.bus_o.data)
data = ctx.get(module.bus_o.data)
if data != expected_data:
raise ValueError(f"Read from {addr} yielded {data} instead of {expected_data}")
def write_register(module, addr, data):
async def write_register(module, ctx, addr, data):
"""
Write to a register over a module's bus connection, placing the contents of `data`
at `addr`.
"""
yield module.bus_i.addr.eq(addr)
yield module.bus_i.data.eq(data)
yield module.bus_i.rw.eq(1)
yield module.bus_i.valid.eq(1)
yield
yield module.bus_i.addr.eq(0)
yield module.bus_i.data.eq(0)
yield module.bus_i.valid.eq(0)
yield module.bus_i.rw.eq(0)
yield
ctx.set(module.bus_i.addr, addr)
ctx.set(module.bus_i.data, data)
ctx.set(module.bus_i.rw, 1)
ctx.set(module.bus_i.valid, True)
await ctx.tick()
ctx.set(module.bus_i.addr, 0)
ctx.set(module.bus_i.data, 0)
ctx.set(module.bus_i.rw, 0)
ctx.set(module.bus_i.valid, False)
await ctx.tick()
def xilinx_tools_installed():

View File

@ -6,109 +6,110 @@ from manta.utils import *
bridge_rx = ReceiveBridge()
def verify_read_decoding(bytes, addr):
async def verify_read_decoding(ctx, bytes, addr):
"""
Send a series of bytes to the receive bridge, and verify that the bridge places
a read request with the appropriate address on the internal bus.
"""
valid_asserted = False
yield bridge_rx.valid_i.eq(1)
ctx.set(bridge_rx.valid_i, True)
for i, byte in enumerate(bytes):
yield bridge_rx.data_i.eq(byte)
ctx.set(bridge_rx.data_i, byte)
if (yield bridge_rx.valid_o) and (i > 0):
if ctx.get(bridge_rx.valid_o) and (i > 0):
valid_asserted = True
if (yield bridge_rx.addr_o) != addr:
if ctx.get(bridge_rx.addr_o) != addr:
raise ValueError("wrong addr!")
if (yield bridge_rx.rw_o) != 0:
if ctx.get(bridge_rx.rw_o) != 0:
raise ValueError("wrong rw!")
if (yield bridge_rx.data_o) != 0:
if ctx.get(bridge_rx.data_o) != 0:
raise ValueError("wrong data!")
yield
await ctx.tick()
yield bridge_rx.valid_i.eq(0)
yield bridge_rx.data_i.eq(0)
ctx.set(bridge_rx.valid_i, False)
ctx.set(bridge_rx.data_i, 0)
if not valid_asserted and not (yield bridge_rx.valid_o):
if not valid_asserted and not ctx.get(bridge_rx.valid_o):
raise ValueError("Bridge failed to output valid message.")
def verify_write_decoding(bytes, addr, data):
async def verify_write_decoding(ctx, bytes, addr, data):
"""
Send a series of bytes to the receive bridge, and verify that the bridge places
a write request with the appropriate address and data on the internal bus.
"""
valid_asserted = False
yield bridge_rx.valid_i.eq(1)
ctx.set(bridge_rx.valid_i, True)
for i, byte in enumerate(bytes):
yield bridge_rx.data_i.eq(byte)
ctx.set(bridge_rx.data_i, byte)
if (yield bridge_rx.valid_o) and (i > 0):
if ctx.get(bridge_rx.valid_o) and (i > 0):
valid_asserted = True
if (yield bridge_rx.addr_o) != addr:
if ctx.get(bridge_rx.addr_o) != addr:
raise ValueError("wrong addr!")
if (yield bridge_rx.rw_o) != 1:
if ctx.get(bridge_rx.rw_o) != 1:
raise ValueError("wrong rw!")
if (yield bridge_rx.data_o) != data:
if ctx.get(bridge_rx.data_o) != data:
raise ValueError("wrong data!")
yield
await ctx.tick()
yield bridge_rx.valid_i.eq(0)
yield bridge_rx.data_i.eq(0)
ctx.set(bridge_rx.valid_i, False)
ctx.set(bridge_rx.data_i, 0)
if not valid_asserted and not (yield bridge_rx.valid_o):
if not valid_asserted and not ctx.get(bridge_rx.valid_o):
raise ValueError("Bridge failed to output valid message.")
def verify_bad_bytes(bytes):
async def verify_bad_bytes(ctx, bytes):
"""
Send a series of bytes to the receive bridge, and verify that the bridge does not
place any transaction on the internal bus.
"""
yield bridge_rx.valid_i.eq(1)
ctx.set(bridge_rx.valid_i, True)
for byte in bytes:
yield bridge_rx.data_i.eq(byte)
ctx.set(bridge_rx.data_i, byte)
if (yield bridge_rx.valid_o):
if ctx.get(bridge_rx.valid_o):
raise ValueError("Bridge decoded invalid message.")
yield
await ctx.tick()
yield bridge_rx.valid_i.eq(0)
ctx.set(bridge_rx.valid_i, 0)
@simulate(bridge_rx)
def test_read_decode():
yield from verify_read_decoding(b"R0000\r\n", 0x0000)
yield from verify_read_decoding(b"R1234\r\n", 0x1234)
yield from verify_read_decoding(b"RBABE\r\n", 0xBABE)
yield from verify_read_decoding(b"R5678\n", 0x5678)
yield from verify_read_decoding(b"R9ABC\r", 0x9ABC)
async def test_function(ctx):
await verify_read_decoding(ctx, b"R0000\r\n", 0x0000)
await verify_read_decoding(ctx, b"R1234\r\n", 0x1234)
await verify_read_decoding(ctx, b"RBABE\r\n", 0xBABE)
await verify_read_decoding(ctx, b"R5678\n", 0x5678)
await verify_read_decoding(ctx, b"R9ABC\r", 0x9ABC)
@simulate(bridge_rx)
def test_write_decode():
yield from verify_write_decoding(b"W12345678\r\n", 0x1234, 0x5678)
yield from verify_write_decoding(b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF)
yield from verify_write_decoding(b"WDEADBEEF\r", 0xDEAD, 0xBEEF)
yield from verify_write_decoding(b"WB0BACAFE\n", 0xB0BA, 0xCAFE)
async def test_write_decode(ctx):
await verify_write_decoding(ctx, b"W12345678\r\n", 0x1234, 0x5678)
await verify_write_decoding(ctx, b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF)
await verify_write_decoding(ctx, b"WDEADBEEF\r", 0xDEAD, 0xBEEF)
await verify_write_decoding(ctx, b"WB0BACAFE\n", 0xB0BA, 0xCAFE)
@simulate(bridge_rx)
def test_no_decode():
yield from verify_bad_bytes(b"RABC\r\n")
yield from verify_bad_bytes(b"R12345\r\n")
yield from verify_bad_bytes(b"M\r\n")
yield from verify_bad_bytes(b"W123456789101112131415161718191201222\r\n")
yield from verify_bad_bytes(b"RABCG\r\n")
yield from verify_bad_bytes(b"WABC[]()##*@\r\n")
yield from verify_bad_bytes(b"R\r\n")
async def test_no_decode(ctx):
await verify_bad_bytes(ctx, b"RABC\r\n")
await verify_bad_bytes(ctx, b"R12345\r\n")
await verify_bad_bytes(ctx, b"M\r\n")
await verify_bad_bytes(ctx, b"W123456789101112131415161718191201222\r\n")
await verify_bad_bytes(ctx, b"RABCG\r\n")
await verify_bad_bytes(ctx, b"WABC[]()##*@\r\n")
await verify_bad_bytes(ctx, b"R\r\n")

View File

@ -7,7 +7,7 @@ from random import randint, sample
bridge_tx = TransmitBridge()
def verify_encoding(data, bytes):
async def verify_encoding(ctx, data, bytes):
"""
Place a read response on the internal bus, and verify that the sequence of bytes
sent from TransmitBridge matches the provided bytestring `bytes`.
@ -17,18 +17,16 @@ def verify_encoding(data, bytes):
"""
# Place a read response on the internal bus
yield bridge_tx.data_i.eq(data)
yield bridge_tx.valid_i.eq(1)
yield bridge_tx.rw_i.eq(0)
yield bridge_tx.done_i.eq(1)
ctx.set(bridge_tx.data_i, data)
ctx.set(bridge_tx.valid_i, True)
ctx.set(bridge_tx.rw_i, 0)
ctx.set(bridge_tx.done_i, True)
yield
await ctx.tick()
yield bridge_tx.data_i.eq(0)
yield bridge_tx.valid_i.eq(0)
yield bridge_tx.rw_i.eq(0)
yield
ctx.set(bridge_tx.data_i, 0)
ctx.set(bridge_tx.valid_i, False)
ctx.set(bridge_tx.rw_i, 0)
# Model the UARTTransmitter
sent_bytes = b""
@ -36,16 +34,15 @@ def verify_encoding(data, bytes):
while len(sent_bytes) < len(bytes):
# If start_o is asserted, set done_i to zero, then delay, then set it back to one
if (yield bridge_tx.start_o):
yield bridge_tx.done_i.eq(0)
sent_bytes += (yield bridge_tx.data_o).to_bytes(1, "big")
if ctx.get(bridge_tx.start_o):
sent_bytes += ctx.get(bridge_tx.data_o).to_bytes(1, "big")
ctx.set(bridge_tx.done_i, 0)
yield bridge_tx.done_i.eq(0)
for _ in range(10):
yield
await ctx.tick()
yield bridge_tx.done_i.eq(1)
yield
ctx.set(bridge_tx.done_i, 1)
await ctx.tick()
# Time out if not enough bytes after trying to get bytes 15 times
iters += 1
@ -58,8 +55,7 @@ def verify_encoding(data, bytes):
@simulate(bridge_tx)
def test_some_random_values():
async def test_some_random_values(ctx):
for i in sample(range(0xFFFF), k=5000):
expected = f"D{i:04X}\r\n".encode("ascii")
print(i)
yield from verify_encoding(i, expected)
await verify_encoding(ctx, i, expected)

View File

@ -10,75 +10,75 @@ probe2 = Signal(8)
probe3 = Signal(20)
inputs = [probe0, probe1, probe2, probe3]
probe4 = Signal(1, reset=1)
probe5 = Signal(2, reset=2)
probe4 = Signal(1, init=1)
probe5 = Signal(2, init=2)
probe6 = Signal(8)
probe7 = Signal(20, reset=65538)
probe7 = Signal(20, init=65538)
outputs = [probe4, probe5, probe6, probe7]
io_core = IOCore(base_addr=0, interface=None, inputs=inputs, outputs=outputs)
def pulse_strobe_register():
async def pulse_strobe_register(ctx):
strobe_addr = io_core._memory_map["strobe"]["addrs"][0]
yield from write_register(io_core, strobe_addr, 0)
yield from write_register(io_core, strobe_addr, 1)
yield from write_register(io_core, strobe_addr, 0)
await write_register(io_core, ctx, strobe_addr, 0)
await write_register(io_core, ctx, strobe_addr, 1)
await write_register(io_core, ctx, strobe_addr, 0)
@simulate(io_core)
def test_input_probe_buffer_initial_value():
async def test_input_probe_buffer_initial_value(ctx):
# Verify all input probe buffers initialize to zero
for i in inputs:
addrs = io_core._memory_map[i.name]["addrs"]
for addr in addrs:
yield from verify_register(io_core, addr, 0)
await verify_register(io_core, ctx, addr, 0)
@simulate(io_core)
def test_output_probe_buffer_initial_value():
async def test_output_probe_buffer_initial_value(ctx):
# Verify all output probe buffers initialize to the values in the config
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
datas = value_to_words(o.reset, len(addrs))
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
await verify_register(io_core, ctx, addr, data)
@simulate(io_core)
def test_output_probes_are_writeable():
async def test_output_probes_are_writeable(ctx):
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = getrandbits(o.width)
test_value = getrandbits(len(o))
datas = value_to_words(test_value, len(addrs))
# write value to registers
for addr, data in zip(addrs, datas):
yield from write_register(io_core, addr, data)
await write_register(io_core, ctx, addr, data)
# read value back from registers
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
await verify_register(io_core, ctx, addr, data)
@simulate(io_core)
def test_output_probes_update():
async def test_output_probes_update(ctx):
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = getrandbits(o.width)
test_value = getrandbits(len(o))
datas = value_to_words(test_value, len(addrs))
# write value to registers
for addr, data in zip(addrs, datas):
yield from write_register(io_core, addr, data)
await write_register(io_core, ctx, addr, data)
# pulse strobe register
yield from pulse_strobe_register()
await pulse_strobe_register(ctx)
# check that outputs took updated value
value = yield (o)
value = ctx.get(o)
if value != test_value:
raise ValueError(
@ -90,18 +90,18 @@ def test_output_probes_update():
@simulate(io_core)
def test_input_probes_update():
async def test_input_probes_update(ctx):
for i in inputs:
# set input probe value
test_value = getrandbits(i.width)
yield i.eq(test_value)
test_value = getrandbits(len(i))
ctx.set(i, test_value)
# pulse strobe register
yield from pulse_strobe_register()
await pulse_strobe_register(ctx)
# check that values are as expected once read back
addrs = io_core._memory_map[i.name]["addrs"]
datas = value_to_words(test_value, len(addrs))
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
await verify_register(io_core, ctx, addr, data)

View File

@ -7,70 +7,69 @@ fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
@simulate(fsm)
def test_signals_reset_correctly():
async def test_signals_reset_correctly(ctx):
# Make sure pointers and write enable reset to zero
for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]:
if (yield sig) != 0:
if ctx.get(sig) != 0:
raise ValueError
# Make sure state resets to IDLE
if (yield fsm.state != States.IDLE):
if ctx.get(fsm.state) != States.IDLE:
raise ValueError
@simulate(fsm)
def test_single_shot_no_wait_for_trigger():
async def test_single_shot_no_wait_for_trigger(ctx):
# Configure and start FSM
yield fsm.trigger.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
ctx.set(fsm.trigger, 1)
ctx.set(fsm.trigger_mode, TriggerModes.SINGLE_SHOT)
ctx.set(fsm.trigger_location, 4)
ctx.set(fsm.request_start, 1)
# Wait until write_enable is asserted
while not (yield fsm.write_enable):
yield
while not ctx.get(fsm.write_enable):
await ctx.tick()
# Wait 8 clock cycles for capture to complete
for i in range(8):
# Make sure that read_pointer does not increase
if (yield fsm.read_pointer) != 0:
if ctx.get(fsm.read_pointer) != 0:
raise ValueError
# Make sure that write_pointer increases by one each cycle
if (yield fsm.write_pointer) != i:
if ctx.get(fsm.write_pointer) != i:
raise ValueError
yield
await ctx.tick()
# Wait one clock cycle (to let BRAM contents cycle in)
yield
await ctx.tick()
# Check that write_pointer points to the end of memory
if (yield fsm.write_pointer) != 7:
if ctx.get(fsm.write_pointer) != 7:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
if ctx.get(fsm.state) != States.CAPTURED:
raise ValueError
@simulate(fsm)
def test_single_shot_wait_for_trigger():
async def test_single_shot_wait_for_trigger(ctx):
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
yield
yield
ctx.set(fsm.trigger_mode, TriggerModes.SINGLE_SHOT)
ctx.set(fsm.trigger_location, 4)
ctx.set(fsm.request_start, 1)
await ctx.tick()
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
if not ctx.get(fsm.write_enable):
raise ValueError
# Wait 4 clock cycles to get to IN_POSITION
for i in range(4):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
rp = ctx.get(fsm.read_pointer)
wp = ctx.get(fsm.write_pointer)
# Make sure that read_pointer does not increase
if rp != 0:
@ -80,48 +79,47 @@ def test_single_shot_wait_for_trigger():
if wp != i:
raise ValueError
yield
await ctx.tick()
# Wait a few cycles before triggering
for _ in range(10):
yield
await ctx.tick()
# Provide the trigger, and check that the capture completes 4 cycles later
yield fsm.trigger.eq(1)
yield
ctx.set(fsm.trigger, 1)
await ctx.tick()
for i in range(4):
yield
await ctx.tick()
# Wait one clock cycle (to let BRAM contents cycle in)
yield
await ctx.tick()
# Check that write_pointer points to the end of memory
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
rp = ctx.get(fsm.read_pointer)
wp = ctx.get(fsm.write_pointer)
if (wp + 1) % config["sample_depth"] != rp:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
if ctx.get(fsm.state) != States.CAPTURED:
raise ValueError
@simulate(fsm)
def test_immediate():
async def test_immediate(ctx):
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.request_start.eq(1)
yield
yield
ctx.set(fsm.trigger_mode, TriggerModes.IMMEDIATE)
ctx.set(fsm.request_start, 1)
await ctx.tick()
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
if not ctx.get(fsm.write_enable):
raise ValueError
for i in range(config["sample_depth"]):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
rp = ctx.get(fsm.read_pointer)
wp = ctx.get(fsm.write_pointer)
if rp != 0:
raise ValueError
@ -129,107 +127,105 @@ def test_immediate():
if wp != i:
raise ValueError
yield
await ctx.tick()
# Wait one clock cycle (to let BRAM contents cycle in)
yield
await ctx.tick()
# Check that write_pointer points to the end of memory
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
rp = ctx.get(fsm.read_pointer)
wp = ctx.get(fsm.write_pointer)
if rp != 0:
raise ValueError
if wp != 7:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
if ctx.get(fsm.state) != States.CAPTURED:
raise ValueError
@simulate(fsm)
def test_incremental():
async def test_incremental(ctx):
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL)
yield fsm.request_start.eq(1)
yield
yield
ctx.set(fsm.trigger_mode, TriggerModes.INCREMENTAL)
ctx.set(fsm.request_start, 1)
await ctx.tick()
# Check that write_enable is asserted on the same edge as request_start
if not (yield fsm.write_enable):
if not ctx.get(fsm.write_enable):
raise ValueError
for _ in range(10):
for _ in range(3):
yield
await ctx.tick().repeat(3)
yield fsm.trigger.eq(1)
yield
yield fsm.trigger.eq(0)
yield
ctx.set(fsm.trigger, 1)
await ctx.tick()
ctx.set(fsm.trigger, 0)
await ctx.tick()
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
if ctx.get(fsm.state) != States.CAPTURED:
raise ValueError
# @simulate(fsm)
# async def test_single_shot_write_enable(ctx):
# # Configure FSM
# ctx.set(fsm.trigger_mode, TriggerModes.SINGLE_SHOT)
# ctx.set(fsm.trigger_location, 4)
# await ctx.tick()
# # Make sure write is not enabled before starting the FSM
# if ctx.get(fsm.write_enable):
# raise ValueError
# # Start the FSM, ensure write enable is asserted throughout the capture
# ctx.set(fsm.request_start, 1)
# await ctx.tick()
# await ctx.tick()
# for _ in range(config["sample_depth"]):
# if not ctx.get(fsm.write_enable):
# raise ValueError
# await ctx.tick()
# ctx.set(fsm.trigger, 1)
# await ctx.tick()
# for _ in range(4):
# if not ctx.get(fsm.write_enable):
# raise ValueError
# await ctx.tick()
# # Make sure write_enable is deasserted after
# if ctx.get(fsm.write_enable):
# raise ValueError
@simulate(fsm)
def test_single_shot_write_enable():
async def test_immediate_write_enable(ctx):
# Configure FSM
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield
ctx.set(fsm.trigger_mode, TriggerModes.IMMEDIATE)
await ctx.tick()
# Make sure write is not enabled before starting the FSM
if (yield fsm.write_enable):
if ctx.get(fsm.write_enable):
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.request_start.eq(1)
yield
yield
ctx.set(fsm.request_start, 1)
await ctx.tick()
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
if not ctx.get(fsm.write_enable):
raise ValueError
yield
yield fsm.trigger.eq(1)
yield
for _ in range(4):
if not (yield fsm.write_enable):
raise ValueError
yield
await ctx.tick()
# Make sure write_enable is deasserted after
if (yield fsm.write_enable):
raise ValueError
@simulate(fsm)
def test_immediate_write_enable():
# Configure FSM
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield
# Make sure write is not enabled before starting the FSM
if (yield fsm.write_enable):
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.request_start.eq(1)
yield
yield
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError
yield
# Make sure write_enable is deasserted after
if (yield fsm.write_enable):
if ctx.get(fsm.write_enable):
raise ValueError

View File

@ -15,86 +15,86 @@ config = {
la = LogicAnalyzerCore(config, base_addr=0, interface=None)
def print_data_at_addr(addr):
async def print_data_at_addr(ctx, addr):
# place read transaction on the bus
yield la.bus_i.addr.eq(addr)
yield la.bus_i.data.eq(0)
yield la.bus_i.rw.eq(0)
yield la.bus_i.valid.eq(1)
yield
yield la.bus_i.addr.eq(0)
yield la.bus_i.valid.eq(0)
ctx.set(la.bus_i.addr, addr)
ctx.set(la.bus_i.data, 0)
ctx.set(la.bus_i.rw, 0)
ctx.set(la.bus_i.valid, True)
await ctx.tick()
ctx.set(la.bus_i.addr, 0)
ctx.set(la.bus_i.valid, 0)
# wait for output to be valid
while not (yield la.bus_o.valid):
yield
while not ctx.get(la.bus_o.valid):
await ctx.tick()
print(f"addr: {hex(addr)} data: {hex((yield la.bus_o.data))}")
print(f"addr: {hex(addr)} data: {hex(ctx.get(la.bus_o.data))}")
def set_fsm_register(name, data):
async def set_fsm_register(ctx, name, data):
addr = la._fsm.registers._memory_map[name]["addrs"][0]
strobe_addr = la._fsm.registers._base_addr
yield from write_register(la, strobe_addr, 0)
yield from write_register(la, addr, data)
yield from write_register(la, strobe_addr, 1)
yield from write_register(la, strobe_addr, 0)
await write_register(la, ctx, strobe_addr, 0)
await write_register(la, ctx, addr, data)
await write_register(la, ctx, strobe_addr, 1)
await write_register(la, ctx, strobe_addr, 0)
def set_trig_blk_register(name, data):
async def set_trig_blk_register(ctx, name, data):
addr = la._trig_blk.registers._memory_map[name]["addrs"][0]
strobe_addr = la._trig_blk.registers._base_addr
yield from write_register(la, strobe_addr, 0)
yield from write_register(la, addr, data)
yield from write_register(la, strobe_addr, 1)
yield from write_register(la, strobe_addr, 0)
await write_register(la, ctx, strobe_addr, 0)
await write_register(la, ctx, addr, data)
await write_register(la, ctx, strobe_addr, 1)
await write_register(la, ctx, strobe_addr, 0)
def set_probe(name, value):
async def set_probe(ctx, name, value):
probe = None
for p in la._probes:
if p.name == name:
probe = p
yield probe.eq(value)
ctx.set(probe, value)
@simulate(la)
def test_single_shot_capture():
# # ok nice what happens if we try to run the core, which includes:
yield from set_fsm_register("request_stop", 1)
yield from set_fsm_register("request_stop", 0)
async def test_single_shot_capture(ctx):
# request FSM to stop
await set_fsm_register(ctx, "request_stop", 1)
await set_fsm_register(ctx, "request_stop", 0)
# setting triggers
yield from set_trig_blk_register("curly_op", Operations.EQ)
yield from set_trig_blk_register("curly_arg", 4)
await set_trig_blk_register(ctx, "curly_op", Operations.EQ)
await set_trig_blk_register(ctx, "curly_arg", 4)
# setting trigger mode
yield from set_fsm_register("trigger_mode", 0)
await set_fsm_register(ctx, "trigger_mode", 0)
# setting trigger location
yield from set_fsm_register("trigger_location", 511)
await set_fsm_register(ctx, "trigger_location", 511)
# starting capture
yield from set_fsm_register("request_start", 1)
yield from set_fsm_register("request_start", 0)
await set_fsm_register(ctx, "request_start", 1)
await set_fsm_register(ctx, "request_start", 0)
# wait a few hundred clock cycles, see what happens
for _ in range(700):
yield
await ctx.tick().repeat(700)
# provide the trigger condition
yield from set_probe("curly", 4)
await set_probe(ctx, "curly", 4)
for _ in range(700):
yield
await ctx.tick().repeat(700)
# dump sample memory contents
yield from write_register(la, 0, 0)
yield from write_register(la, 0, 1)
yield from write_register(la, 0, 0)
await write_register(la, ctx, 0, 0)
await write_register(la, ctx, 0, 1)
await write_register(la, ctx, 0, 0)
for addr in range(la.max_addr):
yield from print_data_at_addr(addr)
await print_data_at_addr(ctx, addr)

View File

@ -23,91 +23,91 @@ class MemoryCoreTests:
# A model of what each bus address contains
self.model = {i: 0 for i in self.bus_addrs}
def bus_addrs_all_zero(self):
async def bus_addrs_all_zero(self):
for addr in self.bus_addrs:
yield from self.verify_bus_side(addr)
await self.verify_bus_side(addr)
def user_addrs_all_zero(self):
async def user_addrs_all_zero(self):
for addr in self.user_addrs:
yield from self.verify_user_side(addr)
await self.verify_user_side(addr)
def bus_to_bus_functionality(self):
async def bus_to_bus_functionality(self):
# yield from self.one_bus_write_then_one_bus_read()
# yield from self.multi_bus_writes_then_multi_bus_reads()
yield from self.rand_bus_writes_rand_bus_reads()
await self.rand_bus_writes_rand_bus_reads()
def user_to_bus_functionality(self):
async def user_to_bus_functionality(self):
# yield from self.one_user_write_then_one_bus_read()
# yield from self.multi_user_write_then_multi_bus_reads()
yield from self.rand_user_writes_rand_bus_reads()
await self.rand_user_writes_rand_bus_reads()
def bus_to_user_functionality(self):
async def bus_to_user_functionality(self):
# yield from self.one_bus_write_then_one_user_read()
# yield from self.multi_bus_write_then_multi_user_reads()
yield from self.rand_bus_writes_rand_user_reads()
await self.rand_bus_writes_rand_user_reads()
def user_to_user_functionality(self):
async def user_to_user_functionality(self):
# yield from self.one_user_write_then_one_user_read()
# yield from self.multi_user_write_then_multi_user_read()
yield from self.rand_user_write_rand_user_read()
await self.rand_user_write_rand_user_read()
def one_bus_write_then_one_bus_read(self):
async def one_bus_write_then_one_bus_read(self):
for addr in self.bus_addrs:
data_width = self.get_data_width(addr)
data = getrandbits(data_width)
yield from self.write_bus_side(addr, data)
yield from self.verify_bus_side(addr)
await self.write_bus_side(addr, data)
await self.verify_bus_side(addr)
def multi_bus_writes_then_multi_bus_reads(self):
async def multi_bus_writes_then_multi_bus_reads(self):
# write-write-write then read-read-read
for addr in jumble(self.bus_addrs):
data_width = self.get_data_width(addr)
data = getrandbits(data_width)
yield from self.write_bus_side(addr, data)
await self.write_bus_side(addr, data)
for addr in jumble(self.bus_addrs):
yield from self.verify_bus_side(addr)
await self.verify_bus_side(addr)
def rand_bus_writes_rand_bus_reads(self):
async def rand_bus_writes_rand_bus_reads(self):
# random reads and writes in random orders
for _ in range(5):
for addr in jumble(self.bus_addrs):
operation = choice(["read", "write"])
if operation == "read":
yield from self.verify_bus_side(addr)
await self.verify_bus_side(addr)
elif operation == "write":
data_width = self.get_data_width(addr)
data = getrandbits(data_width)
yield from self.write_bus_side(addr, data)
await self.write_bus_side(addr, data)
def one_user_write_then_one_bus_read(self):
async def one_user_write_then_one_bus_read(self):
for user_addr in self.user_addrs:
# write to user side
data = getrandbits(self.width)
yield from self.write_user_side(user_addr, data)
await self.write_user_side(user_addr, data)
# verify contents when read out from the bus
for i in range(self.n_mems):
bus_addr = self.base_addr + user_addr + (i * self.depth)
yield from self.verify_bus_side(bus_addr)
await self.verify_bus_side(bus_addr)
def multi_user_write_then_multi_bus_reads(self):
async def multi_user_write_then_multi_bus_reads(self):
# write-write-write then read-read-read
for user_addr in jumble(self.user_addrs):
# write a random number to the user side
data = getrandbits(self.width)
yield from self.write_user_side(user_addr, data)
await self.write_user_side(user_addr, data)
# read out every bus_addr in random order
for bus_addr in jumble(self.bus_addrs):
yield from self.verify_bus_side(bus_addr)
await self.verify_bus_side(bus_addr)
def rand_user_writes_rand_bus_reads(self):
async def rand_user_writes_rand_bus_reads(self):
# random reads and writes in random orders
for _ in range(5):
for user_addr in jumble(self.user_addrs):
@ -121,14 +121,14 @@ class MemoryCoreTests:
# read from bus side
if operation == "read":
for bus_addr in bus_addrs:
yield from self.verify_bus_side(bus_addr)
await self.verify_bus_side(bus_addr)
# write to user side
elif operation == "write":
data = getrandbits(self.width)
yield from self.write_user_side(user_addr, data)
await self.write_user_side(user_addr, data)
def one_bus_write_then_one_user_read(self):
async def one_bus_write_then_one_user_read(self):
for user_addr in self.user_addrs:
# Try and set the value at the user address to a given value,
# by writing to the appropriate memory locaitons on the bus side
@ -138,22 +138,22 @@ class MemoryCoreTests:
for i, word in enumerate(words):
bus_addr = self.base_addr + user_addr + (i * self.depth)
yield from self.write_bus_side(bus_addr, word)
await self.write_bus_side(bus_addr, word)
yield from self.verify_user_side(user_addr)
await self.verify_user_side(user_addr)
def multi_bus_write_then_multi_user_reads(self):
async def multi_bus_write_then_multi_user_reads(self):
# write-write-write then read-read-read
for bus_addr in jumble(self.bus_addrs):
data_width = self.get_data_width(bus_addr)
data = getrandbits(data_width)
yield from self.write_bus_side(bus_addr, data)
await self.write_bus_side(bus_addr, data)
for user_addr in jumble(self.user_addrs):
yield from self.verify_user_side(user_addr)
await self.verify_user_side(user_addr)
def rand_bus_writes_rand_user_reads(self):
async def rand_bus_writes_rand_user_reads(self):
for _ in range(5 * self.depth):
operation = choice(["read", "write"])
@ -163,41 +163,41 @@ class MemoryCoreTests:
data_width = self.get_data_width(bus_addr)
data = getrandbits(data_width)
yield from self.write_bus_side(bus_addr, data)
await self.write_bus_side(bus_addr, data)
# read from random user_addr
if operation == "read":
user_addr = randint(0, self.depth - 1)
yield from self.verify_user_side(user_addr)
await self.verify_user_side(user_addr)
def one_user_write_then_one_user_read(self):
async def one_user_write_then_one_user_read(self):
for addr in self.user_addrs:
data = getrandbits(self.width)
yield from self.write_user_side(addr, data)
yield from self.verify_user_side(addr)
await self.write_user_side(addr, data)
await self.verify_user_side(addr)
def multi_user_write_then_multi_user_read(self):
async def multi_user_write_then_multi_user_read(self):
# write-write-write then read-read-read
for user_addr in jumble(self.user_addrs):
data = getrandbits(self.width)
yield from self.write_user_side(user_addr, data)
await self.write_user_side(user_addr, data)
for user_addr in jumble(self.user_addrs):
yield from self.verify_user_side(user_addr)
await self.verify_user_side(user_addr)
def rand_user_write_rand_user_read(self):
async def rand_user_write_rand_user_read(self):
# random reads and writes in random orders
for _ in range(5):
for user_addr in jumble(self.user_addrs):
operation = choice(["read", "write"])
if operation == "read":
yield from self.verify_user_side(user_addr)
await self.verify_user_side(user_addr)
elif operation == "write":
data = getrandbits(self.width)
yield from self.write_user_side(user_addr, data)
await self.write_user_side(user_addr, data)
def get_data_width(self, addr):
# this part is a little hard to check since we might have a
@ -210,18 +210,16 @@ class MemoryCoreTests:
else:
return self.width % 16
def verify_bus_side(self, addr):
yield from verify_register(self.mem_core, addr, self.model[addr])
for _ in range(4):
yield
async def verify_bus_side(self, ctx, addr):
await verify_register(self.mem_core, ctx, addr, self.model[addr])
await ctx.tick().repeat(4)
def write_bus_side(self, addr, data):
async def write_bus_side(self, ctx, addr, data):
self.model[addr] = data
yield from write_register(self.mem_core, addr, data)
for _ in range(4):
yield
await write_register(self.mem_core, addr, data)
await ctx.tick().repeat(4)
def verify_user_side(self, addr):
async def verify_user_side(self, ctx, addr):
# Determine the expected value on the user side by looking
# up the appropriate bus addresses in the model
@ -233,30 +231,29 @@ class MemoryCoreTests:
expected_data = words_to_value(bus_words)
yield self.mem_core.user_addr.eq(addr)
yield
yield
await self.mem_core.user_addr.eq(addr)
await ctx.tick().repeat(2)
data = yield (self.mem_core.user_data_out)
data = ctx.get(self.mem_core.user_data_out)
if data != expected_data:
raise ValueError(
f"Read from {addr} yielded {data} instead of {expected_data}"
)
def write_user_side(self, addr, data):
async def write_user_side(self, ctx, addr, data):
# convert value to words, and save to self.model
words = value_to_words(data, self.n_mems)
for i, word in enumerate(words):
bus_addr = self.base_addr + addr + (i * self.depth)
self.model[bus_addr] = word
yield self.mem_core.user_addr.eq(addr)
yield self.mem_core.user_data_in.eq(data)
yield self.mem_core.user_write_enable.eq(1)
yield
yield self.mem_core.user_addr.eq(0)
yield self.mem_core.user_data_in.eq(0)
yield self.mem_core.user_write_enable.eq(0)
ctx.set(self.mem_core.user_addr, addr)
ctx.set(self.mem_core.user_data_in, data)
ctx.set(self.mem_core.user_write_enable, 1)
await ctx.tick()
ctx.set(self.mem_core.user_addr, 0)
ctx.set(self.mem_core.user_data_in, 0)
ctx.set(self.mem_core.user_write_enable, 0)
modes = ["bidirectional", "fpga_to_host", "host_to_fpga"]
@ -276,22 +273,22 @@ def test_mem_core(mode, width, depth, base_addr):
tests = MemoryCoreTests(mem_core)
@simulate(mem_core)
def testbench():
async def testbench():
if mode == "bidirectional":
yield from tests.bus_addrs_all_zero()
yield from tests.user_addrs_all_zero()
await tests.bus_addrs_all_zero()
await tests.user_addrs_all_zero()
yield from tests.bus_to_bus_functionality()
yield from tests.user_to_bus_functionality()
yield from tests.bus_to_user_functionality()
yield from tests.user_to_user_functionality()
await tests.bus_to_bus_functionality()
await tests.user_to_bus_functionality()
await tests.bus_to_user_functionality()
await tests.user_to_user_functionality()
if mode == "fpga_to_host":
yield from tests.bus_addrs_all_zero()
yield from tests.user_to_bus_functionality()
await tests.bus_addrs_all_zero()
await tests.user_to_bus_functionality()
if mode == "host_to_fpga":
yield from tests.user_addrs_all_zero()
yield from tests.bus_to_user_functionality()
await tests.user_addrs_all_zero()
await tests.bus_to_user_functionality()
testbench()

View File

@ -7,31 +7,34 @@ source_bridge = UDPSourceBridge()
@simulate(source_bridge)
def test_normie_ops():
yield source_bridge.data_i.eq(0)
yield source_bridge.last_i.eq(0)
yield source_bridge.valid_i.eq(0)
yield
yield
async def test_normie_ops(ctx):
ctx.set(source_bridge.data_i, 0)
ctx.set(source_bridge.last_i, 0)
ctx.set(source_bridge.valid_i, 0)
await ctx.tick()
yield source_bridge.data_i.eq(0x0000_0001)
yield source_bridge.valid_i.eq(1)
yield
yield source_bridge.data_i.eq(0x1234_5678)
yield
yield source_bridge.valid_i.eq(0)
yield
yield
ctx.set(source_bridge.data_i, 0x0000_0001)
ctx.set(source_bridge.valid_i, 1)
await ctx.tick()
yield source_bridge.valid_i.eq(1)
yield source_bridge.data_i.eq(0x0000_0001)
yield
yield source_bridge.data_i.eq(0x90AB_CDEF)
yield
yield source_bridge.data_i.eq(0x0000_0000)
yield
yield source_bridge.data_i.eq(0x1234_5678)
yield
yield source_bridge.valid_i.eq(0)
yield
yield
ctx.set(source_bridge.data_i, 0x1234_5678)
await ctx.tick()
ctx.set(source_bridge.valid_i, 0)
await ctx.tick().repeat(2)
ctx.set(source_bridge.valid_i, 1)
ctx.set(source_bridge.data_i, 0x0000_0001)
await ctx.tick()
ctx.set(source_bridge.data_i, 0x90AB_CDEF)
await ctx.tick()
ctx.set(source_bridge.data_i, 0x0000_0000)
await ctx.tick()
ctx.set(source_bridge.data_i, 0x1234_5678)
await ctx.tick()
ctx.set(source_bridge.valid_i, 0)
await ctx.tick().repeat(2)

View File

@ -7,7 +7,7 @@ from random import sample
uart_rx = UARTReceiver(clocks_per_baud=10)
def verify_receive(data):
async def verify_receive(ctx, data):
# 8N1 serial, LSB sent first
data_bits = "0" + f"{data:08b}"[::-1] + "1"
data_bits = [int(bit) for bit in data_bits]
@ -18,9 +18,9 @@ def verify_receive(data):
bit_index = i // uart_rx._clocks_per_baud
# Every cycle, run checks on uart_rx:
if (yield uart_rx.valid_o):
if (yield uart_rx.data_o) != data:
a = yield uart_rx.data_o
if ctx.get(uart_rx.valid_o):
if ctx.get(uart_rx.data_o) != data:
a = ctx.get(uart_rx.data_o)
print(data_bits)
raise ValueError(
f"Incorrect byte presented - gave {hex(a)} instead of {hex(data)}!"
@ -36,26 +36,26 @@ def verify_receive(data):
else:
raise ValueError("Valid asserted more than once!")
yield uart_rx.rx.eq(data_bits[bit_index])
yield
ctx.set(uart_rx.rx, data_bits[bit_index])
await ctx.tick()
if not valid_asserted_before:
raise ValueError("Failed to assert valid!")
@simulate(uart_rx)
def test_all_possible_bytes():
yield uart_rx.rx.eq(1)
yield
async def test_all_possible_bytes(ctx):
ctx.set(uart_rx.rx, 1)
await ctx.tick()
for i in range(0xFF):
yield from verify_receive(i)
await verify_receive(ctx, i)
@simulate(uart_rx)
def test_bytes_random_sample():
yield uart_rx.rx.eq(1)
yield
async def test_bytes_random_sample(ctx):
ctx.set(uart_rx.rx, 1)
await ctx.tick()
for i in jumble(range(0xFF)):
yield from verify_receive(i)
await verify_receive(ctx, i)

View File

@ -7,18 +7,19 @@ from random import sample
uart_tx = UARTTransmitter(clocks_per_baud=10)
def verify_bit_sequence(byte):
async def verify_bit_sequence(ctx, byte):
"""
Request a byte to be transmitted, and verify that the sequence of bits is correct.
"""
# Request byte to be transmitted
yield uart_tx.data_i.eq(byte)
yield uart_tx.start_i.eq(1)
yield
yield uart_tx.data_i.eq(0)
yield uart_tx.start_i.eq(0)
yield
ctx.set(uart_tx.data_i, byte)
ctx.set(uart_tx.start_i, 1)
await ctx.tick()
ctx.set(uart_tx.data_i, 0)
ctx.set(uart_tx.start_i, 0)
await ctx.tick()
# Check that data bit is correct on every clock baud period
@ -29,25 +30,25 @@ def verify_bit_sequence(byte):
for i in range(10 * uart_tx._clocks_per_baud):
bit_index = i // uart_tx._clocks_per_baud
if (yield uart_tx.tx) != data_bits[bit_index]:
if ctx.get(uart_tx.tx) != data_bits[bit_index]:
raise ValueError("Wrong bit in sequence!")
if (yield uart_tx.done_o) and (bit_index != 9):
if ctx.get(uart_tx.done_o) and (bit_index != 9):
raise ValueError("Done asserted too early!")
yield
await ctx.tick()
if not (yield uart_tx.done_o):
if not ctx.get(uart_tx.done_o):
raise ValueError("Done not asserted at end of transmission!")
@simulate(uart_tx)
def test_all_possible_bytes():
async def test_all_possible_bytes(ctx):
for i in range(0xFF):
yield from verify_bit_sequence(i)
await verify_bit_sequence(ctx, i)
@simulate(uart_tx)
def test_bytes_random_sample():
async def test_bytes_random_sample(ctx):
for i in jumble(range(0xFF)):
yield from verify_bit_sequence(i)
await verify_bit_sequence(ctx, i)