add simulate decorator

This commit is contained in:
Fischer Moseley 2024-03-03 02:14:12 -08:00
parent 2e2397013e
commit e2d52a6e2d
11 changed files with 579 additions and 426 deletions

View File

@ -22,8 +22,8 @@ class MemoryCore(Elaboratable):
self._base_addr = base_addr
self._interface = interface
self._n_brams = ceil(self._width / 16)
self._max_addr = self._base_addr + (self._depth * self._n_brams)
self._n_mems = ceil(self._width / 16)
self._max_addr = self._base_addr + (self._depth * self._n_mems)
# Bus Connections
self.bus_i = Signal(InternalBus())
@ -106,23 +106,43 @@ class MemoryCore(Elaboratable):
start_addr = self._base_addr + (i * self._depth)
stop_addr = start_addr + self._depth - 1
# Handle write ports
if self._mode in ["host_to_fpga", "bidirectional"]:
pass
# write_port = mem.write_port()
# m.d.sync += write_port.data.eq(self.bus_i.data)
# m.d.sync += write_port.en.eq(self.bus_i.rw)
# m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr)
# # Handle write ports
# if self._mode in ["host_to_fpga", "bidirectional"]:
# write_port = mem.write_port()
# m.d.sync += write_port.data.eq(self.bus_i.data)
# m.d.sync += write_port.en.eq(self.bus_i.rw)
# m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr)
# Handle read ports
if self._mode in ["fpga_to_host", "bidirectional"]:
# # Handle read ports
# if self._mode in ["fpga_to_host", "bidirectional"]:
# read_port = mem.read_port()
# m.d.comb += read_port.en.eq(1)
# # Throw BRAM operations into the front of the pipeline
# with m.If(
# (self.bus_i.valid)
# & (~self.bus_i.rw)
# & (self.bus_i.addr >= start_addr)
# & (self.bus_i.addr <= stop_addr)
# ):
# m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr)
# # Pull BRAM reads from the back of the pipeline
# with m.If(
# (self._bus_pipe[2].valid)
# & (~self._bus_pipe[2].rw)
# & (self._bus_pipe[2].addr >= start_addr)
# & (self._bus_pipe[2].addr <= stop_addr)
# ):
# m.d.sync += self.bus_o.data.eq(read_port.data)
if self._mode == "fpga_to_host":
read_port = mem.read_port()
m.d.comb += read_port.en.eq(1)
# Throw BRAM operations into the front of the pipeline
with m.If(
(self.bus_i.valid)
& (~self.bus_i.rw)
& (self.bus_i.addr >= start_addr)
& (self.bus_i.addr <= stop_addr)
):
@ -137,6 +157,47 @@ class MemoryCore(Elaboratable):
):
m.d.sync += self.bus_o.data.eq(read_port.data)
elif self._mode == "host_to_fpga":
write_port = mem.write_port()
m.d.sync += write_port.en.eq(0)
# Throw BRAM operations into the front of the pipeline
with m.If(
(self.bus_i.valid)
& (self.bus_i.addr >= start_addr)
& (self.bus_i.addr <= stop_addr)
):
m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr)
m.d.sync += write_port.data.eq(self.bus_i.data)
m.d.sync += write_port.en.eq(self.bus_i.rw)
elif self._mode == "bidirectional":
read_port = mem.read_port()
m.d.comb += read_port.en.eq(1)
write_port = mem.write_port()
m.d.sync += write_port.en.eq(0)
# Throw BRAM operations into the front of the pipeline
with m.If(
(self.bus_i.valid)
& (self.bus_i.addr >= start_addr)
& (self.bus_i.addr <= stop_addr)
):
m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr)
m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr)
m.d.sync += write_port.data.eq(self.bus_i.data)
m.d.sync += write_port.en.eq(self.bus_i.rw)
# Pull BRAM reads from the back of the pipeline
with m.If(
(self._bus_pipe[2].valid)
& (~self._bus_pipe[2].rw)
& (self._bus_pipe[2].addr >= start_addr)
& (self._bus_pipe[2].addr <= stop_addr)
):
m.d.sync += self.bus_o.data.eq(read_port.data)
def _tie_mems_to_user_logic(self, m):
# Handle write ports
if self._mode in ["fpga_to_host", "bidirectional"]:
@ -238,7 +299,7 @@ class MemoryCore(Elaboratable):
bus_addrs = self._convert_user_to_bus_addr(addrs)
datas = self._interface.read(bus_addrs)
data_chunks = split_into_chunks(datas, self._n_brams)
data_chunks = split_into_chunks(datas, self._n_mems)
return [words_to_value(chunk) for chunk in data_chunks]
def write(self, addrs, datas):
@ -264,5 +325,5 @@ class MemoryCore(Elaboratable):
raise TypeError("Write data must all be integers.")
bus_addrs = self._convert_user_to_bus_addr([addrs])[0]
bus_datas = [word for d in datas for word in value_to_words(d, self._n_brams)]
bus_datas = [word for d in datas for word in value_to_words(d, self._n_mems)]
self._interface.write(bus_addrs, bus_datas)

