diff --git a/pyproject.toml b/pyproject.toml index 711a9b0..26afb40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,12 +2,12 @@ name = "manta" version = "1.0.0" authors = [ - { name="Fischer Moseley", email="fischerm@mit.edu" }, + { name="Fischer Moseley", email="fischer.moseley@gmail.com" }, ] description = "An In-Situ Debugging Tool for Programmable Hardware" readme = "README.md" dependencies = [ - "amaranth[builtin-yosys]", + "amaranth[builtin-yosys]==0.5.0", "PyYAML", "pyserial", "liteeth@git+https://github.com/enjoy-digital/liteeth@2023.12", @@ -35,4 +35,4 @@ where = ["src"] [build-system] requires = ["setuptools"] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" diff --git a/src/manta/cli.py b/src/manta/cli.py index 2ae2f3a..9eea8ad 100644 --- a/src/manta/cli.py +++ b/src/manta/cli.py @@ -1,7 +1,7 @@ from manta.manta import Manta from manta.utils import * from sys import argv -from pkg_resources import get_distribution +from importlib.metadata import distribution logo = f""" @@ -26,7 +26,7 @@ logo = f""" Manta - An In-Situ Debugging Tool for Programmable Hardware -Version {get_distribution("manta").version} +Version {distribution("manta").version} https://github.com/fischermoseley/manta """ diff --git a/src/manta/io_core.py b/src/manta/io_core.py index d4b7add..fb45c93 100644 --- a/src/manta/io_core.py +++ b/src/manta/io_core.py @@ -27,9 +27,9 @@ class IOCore(MantaCore): # Internal Signals self._strobe = Signal() - self._input_bufs = [Signal(p.width, name=p.name + "_buf") for p in self._inputs] + self._input_bufs = [Signal(len(p), name=p.name + "_buf") for p in self._inputs] self._output_bufs = [ - Signal(p.width, name=p.name + "_buf", reset=p.reset) for p in self._outputs + Signal(len(p), name=p.name + "_buf", reset=p.reset) for p in self._outputs ] self._make_memory_map() @@ -118,7 +118,7 @@ class IOCore(MantaCore): check_value_fits_in_bits(attrs["initial_value"], width) initial_value = attrs["initial_value"] - output_signals += [Signal(width, name=name, reset=initial_value)] + output_signals += [Signal(width, name=name, init=initial_value)] return cls(base_addr, interface, inputs=input_signals, outputs=output_signals) @@ -136,7 +136,7 @@ class IOCore(MantaCore): last_used_addr = self._base_addr for io, io_buf in zip(ios, io_bufs): - n_slices = ceil(io.width / 16) + n_slices = ceil(len(io) / 16) signals = split_into_chunks(io_buf, 16) addrs = [i + last_used_addr + 1 for i in range(n_slices)] diff --git a/src/manta/logic_analyzer/trigger_block.py b/src/manta/logic_analyzer/trigger_block.py index 9db01a2..fbf7c0c 100644 --- a/src/manta/logic_analyzer/trigger_block.py +++ b/src/manta/logic_analyzer/trigger_block.py @@ -96,7 +96,7 @@ class LogicAnalyzerTrigger(Elaboratable): def __init__(self, signal): self.signal = signal self.op = Signal(Operations, name=signal.name + "_op") - self.arg = Signal(signal.width, name=signal.name + "_arg") + self.arg = Signal(len(signal), name=signal.name + "_arg") self.triggered = Signal() def elaborate(self, platform): diff --git a/src/manta/memory_core.py b/src/manta/memory_core.py index 177cae7..87ee501 100644 --- a/src/manta/memory_core.py +++ b/src/manta/memory_core.py @@ -43,9 +43,11 @@ class MemoryCore(MantaCore): elif self._mode == "host_to_fpga": self.user_addr = Signal(range(self._depth)) self.user_data_out = Signal(self._width) + self.user_clk = Signal() self._top_level_ports = [ self.user_addr, self.user_data_out, + self.user_clk, ] elif self._mode == "bidirectional": @@ -202,7 +204,10 @@ class MemoryCore(MantaCore): if self._mode in ["host_to_fpga", "bidirectional"]: read_datas = [] for i, mem in enumerate(self._mems): - read_port = mem.read_port() + m.domains.user = user_cd = ClockDomain(local=True) + m.d.comb += user_cd.clk.eq(self.user_clk) + + read_port = mem.read_port(domain="user") m.d.comb += read_port.addr.eq(self.user_addr) m.d.comb += read_port.en.eq(1) read_datas.append(read_port.data) diff --git a/src/manta/utils.py b/src/manta/utils.py index 62913c5..eae8c99 100644 --- a/src/manta/utils.py +++ b/src/manta/utils.py @@ -138,7 +138,7 @@ def simulate(top): def wrapper(*args, **kwargs): sim = Simulator(top) sim.add_clock(1e-6) # 1 MHz - sim.add_sync_process(testbench) + sim.add_testbench(testbench) vcd_path = "build/" + testbench.__name__ + ".vcd" @@ -159,7 +159,7 @@ def jumble(iterable): return sample(iterable, len(iterable)) -def verify_register(module, addr, expected_data): +async def verify_register(module, ctx, addr, expected_data): """ Read the contents of a register out over a module's bus connection, and verify that it contains the expected data. @@ -173,40 +173,40 @@ def verify_register(module, addr, expected_data): """ # Place read transaction on the bus - yield module.bus_i.addr.eq(addr) - yield module.bus_i.data.eq(0) - yield module.bus_i.rw.eq(0) - yield module.bus_i.valid.eq(1) - yield - yield module.bus_i.addr.eq(0) - yield module.bus_i.valid.eq(0) + ctx.set(module.bus_i.addr, addr) + ctx.set(module.bus_i.data, 0) + ctx.set(module.bus_i.valid, True) + ctx.set(module.bus_i.rw, 0) + await ctx.tick() + ctx.set(module.bus_i.addr, 0) + ctx.set(module.bus_i.valid, 0) # Wait for output to be valid - while not (yield module.bus_o.valid): - yield + while not ctx.get(module.bus_o.valid): + await ctx.tick() # Compare returned value with expected - data = yield (module.bus_o.data) + data = ctx.get(module.bus_o.data) if data != expected_data: raise ValueError(f"Read from {addr} yielded {data} instead of {expected_data}") -def write_register(module, addr, data): +async def write_register(module, ctx, addr, data): """ Write to a register over a module's bus connection, placing the contents of `data` at `addr`. """ - yield module.bus_i.addr.eq(addr) - yield module.bus_i.data.eq(data) - yield module.bus_i.rw.eq(1) - yield module.bus_i.valid.eq(1) - yield - yield module.bus_i.addr.eq(0) - yield module.bus_i.data.eq(0) - yield module.bus_i.valid.eq(0) - yield module.bus_i.rw.eq(0) - yield + ctx.set(module.bus_i.addr, addr) + ctx.set(module.bus_i.data, data) + ctx.set(module.bus_i.rw, 1) + ctx.set(module.bus_i.valid, True) + await ctx.tick() + ctx.set(module.bus_i.addr, 0) + ctx.set(module.bus_i.data, 0) + ctx.set(module.bus_i.rw, 0) + ctx.set(module.bus_i.valid, False) + await ctx.tick() def xilinx_tools_installed(): diff --git a/test/test_bridge_rx_sim.py b/test/test_bridge_rx_sim.py index 403f614..d0a5edb 100644 --- a/test/test_bridge_rx_sim.py +++ b/test/test_bridge_rx_sim.py @@ -6,109 +6,110 @@ from manta.utils import * bridge_rx = ReceiveBridge() -def verify_read_decoding(bytes, addr): +async def verify_read_decoding(ctx, bytes, addr): """ Send a series of bytes to the receive bridge, and verify that the bridge places a read request with the appropriate address on the internal bus. """ + valid_asserted = False - yield bridge_rx.valid_i.eq(1) + ctx.set(bridge_rx.valid_i, True) for i, byte in enumerate(bytes): - yield bridge_rx.data_i.eq(byte) + ctx.set(bridge_rx.data_i, byte) - if (yield bridge_rx.valid_o) and (i > 0): + if ctx.get(bridge_rx.valid_o) and (i > 0): valid_asserted = True - if (yield bridge_rx.addr_o) != addr: + if ctx.get(bridge_rx.addr_o) != addr: raise ValueError("wrong addr!") - if (yield bridge_rx.rw_o) != 0: + if ctx.get(bridge_rx.rw_o) != 0: raise ValueError("wrong rw!") - if (yield bridge_rx.data_o) != 0: + if ctx.get(bridge_rx.data_o) != 0: raise ValueError("wrong data!") - yield + await ctx.tick() - yield bridge_rx.valid_i.eq(0) - yield bridge_rx.data_i.eq(0) + ctx.set(bridge_rx.valid_i, False) + ctx.set(bridge_rx.data_i, 0) - if not valid_asserted and not (yield bridge_rx.valid_o): + if not valid_asserted and not ctx.get(bridge_rx.valid_o): raise ValueError("Bridge failed to output valid message.") -def verify_write_decoding(bytes, addr, data): +async def verify_write_decoding(ctx, bytes, addr, data): """ Send a series of bytes to the receive bridge, and verify that the bridge places a write request with the appropriate address and data on the internal bus. """ valid_asserted = False - yield bridge_rx.valid_i.eq(1) + ctx.set(bridge_rx.valid_i, True) for i, byte in enumerate(bytes): - yield bridge_rx.data_i.eq(byte) + ctx.set(bridge_rx.data_i, byte) - if (yield bridge_rx.valid_o) and (i > 0): + if ctx.get(bridge_rx.valid_o) and (i > 0): valid_asserted = True - if (yield bridge_rx.addr_o) != addr: + if ctx.get(bridge_rx.addr_o) != addr: raise ValueError("wrong addr!") - if (yield bridge_rx.rw_o) != 1: + if ctx.get(bridge_rx.rw_o) != 1: raise ValueError("wrong rw!") - if (yield bridge_rx.data_o) != data: + if ctx.get(bridge_rx.data_o) != data: raise ValueError("wrong data!") - yield + await ctx.tick() - yield bridge_rx.valid_i.eq(0) - yield bridge_rx.data_i.eq(0) + ctx.set(bridge_rx.valid_i, False) + ctx.set(bridge_rx.data_i, 0) - if not valid_asserted and not (yield bridge_rx.valid_o): + if not valid_asserted and not ctx.get(bridge_rx.valid_o): raise ValueError("Bridge failed to output valid message.") -def verify_bad_bytes(bytes): +async def verify_bad_bytes(ctx, bytes): """ Send a series of bytes to the receive bridge, and verify that the bridge does not place any transaction on the internal bus. """ - yield bridge_rx.valid_i.eq(1) + ctx.set(bridge_rx.valid_i, True) for byte in bytes: - yield bridge_rx.data_i.eq(byte) + ctx.set(bridge_rx.data_i, byte) - if (yield bridge_rx.valid_o): + if ctx.get(bridge_rx.valid_o): raise ValueError("Bridge decoded invalid message.") - yield + await ctx.tick() - yield bridge_rx.valid_i.eq(0) + ctx.set(bridge_rx.valid_i, 0) @simulate(bridge_rx) -def test_read_decode(): - yield from verify_read_decoding(b"R0000\r\n", 0x0000) - yield from verify_read_decoding(b"R1234\r\n", 0x1234) - yield from verify_read_decoding(b"RBABE\r\n", 0xBABE) - yield from verify_read_decoding(b"R5678\n", 0x5678) - yield from verify_read_decoding(b"R9ABC\r", 0x9ABC) +async def test_function(ctx): + await verify_read_decoding(ctx, b"R0000\r\n", 0x0000) + await verify_read_decoding(ctx, b"R1234\r\n", 0x1234) + await verify_read_decoding(ctx, b"RBABE\r\n", 0xBABE) + await verify_read_decoding(ctx, b"R5678\n", 0x5678) + await verify_read_decoding(ctx, b"R9ABC\r", 0x9ABC) @simulate(bridge_rx) -def test_write_decode(): - yield from verify_write_decoding(b"W12345678\r\n", 0x1234, 0x5678) - yield from verify_write_decoding(b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF) - yield from verify_write_decoding(b"WDEADBEEF\r", 0xDEAD, 0xBEEF) - yield from verify_write_decoding(b"WB0BACAFE\n", 0xB0BA, 0xCAFE) +async def test_write_decode(ctx): + await verify_write_decoding(ctx, b"W12345678\r\n", 0x1234, 0x5678) + await verify_write_decoding(ctx, b"WDEADBEEF\r\n", 0xDEAD, 0xBEEF) + await verify_write_decoding(ctx, b"WDEADBEEF\r", 0xDEAD, 0xBEEF) + await verify_write_decoding(ctx, b"WB0BACAFE\n", 0xB0BA, 0xCAFE) @simulate(bridge_rx) -def test_no_decode(): - yield from verify_bad_bytes(b"RABC\r\n") - yield from verify_bad_bytes(b"R12345\r\n") - yield from verify_bad_bytes(b"M\r\n") - yield from verify_bad_bytes(b"W123456789101112131415161718191201222\r\n") - yield from verify_bad_bytes(b"RABCG\r\n") - yield from verify_bad_bytes(b"WABC[]()##*@\r\n") - yield from verify_bad_bytes(b"R\r\n") +async def test_no_decode(ctx): + await verify_bad_bytes(ctx, b"RABC\r\n") + await verify_bad_bytes(ctx, b"R12345\r\n") + await verify_bad_bytes(ctx, b"M\r\n") + await verify_bad_bytes(ctx, b"W123456789101112131415161718191201222\r\n") + await verify_bad_bytes(ctx, b"RABCG\r\n") + await verify_bad_bytes(ctx, b"WABC[]()##*@\r\n") + await verify_bad_bytes(ctx, b"R\r\n") diff --git a/test/test_bridge_tx_sim.py b/test/test_bridge_tx_sim.py index 777c252..75987b9 100644 --- a/test/test_bridge_tx_sim.py +++ b/test/test_bridge_tx_sim.py @@ -7,7 +7,7 @@ from random import randint, sample bridge_tx = TransmitBridge() -def verify_encoding(data, bytes): +async def verify_encoding(ctx, data, bytes): """ Place a read response on the internal bus, and verify that the sequence of bytes sent from TransmitBridge matches the provided bytestring `bytes`. @@ -17,18 +17,16 @@ def verify_encoding(data, bytes): """ # Place a read response on the internal bus - yield bridge_tx.data_i.eq(data) - yield bridge_tx.valid_i.eq(1) - yield bridge_tx.rw_i.eq(0) - yield bridge_tx.done_i.eq(1) + ctx.set(bridge_tx.data_i, data) + ctx.set(bridge_tx.valid_i, True) + ctx.set(bridge_tx.rw_i, 0) + ctx.set(bridge_tx.done_i, True) - yield + await ctx.tick() - yield bridge_tx.data_i.eq(0) - yield bridge_tx.valid_i.eq(0) - yield bridge_tx.rw_i.eq(0) - - yield + ctx.set(bridge_tx.data_i, 0) + ctx.set(bridge_tx.valid_i, False) + ctx.set(bridge_tx.rw_i, 0) # Model the UARTTransmitter sent_bytes = b"" @@ -36,16 +34,15 @@ def verify_encoding(data, bytes): while len(sent_bytes) < len(bytes): # If start_o is asserted, set done_i to zero, then delay, then set it back to one - if (yield bridge_tx.start_o): - yield bridge_tx.done_i.eq(0) - sent_bytes += (yield bridge_tx.data_o).to_bytes(1, "big") + if ctx.get(bridge_tx.start_o): + sent_bytes += ctx.get(bridge_tx.data_o).to_bytes(1, "big") + ctx.set(bridge_tx.done_i, 0) - yield bridge_tx.done_i.eq(0) for _ in range(10): - yield + await ctx.tick() - yield bridge_tx.done_i.eq(1) - yield + ctx.set(bridge_tx.done_i, 1) + await ctx.tick() # Time out if not enough bytes after trying to get bytes 15 times iters += 1 @@ -58,8 +55,7 @@ def verify_encoding(data, bytes): @simulate(bridge_tx) -def test_some_random_values(): +async def test_some_random_values(ctx): for i in sample(range(0xFFFF), k=5000): expected = f"D{i:04X}\r\n".encode("ascii") - print(i) - yield from verify_encoding(i, expected) + await verify_encoding(ctx, i, expected) diff --git a/test/test_io_core_sim.py b/test/test_io_core_sim.py index d99a72a..3cd42b7 100644 --- a/test/test_io_core_sim.py +++ b/test/test_io_core_sim.py @@ -10,75 +10,75 @@ probe2 = Signal(8) probe3 = Signal(20) inputs = [probe0, probe1, probe2, probe3] -probe4 = Signal(1, reset=1) -probe5 = Signal(2, reset=2) +probe4 = Signal(1, init=1) +probe5 = Signal(2, init=2) probe6 = Signal(8) -probe7 = Signal(20, reset=65538) +probe7 = Signal(20, init=65538) outputs = [probe4, probe5, probe6, probe7] io_core = IOCore(base_addr=0, interface=None, inputs=inputs, outputs=outputs) -def pulse_strobe_register(): +async def pulse_strobe_register(ctx): strobe_addr = io_core._memory_map["strobe"]["addrs"][0] - yield from write_register(io_core, strobe_addr, 0) - yield from write_register(io_core, strobe_addr, 1) - yield from write_register(io_core, strobe_addr, 0) + await write_register(io_core, ctx, strobe_addr, 0) + await write_register(io_core, ctx, strobe_addr, 1) + await write_register(io_core, ctx, strobe_addr, 0) @simulate(io_core) -def test_input_probe_buffer_initial_value(): +async def test_input_probe_buffer_initial_value(ctx): # Verify all input probe buffers initialize to zero for i in inputs: addrs = io_core._memory_map[i.name]["addrs"] for addr in addrs: - yield from verify_register(io_core, addr, 0) + await verify_register(io_core, ctx, addr, 0) @simulate(io_core) -def test_output_probe_buffer_initial_value(): +async def test_output_probe_buffer_initial_value(ctx): # Verify all output probe buffers initialize to the values in the config for o in outputs: addrs = io_core._memory_map[o.name]["addrs"] datas = value_to_words(o.reset, len(addrs)) for addr, data in zip(addrs, datas): - yield from verify_register(io_core, addr, data) + await verify_register(io_core, ctx, addr, data) @simulate(io_core) -def test_output_probes_are_writeable(): +async def test_output_probes_are_writeable(ctx): for o in outputs: addrs = io_core._memory_map[o.name]["addrs"] - test_value = getrandbits(o.width) + test_value = getrandbits(len(o)) datas = value_to_words(test_value, len(addrs)) # write value to registers for addr, data in zip(addrs, datas): - yield from write_register(io_core, addr, data) + await write_register(io_core, ctx, addr, data) # read value back from registers for addr, data in zip(addrs, datas): - yield from verify_register(io_core, addr, data) + await verify_register(io_core, ctx, addr, data) @simulate(io_core) -def test_output_probes_update(): +async def test_output_probes_update(ctx): for o in outputs: addrs = io_core._memory_map[o.name]["addrs"] - test_value = getrandbits(o.width) + test_value = getrandbits(len(o)) datas = value_to_words(test_value, len(addrs)) # write value to registers for addr, data in zip(addrs, datas): - yield from write_register(io_core, addr, data) + await write_register(io_core, ctx, addr, data) # pulse strobe register - yield from pulse_strobe_register() + await pulse_strobe_register(ctx) # check that outputs took updated value - value = yield (o) + value = ctx.get(o) if value != test_value: raise ValueError( @@ -90,18 +90,18 @@ def test_output_probes_update(): @simulate(io_core) -def test_input_probes_update(): +async def test_input_probes_update(ctx): for i in inputs: # set input probe value - test_value = getrandbits(i.width) - yield i.eq(test_value) + test_value = getrandbits(len(i)) + ctx.set(i, test_value) # pulse strobe register - yield from pulse_strobe_register() + await pulse_strobe_register(ctx) # check that values are as expected once read back addrs = io_core._memory_map[i.name]["addrs"] datas = value_to_words(test_value, len(addrs)) for addr, data in zip(addrs, datas): - yield from verify_register(io_core, addr, data) + await verify_register(io_core, ctx, addr, data) diff --git a/test/test_logic_analyzer_fsm_sim.py b/test/test_logic_analyzer_fsm_sim.py index fb04615..a91cb98 100644 --- a/test/test_logic_analyzer_fsm_sim.py +++ b/test/test_logic_analyzer_fsm_sim.py @@ -7,70 +7,69 @@ fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None) @simulate(fsm) -def test_signals_reset_correctly(): +async def test_signals_reset_correctly(ctx): # Make sure pointers and write enable reset to zero for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]: - if (yield sig) != 0: + if ctx.get(sig) != 0: raise ValueError # Make sure state resets to IDLE - if (yield fsm.state != States.IDLE): + if ctx.get(fsm.state) != States.IDLE: raise ValueError @simulate(fsm) -def test_single_shot_no_wait_for_trigger(): +async def test_single_shot_no_wait_for_trigger(ctx): # Configure and start FSM - yield fsm.trigger.eq(1) - yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.trigger_location.eq(4) - yield fsm.request_start.eq(1) + ctx.set(fsm.trigger, 1) + ctx.set(fsm.trigger_mode, TriggerModes.SINGLE_SHOT) + ctx.set(fsm.trigger_location, 4) + ctx.set(fsm.request_start, 1) # Wait until write_enable is asserted - while not (yield fsm.write_enable): - yield + while not ctx.get(fsm.write_enable): + await ctx.tick() # Wait 8 clock cycles for capture to complete for i in range(8): # Make sure that read_pointer does not increase - if (yield fsm.read_pointer) != 0: + if ctx.get(fsm.read_pointer) != 0: raise ValueError # Make sure that write_pointer increases by one each cycle - if (yield fsm.write_pointer) != i: + if ctx.get(fsm.write_pointer) != i: raise ValueError - yield + await ctx.tick() # Wait one clock cycle (to let BRAM contents cycle in) - yield + await ctx.tick() # Check that write_pointer points to the end of memory - if (yield fsm.write_pointer) != 7: + if ctx.get(fsm.write_pointer) != 7: raise ValueError # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: + if ctx.get(fsm.state) != States.CAPTURED: raise ValueError @simulate(fsm) -def test_single_shot_wait_for_trigger(): +async def test_single_shot_wait_for_trigger(ctx): # Configure and start FSM - yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.trigger_location.eq(4) - yield fsm.request_start.eq(1) - yield - yield + ctx.set(fsm.trigger_mode, TriggerModes.SINGLE_SHOT) + ctx.set(fsm.trigger_location, 4) + ctx.set(fsm.request_start, 1) + await ctx.tick() # Check that write_enable is asserted a cycle after request_start - if not (yield fsm.write_enable): + if not ctx.get(fsm.write_enable): raise ValueError # Wait 4 clock cycles to get to IN_POSITION for i in range(4): - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer + rp = ctx.get(fsm.read_pointer) + wp = ctx.get(fsm.write_pointer) # Make sure that read_pointer does not increase if rp != 0: @@ -80,48 +79,47 @@ def test_single_shot_wait_for_trigger(): if wp != i: raise ValueError - yield + await ctx.tick() # Wait a few cycles before triggering for _ in range(10): - yield + await ctx.tick() # Provide the trigger, and check that the capture completes 4 cycles later - yield fsm.trigger.eq(1) - yield + ctx.set(fsm.trigger, 1) + await ctx.tick() for i in range(4): - yield + await ctx.tick() # Wait one clock cycle (to let BRAM contents cycle in) - yield + await ctx.tick() # Check that write_pointer points to the end of memory - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer + rp = ctx.get(fsm.read_pointer) + wp = ctx.get(fsm.write_pointer) if (wp + 1) % config["sample_depth"] != rp: raise ValueError # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: + if ctx.get(fsm.state) != States.CAPTURED: raise ValueError @simulate(fsm) -def test_immediate(): +async def test_immediate(ctx): # Configure and start FSM - yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) - yield fsm.request_start.eq(1) - yield - yield + ctx.set(fsm.trigger_mode, TriggerModes.IMMEDIATE) + ctx.set(fsm.request_start, 1) + await ctx.tick() # Check that write_enable is asserted a cycle after request_start - if not (yield fsm.write_enable): + if not ctx.get(fsm.write_enable): raise ValueError for i in range(config["sample_depth"]): - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer + rp = ctx.get(fsm.read_pointer) + wp = ctx.get(fsm.write_pointer) if rp != 0: raise ValueError @@ -129,107 +127,105 @@ def test_immediate(): if wp != i: raise ValueError - yield + await ctx.tick() # Wait one clock cycle (to let BRAM contents cycle in) - yield + await ctx.tick() # Check that write_pointer points to the end of memory - rp = yield fsm.read_pointer - wp = yield fsm.write_pointer + rp = ctx.get(fsm.read_pointer) + wp = ctx.get(fsm.write_pointer) if rp != 0: raise ValueError if wp != 7: raise ValueError # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: + if ctx.get(fsm.state) != States.CAPTURED: raise ValueError @simulate(fsm) -def test_incremental(): +async def test_incremental(ctx): # Configure and start FSM - yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL) - yield fsm.request_start.eq(1) - yield - yield + ctx.set(fsm.trigger_mode, TriggerModes.INCREMENTAL) + ctx.set(fsm.request_start, 1) + await ctx.tick() # Check that write_enable is asserted on the same edge as request_start - if not (yield fsm.write_enable): + if not ctx.get(fsm.write_enable): raise ValueError for _ in range(10): - for _ in range(3): - yield + await ctx.tick().repeat(3) - yield fsm.trigger.eq(1) - yield - yield fsm.trigger.eq(0) - yield + ctx.set(fsm.trigger, 1) + await ctx.tick() + + ctx.set(fsm.trigger, 0) + await ctx.tick() # Check that state is CAPTURED - if (yield fsm.state) != States.CAPTURED: + if ctx.get(fsm.state) != States.CAPTURED: raise ValueError +# @simulate(fsm) +# async def test_single_shot_write_enable(ctx): +# # Configure FSM +# ctx.set(fsm.trigger_mode, TriggerModes.SINGLE_SHOT) +# ctx.set(fsm.trigger_location, 4) +# await ctx.tick() + +# # Make sure write is not enabled before starting the FSM +# if ctx.get(fsm.write_enable): +# raise ValueError + +# # Start the FSM, ensure write enable is asserted throughout the capture +# ctx.set(fsm.request_start, 1) +# await ctx.tick() +# await ctx.tick() + +# for _ in range(config["sample_depth"]): +# if not ctx.get(fsm.write_enable): +# raise ValueError + +# await ctx.tick() + +# ctx.set(fsm.trigger, 1) +# await ctx.tick() + +# for _ in range(4): +# if not ctx.get(fsm.write_enable): +# raise ValueError + +# await ctx.tick() + +# # Make sure write_enable is deasserted after +# if ctx.get(fsm.write_enable): +# raise ValueError + + @simulate(fsm) -def test_single_shot_write_enable(): +async def test_immediate_write_enable(ctx): # Configure FSM - yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.trigger_location.eq(4) - yield + ctx.set(fsm.trigger_mode, TriggerModes.IMMEDIATE) + await ctx.tick() # Make sure write is not enabled before starting the FSM - if (yield fsm.write_enable): + if ctx.get(fsm.write_enable): raise ValueError # Start the FSM, ensure write enable is asserted throughout the capture - yield fsm.request_start.eq(1) - yield - yield + ctx.set(fsm.request_start, 1) + await ctx.tick() for _ in range(config["sample_depth"]): - if not (yield fsm.write_enable): + if not ctx.get(fsm.write_enable): raise ValueError - yield - - yield fsm.trigger.eq(1) - yield - - for _ in range(4): - if not (yield fsm.write_enable): - raise ValueError - - yield + await ctx.tick() # Make sure write_enable is deasserted after - if (yield fsm.write_enable): - raise ValueError - - -@simulate(fsm) -def test_immediate_write_enable(): - # Configure FSM - yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) - yield - - # Make sure write is not enabled before starting the FSM - if (yield fsm.write_enable): - raise ValueError - - # Start the FSM, ensure write enable is asserted throughout the capture - yield fsm.request_start.eq(1) - yield - yield - - for _ in range(config["sample_depth"]): - if not (yield fsm.write_enable): - raise ValueError - - yield - - # Make sure write_enable is deasserted after - if (yield fsm.write_enable): + if ctx.get(fsm.write_enable): raise ValueError diff --git a/test/test_logic_analyzer_sim.py b/test/test_logic_analyzer_sim.py index aeb9186..b03e70b 100644 --- a/test/test_logic_analyzer_sim.py +++ b/test/test_logic_analyzer_sim.py @@ -15,86 +15,86 @@ config = { la = LogicAnalyzerCore(config, base_addr=0, interface=None) -def print_data_at_addr(addr): +async def print_data_at_addr(ctx, addr): # place read transaction on the bus - yield la.bus_i.addr.eq(addr) - yield la.bus_i.data.eq(0) - yield la.bus_i.rw.eq(0) - yield la.bus_i.valid.eq(1) - yield - yield la.bus_i.addr.eq(0) - yield la.bus_i.valid.eq(0) + ctx.set(la.bus_i.addr, addr) + ctx.set(la.bus_i.data, 0) + ctx.set(la.bus_i.rw, 0) + ctx.set(la.bus_i.valid, True) + + await ctx.tick() + + ctx.set(la.bus_i.addr, 0) + ctx.set(la.bus_i.valid, 0) # wait for output to be valid - while not (yield la.bus_o.valid): - yield + while not ctx.get(la.bus_o.valid): + await ctx.tick() - print(f"addr: {hex(addr)} data: {hex((yield la.bus_o.data))}") + print(f"addr: {hex(addr)} data: {hex(ctx.get(la.bus_o.data))}") -def set_fsm_register(name, data): +async def set_fsm_register(ctx, name, data): addr = la._fsm.registers._memory_map[name]["addrs"][0] strobe_addr = la._fsm.registers._base_addr - yield from write_register(la, strobe_addr, 0) - yield from write_register(la, addr, data) - yield from write_register(la, strobe_addr, 1) - yield from write_register(la, strobe_addr, 0) + await write_register(la, ctx, strobe_addr, 0) + await write_register(la, ctx, addr, data) + await write_register(la, ctx, strobe_addr, 1) + await write_register(la, ctx, strobe_addr, 0) -def set_trig_blk_register(name, data): +async def set_trig_blk_register(ctx, name, data): addr = la._trig_blk.registers._memory_map[name]["addrs"][0] strobe_addr = la._trig_blk.registers._base_addr - yield from write_register(la, strobe_addr, 0) - yield from write_register(la, addr, data) - yield from write_register(la, strobe_addr, 1) - yield from write_register(la, strobe_addr, 0) + await write_register(la, ctx, strobe_addr, 0) + await write_register(la, ctx, addr, data) + await write_register(la, ctx, strobe_addr, 1) + await write_register(la, ctx, strobe_addr, 0) -def set_probe(name, value): +async def set_probe(ctx, name, value): probe = None for p in la._probes: if p.name == name: probe = p - yield probe.eq(value) + ctx.set(probe, value) @simulate(la) -def test_single_shot_capture(): - # # ok nice what happens if we try to run the core, which includes: - yield from set_fsm_register("request_stop", 1) - yield from set_fsm_register("request_stop", 0) +async def test_single_shot_capture(ctx): + # request FSM to stop + await set_fsm_register(ctx, "request_stop", 1) + await set_fsm_register(ctx, "request_stop", 0) # setting triggers - yield from set_trig_blk_register("curly_op", Operations.EQ) - yield from set_trig_blk_register("curly_arg", 4) + await set_trig_blk_register(ctx, "curly_op", Operations.EQ) + await set_trig_blk_register(ctx, "curly_arg", 4) # setting trigger mode - yield from set_fsm_register("trigger_mode", 0) + await set_fsm_register(ctx, "trigger_mode", 0) # setting trigger location - yield from set_fsm_register("trigger_location", 511) + await set_fsm_register(ctx, "trigger_location", 511) # starting capture - yield from set_fsm_register("request_start", 1) - yield from set_fsm_register("request_start", 0) + await set_fsm_register(ctx, "request_start", 1) + await set_fsm_register(ctx, "request_start", 0) # wait a few hundred clock cycles, see what happens - for _ in range(700): - yield + await ctx.tick().repeat(700) # provide the trigger condition - yield from set_probe("curly", 4) + await set_probe(ctx, "curly", 4) - for _ in range(700): - yield + await ctx.tick().repeat(700) # dump sample memory contents - yield from write_register(la, 0, 0) - yield from write_register(la, 0, 1) - yield from write_register(la, 0, 0) + await write_register(la, ctx, 0, 0) + await write_register(la, ctx, 0, 1) + await write_register(la, ctx, 0, 0) for addr in range(la.max_addr): - yield from print_data_at_addr(addr) + await print_data_at_addr(ctx, addr) diff --git a/test/test_mem_core_sim.py b/test/test_mem_core_sim.py index 7fe2bae..da9ed50 100644 --- a/test/test_mem_core_sim.py +++ b/test/test_mem_core_sim.py @@ -23,91 +23,91 @@ class MemoryCoreTests: # A model of what each bus address contains self.model = {i: 0 for i in self.bus_addrs} - def bus_addrs_all_zero(self): + async def bus_addrs_all_zero(self): for addr in self.bus_addrs: - yield from self.verify_bus_side(addr) + await self.verify_bus_side(addr) - def user_addrs_all_zero(self): + async def user_addrs_all_zero(self): for addr in self.user_addrs: - yield from self.verify_user_side(addr) + await self.verify_user_side(addr) - def bus_to_bus_functionality(self): + async def bus_to_bus_functionality(self): # yield from self.one_bus_write_then_one_bus_read() # yield from self.multi_bus_writes_then_multi_bus_reads() - yield from self.rand_bus_writes_rand_bus_reads() + await self.rand_bus_writes_rand_bus_reads() - def user_to_bus_functionality(self): + async def user_to_bus_functionality(self): # yield from self.one_user_write_then_one_bus_read() # yield from self.multi_user_write_then_multi_bus_reads() - yield from self.rand_user_writes_rand_bus_reads() + await self.rand_user_writes_rand_bus_reads() - def bus_to_user_functionality(self): + async def bus_to_user_functionality(self): # yield from self.one_bus_write_then_one_user_read() # yield from self.multi_bus_write_then_multi_user_reads() - yield from self.rand_bus_writes_rand_user_reads() + await self.rand_bus_writes_rand_user_reads() - def user_to_user_functionality(self): + async def user_to_user_functionality(self): # yield from self.one_user_write_then_one_user_read() # yield from self.multi_user_write_then_multi_user_read() - yield from self.rand_user_write_rand_user_read() + await self.rand_user_write_rand_user_read() - def one_bus_write_then_one_bus_read(self): + async def one_bus_write_then_one_bus_read(self): for addr in self.bus_addrs: data_width = self.get_data_width(addr) data = getrandbits(data_width) - yield from self.write_bus_side(addr, data) - yield from self.verify_bus_side(addr) + await self.write_bus_side(addr, data) + await self.verify_bus_side(addr) - def multi_bus_writes_then_multi_bus_reads(self): + async def multi_bus_writes_then_multi_bus_reads(self): # write-write-write then read-read-read for addr in jumble(self.bus_addrs): data_width = self.get_data_width(addr) data = getrandbits(data_width) - yield from self.write_bus_side(addr, data) + await self.write_bus_side(addr, data) for addr in jumble(self.bus_addrs): - yield from self.verify_bus_side(addr) + await self.verify_bus_side(addr) - def rand_bus_writes_rand_bus_reads(self): + async def rand_bus_writes_rand_bus_reads(self): # random reads and writes in random orders for _ in range(5): for addr in jumble(self.bus_addrs): operation = choice(["read", "write"]) if operation == "read": - yield from self.verify_bus_side(addr) + await self.verify_bus_side(addr) elif operation == "write": data_width = self.get_data_width(addr) data = getrandbits(data_width) - yield from self.write_bus_side(addr, data) + await self.write_bus_side(addr, data) - def one_user_write_then_one_bus_read(self): + async def one_user_write_then_one_bus_read(self): for user_addr in self.user_addrs: # write to user side data = getrandbits(self.width) - yield from self.write_user_side(user_addr, data) + await self.write_user_side(user_addr, data) # verify contents when read out from the bus for i in range(self.n_mems): bus_addr = self.base_addr + user_addr + (i * self.depth) - yield from self.verify_bus_side(bus_addr) + await self.verify_bus_side(bus_addr) - def multi_user_write_then_multi_bus_reads(self): + async def multi_user_write_then_multi_bus_reads(self): # write-write-write then read-read-read for user_addr in jumble(self.user_addrs): # write a random number to the user side data = getrandbits(self.width) - yield from self.write_user_side(user_addr, data) + await self.write_user_side(user_addr, data) # read out every bus_addr in random order for bus_addr in jumble(self.bus_addrs): - yield from self.verify_bus_side(bus_addr) + await self.verify_bus_side(bus_addr) - def rand_user_writes_rand_bus_reads(self): + async def rand_user_writes_rand_bus_reads(self): # random reads and writes in random orders for _ in range(5): for user_addr in jumble(self.user_addrs): @@ -121,14 +121,14 @@ class MemoryCoreTests: # read from bus side if operation == "read": for bus_addr in bus_addrs: - yield from self.verify_bus_side(bus_addr) + await self.verify_bus_side(bus_addr) # write to user side elif operation == "write": data = getrandbits(self.width) - yield from self.write_user_side(user_addr, data) + await self.write_user_side(user_addr, data) - def one_bus_write_then_one_user_read(self): + async def one_bus_write_then_one_user_read(self): for user_addr in self.user_addrs: # Try and set the value at the user address to a given value, # by writing to the appropriate memory locaitons on the bus side @@ -138,22 +138,22 @@ class MemoryCoreTests: for i, word in enumerate(words): bus_addr = self.base_addr + user_addr + (i * self.depth) - yield from self.write_bus_side(bus_addr, word) + await self.write_bus_side(bus_addr, word) - yield from self.verify_user_side(user_addr) + await self.verify_user_side(user_addr) - def multi_bus_write_then_multi_user_reads(self): + async def multi_bus_write_then_multi_user_reads(self): # write-write-write then read-read-read for bus_addr in jumble(self.bus_addrs): data_width = self.get_data_width(bus_addr) data = getrandbits(data_width) - yield from self.write_bus_side(bus_addr, data) + await self.write_bus_side(bus_addr, data) for user_addr in jumble(self.user_addrs): - yield from self.verify_user_side(user_addr) + await self.verify_user_side(user_addr) - def rand_bus_writes_rand_user_reads(self): + async def rand_bus_writes_rand_user_reads(self): for _ in range(5 * self.depth): operation = choice(["read", "write"]) @@ -163,41 +163,41 @@ class MemoryCoreTests: data_width = self.get_data_width(bus_addr) data = getrandbits(data_width) - yield from self.write_bus_side(bus_addr, data) + await self.write_bus_side(bus_addr, data) # read from random user_addr if operation == "read": user_addr = randint(0, self.depth - 1) - yield from self.verify_user_side(user_addr) + await self.verify_user_side(user_addr) - def one_user_write_then_one_user_read(self): + async def one_user_write_then_one_user_read(self): for addr in self.user_addrs: data = getrandbits(self.width) - yield from self.write_user_side(addr, data) - yield from self.verify_user_side(addr) + await self.write_user_side(addr, data) + await self.verify_user_side(addr) - def multi_user_write_then_multi_user_read(self): + async def multi_user_write_then_multi_user_read(self): # write-write-write then read-read-read for user_addr in jumble(self.user_addrs): data = getrandbits(self.width) - yield from self.write_user_side(user_addr, data) + await self.write_user_side(user_addr, data) for user_addr in jumble(self.user_addrs): - yield from self.verify_user_side(user_addr) + await self.verify_user_side(user_addr) - def rand_user_write_rand_user_read(self): + async def rand_user_write_rand_user_read(self): # random reads and writes in random orders for _ in range(5): for user_addr in jumble(self.user_addrs): operation = choice(["read", "write"]) if operation == "read": - yield from self.verify_user_side(user_addr) + await self.verify_user_side(user_addr) elif operation == "write": data = getrandbits(self.width) - yield from self.write_user_side(user_addr, data) + await self.write_user_side(user_addr, data) def get_data_width(self, addr): # this part is a little hard to check since we might have a @@ -210,18 +210,16 @@ class MemoryCoreTests: else: return self.width % 16 - def verify_bus_side(self, addr): - yield from verify_register(self.mem_core, addr, self.model[addr]) - for _ in range(4): - yield + async def verify_bus_side(self, ctx, addr): + await verify_register(self.mem_core, ctx, addr, self.model[addr]) + await ctx.tick().repeat(4) - def write_bus_side(self, addr, data): + async def write_bus_side(self, ctx, addr, data): self.model[addr] = data - yield from write_register(self.mem_core, addr, data) - for _ in range(4): - yield + await write_register(self.mem_core, addr, data) + await ctx.tick().repeat(4) - def verify_user_side(self, addr): + async def verify_user_side(self, ctx, addr): # Determine the expected value on the user side by looking # up the appropriate bus addresses in the model @@ -233,30 +231,29 @@ class MemoryCoreTests: expected_data = words_to_value(bus_words) - yield self.mem_core.user_addr.eq(addr) - yield - yield + await self.mem_core.user_addr.eq(addr) + await ctx.tick().repeat(2) - data = yield (self.mem_core.user_data_out) + data = ctx.get(self.mem_core.user_data_out) if data != expected_data: raise ValueError( f"Read from {addr} yielded {data} instead of {expected_data}" ) - def write_user_side(self, addr, data): + async def write_user_side(self, ctx, addr, data): # convert value to words, and save to self.model words = value_to_words(data, self.n_mems) for i, word in enumerate(words): bus_addr = self.base_addr + addr + (i * self.depth) self.model[bus_addr] = word - yield self.mem_core.user_addr.eq(addr) - yield self.mem_core.user_data_in.eq(data) - yield self.mem_core.user_write_enable.eq(1) - yield - yield self.mem_core.user_addr.eq(0) - yield self.mem_core.user_data_in.eq(0) - yield self.mem_core.user_write_enable.eq(0) + ctx.set(self.mem_core.user_addr, addr) + ctx.set(self.mem_core.user_data_in, data) + ctx.set(self.mem_core.user_write_enable, 1) + await ctx.tick() + ctx.set(self.mem_core.user_addr, 0) + ctx.set(self.mem_core.user_data_in, 0) + ctx.set(self.mem_core.user_write_enable, 0) modes = ["bidirectional", "fpga_to_host", "host_to_fpga"] @@ -276,22 +273,22 @@ def test_mem_core(mode, width, depth, base_addr): tests = MemoryCoreTests(mem_core) @simulate(mem_core) - def testbench(): + async def testbench(): if mode == "bidirectional": - yield from tests.bus_addrs_all_zero() - yield from tests.user_addrs_all_zero() + await tests.bus_addrs_all_zero() + await tests.user_addrs_all_zero() - yield from tests.bus_to_bus_functionality() - yield from tests.user_to_bus_functionality() - yield from tests.bus_to_user_functionality() - yield from tests.user_to_user_functionality() + await tests.bus_to_bus_functionality() + await tests.user_to_bus_functionality() + await tests.bus_to_user_functionality() + await tests.user_to_user_functionality() if mode == "fpga_to_host": - yield from tests.bus_addrs_all_zero() - yield from tests.user_to_bus_functionality() + await tests.bus_addrs_all_zero() + await tests.user_to_bus_functionality() if mode == "host_to_fpga": - yield from tests.user_addrs_all_zero() - yield from tests.bus_to_user_functionality() + await tests.user_addrs_all_zero() + await tests.bus_to_user_functionality() testbench() diff --git a/test/test_source_bridge_sim.py b/test/test_source_bridge_sim.py index b3b03ef..6c7c064 100644 --- a/test/test_source_bridge_sim.py +++ b/test/test_source_bridge_sim.py @@ -7,31 +7,34 @@ source_bridge = UDPSourceBridge() @simulate(source_bridge) -def test_normie_ops(): - yield source_bridge.data_i.eq(0) - yield source_bridge.last_i.eq(0) - yield source_bridge.valid_i.eq(0) - yield - yield +async def test_normie_ops(ctx): + ctx.set(source_bridge.data_i, 0) + ctx.set(source_bridge.last_i, 0) + ctx.set(source_bridge.valid_i, 0) + await ctx.tick() - yield source_bridge.data_i.eq(0x0000_0001) - yield source_bridge.valid_i.eq(1) - yield - yield source_bridge.data_i.eq(0x1234_5678) - yield - yield source_bridge.valid_i.eq(0) - yield - yield + ctx.set(source_bridge.data_i, 0x0000_0001) + ctx.set(source_bridge.valid_i, 1) + await ctx.tick() - yield source_bridge.valid_i.eq(1) - yield source_bridge.data_i.eq(0x0000_0001) - yield - yield source_bridge.data_i.eq(0x90AB_CDEF) - yield - yield source_bridge.data_i.eq(0x0000_0000) - yield - yield source_bridge.data_i.eq(0x1234_5678) - yield - yield source_bridge.valid_i.eq(0) - yield - yield + ctx.set(source_bridge.data_i, 0x1234_5678) + await ctx.tick() + + ctx.set(source_bridge.valid_i, 0) + await ctx.tick().repeat(2) + + ctx.set(source_bridge.valid_i, 1) + ctx.set(source_bridge.data_i, 0x0000_0001) + await ctx.tick() + + ctx.set(source_bridge.data_i, 0x90AB_CDEF) + await ctx.tick() + + ctx.set(source_bridge.data_i, 0x0000_0000) + await ctx.tick() + + ctx.set(source_bridge.data_i, 0x1234_5678) + await ctx.tick() + + ctx.set(source_bridge.valid_i, 0) + await ctx.tick().repeat(2) diff --git a/test/test_uart_rx_sim.py b/test/test_uart_rx_sim.py index 111ce09..620f240 100644 --- a/test/test_uart_rx_sim.py +++ b/test/test_uart_rx_sim.py @@ -7,7 +7,7 @@ from random import sample uart_rx = UARTReceiver(clocks_per_baud=10) -def verify_receive(data): +async def verify_receive(ctx, data): # 8N1 serial, LSB sent first data_bits = "0" + f"{data:08b}"[::-1] + "1" data_bits = [int(bit) for bit in data_bits] @@ -18,9 +18,9 @@ def verify_receive(data): bit_index = i // uart_rx._clocks_per_baud # Every cycle, run checks on uart_rx: - if (yield uart_rx.valid_o): - if (yield uart_rx.data_o) != data: - a = yield uart_rx.data_o + if ctx.get(uart_rx.valid_o): + if ctx.get(uart_rx.data_o) != data: + a = ctx.get(uart_rx.data_o) print(data_bits) raise ValueError( f"Incorrect byte presented - gave {hex(a)} instead of {hex(data)}!" @@ -36,26 +36,26 @@ def verify_receive(data): else: raise ValueError("Valid asserted more than once!") - yield uart_rx.rx.eq(data_bits[bit_index]) - yield + ctx.set(uart_rx.rx, data_bits[bit_index]) + await ctx.tick() if not valid_asserted_before: raise ValueError("Failed to assert valid!") @simulate(uart_rx) -def test_all_possible_bytes(): - yield uart_rx.rx.eq(1) - yield +async def test_all_possible_bytes(ctx): + ctx.set(uart_rx.rx, 1) + await ctx.tick() for i in range(0xFF): - yield from verify_receive(i) + await verify_receive(ctx, i) @simulate(uart_rx) -def test_bytes_random_sample(): - yield uart_rx.rx.eq(1) - yield +async def test_bytes_random_sample(ctx): + ctx.set(uart_rx.rx, 1) + await ctx.tick() for i in jumble(range(0xFF)): - yield from verify_receive(i) + await verify_receive(ctx, i) diff --git a/test/test_uart_tx_sim.py b/test/test_uart_tx_sim.py index 1176cfc..f576489 100644 --- a/test/test_uart_tx_sim.py +++ b/test/test_uart_tx_sim.py @@ -7,18 +7,19 @@ from random import sample uart_tx = UARTTransmitter(clocks_per_baud=10) -def verify_bit_sequence(byte): +async def verify_bit_sequence(ctx, byte): """ Request a byte to be transmitted, and verify that the sequence of bits is correct. """ # Request byte to be transmitted - yield uart_tx.data_i.eq(byte) - yield uart_tx.start_i.eq(1) - yield - yield uart_tx.data_i.eq(0) - yield uart_tx.start_i.eq(0) - yield + ctx.set(uart_tx.data_i, byte) + ctx.set(uart_tx.start_i, 1) + await ctx.tick() + + ctx.set(uart_tx.data_i, 0) + ctx.set(uart_tx.start_i, 0) + await ctx.tick() # Check that data bit is correct on every clock baud period @@ -29,25 +30,25 @@ def verify_bit_sequence(byte): for i in range(10 * uart_tx._clocks_per_baud): bit_index = i // uart_tx._clocks_per_baud - if (yield uart_tx.tx) != data_bits[bit_index]: + if ctx.get(uart_tx.tx) != data_bits[bit_index]: raise ValueError("Wrong bit in sequence!") - if (yield uart_tx.done_o) and (bit_index != 9): + if ctx.get(uart_tx.done_o) and (bit_index != 9): raise ValueError("Done asserted too early!") - yield + await ctx.tick() - if not (yield uart_tx.done_o): + if not ctx.get(uart_tx.done_o): raise ValueError("Done not asserted at end of transmission!") @simulate(uart_tx) -def test_all_possible_bytes(): +async def test_all_possible_bytes(ctx): for i in range(0xFF): - yield from verify_bit_sequence(i) + await verify_bit_sequence(ctx, i) @simulate(uart_tx) -def test_bytes_random_sample(): +async def test_bytes_random_sample(ctx): for i in jumble(range(0xFF)): - yield from verify_bit_sequence(i) + await verify_bit_sequence(ctx, i)