diff --git a/src/manta/memory_core.py b/src/manta/memory_core.py index e88a9ec..7e8e0a5 100644 --- a/src/manta/memory_core.py +++ b/src/manta/memory_core.py @@ -22,8 +22,8 @@ class MemoryCore(Elaboratable): self._base_addr = base_addr self._interface = interface - self._n_brams = ceil(self._width / 16) - self._max_addr = self._base_addr + (self._depth * self._n_brams) + self._n_mems = ceil(self._width / 16) + self._max_addr = self._base_addr + (self._depth * self._n_mems) # Bus Connections self.bus_i = Signal(InternalBus()) @@ -106,23 +106,43 @@ class MemoryCore(Elaboratable): start_addr = self._base_addr + (i * self._depth) stop_addr = start_addr + self._depth - 1 - # Handle write ports - if self._mode in ["host_to_fpga", "bidirectional"]: - pass - # write_port = mem.write_port() - # m.d.sync += write_port.data.eq(self.bus_i.data) - # m.d.sync += write_port.en.eq(self.bus_i.rw) - # m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr) + # # Handle write ports + # if self._mode in ["host_to_fpga", "bidirectional"]: + # write_port = mem.write_port() + # m.d.sync += write_port.data.eq(self.bus_i.data) + # m.d.sync += write_port.en.eq(self.bus_i.rw) + # m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr) - # Handle read ports - if self._mode in ["fpga_to_host", "bidirectional"]: + # # Handle read ports + # if self._mode in ["fpga_to_host", "bidirectional"]: + # read_port = mem.read_port() + # m.d.comb += read_port.en.eq(1) + + # # Throw BRAM operations into the front of the pipeline + # with m.If( + # (self.bus_i.valid) + # & (~self.bus_i.rw) + # & (self.bus_i.addr >= start_addr) + # & (self.bus_i.addr <= stop_addr) + # ): + # m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr) + + # # Pull BRAM reads from the back of the pipeline + # with m.If( + # (self._bus_pipe[2].valid) + # & (~self._bus_pipe[2].rw) + # & (self._bus_pipe[2].addr >= start_addr) + # & (self._bus_pipe[2].addr <= stop_addr) + # ): + # m.d.sync += self.bus_o.data.eq(read_port.data) + + if self._mode == "fpga_to_host": read_port = mem.read_port() m.d.comb += read_port.en.eq(1) # Throw BRAM operations into the front of the pipeline with m.If( (self.bus_i.valid) - & (~self.bus_i.rw) & (self.bus_i.addr >= start_addr) & (self.bus_i.addr <= stop_addr) ): @@ -137,6 +157,47 @@ class MemoryCore(Elaboratable): ): m.d.sync += self.bus_o.data.eq(read_port.data) + elif self._mode == "host_to_fpga": + write_port = mem.write_port() + m.d.sync += write_port.en.eq(0) + + # Throw BRAM operations into the front of the pipeline + with m.If( + (self.bus_i.valid) + & (self.bus_i.addr >= start_addr) + & (self.bus_i.addr <= stop_addr) + ): + m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr) + m.d.sync += write_port.data.eq(self.bus_i.data) + m.d.sync += write_port.en.eq(self.bus_i.rw) + + elif self._mode == "bidirectional": + read_port = mem.read_port() + m.d.comb += read_port.en.eq(1) + + write_port = mem.write_port() + m.d.sync += write_port.en.eq(0) + + # Throw BRAM operations into the front of the pipeline + with m.If( + (self.bus_i.valid) + & (self.bus_i.addr >= start_addr) + & (self.bus_i.addr <= stop_addr) + ): + m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr) + m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr) + m.d.sync += write_port.data.eq(self.bus_i.data) + m.d.sync += write_port.en.eq(self.bus_i.rw) + + # Pull BRAM reads from the back of the pipeline + with m.If( + (self._bus_pipe[2].valid) + & (~self._bus_pipe[2].rw) + & (self._bus_pipe[2].addr >= start_addr) + & (self._bus_pipe[2].addr <= stop_addr) + ): + m.d.sync += self.bus_o.data.eq(read_port.data) + def _tie_mems_to_user_logic(self, m): # Handle write ports if self._mode in ["fpga_to_host", "bidirectional"]: @@ -238,7 +299,7 @@ class MemoryCore(Elaboratable): bus_addrs = self._convert_user_to_bus_addr(addrs) datas = self._interface.read(bus_addrs) - data_chunks = split_into_chunks(datas, self._n_brams) + data_chunks = split_into_chunks(datas, self._n_mems) return [words_to_value(chunk) for chunk in data_chunks] def write(self, addrs, datas): @@ -264,5 +325,5 @@ class MemoryCore(Elaboratable): raise TypeError("Write data must all be integers.") bus_addrs = self._convert_user_to_bus_addr([addrs])[0] - bus_datas = [word for d in datas for word in value_to_words(d, self._n_brams)] + bus_datas = [word for d in datas for word in value_to_words(d, self._n_mems)] self._interface.write(bus_addrs, bus_datas) diff --git a/src/manta/utils.py b/src/manta/utils.py index 072ceb5..20dc6bc 100644 --- a/src/manta/utils.py +++ b/src/manta/utils.py @@ -83,22 +83,41 @@ def split_into_chunks(data, chunk_size): return [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] -def simulate(top, testbench, vcd_path=None): +def simulate(top): """ - Run a behavior simulation using Amaranth's built-in simulator `pysim`. Takes - the top-level module to simulate, the testbench process to run, and an optional - path to export a VCD file to. + A decorator for running behavioral simulation using Amaranth's built-in + simulator. Requires the top-level module in the simulation as an argument, + and automatically names VCD file containing the waveform dump with the name + of the function being decorated. """ - sim = Simulator(top) - sim.add_clock(1e-6) # 1 MHz - sim.add_sync_process(testbench) - if vcd_path is None: - sim.run() + def decorator(testbench): + def wrapper(*args, **kwargs): + sim = Simulator(top) + sim.add_clock(1e-6) # 1 MHz + sim.add_sync_process(testbench) - else: - with sim.write_vcd(vcd_path): - sim.run() + vcd_path = testbench.__name__ + ".vcd" + + with sim.write_vcd(vcd_path): + sim.run() + + return wrapper + + return decorator + + +# def simulate_decorator(testbench): +# def wrapper_accepting_arguments(top): +# sim = Simulator(top) +# sim.add_clock(1e-6) # 1 MHz +# sim.add_sync_process(testbench) + +# vcd_path = testbench.__name__ + ".vcd" +# with sim.write_vcd(vcd_path): +# sim.run() + +# return wrapper_accepting_arguments def verify_register(module, addr, expected_data): @@ -143,7 +162,10 @@ def write_register(module, addr, data): yield module.bus_i.rw.eq(1) yield module.bus_i.valid.eq(1) yield + yield module.bus_i.addr.eq(0) + yield module.bus_i.data.eq(0) yield module.bus_i.valid.eq(0) + yield module.bus_i.rw.eq(0) yield diff --git a/test/test_bridge_rx_sim.py b/test/test_bridge_rx_sim.py index c65a184..403f614 100644 --- a/test/test_bridge_rx_sim.py +++ b/test/test_bridge_rx_sim.py @@ -86,35 +86,29 @@ def verify_bad_bytes(bytes): yield bridge_rx.valid_i.eq(0) +@simulate(bridge_rx) def test_read_decode(): - def testbench(): - yield from verify_read_decoding(b"R0000\r\n", 0x0000) - yield from verify_read_decoding(b"R1234\r\n", 0x1234) - yield from verify_read_decoding(b"RBABE\r\n", 0xBABE) - yield from verify_read_decoding(b"R5678\n", 0x5678) - yield from verify_read_decoding(b"R9ABC\r", 0x9ABC) - - simulate(bridge_rx, testbench) + yield from verify_read_decoding(b"R0000\r\n", 0x0000) + yield from verify_read_decoding(b"R1234\r\n", 0x1234) + yield from verify_read_decoding(b"RBABE\r\n", 0xBABE) + yield from verify_read_decoding(b"R5678\n", 0x5678) + yield from verify_read_decoding(b"R9ABC\r", 0x9ABC) +@simulate(bridge_rx) def test_write_decode(): - def testbench(): - yield from verify_write_decoding(b"W12345678\r\n", 0x1234, 0x5678) - yield from verify_write_decoding(b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF) - yield from verify_write_decoding(b"WDEADBEEF\r", 0xDEAD, 0xBEEF) - yield from verify_write_decoding(b"WB0BACAFE\n", 0xB0BA, 0xCAFE) - - simulate(bridge_rx, testbench) + yield from verify_write_decoding(b"W12345678\r\n", 0x1234, 0x5678) + yield from verify_write_decoding(b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF) + yield from verify_write_decoding(b"WDEADBEEF\r", 0xDEAD, 0xBEEF) + yield from verify_write_decoding(b"WB0BACAFE\n", 0xB0BA, 0xCAFE) +@simulate(bridge_rx) def test_no_decode(): - def testbench(): - yield from verify_bad_bytes(b"RABC\r\n") - yield from verify_bad_bytes(b"R12345\r\n") - yield from verify_bad_bytes(b"M\r\n") - yield from verify_bad_bytes(b"W123456789101112131415161718191201222\r\n") - yield from verify_bad_bytes(b"RABCG\r\n") - yield from verify_bad_bytes(b"WABC[]()##*@\r\n") - yield from verify_bad_bytes(b"R\r\n") - - simulate(bridge_rx, testbench) + yield from verify_bad_bytes(b"RABC\r\n") + yield from verify_bad_bytes(b"R12345\r\n") + yield from verify_bad_bytes(b"M\r\n") + yield from verify_bad_bytes(b"W123456789101112131415161718191201222\r\n") + yield from verify_bad_bytes(b"RABCG\r\n") + yield from verify_bad_bytes(b"WABC[]()##*@\r\n") + yield from verify_bad_bytes(b"R\r\n") diff --git a/test/test_bridge_tx_sim.py b/test/test_bridge_tx_sim.py index 9a1568d..777c252 100644 --- a/test/test_bridge_tx_sim.py +++ b/test/test_bridge_tx_sim.py @@ -57,11 +57,9 @@ def verify_encoding(data, bytes): raise ValueError(f"Received {sent_bytes} instead of {bytes}.") +@simulate(bridge_tx) def test_some_random_values(): - def testbench(): - for i in sample(range(0xFFFF), k=5000): - expected = f"D{i:04X}\r\n".encode("ascii") - print(i) - yield from verify_encoding(i, expected) - - simulate(bridge_tx, testbench) + for i in sample(range(0xFFFF), k=5000): + expected = f"D{i:04X}\r\n".encode("ascii") + print(i) + yield from verify_encoding(i, expected) diff --git a/test/test_io_core_sim.py b/test/test_io_core_sim.py index 3bda812..4edbd37 100644 --- a/test/test_io_core_sim.py +++ b/test/test_io_core_sim.py @@ -26,92 +26,82 @@ def pulse_strobe_register(): yield from write_register(io_core, strobe_addr, 0) +@simulate(io_core) def test_input_probe_buffer_initial_value(): - def testbench(): - # Verify all input probe buffers initialize to zero - for i in inputs: - addrs = io_core._memory_map[i.name]["addrs"] + # Verify all input probe buffers initialize to zero + for i in inputs: + addrs = io_core._memory_map[i.name]["addrs"] - for addr in addrs: - yield from verify_register(io_core, addr, 0) - - simulate(io_core, testbench) + for addr in addrs: + yield from verify_register(io_core, addr, 0) +@simulate(io_core) def test_output_probe_buffer_initial_value(): - def testbench(): - # Verify all output probe buffers initialize to the values in the config - for o in outputs: - addrs = io_core._memory_map[o.name]["addrs"] - datas = value_to_words(o.reset, len(addrs)) + # Verify all output probe buffers initialize to the values in the config + for o in outputs: + addrs = io_core._memory_map[o.name]["addrs"] + datas = value_to_words(o.reset, len(addrs)) - for addr, data in zip(addrs, datas): - yield from verify_register(io_core, addr, data) - - simulate(io_core, testbench) + for addr, data in zip(addrs, datas): + yield from verify_register(io_core, addr, data) +@simulate(io_core) def test_output_probes_are_writeable(): - def testbench(): - for o in outputs: - addrs = io_core._memory_map[o.name]["addrs"] - test_value = randint(0, (2**o.width) - 1) - datas = value_to_words(test_value, len(addrs)) + for o in outputs: + addrs = io_core._memory_map[o.name]["addrs"] + test_value = randint(0, (2**o.width) - 1) + datas = value_to_words(test_value, len(addrs)) - # write value to registers - for addr, data in zip(addrs, datas): - yield from write_register(io_core, addr, data) + # write value to registers + for addr, data in zip(addrs, datas): + yield from write_register(io_core, addr, data) - # read value back from registers - for addr, data in zip(addrs, datas): - yield from verify_register(io_core, addr, data) - - simulate(io_core, testbench) + # read value back from registers + for addr, data in zip(addrs, datas): + yield from verify_register(io_core, addr, data) +@simulate(io_core) def test_output_probes_update(): - def testbench(): - for o in outputs: - addrs = io_core._memory_map[o.name]["addrs"] - test_value = randint(0, (2**o.width) - 1) - datas = value_to_words(test_value, len(addrs)) + for o in outputs: + addrs = io_core._memory_map[o.name]["addrs"] + test_value = randint(0, (2**o.width) - 1) + datas = value_to_words(test_value, len(addrs)) - # write value to registers - for addr, data in zip(addrs, datas): - yield from write_register(io_core, addr, data) + # write value to registers + for addr, data in zip(addrs, datas): + yield from write_register(io_core, addr, data) - # pulse strobe register - yield from pulse_strobe_register() + # pulse strobe register + yield from pulse_strobe_register() - # check that outputs took updated value - value = yield (o) + # check that outputs took updated value + value = yield (o) - if value != test_value: - raise ValueError( - f"Output probe {o.name} took value {value} instead of {test_value} after pulsing strobe." - ) + if value != test_value: + raise ValueError( + f"Output probe {o.name} took value {value} instead of {test_value} after pulsing strobe." + ) - else: - print(f"Output probe {o.name} took value {value} after pulsing strobe.") - - simulate(io_core, testbench) + else: + print(f"Output probe {o.name} took value {value} after pulsing strobe.") +@simulate(io_core) def test_input_probes_update(): - def testbench(): - for i in inputs: - # set input probe value - test_value = randint(0, (2**i.width) - 1) - yield i.eq(test_value) + for i in inputs: + # set input probe value + test_value = randint(0, (2**i.width) - 1) + yield i.eq(test_value) - # pulse strobe register - yield from pulse_strobe_register() + # pulse strobe register + yield from pulse_strobe_register() - # check that values are as expected once read back - addrs = io_core._memory_map[i.name]["addrs"] - datas = value_to_words(test_value, len(addrs)) + # check that values are as expected once read back + addrs = io_core._memory_map[i.name]["addrs"] + datas = value_to_words(test_value, len(addrs)) - for addr, data in zip(addrs, datas): - yield from verify_register(io_core, addr, data) - - simulate(io_core, testbench) + for addr, data in zip(addrs, datas): + yield from verify_register(io_core, addr, data) diff --git a/test/test_logic_analyzer_fsm_sim.py b/test/test_logic_analyzer_fsm_sim.py index 7e7043c..fb04615 100644 --- a/test/test_logic_analyzer_fsm_sim.py +++ b/test/test_logic_analyzer_fsm_sim.py @@ -6,244 +6,230 @@ config = {"sample_depth": 8} fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None) +@simulate(fsm) def test_signals_reset_correctly(): - def testbench(): - # Make sure pointers and write enable reset to zero - for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]: - if (yield sig) != 0: - raise ValueError - - # Make sure state resets to IDLE - if (yield fsm.state != States.IDLE): + # Make sure pointers and write enable reset to zero + for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]: + if (yield sig) != 0: raise ValueError - simulate(fsm, testbench) + # Make sure state resets to IDLE + if (yield fsm.state != States.IDLE): + raise ValueError +@simulate(fsm) def test_single_shot_no_wait_for_trigger(): - def testbench(): - # Configure and start FSM - yield fsm.trigger.eq(1) - yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.trigger_location.eq(4) - yield fsm.request_start.eq(1) + # Configure and start FSM + yield fsm.trigger.eq(1) + yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.trigger_location.eq(4) + yield fsm.request_start.eq(1) - # Wait until write_enable is asserted - while not (yield fsm.write_enable): - yield - - # Wait 8 clock cycles for capture to complete - for i in range(8): - # Make sure that read_pointer does not increase - if (yield fsm.read_pointer) != 0: - raise ValueError - - # Make sure that write_pointer increases by one each cycle - if (yield fsm.write_pointer) != i: - raise ValueError - - yield - - # Wait one clock cycle (to let BRAM contents cycle in) + # Wait until write_enable is asserted + while not (yield fsm.write_enable): yield - # Check that write_pointer points to the end of memory - if (yield fsm.write_pointer) != 7: + # Wait 8 clock cycles for capture to complete + for i in range(8): + # Make sure that read_pointer does not increase + if (yield fsm.read_pointer) != 0: raise ValueError - # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: + # Make sure that write_pointer increases by one each cycle + if (yield fsm.write_pointer) != i: raise ValueError - simulate(fsm, testbench, "single_shot_no_wait_for_trigger.vcd") + yield + + # Wait one clock cycle (to let BRAM contents cycle in) + yield + + # Check that write_pointer points to the end of memory + if (yield fsm.write_pointer) != 7: + raise ValueError + + # Check that state is CAPTURED + if (yield fsm.state) != States.CAPTURED: + raise ValueError +@simulate(fsm) def test_single_shot_wait_for_trigger(): - def testbench(): - # Configure and start FSM - yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.trigger_location.eq(4) - yield fsm.request_start.eq(1) - yield - yield + # Configure and start FSM + yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.trigger_location.eq(4) + yield fsm.request_start.eq(1) + yield + yield - # Check that write_enable is asserted a cycle after request_start - if not (yield fsm.write_enable): - raise ValueError + # Check that write_enable is asserted a cycle after request_start + if not (yield fsm.write_enable): + raise ValueError - # Wait 4 clock cycles to get to IN_POSITION - for i in range(4): - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer - - # Make sure that read_pointer does not increase - if rp != 0: - raise ValueError - - # Make sure that write_pointer increases by one each cycle - if wp != i: - raise ValueError - - yield - - # Wait a few cycles before triggering - for _ in range(10): - yield - - # Provide the trigger, and check that the capture completes 4 cycles later - yield fsm.trigger.eq(1) - yield - - for i in range(4): - yield - - # Wait one clock cycle (to let BRAM contents cycle in) - yield - - # Check that write_pointer points to the end of memory + # Wait 4 clock cycles to get to IN_POSITION + for i in range(4): rp = yield fsm.read_pointer wp = yield fsm.write_pointer - if (wp + 1) % config["sample_depth"] != rp: - raise ValueError - # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: - raise ValueError - - simulate(fsm, testbench, "single_shot_wait_for_trigger.vcd") - - -def test_immediate(): - def testbench(): - # Configure and start FSM - yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) - yield fsm.request_start.eq(1) - yield - yield - - # Check that write_enable is asserted a cycle after request_start - if not (yield fsm.write_enable): - raise ValueError - - for i in range(config["sample_depth"]): - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer - - if rp != 0: - raise ValueError - - if wp != i: - raise ValueError - - yield - - # Wait one clock cycle (to let BRAM contents cycle in) - yield - - # Check that write_pointer points to the end of memory - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer + # Make sure that read_pointer does not increase if rp != 0: raise ValueError - if wp != 7: + + # Make sure that write_pointer increases by one each cycle + if wp != i: raise ValueError - # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: + yield + + # Wait a few cycles before triggering + for _ in range(10): + yield + + # Provide the trigger, and check that the capture completes 4 cycles later + yield fsm.trigger.eq(1) + yield + + for i in range(4): + yield + + # Wait one clock cycle (to let BRAM contents cycle in) + yield + + # Check that write_pointer points to the end of memory + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer + if (wp + 1) % config["sample_depth"] != rp: + raise ValueError + + # Check that state is CAPTURED + if (yield fsm.state) != States.CAPTURED: + raise ValueError + + +@simulate(fsm) +def test_immediate(): + # Configure and start FSM + yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) + yield fsm.request_start.eq(1) + yield + yield + + # Check that write_enable is asserted a cycle after request_start + if not (yield fsm.write_enable): + raise ValueError + + for i in range(config["sample_depth"]): + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer + + if rp != 0: raise ValueError - simulate(fsm, testbench, "immediate.vcd") + if wp != i: + raise ValueError + + yield + + # Wait one clock cycle (to let BRAM contents cycle in) + yield + + # Check that write_pointer points to the end of memory + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer + if rp != 0: + raise ValueError + if wp != 7: + raise ValueError + + # Check that state is CAPTURED + if (yield fsm.state) != States.CAPTURED: + raise ValueError +@simulate(fsm) def test_incremental(): - def testbench(): - # Configure and start FSM - yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL) - yield fsm.request_start.eq(1) - yield - yield + # Configure and start FSM + yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL) + yield fsm.request_start.eq(1) + yield + yield - # Check that write_enable is asserted on the same edge as request_start - if not (yield fsm.write_enable): - raise ValueError - - for _ in range(10): - for _ in range(3): - yield - - yield fsm.trigger.eq(1) - yield - yield fsm.trigger.eq(0) - yield - - # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: - raise ValueError - - simulate(fsm, testbench, "incremental.vcd") - - -def test_single_shot_write_enable(): - def testbench(): - # Configure FSM - yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.trigger_location.eq(4) - yield - - # Make sure write is not enabled before starting the FSM - if (yield fsm.write_enable): - raise ValueError - - # Start the FSM, ensure write enable is asserted throughout the capture - yield fsm.request_start.eq(1) - yield - yield - - for _ in range(config["sample_depth"]): - if not (yield fsm.write_enable): - raise ValueError + # Check that write_enable is asserted on the same edge as request_start + if not (yield fsm.write_enable): + raise ValueError + for _ in range(10): + for _ in range(3): yield yield fsm.trigger.eq(1) yield + yield fsm.trigger.eq(0) + yield - for _ in range(4): - if not (yield fsm.write_enable): - raise ValueError + # Check that state is CAPTURED + if (yield fsm.state) != States.CAPTURED: + raise ValueError - yield - # Make sure write_enable is deasserted after - if (yield fsm.write_enable): +@simulate(fsm) +def test_single_shot_write_enable(): + # Configure FSM + yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.trigger_location.eq(4) + yield + + # Make sure write is not enabled before starting the FSM + if (yield fsm.write_enable): + raise ValueError + + # Start the FSM, ensure write enable is asserted throughout the capture + yield fsm.request_start.eq(1) + yield + yield + + for _ in range(config["sample_depth"]): + if not (yield fsm.write_enable): raise ValueError - simulate(fsm, testbench, "single_shot_write_enable.vcd") + yield + + yield fsm.trigger.eq(1) + yield + + for _ in range(4): + if not (yield fsm.write_enable): + raise ValueError + + yield + + # Make sure write_enable is deasserted after + if (yield fsm.write_enable): + raise ValueError +@simulate(fsm) def test_immediate_write_enable(): - def testbench(): - # Configure FSM - yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) - yield + # Configure FSM + yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) + yield - # Make sure write is not enabled before starting the FSM - if (yield fsm.write_enable): + # Make sure write is not enabled before starting the FSM + if (yield fsm.write_enable): + raise ValueError + + # Start the FSM, ensure write enable is asserted throughout the capture + yield fsm.request_start.eq(1) + yield + yield + + for _ in range(config["sample_depth"]): + if not (yield fsm.write_enable): raise ValueError - # Start the FSM, ensure write enable is asserted throughout the capture - yield fsm.request_start.eq(1) - yield yield - for _ in range(config["sample_depth"]): - if not (yield fsm.write_enable): - raise ValueError - - yield - - # Make sure write_enable is deasserted after - if (yield fsm.write_enable): - raise ValueError - - simulate(fsm, testbench, "immediate_write_enable.vcd") + # Make sure write_enable is deasserted after + if (yield fsm.write_enable): + raise ValueError diff --git a/test/test_logic_analyzer_sim.py b/test/test_logic_analyzer_sim.py index f5c7ea2..eb240a5 100644 --- a/test/test_logic_analyzer_sim.py +++ b/test/test_logic_analyzer_sim.py @@ -61,42 +61,40 @@ def set_probe(name, value): yield probe.eq(value) +@simulate(la) def test_single_shot_capture(): - def testbench(): - # # ok nice what happens if we try to run the core, which includes: - yield from set_fsm_register("request_stop", 1) - yield from set_fsm_register("request_stop", 0) + # # ok nice what happens if we try to run the core, which includes: + yield from set_fsm_register("request_stop", 1) + yield from set_fsm_register("request_stop", 0) - # setting triggers - yield from set_trig_blk_register("curly_op", Operations.EQ) - yield from set_trig_blk_register("curly_arg", 4) + # setting triggers + yield from set_trig_blk_register("curly_op", Operations.EQ) + yield from set_trig_blk_register("curly_arg", 4) - # setting trigger mode - yield from set_fsm_register("trigger_mode", 0) + # setting trigger mode + yield from set_fsm_register("trigger_mode", 0) - # setting trigger location - yield from set_fsm_register("trigger_location", 511) + # setting trigger location + yield from set_fsm_register("trigger_location", 511) - # starting capture - yield from set_fsm_register("request_start", 1) - yield from set_fsm_register("request_start", 0) + # starting capture + yield from set_fsm_register("request_start", 1) + yield from set_fsm_register("request_start", 0) - # wait a few hundred clock cycles, see what happens - for _ in range(700): - yield + # wait a few hundred clock cycles, see what happens + for _ in range(700): + yield - # provide the trigger condition - yield from set_probe("curly", 4) + # provide the trigger condition + yield from set_probe("curly", 4) - for _ in range(700): - yield + for _ in range(700): + yield - # dump sample memory contents - yield from write_register(la, 0, 0) - yield from write_register(la, 0, 1) - yield from write_register(la, 0, 0) + # dump sample memory contents + yield from write_register(la, 0, 0) + yield from write_register(la, 0, 1) + yield from write_register(la, 0, 0) - for addr in range(la.get_max_addr()): - yield from print_data_at_addr(addr) - - simulate(la, testbench, "la_core.vcd") + for addr in range(la.get_max_addr()): + yield from print_data_at_addr(addr) diff --git a/test/test_mem_core_sim.py b/test/test_mem_core_sim.py index fce9800..dd8d430 100644 --- a/test/test_mem_core_sim.py +++ b/test/test_mem_core_sim.py @@ -1,44 +1,158 @@ from manta.memory_core import MemoryCore from manta.utils import * -from random import randint, sample +from random import randint, sample, choice + +width = 18 +depth = 512 +base_addr = 0 +mem_core = MemoryCore( + mode="bidirectional", width=width, depth=depth, base_addr=base_addr, interface=None +) + +max_addr = mem_core.get_max_addr() +bus_addrs = list(range(base_addr, max_addr)) # include the endpoint! +user_addrs = list(range(depth)) -def fill_mem_from_user_port(mem_core, depth): - for i in range(depth): - yield mem_core.user_addr.eq(i) - yield mem_core.user_data_in.eq(i) - yield mem_core.user_write_enable.eq(1) +@simulate(mem_core) +def test_bidirectional(): + # make sure each address on the bus side contains zero + for addr in bus_addrs: + yield from verify_register(mem_core, addr, 0) + + # make sure each address on the user side contains zero + for addr in user_addrs: + yield from verify_user_side(mem_core, addr, 0) + + # write then immediately read + for addr in bus_addrs: + # this part is a little hard to check since we might have a + # memory at the end of the address space that's less than + # 16-bits wide. so we'll have to calculate how wide our + # memory is + + n_full = width // 16 + if addr < base_addr + (n_full * depth): + data_width = 16 + else: + data_width = width % 16 + + data = randint(0, (2**data_width) - 1) + yield from write_register(mem_core, addr, data) + yield + yield + yield + yield + yield from verify_register(mem_core, addr, data) + yield + yield + yield + yield yield + # write-write-write then read-read-read + model = {} + for addr in sample(bus_addrs, len(bus_addrs)): + n_full = width // 16 + if addr < base_addr + (n_full * depth): + data_width = 16 + else: + data_width = width % 16 + + data = randint(0, (2**data_width) - 1) + model[addr] = data + yield from write_register(mem_core, addr, data) + yield + yield + yield + yield + + for addr in sample(bus_addrs, len(bus_addrs)): + yield from verify_register(mem_core, addr, model[addr]) + yield + yield + yield + yield + + # random reads and writes in random orders + for _ in range(5): + for addr in sample(bus_addrs, len(bus_addrs)): + + operation = choice(["read", "write"]) + if operation == "read": + yield from verify_register(mem_core, addr, model[addr]) + yield + yield + yield + yield + yield + yield + + elif operation == "write": + n_full = width // 16 + + if addr < base_addr + (n_full * depth): + data_width = 16 + else: + data_width = width % 16 + + data = randint(0, (2**data_width) - 1) + model[addr] = data + + yield from write_register(mem_core, addr, data) + yield + yield + yield + yield + yield + yield + + +def verify_user_side(mem_core, addr, expected_data): + yield mem_core.user_addr.eq(addr) yield mem_core.user_write_enable.eq(0) yield - -def verify_mem_core(width, depth, base_addr): - mem_core = MemoryCore("fpga_to_host", width, depth, base_addr, interface=None) - - def testbench(): - yield from fill_mem_from_user_port(mem_core, depth) - - # Read from address sequentially - for i in range(depth): - yield from verify_register(mem_core, i + base_addr, i % (2**width)) - - # Read from addresses randomly - for i in sample(range(depth), k=depth): - yield from verify_register(mem_core, i + base_addr, i % (2**width)) - - simulate(mem_core, testbench) + data = yield (mem_core.user_data_out) + if data != expected_data: + raise ValueError(f"Read from {addr} yielded {data} instead of {expected_data}") -def test_sweep_core_widths(): - for i in range(1, 64): - verify_mem_core(i, 128, 0) +# def fill_mem_from_user_port(mem_core, depth): +# for i in range(depth): +# yield mem_core.user_addr.eq(i) +# yield mem_core.user_data_in.eq(i) +# yield mem_core.user_write_enable.eq(1) +# yield + +# yield mem_core.user_write_enable.eq(0) +# yield -def test_random_cores(): - for _ in range(5): - width = randint(0, 512) - depth = randint(0, 1024) - base_addr = randint(0, 2**16 - 1 - depth) - verify_mem_core(width, depth, base_addr) +# def verify_mem_core(width, depth, base_addr): +# mem_core = MemoryCore("bidirectional", width, depth, base_addr, interface=None) + +# def testbench(): +# yield from fill_mem_from_user_port(mem_core, depth) + +# # Read from address sequentially +# for i in range(depth): +# yield from verify_register(mem_core, i + base_addr, i % (2**width)) + +# # Read from addresses randomly +# for i in sample(range(depth), k=depth): +# yield from verify_register(mem_core, i + base_addr, i % (2**width)) + +# simulate(mem_core, testbench) + +# def test_sweep_core_widths(): +# for i in range(1, 64): +# verify_mem_core(i, 128, 0) + + +# def test_random_cores(): +# for _ in range(5): +# width = randint(0, 512) +# depth = randint(0, 1024) +# base_addr = randint(0, 2**16 - 1 - depth) +# verify_mem_core(width, depth, base_addr) diff --git a/test/test_source_bridge_sim.py b/test/test_source_bridge_sim.py index 031dec8..b3b03ef 100644 --- a/test/test_source_bridge_sim.py +++ b/test/test_source_bridge_sim.py @@ -6,34 +6,32 @@ from manta.utils import * source_bridge = UDPSourceBridge() +@simulate(source_bridge) def test_normie_ops(): - def testbench(): - yield source_bridge.data_i.eq(0) - yield source_bridge.last_i.eq(0) - yield source_bridge.valid_i.eq(0) - yield - yield + yield source_bridge.data_i.eq(0) + yield source_bridge.last_i.eq(0) + yield source_bridge.valid_i.eq(0) + yield + yield - yield source_bridge.data_i.eq(0x0000_0001) - yield source_bridge.valid_i.eq(1) - yield - yield source_bridge.data_i.eq(0x1234_5678) - yield - yield source_bridge.valid_i.eq(0) - yield - yield + yield source_bridge.data_i.eq(0x0000_0001) + yield source_bridge.valid_i.eq(1) + yield + yield source_bridge.data_i.eq(0x1234_5678) + yield + yield source_bridge.valid_i.eq(0) + yield + yield - yield source_bridge.valid_i.eq(1) - yield source_bridge.data_i.eq(0x0000_0001) - yield - yield source_bridge.data_i.eq(0x90AB_CDEF) - yield - yield source_bridge.data_i.eq(0x0000_0000) - yield - yield source_bridge.data_i.eq(0x1234_5678) - yield - yield source_bridge.valid_i.eq(0) - yield - yield - - simulate(source_bridge, testbench, "source_bridge.vcd") + yield source_bridge.valid_i.eq(1) + yield source_bridge.data_i.eq(0x0000_0001) + yield + yield source_bridge.data_i.eq(0x90AB_CDEF) + yield + yield source_bridge.data_i.eq(0x0000_0000) + yield + yield source_bridge.data_i.eq(0x1234_5678) + yield + yield source_bridge.valid_i.eq(0) + yield + yield diff --git a/test/test_uart_rx_sim.py b/test/test_uart_rx_sim.py index 40111aa..23c9f43 100644 --- a/test/test_uart_rx_sim.py +++ b/test/test_uart_rx_sim.py @@ -43,23 +43,19 @@ def verify_receive(data): raise ValueError("Failed to assert valid!") +@simulate(uart_rx) def test_all_possible_bytes(): - def testbench(): - yield uart_rx.rx.eq(1) - yield + yield uart_rx.rx.eq(1) + yield - for i in range(0xFF): - yield from verify_receive(i) - - simulate(uart_rx, testbench) + for i in range(0xFF): + yield from verify_receive(i) +@simulate(uart_rx) def test_bytes_random_sample(): - def testbench(): - yield uart_rx.rx.eq(1) - yield + yield uart_rx.rx.eq(1) + yield - for i in sample(range(0xFF), k=0xFF): - yield from verify_receive(i) - - simulate(uart_rx, testbench) + for i in sample(range(0xFF), k=0xFF): + yield from verify_receive(i) diff --git a/test/test_uart_tx_sim.py b/test/test_uart_tx_sim.py index 8b07aa4..63bd373 100644 --- a/test/test_uart_tx_sim.py +++ b/test/test_uart_tx_sim.py @@ -41,17 +41,13 @@ def verify_bit_sequence(byte): raise ValueError("Done not asserted at end of transmission!") +@simulate(uart_tx) def test_all_possible_bytes(): - def testbench(): - for i in range(0xFF): - yield from verify_bit_sequence(i) - - simulate(uart_tx, testbench) + for i in range(0xFF): + yield from verify_bit_sequence(i) +@simulate(uart_tx) def test_bytes_random_sample(): - def testbench(): - for i in sample(range(0xFF), k=0xFF): - yield from verify_bit_sequence(i) - - simulate(uart_tx, testbench) + for i in sample(range(0xFF), k=0xFF): + yield from verify_bit_sequence(i)