View File

@ -83,22 +83,41 @@ def split_into_chunks(data, chunk_size):
return [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
def simulate(top, testbench, vcd_path=None):
def simulate(top):
"""
Run a behavior simulation using Amaranth's built-in simulator `pysim`. Takes
the top-level module to simulate, the testbench process to run, and an optional
path to export a VCD file to.
A decorator for running behavioral simulation using Amaranth's built-in
simulator. Requires the top-level module in the simulation as an argument,
and automatically names VCD file containing the waveform dump with the name
of the function being decorated.
"""
sim = Simulator(top)
sim.add_clock(1e-6) # 1 MHz
sim.add_sync_process(testbench)
if vcd_path is None:
sim.run()
def decorator(testbench):
def wrapper(*args, **kwargs):
sim = Simulator(top)
sim.add_clock(1e-6) # 1 MHz
sim.add_sync_process(testbench)
else:
with sim.write_vcd(vcd_path):
sim.run()
vcd_path = testbench.__name__ + ".vcd"
with sim.write_vcd(vcd_path):
sim.run()
return wrapper
return decorator
# def simulate_decorator(testbench):
# def wrapper_accepting_arguments(top):
# sim = Simulator(top)
# sim.add_clock(1e-6) # 1 MHz
# sim.add_sync_process(testbench)
# vcd_path = testbench.__name__ + ".vcd"
# with sim.write_vcd(vcd_path):
# sim.run()
# return wrapper_accepting_arguments
def verify_register(module, addr, expected_data):
@ -143,7 +162,10 @@ def write_register(module, addr, data):
yield module.bus_i.rw.eq(1)
yield module.bus_i.valid.eq(1)
yield
yield module.bus_i.addr.eq(0)
yield module.bus_i.data.eq(0)
yield module.bus_i.valid.eq(0)
yield module.bus_i.rw.eq(0)
yield

View File

@ -86,35 +86,29 @@ def verify_bad_bytes(bytes):
yield bridge_rx.valid_i.eq(0)
@simulate(bridge_rx)
def test_read_decode():
def testbench():
yield from verify_read_decoding(b"R0000\r\n", 0x0000)
yield from verify_read_decoding(b"R1234\r\n", 0x1234)
yield from verify_read_decoding(b"RBABE\r\n", 0xBABE)
yield from verify_read_decoding(b"R5678\n", 0x5678)
yield from verify_read_decoding(b"R9ABC\r", 0x9ABC)
simulate(bridge_rx, testbench)
yield from verify_read_decoding(b"R0000\r\n", 0x0000)
yield from verify_read_decoding(b"R1234\r\n", 0x1234)
yield from verify_read_decoding(b"RBABE\r\n", 0xBABE)
yield from verify_read_decoding(b"R5678\n", 0x5678)
yield from verify_read_decoding(b"R9ABC\r", 0x9ABC)
@simulate(bridge_rx)
def test_write_decode():
def testbench():
yield from verify_write_decoding(b"W12345678\r\n", 0x1234, 0x5678)
yield from verify_write_decoding(b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF)
yield from verify_write_decoding(b"WDEADBEEF\r", 0xDEAD, 0xBEEF)
yield from verify_write_decoding(b"WB0BACAFE\n", 0xB0BA, 0xCAFE)
simulate(bridge_rx, testbench)
yield from verify_write_decoding(b"W12345678\r\n", 0x1234, 0x5678)
yield from verify_write_decoding(b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF)
yield from verify_write_decoding(b"WDEADBEEF\r", 0xDEAD, 0xBEEF)
yield from verify_write_decoding(b"WB0BACAFE\n", 0xB0BA, 0xCAFE)
@simulate(bridge_rx)
def test_no_decode():
def testbench():
yield from verify_bad_bytes(b"RABC\r\n")
yield from verify_bad_bytes(b"R12345\r\n")
yield from verify_bad_bytes(b"M\r\n")
yield from verify_bad_bytes(b"W123456789101112131415161718191201222\r\n")
yield from verify_bad_bytes(b"RABCG\r\n")
yield from verify_bad_bytes(b"WABC[]()##*@\r\n")
yield from verify_bad_bytes(b"R\r\n")
simulate(bridge_rx, testbench)
yield from verify_bad_bytes(b"RABC\r\n")
yield from verify_bad_bytes(b"R12345\r\n")
yield from verify_bad_bytes(b"M\r\n")
yield from verify_bad_bytes(b"W123456789101112131415161718191201222\r\n")
yield from verify_bad_bytes(b"RABCG\r\n")
yield from verify_bad_bytes(b"WABC[]()##*@\r\n")
yield from verify_bad_bytes(b"R\r\n")

View File

@ -57,11 +57,9 @@ def verify_encoding(data, bytes):
raise ValueError(f"Received {sent_bytes} instead of {bytes}.")
@simulate(bridge_tx)
def test_some_random_values():
def testbench():
for i in sample(range(0xFFFF), k=5000):
expected = f"D{i:04X}\r\n".encode("ascii")
print(i)
yield from verify_encoding(i, expected)
simulate(bridge_tx, testbench)
for i in sample(range(0xFFFF), k=5000):
expected = f"D{i:04X}\r\n".encode("ascii")
print(i)
yield from verify_encoding(i, expected)

View File

@ -26,92 +26,82 @@ def pulse_strobe_register():
yield from write_register(io_core, strobe_addr, 0)
@simulate(io_core)
def test_input_probe_buffer_initial_value():
def testbench():
# Verify all input probe buffers initialize to zero
for i in inputs:
addrs = io_core._memory_map[i.name]["addrs"]
# Verify all input probe buffers initialize to zero
for i in inputs:
addrs = io_core._memory_map[i.name]["addrs"]
for addr in addrs:
yield from verify_register(io_core, addr, 0)
simulate(io_core, testbench)
for addr in addrs:
yield from verify_register(io_core, addr, 0)
@simulate(io_core)
def test_output_probe_buffer_initial_value():
def testbench():
# Verify all output probe buffers initialize to the values in the config
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
datas = value_to_words(o.reset, len(addrs))
# Verify all output probe buffers initialize to the values in the config
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
datas = value_to_words(o.reset, len(addrs))
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
simulate(io_core, testbench)
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
@simulate(io_core)
def test_output_probes_are_writeable():
def testbench():
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = randint(0, (2**o.width) - 1)
datas = value_to_words(test_value, len(addrs))
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = randint(0, (2**o.width) - 1)
datas = value_to_words(test_value, len(addrs))
# write value to registers
for addr, data in zip(addrs, datas):
yield from write_register(io_core, addr, data)
# write value to registers
for addr, data in zip(addrs, datas):
yield from write_register(io_core, addr, data)
# read value back from registers
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
simulate(io_core, testbench)
# read value back from registers
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
@simulate(io_core)
def test_output_probes_update():
def testbench():
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = randint(0, (2**o.width) - 1)
datas = value_to_words(test_value, len(addrs))
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = randint(0, (2**o.width) - 1)
datas = value_to_words(test_value, len(addrs))
# write value to registers
for addr, data in zip(addrs, datas):
yield from write_register(io_core, addr, data)
# write value to registers
for addr, data in zip(addrs, datas):
yield from write_register(io_core, addr, data)
# pulse strobe register
yield from pulse_strobe_register()
# pulse strobe register
yield from pulse_strobe_register()
# check that outputs took updated value
value = yield (o)
# check that outputs took updated value
value = yield (o)
if value != test_value:
raise ValueError(
f"Output probe {o.name} took value {value} instead of {test_value} after pulsing strobe."
)
if value != test_value:
raise ValueError(
f"Output probe {o.name} took value {value} instead of {test_value} after pulsing strobe."
)
else:
print(f"Output probe {o.name} took value {value} after pulsing strobe.")
simulate(io_core, testbench)
else:
print(f"Output probe {o.name} took value {value} after pulsing strobe.")
@simulate(io_core)
def test_input_probes_update():
def testbench():
for i in inputs:
# set input probe value
test_value = randint(0, (2**i.width) - 1)
yield i.eq(test_value)
for i in inputs:
# set input probe value
test_value = randint(0, (2**i.width) - 1)
yield i.eq(test_value)
# pulse strobe register
yield from pulse_strobe_register()
# pulse strobe register
yield from pulse_strobe_register()
# check that values are as expected once read back
addrs = io_core._memory_map[i.name]["addrs"]
datas = value_to_words(test_value, len(addrs))
# check that values are as expected once read back
addrs = io_core._memory_map[i.name]["addrs"]
datas = value_to_words(test_value, len(addrs))
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
simulate(io_core, testbench)
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)

