From e2450ddbffcb7378d611ab38fa6da341ad858be1 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Sun, 18 Feb 2024 15:50:51 -0800 Subject: [PATCH] complete IO core refactor --- src/manta/io_core.py | 6 +- src/manta/logic_analyzer/__init__.py | 36 +++++---- src/manta/logic_analyzer/fsm.py | 83 ++++++++++--------- src/manta/logic_analyzer/trigger_block.py | 97 +++++++++++------------ test/test_io_core_hw.py | 2 +- test/test_logic_analyzer_fsm_sim.py | 72 ++++++++--------- test/test_logic_analyzer_sim.py | 13 ++- test/test_mem_core_hw.py | 23 +++++- test/test_uart_rx_sim.py | 4 +- test/test_uart_tx_sim.py | 4 +- 10 files changed, 179 insertions(+), 161 deletions(-) diff --git a/src/manta/io_core.py b/src/manta/io_core.py index 648be7c..61bc5be 100644 --- a/src/manta/io_core.py +++ b/src/manta/io_core.py @@ -36,8 +36,8 @@ class IOCore(Elaboratable): @classmethod def from_config(cls, config, base_addr, interface): - inputs = config.get("inputs", []) - outputs = config.get("outputs", []) + inputs = config.get("inputs", {}) + outputs = config.get("outputs", {}) # Make sure IO core has at least one input or output if not inputs and not outputs: @@ -178,7 +178,7 @@ class IOCore(Elaboratable): Return the Amaranth signals that should be included as ports in the top-level Manta module. """ - return [self._inputs + self._outputs] + return self._inputs + self._outputs def get_max_addr(self): """ diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py index b46a449..d88cbef 100644 --- a/src/manta/logic_analyzer/__init__.py +++ b/src/manta/logic_analyzer/__init__.py @@ -165,7 +165,7 @@ class LogicAnalyzerCore(Elaboratable): self.bus_o.eq(sample_mem.bus_o), # Non-bus Connections fsm.trigger.eq(trig_blk.trig), - sample_mem.user_addr.eq(fsm.r.write_pointer), + sample_mem.user_addr.eq(fsm.write_pointer), sample_mem.user_we.eq(fsm.write_enable), ] @@ -211,14 +211,14 @@ class LogicAnalyzerCore(Elaboratable): # If core is not in IDLE state, request that it return to IDLE print_if_verbose(" -> Resetting core...") - state = self.fsm.r.get_probe("state") + state = self.fsm.registers.get_probe("state") if state != States.IDLE: - self.fsm.r.set_probe("request_start", 0) - self.fsm.r.set_probe("request_stop", 0) - self.fsm.r.set_probe("request_stop", 1) - self.fsm.r.set_probe("request_stop", 0) + self.fsm.registers.set_probe("request_start", 0) + self.fsm.registers.set_probe("request_stop", 0) + self.fsm.registers.set_probe("request_stop", 1) + self.fsm.registers.set_probe("request_stop", 0) - if self.fsm.r.get_probe("state") != States.IDLE: + if self.fsm.registers.get_probe("state") != States.IDLE: raise ValueError("Logic analyzer did not reset to IDLE state.") # Set triggers @@ -232,28 +232,32 @@ class LogicAnalyzerCore(Elaboratable): print_if_verbose(" -> Setting trigger mode...") if "trigger_mode" in self.config: mode = self.config["trigger_mode"].upper() - self.fsm.r.set_probe("trigger_mode", TriggerModes[mode]) + self.fsm.registers.set_probe("trigger_mode", TriggerModes[mode]) else: - self.fsm.r.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT) + self.fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT) # Set trigger location print_if_verbose(" -> Setting trigger location...") if "trigger_location" in self.config: - self.fsm.r.set_probe("trigger_location", self.config["trigger_location"]) + self.fsm.registers.set_probe( + "trigger_location", self.config["trigger_location"] + ) else: - self.fsm.r.set_probe("trigger_location", self.config["sample_depth"] // 2) + self.fsm.registers.set_probe( + "trigger_location", self.config["sample_depth"] // 2 + ) # Send a start request to the state machine print_if_verbose(" -> Starting capture...") - self.fsm.r.set_probe("request_start", 0) - self.fsm.r.set_probe("request_start", 1) - self.fsm.r.set_probe("request_start", 0) + self.fsm.registers.set_probe("request_start", 0) + self.fsm.registers.set_probe("request_start", 1) + self.fsm.registers.set_probe("request_start", 0) # Poll the state machine's state, and wait for the capture to complete print_if_verbose(" -> Waiting for capture to complete...") - while self.fsm.r.get_probe("state") != States.CAPTURED: + while self.fsm.registers.get_probe("state") != States.CAPTURED: pass # Read out the entirety of the sample memory @@ -264,7 +268,7 @@ class LogicAnalyzerCore(Elaboratable): # Revolve the memory around the read_pointer, such that all the beginning # of the caputure is at the first element print_if_verbose(" -> Checking read pointer and revolving memory...") - read_pointer = self.fsm.r.get_probe("read_pointer") + read_pointer = self.fsm.registers.get_probe("read_pointer") data = raw_capture[read_pointer:] + raw_capture[:read_pointer] return LogicAnalyzerCapture(data, self.config) diff --git a/src/manta/logic_analyzer/fsm.py b/src/manta/logic_analyzer/fsm.py index 461fa8b..cdf4b3c 100644 --- a/src/manta/logic_analyzer/fsm.py +++ b/src/manta/logic_analyzer/fsm.py @@ -26,29 +26,39 @@ class LogicAnalyzerFSM(Elaboratable): """ def __init__(self, config, base_addr, interface): - self.config = config + self._sample_depth = config["sample_depth"] + + # Outputs to rest of Logic Analyzer self.trigger = Signal(1) self.write_enable = Signal(1) - register_config = { - "inputs": { - "state": 4, - "read_pointer": ceil(log2(self.config["sample_depth"])), - "write_pointer": ceil(log2(self.config["sample_depth"])), - }, - "outputs": { - "trigger_location": ceil(log2(self.config["sample_depth"])), - "trigger_mode": 2, - "request_start": 1, - "request_stop": 1, - }, - } + # Outputs from FSM, inputs from IOCore + self.state = Signal(States) + self.read_pointer = Signal(range(self._sample_depth)) + self.write_pointer = Signal(range(self._sample_depth)) + inputs = [ + self.state, + self.read_pointer, + self.write_pointer, + ] - self.r = IOCore(register_config, base_addr, interface) + # Inputs to FSM, outputs from IOCore + self.trigger_location = Signal(range(self._sample_depth)) + self.trigger_mode = Signal(TriggerModes) + self.request_start = Signal() + self.request_stop = Signal() + outputs = [ + self.trigger_location, + self.trigger_mode, + self.request_start, + self.request_stop, + ] + + self.registers = IOCore(base_addr, interface, inputs, outputs) # Bus Input/Output - self.bus_i = self.r.bus_i - self.bus_o = self.r.bus_o + self.bus_i = self.registers.bus_i + self.bus_o = self.registers.bus_o def get_max_addr(self): """ @@ -56,38 +66,33 @@ class LogicAnalyzerFSM(Elaboratable): space used by the core extends from `base_addr` to the number returned by this function. """ - return self.r.get_max_addr() - - def increment_mod_sample_depth(self, m, signal): - # m.d.sync += signal.eq((signal + 1) % self.config["sample_depth"]) - - with m.If(signal == self.config["sample_depth"] - 1): - m.d.sync += signal.eq(0) - - with m.Else(): - m.d.sync += signal.eq(signal + 1) + return self.registers.get_max_addr() def elaborate(self, platform): m = Module() - m.submodules.registers = self.r + m.submodules.registers = self.registers - sample_depth = self.config["sample_depth"] - request_start = self.r.request_start - request_stop = self.r.request_stop - trigger_mode = self.r.trigger_mode - trigger_location = self.r.trigger_location - state = self.r.state + sample_depth = self._sample_depth + request_start = self.request_start + request_stop = self.request_stop + trigger_mode = self.trigger_mode + trigger_location = self.trigger_location + state = self.state write_enable = self.write_enable - write_pointer = self.r.write_pointer - read_pointer = self.r.read_pointer + write_pointer = self.write_pointer + read_pointer = self.read_pointer - prev_request_start = Signal(1) - prev_request_stop = Signal(1) + prev_request_start = Signal().like(request_start) + prev_request_stop = Signal().like(request_stop) + # Compute next_write_pointer as write_pointer + 1 % sample_depth next_write_pointer = Signal().like(write_pointer) + with m.If(write_pointer == self._sample_depth - 1): + m.d.comb += next_write_pointer.eq(0) - m.d.comb += next_write_pointer.eq((write_pointer + 1) % sample_depth) + with m.Else(): + m.d.comb += next_write_pointer.eq(write_pointer + 1) # Rising edge detection for start/stop requests m.d.sync += prev_request_start.eq(request_start) diff --git a/src/manta/logic_analyzer/trigger_block.py b/src/manta/logic_analyzer/trigger_block.py index 487a9c4..652f5be 100644 --- a/src/manta/logic_analyzer/trigger_block.py +++ b/src/manta/logic_analyzer/trigger_block.py @@ -1,4 +1,5 @@ from amaranth import * +from amaranth.lib.enum import IntEnum from manta.io_core import IOCore @@ -12,23 +13,20 @@ class LogicAnalyzerTriggerBlock(Elaboratable): def __init__(self, probes, base_addr, interface): # Instantiate a bunch of trigger blocks - self.probes = probes - self.triggers = [LogicAnalyzerTrigger(p) for p in self.probes] + self._probes = probes + self._triggers = [LogicAnalyzerTrigger(p) for p in self._probes] # Make IO core for everything - outputs = {} - for p in self.probes: - outputs[p.name + "_arg"] = p.width - outputs[p.name + "_op"] = 4 - - self.r = IOCore({"outputs": outputs}, base_addr, interface) + ops = [t.op for t in self._triggers] + args = [t.arg for t in self._triggers] + self.registers = IOCore(base_addr, interface, outputs=ops + args) # Bus Input/Output - self.bus_i = self.r.bus_i - self.bus_o = self.r.bus_o + self.bus_i = self.registers.bus_i + self.bus_o = self.registers.bus_o # Global trigger. High if any probe is triggered. - self.trig = Signal(1) + self.trig = Signal() def get_max_addr(self): """ @@ -36,13 +34,13 @@ class LogicAnalyzerTriggerBlock(Elaboratable): space used by the core extends from `base_addr` to the number returned by this function. """ - return self.r.get_max_addr() + return self.registers.get_max_addr() def clear_triggers(self): # reset all triggers to disabled with no argument - for p in self.probes: - self.r.set_probe(p.name + "_op", 0) - self.r.set_probe(p.name + "_arg", 0) + for p in self._probes: + self.registers.set_probe(p.name + "_op", Operations.DISABLE) + self.registers.set_probe(p.name + "_arg", 0) def set_triggers(self, config): # set triggers @@ -52,34 +50,42 @@ class LogicAnalyzerTriggerBlock(Elaboratable): # Handle triggers that don't need an argument if len(components) == 2: name, op = components - self.r.set_probe(name + "_op", self.triggers[0].operations[op]) + self.registers.set_probe(name + "_op", Operations[op].value) # Handle triggers that do need an argument elif len(components) == 3: name, op, arg = components - self.r.set_probe(name + "_op", self.triggers[0].operations[op]) - self.r.set_probe(name + "_arg", int(arg)) + self.registers.set_probe(name + "_op", Operations[op].value) + self.registers.set_probe(name + "_arg", int(arg)) def elaborate(self, platform): m = Module() # Add IO Core as submodule - m.submodules.registers = self.r + m.submodules.registers = self.registers # Add triggers as submodules - for t in self.triggers: + for t in self._triggers: m.submodules[t.signal.name + "_trigger"] = t - # Connect IO core registers to triggers - for probe, trigger in zip(self.probes, self.triggers): - m.d.comb += trigger.arg.eq(getattr(self.r, probe.name + "_arg")) - m.d.comb += trigger.op.eq(getattr(self.r, probe.name + "_op")) - - m.d.comb += self.trig.eq(Cat([t.triggered for t in self.triggers]).any()) + m.d.comb += self.trig.eq(Cat([t.triggered for t in self._triggers]).any()) return m +class Operations(IntEnum): + DISABLE = 0 + RISING = 1 + FALLING = 2 + CHANGING = 3 + GT = 4 + LT = 5 + GEQ = 6 + LEQ = 7 + EQ = 8 + NEQ = 9 + + class LogicAnalyzerTrigger(Elaboratable): """ A module containing a programmable "trigger" for a given input signal, @@ -88,23 +94,10 @@ class LogicAnalyzerTrigger(Elaboratable): """ def __init__(self, signal): - self.operations = { - "DISABLE": 0, - "RISING": 1, - "FALLING": 2, - "CHANGING": 3, - "GT": 4, - "LT": 5, - "GEQ": 6, - "LEQ": 7, - "EQ": 8, - "NEQ": 9, - } - self.signal = signal - self.op = Signal(range(len(self.operations))) - self.arg = Signal().like(signal) - self.triggered = Signal(1) + self.op = Signal(Operations, name=signal.name + "_op") + self.arg = Signal(signal.width, name=signal.name + "_arg") + self.triggered = Signal() def elaborate(self, platform): m = Module() @@ -113,34 +106,34 @@ class LogicAnalyzerTrigger(Elaboratable): prev = Signal().like(self.signal) m.d.sync += prev.eq(self.signal) - with m.If(self.op == self.operations["DISABLE"]): + with m.If(self.op == Operations.DISABLE): m.d.comb += self.triggered.eq(0) - with m.Elif(self.op == self.operations["RISING"]): + with m.Elif(self.op == Operations.RISING): m.d.comb += self.triggered.eq(self.signal > prev) - with m.Elif(self.op == self.operations["FALLING"]): + with m.Elif(self.op == Operations.FALLING): m.d.comb += self.triggered.eq(self.signal < prev) - with m.Elif(self.op == self.operations["CHANGING"]): + with m.Elif(self.op == Operations.CHANGING): m.d.comb += self.triggered.eq(self.signal != prev) - with m.Elif(self.op == self.operations["GT"]): + with m.Elif(self.op == Operations.GT): m.d.comb += self.triggered.eq(self.signal > self.arg) - with m.Elif(self.op == self.operations["LT"]): + with m.Elif(self.op == Operations.LT): m.d.comb += self.triggered.eq(self.signal < self.arg) - with m.Elif(self.op == self.operations["GEQ"]): + with m.Elif(self.op == Operations.GEQ): m.d.comb += self.triggered.eq(self.signal >= self.arg) - with m.Elif(self.op == self.operations["LEQ"]): + with m.Elif(self.op == Operations.LEQ): m.d.comb += self.triggered.eq(self.signal <= self.arg) - with m.Elif(self.op == self.operations["EQ"]): + with m.Elif(self.op == Operations.EQ): m.d.comb += self.triggered.eq(self.signal == self.arg) - with m.Elif(self.op == self.operations["NEQ"]): + with m.Elif(self.op == Operations.NEQ): m.d.comb += self.triggered.eq(self.signal != self.arg) with m.Else(): diff --git a/test/test_io_core_hw.py b/test/test_io_core_hw.py index 2c984f8..13f29c6 100644 --- a/test/test_io_core_hw.py +++ b/test/test_io_core_hw.py @@ -148,4 +148,4 @@ def test_output_probe_initial_values_xilinx(): @pytest.mark.skipif(not ice40_tools_installed(), reason="no toolchain installed") def test_output_probe_initial_values_ice40(): - IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB3").verify() + IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB2").verify() diff --git a/test/test_logic_analyzer_fsm_sim.py b/test/test_logic_analyzer_fsm_sim.py index c2b4e70..7e7043c 100644 --- a/test/test_logic_analyzer_fsm_sim.py +++ b/test/test_logic_analyzer_fsm_sim.py @@ -9,12 +9,12 @@ fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None) def test_signals_reset_correctly(): def testbench(): # Make sure pointers and write enable reset to zero - for sig in [fsm.r.write_pointer, fsm.r.read_pointer, fsm.write_enable]: + for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]: if (yield sig) != 0: raise ValueError # Make sure state resets to IDLE - if (yield fsm.r.state != States.IDLE): + if (yield fsm.state != States.IDLE): raise ValueError simulate(fsm, testbench) @@ -24,9 +24,9 @@ def test_single_shot_no_wait_for_trigger(): def testbench(): # Configure and start FSM yield fsm.trigger.eq(1) - yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.r.trigger_location.eq(4) - yield fsm.r.request_start.eq(1) + yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.trigger_location.eq(4) + yield fsm.request_start.eq(1) # Wait until write_enable is asserted while not (yield fsm.write_enable): @@ -35,11 +35,11 @@ def test_single_shot_no_wait_for_trigger(): # Wait 8 clock cycles for capture to complete for i in range(8): # Make sure that read_pointer does not increase - if (yield fsm.r.read_pointer) != 0: + if (yield fsm.read_pointer) != 0: raise ValueError # Make sure that write_pointer increases by one each cycle - if (yield fsm.r.write_pointer) != i: + if (yield fsm.write_pointer) != i: raise ValueError yield @@ -48,11 +48,11 @@ def test_single_shot_no_wait_for_trigger(): yield # Check that write_pointer points to the end of memory - if (yield fsm.r.write_pointer) != 7: + if (yield fsm.write_pointer) != 7: raise ValueError # Check that state is CAPTURED - if (yield fsm.r.state) != States.CAPTURED: + if (yield fsm.state) != States.CAPTURED: raise ValueError simulate(fsm, testbench, "single_shot_no_wait_for_trigger.vcd") @@ -61,9 +61,9 @@ def test_single_shot_no_wait_for_trigger(): def test_single_shot_wait_for_trigger(): def testbench(): # Configure and start FSM - yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.r.trigger_location.eq(4) - yield fsm.r.request_start.eq(1) + yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.trigger_location.eq(4) + yield fsm.request_start.eq(1) yield yield @@ -73,8 +73,8 @@ def test_single_shot_wait_for_trigger(): # Wait 4 clock cycles to get to IN_POSITION for i in range(4): - rp = yield fsm.r.read_pointer - wp = yield fsm.r.write_pointer + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer # Make sure that read_pointer does not increase if rp != 0: @@ -101,13 +101,13 @@ def test_single_shot_wait_for_trigger(): yield # Check that write_pointer points to the end of memory - rp = yield fsm.r.read_pointer - wp = yield fsm.r.write_pointer - if (wp + 1) % fsm.config["sample_depth"] != rp: + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer + if (wp + 1) % config["sample_depth"] != rp: raise ValueError # Check that state is CAPTURED - if (yield fsm.r.state) != States.CAPTURED: + if (yield fsm.state) != States.CAPTURED: raise ValueError simulate(fsm, testbench, "single_shot_wait_for_trigger.vcd") @@ -116,8 +116,8 @@ def test_single_shot_wait_for_trigger(): def test_immediate(): def testbench(): # Configure and start FSM - yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE) - yield fsm.r.request_start.eq(1) + yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) + yield fsm.request_start.eq(1) yield yield @@ -125,9 +125,9 @@ def test_immediate(): if not (yield fsm.write_enable): raise ValueError - for i in range(fsm.config["sample_depth"]): - rp = yield fsm.r.read_pointer - wp = yield fsm.r.write_pointer + for i in range(config["sample_depth"]): + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer if rp != 0: raise ValueError @@ -141,15 +141,15 @@ def test_immediate(): yield # Check that write_pointer points to the end of memory - rp = yield fsm.r.read_pointer - wp = yield fsm.r.write_pointer + rp = yield fsm.read_pointer + wp = yield fsm.write_pointer if rp != 0: raise ValueError if wp != 7: raise ValueError # Check that state is CAPTURED - if (yield fsm.r.state) != States.CAPTURED: + if (yield fsm.state) != States.CAPTURED: raise ValueError simulate(fsm, testbench, "immediate.vcd") @@ -158,8 +158,8 @@ def test_immediate(): def test_incremental(): def testbench(): # Configure and start FSM - yield fsm.r.trigger_mode.eq(TriggerModes.INCREMENTAL) - yield fsm.r.request_start.eq(1) + yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL) + yield fsm.request_start.eq(1) yield yield @@ -177,7 +177,7 @@ def test_incremental(): yield # Check that state is CAPTURED - if (yield fsm.r.state) != States.CAPTURED: + if (yield fsm.state) != States.CAPTURED: raise ValueError simulate(fsm, testbench, "incremental.vcd") @@ -186,8 +186,8 @@ def test_incremental(): def test_single_shot_write_enable(): def testbench(): # Configure FSM - yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT) - yield fsm.r.trigger_location.eq(4) + yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.trigger_location.eq(4) yield # Make sure write is not enabled before starting the FSM @@ -195,11 +195,11 @@ def test_single_shot_write_enable(): raise ValueError # Start the FSM, ensure write enable is asserted throughout the capture - yield fsm.r.request_start.eq(1) + yield fsm.request_start.eq(1) yield yield - for _ in range(fsm.config["sample_depth"]): + for _ in range(config["sample_depth"]): if not (yield fsm.write_enable): raise ValueError @@ -224,7 +224,7 @@ def test_single_shot_write_enable(): def test_immediate_write_enable(): def testbench(): # Configure FSM - yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE) + yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE) yield # Make sure write is not enabled before starting the FSM @@ -232,11 +232,11 @@ def test_immediate_write_enable(): raise ValueError # Start the FSM, ensure write enable is asserted throughout the capture - yield fsm.r.request_start.eq(1) + yield fsm.request_start.eq(1) yield yield - for _ in range(fsm.config["sample_depth"]): + for _ in range(config["sample_depth"]): if not (yield fsm.write_enable): raise ValueError diff --git a/test/test_logic_analyzer_sim.py b/test/test_logic_analyzer_sim.py index ebce934..0346bd6 100644 --- a/test/test_logic_analyzer_sim.py +++ b/test/test_logic_analyzer_sim.py @@ -1,5 +1,6 @@ from amaranth.sim import Simulator from manta.logic_analyzer import LogicAnalyzerCore +from manta.logic_analyzer.trigger_block import Operations from manta.utils import * from random import sample @@ -32,8 +33,8 @@ def print_data_at_addr(addr): def set_fsm_register(name, data): - addr = la.fsm.r.mmap[f"{name}_buf"]["addrs"][0] - strobe_addr = la.fsm.r.base_addr + addr = la.fsm.registers._memory_map[name]["addrs"][0] + strobe_addr = la.fsm.registers._base_addr yield from write_register(la, strobe_addr, 0) yield from write_register(la, addr, data) @@ -42,8 +43,8 @@ def set_fsm_register(name, data): def set_trig_blk_register(name, data): - addr = la.trig_blk.r.mmap[f"{name}_buf"]["addrs"][0] - strobe_addr = la.trig_blk.r.base_addr + addr = la.trig_blk.registers._memory_map[name]["addrs"][0] + strobe_addr = la.trig_blk.registers._base_addr yield from write_register(la, strobe_addr, 0) yield from write_register(la, addr, data) @@ -67,9 +68,7 @@ def test_single_shot_capture(): yield from set_fsm_register("request_stop", 0) # setting triggers - yield from set_trig_blk_register( - "curly_op", la.trig_blk.triggers[0].operations["EQ"] - ) + yield from set_trig_blk_register("curly_op", Operations.EQ) yield from set_trig_blk_register("curly_arg", 4) # setting trigger mode diff --git a/test/test_mem_core_hw.py b/test/test_mem_core_hw.py index e9c0710..78bd024 100644 --- a/test/test_mem_core_hw.py +++ b/test/test_mem_core_hw.py @@ -48,16 +48,33 @@ class MemoryCoreLoopbackTest(Elaboratable): }, } + def get_probe(self, name): + # This is a hack! And should be removed once the full Amaranth-native + # API is built out + for i in self.manta.io_core._inputs: + if i.name == name: + return i + + for o in self.manta.io_core._outputs: + if o.name == name: + return o + + return None + def elaborate(self, platform): m = Module() m.submodules.manta = self.manta uart_pins = platform.request("uart") + addr = self.get_probe("addr") + data = self.get_probe("data") + we = self.get_probe("we") + m.d.comb += [ - self.manta.mem_core.user_addr.eq(self.manta.io_core.addr), - self.manta.mem_core.user_data.eq(self.manta.io_core.data), - self.manta.mem_core.user_we.eq(self.manta.io_core.we), + self.manta.mem_core.user_addr.eq(addr), + self.manta.mem_core.user_data.eq(data), + self.manta.mem_core.user_we.eq(we), self.manta.interface.rx.eq(uart_pins.rx.i), uart_pins.tx.o.eq(self.manta.interface.tx), ] diff --git a/test/test_uart_rx_sim.py b/test/test_uart_rx_sim.py index 3604591..40111aa 100644 --- a/test/test_uart_rx_sim.py +++ b/test/test_uart_rx_sim.py @@ -14,8 +14,8 @@ def verify_receive(data): valid_asserted_before = False - for i in range(10 * uart_rx.clocks_per_baud): - bit_index = i // uart_rx.clocks_per_baud + for i in range(10 * uart_rx._clocks_per_baud): + bit_index = i // uart_rx._clocks_per_baud # Every cycle, run checks on uart_rx: if (yield uart_rx.valid_o): diff --git a/test/test_uart_tx_sim.py b/test/test_uart_tx_sim.py index c78d076..8b07aa4 100644 --- a/test/test_uart_tx_sim.py +++ b/test/test_uart_tx_sim.py @@ -26,8 +26,8 @@ def verify_bit_sequence(byte): data_bits = "0" + f"{byte:08b}"[::-1] + "1" data_bits = [int(bit) for bit in data_bits] - for i in range(10 * uart_tx.clocks_per_baud): - bit_index = i // uart_tx.clocks_per_baud + for i in range(10 * uart_tx._clocks_per_baud): + bit_index = i // uart_tx._clocks_per_baud if (yield uart_tx.tx) != data_bits[bit_index]: raise ValueError("Wrong bit in sequence!")