From ee4a79a4d42bb434c60d6d6634f4ecd885063d48 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Sun, 21 Jan 2024 23:45:14 -0800 Subject: [PATCH] refactor logic analyzer FSM to be sequential-only for better timing --- src/manta/logic_analyzer/__init__.py | 6 +- src/manta/logic_analyzer/fsm.py | 175 ++++++++------------------- test/test_logic_analyzer_fsm_sim.py | 53 ++------ 3 files changed, 69 insertions(+), 165 deletions(-) diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py index fd8dece..470843d 100644 --- a/src/manta/logic_analyzer/__init__.py +++ b/src/manta/logic_analyzer/__init__.py @@ -103,8 +103,8 @@ class LogicAnalyzerCore(Elaboratable): if not isinstance(trigger_location, int) or trigger_location < 0: raise ValueError("Trigger location must be a positive integer.") - if trigger_location > config["sample_depth"]: - raise ValueError("Trigger location cannot exceed sample depth.") + if trigger_location >= config["sample_depth"]: + raise ValueError("Trigger location must be less than sample depth.") if trigger_mode == "immediate": warn( @@ -229,7 +229,7 @@ class LogicAnalyzerCore(Elaboratable): print_if_verbose(" -> Setting triggers...") self.trig_blk.clear_triggers() - if self.config["trigger_mode"] != "immediate": + if self.config.get("trigger_mode") != "immediate": self.trig_blk.set_triggers(self.config) # Set trigger mode, default to single-shot if user didn't specify a mode diff --git a/src/manta/logic_analyzer/fsm.py b/src/manta/logic_analyzer/fsm.py index 38d384d..9efe17a 100644 --- a/src/manta/logic_analyzer/fsm.py +++ b/src/manta/logic_analyzer/fsm.py @@ -76,154 +76,85 @@ class LogicAnalyzerFSM(Elaboratable): prev_request_start = Signal(1) prev_request_stop = Signal(1) - next_state = Signal().like(state) - next_read_pointer = Signal().like(read_pointer) - latch_read_pointer = Signal().like(read_pointer) - latch_write_pointer = Signal().like(write_pointer) - latch_write_enable = Signal().like(write_enable) next_write_pointer = Signal().like(write_pointer) - # --- Sequential Logic --- + m.d.comb += next_write_pointer.eq((write_pointer + 1) % sample_depth) + # Rising edge detection for start/stop requests m.d.sync += prev_request_start.eq(request_start) m.d.sync += prev_request_stop.eq(request_stop) - # Copy next into current - m.d.sync += state.eq(next_state) - m.d.sync += next_write_pointer.eq((write_pointer + 1) % sample_depth) - m.d.sync += next_read_pointer.eq((read_pointer + 1) % sample_depth) - m.d.sync += latch_read_pointer.eq(read_pointer) - m.d.sync += latch_write_pointer.eq(write_pointer) - m.d.sync += latch_write_enable.eq(write_enable) + with m.If(state == States.IDLE): + m.d.sync += write_pointer.eq(0) + m.d.sync += read_pointer.eq(0) + m.d.sync += write_enable.eq(0) - # --- Combinational Logic --- + with m.If((request_start) & (~prev_request_start)): + with m.If(trigger_mode == TriggerModes.IMMEDIATE): + m.d.sync += state.eq(States.CAPTURING) + m.d.sync += write_enable.eq(1) - # --- Single Shot Trigger Mode --- - with m.If(trigger_mode == TriggerModes.SINGLE_SHOT): - with m.If(state == States.IDLE): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(0) - m.d.comb += read_pointer.eq(0) - m.d.comb += next_state.eq(States.IDLE) + with m.Elif(trigger_mode == TriggerModes.INCREMENTAL): + m.d.sync += state.eq(States.CAPTURING) + m.d.sync += write_enable.eq(1) - # Rising edge of request_start beings the capture: - with m.If((request_start) & (~prev_request_start)): - m.d.comb += write_enable.eq(1) - # Go straight to IN_POSITION if trigger_location == 0 + with m.Elif(trigger_mode == TriggerModes.SINGLE_SHOT): with m.If(trigger_location == 0): - m.d.comb += next_state.eq(States.IN_POSITION) - - # Otherwise go to MOVE_TO_POSITION - with m.Else(): - m.d.comb += next_state.eq(States.MOVE_TO_POSITION) - - with m.Elif(state == States.MOVE_TO_POSITION): - m.d.comb += write_enable.eq(1) - m.d.comb += write_pointer.eq(next_write_pointer) - m.d.comb += read_pointer.eq(0) - m.d.comb += next_state.eq(States.MOVE_TO_POSITION) - - with m.If(write_pointer == trigger_location - 1): - with m.If(self.trigger): - m.d.comb += next_state.eq(States.CAPTURING) + m.d.sync += state.eq(States.IN_POSITION) with m.Else(): - m.d.comb += next_state.eq(States.IN_POSITION) + m.d.sync += state.eq(States.MOVE_TO_POSITION) - with m.Elif(state == States.IN_POSITION): - m.d.comb += write_enable.eq(1) - m.d.comb += write_pointer.eq(next_write_pointer) - m.d.comb += next_state.eq(States.IN_POSITION) + m.d.sync += write_enable.eq(1) + with m.Elif(state == States.MOVE_TO_POSITION): + m.d.sync += write_pointer.eq(next_write_pointer) + + with m.If(write_pointer == trigger_location - 1): with m.If(self.trigger): - m.d.comb += next_state.eq(States.CAPTURING) - m.d.comb += read_pointer.eq(latch_read_pointer) + m.d.sync += state.eq(States.CAPTURING) with m.Else(): - m.d.comb += read_pointer.eq(next_read_pointer) + m.d.sync += state.eq(States.IN_POSITION) - with m.Elif(state == States.CAPTURING): - m.d.comb += write_enable.eq(1) - m.d.comb += read_pointer.eq(latch_read_pointer) - m.d.comb += next_state.eq(States.CAPTURING) + with m.Elif(state == States.IN_POSITION): + m.d.sync += write_pointer.eq(next_write_pointer) + with m.If(self.trigger): + m.d.sync += state.eq(States.CAPTURING) + + # kind of horrible, i'll get rid of this later... + with m.If(write_pointer > trigger_location): + m.d.sync += read_pointer.eq(write_pointer - trigger_location) + with m.Else(): + m.d.sync += read_pointer.eq( + write_pointer - trigger_location + sample_depth + ) + + # ok that's all for horrible + + with m.If(state == States.CAPTURING): + # Non- incremental modes + with m.If(trigger_mode != TriggerModes.INCREMENTAL): with m.If(next_write_pointer == read_pointer): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(latch_write_pointer) - m.d.comb += next_state.eq(States.CAPTURED) + m.d.sync += write_enable.eq(0) + m.d.sync += state.eq(States.CAPTURED) with m.Else(): - m.d.comb += write_pointer.eq(next_write_pointer) + m.d.sync += write_pointer.eq(next_write_pointer) - with m.Elif(state == States.CAPTURED): - m.d.comb += next_state.eq(States.CAPTURED) - m.d.comb += read_pointer.eq(latch_read_pointer) - m.d.comb += write_pointer.eq(latch_write_pointer) - m.d.comb += write_enable.eq(0) + # Incremental mode + with m.Else(): + with m.If(self.trigger): + with m.If(next_write_pointer == read_pointer): + m.d.sync += write_enable.eq(0) + m.d.sync += state.eq(States.CAPTURED) - # --- Immediate Trigger Mode --- - with m.If(self.r.trigger_mode == TriggerModes.IMMEDIATE): - m.d.comb += read_pointer.eq(0) - with m.If(self.r.state == States.IDLE): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(0) - m.d.comb += next_state.eq(States.IDLE) - - # Rising edge of request_start beings the capture: - with m.If((request_start) & (~prev_request_start)): - m.d.comb += write_enable.eq(1) - m.d.comb += next_state.eq(States.CAPTURING) - - with m.Elif(state == States.CAPTURING): - m.d.comb += write_enable.eq(1) - m.d.comb += next_state.eq(States.CAPTURING) - m.d.comb += write_pointer.eq(next_write_pointer) - - with m.If(next_write_pointer == read_pointer): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(latch_write_pointer) - m.d.comb += next_state.eq(States.CAPTURED) - - with m.Elif(state == States.CAPTURED): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(latch_write_pointer) - m.d.comb += next_state.eq(States.CAPTURED) - - # --- Incremental Trigger Mode --- - with m.If(self.r.trigger_mode == TriggerModes.INCREMENTAL): - with m.If(state == States.IDLE): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(0) - m.d.comb += read_pointer.eq(0) - m.d.comb += next_state.eq(States.IDLE) - - # Rising edge of request_start beings the capture: - with m.If((request_start) & (~prev_request_start)): - m.d.comb += write_enable.eq(self.trigger) - m.d.comb += next_state.eq(States.CAPTURING) - - with m.Elif(state == States.CAPTURING): - m.d.comb += read_pointer.eq(0) - m.d.comb += next_state.eq(States.CAPTURING) - m.d.comb += write_enable.eq(self.trigger) - - with m.If(latch_write_enable): - m.d.comb += write_pointer.eq(next_write_pointer) - with m.Else(): - m.d.comb += write_pointer.eq(latch_write_pointer) - - with m.If((self.trigger) & (next_write_pointer == read_pointer)): - m.d.comb += write_pointer.eq(latch_write_pointer) - m.d.comb += next_state.eq(States.CAPTURED) - - with m.Elif(state == States.CAPTURED): - m.d.comb += next_state.eq(States.CAPTURED) - m.d.comb += read_pointer.eq(latch_read_pointer) - m.d.comb += write_pointer.eq(latch_write_pointer) - m.d.comb += write_enable.eq(0) + with m.Else(): + m.d.sync += write_pointer.eq(next_write_pointer) # Regardless of trigger mode, go back to IDLE if request_stop is pulsed with m.If((request_stop) & (~prev_request_stop)): - m.d.comb += next_state.eq(States.IDLE) + m.d.sync += state.eq(States.IDLE) return m diff --git a/test/test_logic_analyzer_fsm_sim.py b/test/test_logic_analyzer_fsm_sim.py index 24d8215..c2b4e70 100644 --- a/test/test_logic_analyzer_fsm_sim.py +++ b/test/test_logic_analyzer_fsm_sim.py @@ -2,25 +2,6 @@ from amaranth.sim import Simulator from manta.logic_analyzer import * from manta.utils import * -""" -what do we want this to do? - -we want to run a capture in single shot mode, immediate mode, and incremental mode - - -single-shot case: -- exactly the right number of samples are taken -- we only start taking samples once captured - -immediate case: -- exactly the right number of samples are taken -- we only start taking samples once captured - -incremental case: -- exactly the right number of samples are taken -- we only take samples when trig is asserted - -""" config = {"sample_depth": 8} fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None) @@ -84,8 +65,9 @@ def test_single_shot_wait_for_trigger(): yield fsm.r.trigger_location.eq(4) yield fsm.r.request_start.eq(1) yield + yield - # Check that write_enable is asserted on the same edge as request_start + # Check that write_enable is asserted a cycle after request_start if not (yield fsm.write_enable): raise ValueError @@ -104,28 +86,15 @@ def test_single_shot_wait_for_trigger(): yield - # Wait a few cycles before triggering: + # Wait a few cycles before triggering for _ in range(10): - if (rp + 3) % fsm.config["sample_depth"] != wp: - raise ValueError - yield # Provide the trigger, and check that the capture completes 4 cycles later yield fsm.trigger.eq(1) yield - rp_start = yield fsm.r.read_pointer for i in range(4): - rp = yield fsm.r.read_pointer - wp = yield fsm.r.write_pointer - - if rp != rp_start: - raise ValueError - - if (rp_start + 4 + i) % fsm.config["sample_depth"] != wp: - raise ValueError - yield # Wait one clock cycle (to let BRAM contents cycle in) @@ -150,8 +119,9 @@ def test_immediate(): yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE) yield fsm.r.request_start.eq(1) yield + yield - # Check that write_enable is asserted on the same edge as request_start + # Check that write_enable is asserted a cycle after request_start if not (yield fsm.write_enable): raise ValueError @@ -191,10 +161,11 @@ def test_incremental(): yield fsm.r.trigger_mode.eq(TriggerModes.INCREMENTAL) yield fsm.r.request_start.eq(1) yield + yield # Check that write_enable is asserted on the same edge as request_start - # if not (yield fsm.write_enable): - # raise ValueError + if not (yield fsm.write_enable): + raise ValueError for _ in range(10): for _ in range(3): @@ -205,9 +176,9 @@ def test_incremental(): yield fsm.trigger.eq(0) yield - # # Check that state is CAPTURED - # if (yield fsm.r.state) != States.CAPTURED: - # raise ValueError + # Check that state is CAPTURED + if (yield fsm.r.state) != States.CAPTURED: + raise ValueError simulate(fsm, testbench, "incremental.vcd") @@ -226,6 +197,7 @@ def test_single_shot_write_enable(): # Start the FSM, ensure write enable is asserted throughout the capture yield fsm.r.request_start.eq(1) yield + yield for _ in range(fsm.config["sample_depth"]): if not (yield fsm.write_enable): @@ -262,6 +234,7 @@ def test_immediate_write_enable(): # Start the FSM, ensure write enable is asserted throughout the capture yield fsm.r.request_start.eq(1) yield + yield for _ in range(fsm.config["sample_depth"]): if not (yield fsm.write_enable):