View File

@ -6,244 +6,230 @@ config = {"sample_depth": 8}
fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
@simulate(fsm)
def test_signals_reset_correctly():
def testbench():
# Make sure pointers and write enable reset to zero
for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]:
if (yield sig) != 0:
raise ValueError
# Make sure state resets to IDLE
if (yield fsm.state != States.IDLE):
# Make sure pointers and write enable reset to zero
for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]:
if (yield sig) != 0:
raise ValueError
simulate(fsm, testbench)
# Make sure state resets to IDLE
if (yield fsm.state != States.IDLE):
raise ValueError
@simulate(fsm)
def test_single_shot_no_wait_for_trigger():
def testbench():
# Configure and start FSM
yield fsm.trigger.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
# Configure and start FSM
yield fsm.trigger.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
# Wait until write_enable is asserted
while not (yield fsm.write_enable):
yield
# Wait 8 clock cycles for capture to complete
for i in range(8):
# Make sure that read_pointer does not increase
if (yield fsm.read_pointer) != 0:
raise ValueError
# Make sure that write_pointer increases by one each cycle
if (yield fsm.write_pointer) != i:
raise ValueError
yield
# Wait one clock cycle (to let BRAM contents cycle in)
# Wait until write_enable is asserted
while not (yield fsm.write_enable):
yield
# Check that write_pointer points to the end of memory
if (yield fsm.write_pointer) != 7:
# Wait 8 clock cycles for capture to complete
for i in range(8):
# Make sure that read_pointer does not increase
if (yield fsm.read_pointer) != 0:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
# Make sure that write_pointer increases by one each cycle
if (yield fsm.write_pointer) != i:
raise ValueError
simulate(fsm, testbench, "single_shot_no_wait_for_trigger.vcd")
yield
# Wait one clock cycle (to let BRAM contents cycle in)
yield
# Check that write_pointer points to the end of memory
if (yield fsm.write_pointer) != 7:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
raise ValueError
@simulate(fsm)
def test_single_shot_wait_for_trigger():
def testbench():
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
yield
yield
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
yield
yield
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
raise ValueError
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
raise ValueError
# Wait 4 clock cycles to get to IN_POSITION
for i in range(4):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
# Make sure that read_pointer does not increase
if rp != 0:
raise ValueError
# Make sure that write_pointer increases by one each cycle
if wp != i:
raise ValueError
yield
# Wait a few cycles before triggering
for _ in range(10):
yield
# Provide the trigger, and check that the capture completes 4 cycles later
yield fsm.trigger.eq(1)
yield
for i in range(4):
yield
# Wait one clock cycle (to let BRAM contents cycle in)
yield
# Check that write_pointer points to the end of memory
# Wait 4 clock cycles to get to IN_POSITION
for i in range(4):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if (wp + 1) % config["sample_depth"] != rp:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "single_shot_wait_for_trigger.vcd")
def test_immediate():
def testbench():
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.request_start.eq(1)
yield
yield
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
raise ValueError
for i in range(config["sample_depth"]):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if rp != 0:
raise ValueError
if wp != i:
raise ValueError
yield
# Wait one clock cycle (to let BRAM contents cycle in)
yield
# Check that write_pointer points to the end of memory
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
# Make sure that read_pointer does not increase
if rp != 0:
raise ValueError
if wp != 7:
# Make sure that write_pointer increases by one each cycle
if wp != i:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
yield
# Wait a few cycles before triggering
for _ in range(10):
yield
# Provide the trigger, and check that the capture completes 4 cycles later
yield fsm.trigger.eq(1)
yield
for i in range(4):
yield
# Wait one clock cycle (to let BRAM contents cycle in)
yield
# Check that write_pointer points to the end of memory
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if (wp + 1) % config["sample_depth"] != rp:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
raise ValueError
@simulate(fsm)
def test_immediate():
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.request_start.eq(1)
yield
yield
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
raise ValueError
for i in range(config["sample_depth"]):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if rp != 0:
raise ValueError
simulate(fsm, testbench, "immediate.vcd")
if wp != i:
raise ValueError
yield
# Wait one clock cycle (to let BRAM contents cycle in)
yield
# Check that write_pointer points to the end of memory
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if rp != 0:
raise ValueError
if wp != 7:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
raise ValueError
@simulate(fsm)
def test_incremental():
def testbench():
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL)
yield fsm.request_start.eq(1)
yield
yield
# Configure and start FSM
yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL)
yield fsm.request_start.eq(1)
yield
yield
# Check that write_enable is asserted on the same edge as request_start
if not (yield fsm.write_enable):
raise ValueError
for _ in range(10):
for _ in range(3):
yield
yield fsm.trigger.eq(1)
yield
yield fsm.trigger.eq(0)
yield
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "incremental.vcd")
def test_single_shot_write_enable():
def testbench():
# Configure FSM
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield
# Make sure write is not enabled before starting the FSM
if (yield fsm.write_enable):
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.request_start.eq(1)
yield
yield
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError
# Check that write_enable is asserted on the same edge as request_start
if not (yield fsm.write_enable):
raise ValueError
for _ in range(10):
for _ in range(3):
yield
yield fsm.trigger.eq(1)
yield
yield fsm.trigger.eq(0)
yield
for _ in range(4):
if not (yield fsm.write_enable):
raise ValueError
# Check that state is CAPTURED
if (yield fsm.state) != States.CAPTURED:
raise ValueError
yield
# Make sure write_enable is deasserted after
if (yield fsm.write_enable):
@simulate(fsm)
def test_single_shot_write_enable():
# Configure FSM
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield
# Make sure write is not enabled before starting the FSM
if (yield fsm.write_enable):
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.request_start.eq(1)
yield
yield
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError
simulate(fsm, testbench, "single_shot_write_enable.vcd")
yield
yield fsm.trigger.eq(1)
yield
for _ in range(4):
if not (yield fsm.write_enable):
raise ValueError
yield
# Make sure write_enable is deasserted after
if (yield fsm.write_enable):
raise ValueError
@simulate(fsm)
def test_immediate_write_enable():
def testbench():
# Configure FSM
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield
# Configure FSM
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield
# Make sure write is not enabled before starting the FSM
if (yield fsm.write_enable):
# Make sure write is not enabled before starting the FSM
if (yield fsm.write_enable):
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.request_start.eq(1)
yield
yield
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.request_start.eq(1)
yield
yield
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError
yield
# Make sure write_enable is deasserted after
if (yield fsm.write_enable):
raise ValueError
simulate(fsm, testbench, "immediate_write_enable.vcd")
# Make sure write_enable is deasserted after
if (yield fsm.write_enable):
raise ValueError

View File

@ -61,42 +61,40 @@ def set_probe(name, value):
yield probe.eq(value)
@simulate(la)
def test_single_shot_capture():
def testbench():
# # ok nice what happens if we try to run the core, which includes:
yield from set_fsm_register("request_stop", 1)
yield from set_fsm_register("request_stop", 0)
# # ok nice what happens if we try to run the core, which includes:
yield from set_fsm_register("request_stop", 1)
yield from set_fsm_register("request_stop", 0)
# setting triggers
yield from set_trig_blk_register("curly_op", Operations.EQ)
yield from set_trig_blk_register("curly_arg", 4)
# setting triggers
yield from set_trig_blk_register("curly_op", Operations.EQ)
yield from set_trig_blk_register("curly_arg", 4)
# setting trigger mode
yield from set_fsm_register("trigger_mode", 0)
# setting trigger mode
yield from set_fsm_register("trigger_mode", 0)
# setting trigger location
yield from set_fsm_register("trigger_location", 511)
# setting trigger location
yield from set_fsm_register("trigger_location", 511)
# starting capture
yield from set_fsm_register("request_start", 1)
yield from set_fsm_register("request_start", 0)
# starting capture
yield from set_fsm_register("request_start", 1)
yield from set_fsm_register("request_start", 0)
# wait a few hundred clock cycles, see what happens
for _ in range(700):
yield
# wait a few hundred clock cycles, see what happens
for _ in range(700):
yield
# provide the trigger condition
yield from set_probe("curly", 4)
# provide the trigger condition
yield from set_probe("curly", 4)
for _ in range(700):
yield
for _ in range(700):
yield
# dump sample memory contents
yield from write_register(la, 0, 0)
yield from write_register(la, 0, 1)
yield from write_register(la, 0, 0)
# dump sample memory contents
yield from write_register(la, 0, 0)
yield from write_register(la, 0, 1)
yield from write_register(la, 0, 0)
for addr in range(la.get_max_addr()):
yield from print_data_at_addr(addr)
simulate(la, testbench, "la_core.vcd")
for addr in range(la.get_max_addr()):
yield from print_data_at_addr(addr)

View File

@ -1,44 +1,158 @@
from manta.memory_core import MemoryCore
from manta.utils import *
from random import randint, sample
from random import randint, sample, choice
width = 18
depth = 512
base_addr = 0
mem_core = MemoryCore(
mode="bidirectional", width=width, depth=depth, base_addr=base_addr, interface=None
)
max_addr = mem_core.get_max_addr()
bus_addrs = list(range(base_addr, max_addr)) # include the endpoint!
user_addrs = list(range(depth))
def fill_mem_from_user_port(mem_core, depth):
for i in range(depth):
yield mem_core.user_addr.eq(i)
yield mem_core.user_data_in.eq(i)
yield mem_core.user_write_enable.eq(1)
@simulate(mem_core)
def test_bidirectional():
# make sure each address on the bus side contains zero
for addr in bus_addrs:
yield from verify_register(mem_core, addr, 0)
# make sure each address on the user side contains zero
for addr in user_addrs:
yield from verify_user_side(mem_core, addr, 0)
# write then immediately read
for addr in bus_addrs:
# this part is a little hard to check since we might have a
# memory at the end of the address space that's less than
# 16-bits wide. so we'll have to calculate how wide our
# memory is
n_full = width // 16
if addr < base_addr + (n_full * depth):
data_width = 16
else:
data_width = width % 16
data = randint(0, (2**data_width) - 1)
yield from write_register(mem_core, addr, data)
yield
yield
yield
yield
yield from verify_register(mem_core, addr, data)
yield
yield
yield
yield
yield
# write-write-write then read-read-read
model = {}
for addr in sample(bus_addrs, len(bus_addrs)):
n_full = width // 16
if addr < base_addr + (n_full * depth):
data_width = 16
else:
data_width = width % 16
data = randint(0, (2**data_width) - 1)
model[addr] = data
yield from write_register(mem_core, addr, data)
yield
yield
yield
yield
for addr in sample(bus_addrs, len(bus_addrs)):
yield from verify_register(mem_core, addr, model[addr])
yield
yield
yield
yield
# random reads and writes in random orders
for _ in range(5):
for addr in sample(bus_addrs, len(bus_addrs)):
operation = choice(["read", "write"])
if operation == "read":
yield from verify_register(mem_core, addr, model[addr])
yield
yield
yield
yield
yield
yield
elif operation == "write":
n_full = width // 16
if addr < base_addr + (n_full * depth):
data_width = 16
else:
data_width = width % 16
data = randint(0, (2**data_width) - 1)
model[addr] = data
yield from write_register(mem_core, addr, data)
yield
yield
yield
yield
yield
yield
def verify_user_side(mem_core, addr, expected_data):
yield mem_core.user_addr.eq(addr)
yield mem_core.user_write_enable.eq(0)
yield
def verify_mem_core(width, depth, base_addr):
mem_core = MemoryCore("fpga_to_host", width, depth, base_addr, interface=None)
def testbench():
yield from fill_mem_from_user_port(mem_core, depth)
# Read from address sequentially
for i in range(depth):
yield from verify_register(mem_core, i + base_addr, i % (2**width))
# Read from addresses randomly
for i in sample(range(depth), k=depth):
yield from verify_register(mem_core, i + base_addr, i % (2**width))
simulate(mem_core, testbench)
data = yield (mem_core.user_data_out)
if data != expected_data:
raise ValueError(f"Read from {addr} yielded {data} instead of {expected_data}")
def test_sweep_core_widths():
for i in range(1, 64):
verify_mem_core(i, 128, 0)
# def fill_mem_from_user_port(mem_core, depth):
# for i in range(depth):
# yield mem_core.user_addr.eq(i)
# yield mem_core.user_data_in.eq(i)
# yield mem_core.user_write_enable.eq(1)
# yield
# yield mem_core.user_write_enable.eq(0)
# yield
def test_random_cores():
for _ in range(5):
width = randint(0, 512)
depth = randint(0, 1024)
base_addr = randint(0, 2**16 - 1 - depth)
verify_mem_core(width, depth, base_addr)
# def verify_mem_core(width, depth, base_addr):
# mem_core = MemoryCore("bidirectional", width, depth, base_addr, interface=None)
# def testbench():
# yield from fill_mem_from_user_port(mem_core, depth)
# # Read from address sequentially
# for i in range(depth):
# yield from verify_register(mem_core, i + base_addr, i % (2**width))
# # Read from addresses randomly
# for i in sample(range(depth), k=depth):
# yield from verify_register(mem_core, i + base_addr, i % (2**width))
# simulate(mem_core, testbench)
# def test_sweep_core_widths():
# for i in range(1, 64):
# verify_mem_core(i, 128, 0)
# def test_random_cores():
# for _ in range(5):
# width = randint(0, 512)
# depth = randint(0, 1024)
# base_addr = randint(0, 2**16 - 1 - depth)
# verify_mem_core(width, depth, base_addr)

View File

@ -6,34 +6,32 @@ from manta.utils import *
source_bridge = UDPSourceBridge()
@simulate(source_bridge)
def test_normie_ops():
def testbench():
yield source_bridge.data_i.eq(0)
yield source_bridge.last_i.eq(0)
yield source_bridge.valid_i.eq(0)
yield
yield
yield source_bridge.data_i.eq(0)
yield source_bridge.last_i.eq(0)
yield source_bridge.valid_i.eq(0)
yield
yield
yield source_bridge.data_i.eq(0x0000_0001)
yield source_bridge.valid_i.eq(1)
yield
yield source_bridge.data_i.eq(0x1234_5678)
yield
yield source_bridge.valid_i.eq(0)
yield
yield
yield source_bridge.data_i.eq(0x0000_0001)
yield source_bridge.valid_i.eq(1)
yield
yield source_bridge.data_i.eq(0x1234_5678)
yield
yield source_bridge.valid_i.eq(0)
yield
yield
yield source_bridge.valid_i.eq(1)
yield source_bridge.data_i.eq(0x0000_0001)
yield
yield source_bridge.data_i.eq(0x90AB_CDEF)
yield
yield source_bridge.data_i.eq(0x0000_0000)
yield
yield source_bridge.data_i.eq(0x1234_5678)
yield
yield source_bridge.valid_i.eq(0)
yield
yield
simulate(source_bridge, testbench, "source_bridge.vcd")
yield source_bridge.valid_i.eq(1)
yield source_bridge.data_i.eq(0x0000_0001)
yield
yield source_bridge.data_i.eq(0x90AB_CDEF)
yield
yield source_bridge.data_i.eq(0x0000_0000)
yield
yield source_bridge.data_i.eq(0x1234_5678)
yield
yield source_bridge.valid_i.eq(0)
yield
yield

View File

@ -43,23 +43,19 @@ def verify_receive(data):
raise ValueError("Failed to assert valid!")
@simulate(uart_rx)
def test_all_possible_bytes():
def testbench():
yield uart_rx.rx.eq(1)
yield
yield uart_rx.rx.eq(1)
yield
for i in range(0xFF):
yield from verify_receive(i)
simulate(uart_rx, testbench)
for i in range(0xFF):
yield from verify_receive(i)
@simulate(uart_rx)
def test_bytes_random_sample():
def testbench():
yield uart_rx.rx.eq(1)
yield
yield uart_rx.rx.eq(1)
yield
for i in sample(range(0xFF), k=0xFF):
yield from verify_receive(i)
simulate(uart_rx, testbench)
for i in sample(range(0xFF), k=0xFF):
yield from verify_receive(i)

View File

@ -41,17 +41,13 @@ def verify_bit_sequence(byte):
raise ValueError("Done not asserted at end of transmission!")
@simulate(uart_tx)
def test_all_possible_bytes():
def testbench():
for i in range(0xFF):
yield from verify_bit_sequence(i)
simulate(uart_tx, testbench)
for i in range(0xFF):
yield from verify_bit_sequence(i)
@simulate(uart_tx)
def test_bytes_random_sample():
def testbench():
for i in sample(range(0xFF), k=0xFF):
yield from verify_bit_sequence(i)
simulate(uart_tx, testbench)
for i in sample(range(0xFF), k=0xFF):
yield from verify_bit_sequence(i)