diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py index bfbf295..e78f634 100644 --- a/src/manta/logic_analyzer/__init__.py +++ b/src/manta/logic_analyzer/__init__.py @@ -2,7 +2,7 @@ from amaranth import * from warnings import warn from ..utils import * from .trigger_block import LogicAnalyzerTriggerBlock -from .fsm import LogicAnalyzerFSM +from .fsm import LogicAnalyzerFSM, States, TriggerModes from .sample_mem import LogicAnalyzerSampleMemory from .playback import LogicAnalyzerPlayback @@ -217,13 +217,13 @@ class LogicAnalyzerCore(Elaboratable): # If core is not in IDLE state, request that it return to IDLE print_if_verbose(" -> Resetting core...") state = self.fsm.r.get_probe("state") - if state != self.fsm.states["IDLE"]: + if state != States.IDLE: self.fsm.r.set_probe("request_start", 0) self.fsm.r.set_probe("request_stop", 0) self.fsm.r.set_probe("request_stop", 1) self.fsm.r.set_probe("request_stop", 0) - if self.fsm.r.get_probe("state") != self.fsm.states["IDLE"]: + if self.fsm.r.get_probe("state") != States.IDLE: raise ValueError("Logic analyzer did not reset to IDLE state.") # Set triggers @@ -237,10 +237,10 @@ class LogicAnalyzerCore(Elaboratable): print_if_verbose(" -> Setting trigger mode...") if "trigger_mode" in self.config: mode = self.config["trigger_mode"].upper() - self.fsm.r.set_probe("trigger_mode", self.fsm.trigger_modes[mode]) + self.fsm.r.set_probe("trigger_mode", TriggerModes[mode]) else: - self.fsm.r.set_probe("trigger_mode", self.fsm.trigger_modes["SINGLE_SHOT"]) + self.fsm.r.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT) # Set trigger location print_if_verbose(" -> Setting trigger location...") @@ -258,7 +258,7 @@ class LogicAnalyzerCore(Elaboratable): # Poll the state machine's state, and wait for the capture to complete print_if_verbose(" -> Waiting for capture to complete...") - while self.fsm.r.get_probe("state") != self.fsm.states["CAPTURED"]: + while self.fsm.r.get_probe("state") != States.CAPTURED: pass # Read out the entirety of the sample memory diff --git a/src/manta/logic_analyzer/fsm.py b/src/manta/logic_analyzer/fsm.py index 7dcbceb..38d384d 100644 --- a/src/manta/logic_analyzer/fsm.py +++ b/src/manta/logic_analyzer/fsm.py @@ -1,23 +1,28 @@ from amaranth import * +from amaranth.lib.enum import IntEnum from math import ceil, log2 from ..io_core import IOCore +class States(IntEnum): + IDLE = 0 + MOVE_TO_POSITION = 1 + IN_POSITION = 2 + CAPTURING = 3 + CAPTURED = 4 + + +class TriggerModes(IntEnum): + SINGLE_SHOT = 0 + INCREMENTAL = 1 + IMMEDIATE = 2 + + class LogicAnalyzerFSM(Elaboratable): """ """ def __init__(self, config, base_addr, interface): self.config = config - self.states = { - "IDLE": 0, - "MOVE_TO_POSITION": 1, - "IN_POSITION": 2, - "CAPTURING": 3, - "CAPTURED": 4, - } - - self.trigger_modes = {"SINGLE_SHOT": 0, "INCREMENTAL": 1, "IMMEDIATE": 2} - self.trigger = Signal(1) self.write_enable = Signal(1) @@ -74,83 +79,11 @@ class LogicAnalyzerFSM(Elaboratable): next_state = Signal().like(state) next_read_pointer = Signal().like(read_pointer) latch_read_pointer = Signal().like(read_pointer) + latch_write_pointer = Signal().like(write_pointer) + latch_write_enable = Signal().like(write_enable) next_write_pointer = Signal().like(write_pointer) - # COMBINATIONAL SHIT - with m.If(trigger_mode == self.trigger_modes["SINGLE_SHOT"]): - with m.If(state == self.states["IDLE"]): - m.d.comb += write_enable.eq(0) - m.d.comb += write_pointer.eq(0) - m.d.comb += read_pointer.eq(0) - m.d.comb += next_state.eq(self.states["IDLE"]) - - # Rising edge of request_start beings the capture: - with m.If((request_start) & (~prev_request_start)): - m.d.comb += write_enable.eq(1) - # Go straight to IN_POSITION if trigger_location == 0 - with m.If(trigger_location == 0): - m.d.comb += next_state.eq(self.states["IN_POSITION"]) - - # Otherwise go to MOVE_TO_POSITION - with m.Else(): - m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"]) - - with m.Elif(state == self.states["MOVE_TO_POSITION"]): - m.d.comb += write_enable.eq(1) - m.d.comb += write_pointer.eq(next_write_pointer) - m.d.comb += read_pointer.eq(0) - m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"]) - - with m.If(write_pointer == trigger_location - 1): - with m.If(self.trigger): - m.d.comb += next_state.eq(self.states["CAPTURING"]) - - with m.Else(): - m.d.comb += next_state.eq(self.states["IN_POSITION"]) - - with m.Elif(state == self.states["IN_POSITION"]): - m.d.comb += write_enable.eq(1) - m.d.comb += write_pointer.eq(next_write_pointer) - m.d.comb += next_state.eq(self.states["IN_POSITION"]) - - with m.If(self.trigger): - m.d.comb += next_state.eq(self.states["CAPTURING"]) - m.d.comb += read_pointer.eq(latch_read_pointer) - - with m.Else(): - m.d.comb += read_pointer.eq(next_read_pointer) - - with m.Elif(state == self.states["CAPTURING"]): - m.d.comb += write_enable.eq(1) - m.d.comb += read_pointer.eq(latch_read_pointer) - m.d.comb += next_state.eq(self.states["CAPTURING"]) - - with m.If(next_write_pointer == read_pointer): - m.d.comb += write_enable.eq(0) - m.d.comb += next_state.eq(self.states["CAPTURED"]) - - with m.Else(): - m.d.comb += write_pointer.eq(next_write_pointer) - - with m.Elif(state == self.states["CAPTURED"]): - m.d.comb += next_state.eq(self.states["CAPTURED"]) - m.d.comb += read_pointer.eq(latch_read_pointer) - m.d.comb += write_enable.eq(0) - # m.d.comb += read_pointer.eq(read_pointer) - # m.d.comb += write_pointer.eq(write_pointer) - - with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]): - pass - - with m.If(self.r.trigger_mode == self.trigger_modes["INCREMENTAL"]): - pass - - # Regardless of trigger mode, go back to IDLE if request_stop is pulsed - with m.If((request_stop) & (~prev_request_stop)): - m.d.comb += next_state.eq(self.states["IDLE"]) - - # SEQUENTIAL SHIT - + # --- Sequential Logic --- # Rising edge detection for start/stop requests m.d.sync += prev_request_start.eq(request_start) m.d.sync += prev_request_stop.eq(request_stop) @@ -160,58 +93,137 @@ class LogicAnalyzerFSM(Elaboratable): m.d.sync += next_write_pointer.eq((write_pointer + 1) % sample_depth) m.d.sync += next_read_pointer.eq((read_pointer + 1) % sample_depth) m.d.sync += latch_read_pointer.eq(read_pointer) + m.d.sync += latch_write_pointer.eq(write_pointer) + m.d.sync += latch_write_enable.eq(write_enable) + + # --- Combinational Logic --- + + # --- Single Shot Trigger Mode --- + with m.If(trigger_mode == TriggerModes.SINGLE_SHOT): + with m.If(state == States.IDLE): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(0) + m.d.comb += read_pointer.eq(0) + m.d.comb += next_state.eq(States.IDLE) + + # Rising edge of request_start beings the capture: + with m.If((request_start) & (~prev_request_start)): + m.d.comb += write_enable.eq(1) + # Go straight to IN_POSITION if trigger_location == 0 + with m.If(trigger_location == 0): + m.d.comb += next_state.eq(States.IN_POSITION) + + # Otherwise go to MOVE_TO_POSITION + with m.Else(): + m.d.comb += next_state.eq(States.MOVE_TO_POSITION) + + with m.Elif(state == States.MOVE_TO_POSITION): + m.d.comb += write_enable.eq(1) + m.d.comb += write_pointer.eq(next_write_pointer) + m.d.comb += read_pointer.eq(0) + m.d.comb += next_state.eq(States.MOVE_TO_POSITION) + + with m.If(write_pointer == trigger_location - 1): + with m.If(self.trigger): + m.d.comb += next_state.eq(States.CAPTURING) + + with m.Else(): + m.d.comb += next_state.eq(States.IN_POSITION) + + with m.Elif(state == States.IN_POSITION): + m.d.comb += write_enable.eq(1) + m.d.comb += write_pointer.eq(next_write_pointer) + m.d.comb += next_state.eq(States.IN_POSITION) + + with m.If(self.trigger): + m.d.comb += next_state.eq(States.CAPTURING) + m.d.comb += read_pointer.eq(latch_read_pointer) + + with m.Else(): + m.d.comb += read_pointer.eq(next_read_pointer) + + with m.Elif(state == States.CAPTURING): + m.d.comb += write_enable.eq(1) + m.d.comb += read_pointer.eq(latch_read_pointer) + m.d.comb += next_state.eq(States.CAPTURING) + + with m.If(next_write_pointer == read_pointer): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(latch_write_pointer) + m.d.comb += next_state.eq(States.CAPTURED) + + with m.Else(): + m.d.comb += write_pointer.eq(next_write_pointer) + + with m.Elif(state == States.CAPTURED): + m.d.comb += next_state.eq(States.CAPTURED) + m.d.comb += read_pointer.eq(latch_read_pointer) + m.d.comb += write_pointer.eq(latch_write_pointer) + m.d.comb += write_enable.eq(0) + + # --- Immediate Trigger Mode --- + with m.If(self.r.trigger_mode == TriggerModes.IMMEDIATE): + m.d.comb += read_pointer.eq(0) + with m.If(self.r.state == States.IDLE): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(0) + m.d.comb += next_state.eq(States.IDLE) + + # Rising edge of request_start beings the capture: + with m.If((request_start) & (~prev_request_start)): + m.d.comb += write_enable.eq(1) + m.d.comb += next_state.eq(States.CAPTURING) + + with m.Elif(state == States.CAPTURING): + m.d.comb += write_enable.eq(1) + m.d.comb += next_state.eq(States.CAPTURING) + m.d.comb += write_pointer.eq(next_write_pointer) + + with m.If(next_write_pointer == read_pointer): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(latch_write_pointer) + m.d.comb += next_state.eq(States.CAPTURED) + + with m.Elif(state == States.CAPTURED): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(latch_write_pointer) + m.d.comb += next_state.eq(States.CAPTURED) + + # --- Incremental Trigger Mode --- + with m.If(self.r.trigger_mode == TriggerModes.INCREMENTAL): + with m.If(state == States.IDLE): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(0) + m.d.comb += read_pointer.eq(0) + m.d.comb += next_state.eq(States.IDLE) + + # Rising edge of request_start beings the capture: + with m.If((request_start) & (~prev_request_start)): + m.d.comb += write_enable.eq(self.trigger) + m.d.comb += next_state.eq(States.CAPTURING) + + with m.Elif(state == States.CAPTURING): + m.d.comb += read_pointer.eq(0) + m.d.comb += next_state.eq(States.CAPTURING) + m.d.comb += write_enable.eq(self.trigger) + + with m.If(latch_write_enable): + m.d.comb += write_pointer.eq(next_write_pointer) + with m.Else(): + m.d.comb += write_pointer.eq(latch_write_pointer) + + with m.If((self.trigger) & (next_write_pointer == read_pointer)): + m.d.comb += write_pointer.eq(latch_write_pointer) + m.d.comb += next_state.eq(States.CAPTURED) + + with m.Elif(state == States.CAPTURED): + m.d.comb += next_state.eq(States.CAPTURED) + m.d.comb += read_pointer.eq(latch_read_pointer) + m.d.comb += write_pointer.eq(latch_write_pointer) + m.d.comb += write_enable.eq(0) + + # Regardless of trigger mode, go back to IDLE if request_stop is pulsed + with m.If((request_stop) & (~prev_request_stop)): + m.d.comb += next_state.eq(States.IDLE) return m - - #### OLD STUFF FOR REFERENCE #### - - # with m.If(self.r.state == self.states["IDLE"]): - # m.d.sync += self.r.write_pointer.eq(0) - # m.d.sync += self.r.read_pointer.eq(0) - # m.d.sync += self.write_enable.eq(0) - - # with m.If((self.r.request_start) & (~prev_request_start)): - # m.d.sync += self.write_enable.eq(1) - # with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]): - # m.d.sync += self.r.state.eq(self.states["CAPTURING"]) - # m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) - - # with m.Else(): - # with m.If(self.r.trigger_location == 0): - # m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) - - # with m.Else(): - # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) - - # # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) - - # with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]): - # m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) - - # with m.If(self.r.write_pointer == self.r.trigger_location): - # with m.If(self.trigger): - # m.d.sync += self.r.state.eq(self.states["CAPTURING"]) - - # with m.Else(): - # m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) - # self.increment_mod_sample_depth(m, self.r.read_pointer) - - # with m.Elif(self.r.state == self.states["IN_POSITION"]): - # self.increment_mod_sample_depth(m, self.r.write_pointer) - - # with m.If(self.trigger): - # m.d.sync += self.r.state.eq(self.states["CAPTURING"]) - - # with m.Else(): - # self.increment_mod_sample_depth(m, self.r.read_pointer) - - # with m.Elif(self.r.state == self.states["CAPTURING"]): - # with m.If(self.r.write_pointer == self.r.read_pointer): - # m.d.sync += self.write_enable.eq(0) - # m.d.sync += self.r.state.eq(self.states["CAPTURED"]) - - # with m.Else(): - # self.increment_mod_sample_depth(m, self.r.write_pointer) - - # with m.If((self.r.request_stop) & (~prev_request_stop)): - # m.d.sync += self.r.state.eq(self.states["IDLE"]) diff --git a/test/test_logic_analyzer_fsm_sim.py b/test/test_logic_analyzer_fsm_sim.py index 2bd7f8d..24d8215 100644 --- a/test/test_logic_analyzer_fsm_sim.py +++ b/test/test_logic_analyzer_fsm_sim.py @@ -1,5 +1,5 @@ from amaranth.sim import Simulator -from manta.logic_analyzer import LogicAnalyzerFSM +from manta.logic_analyzer import * from manta.utils import * """ @@ -25,49 +25,252 @@ config = {"sample_depth": 8} fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None) -def set_fsm_register(name, data): - addr = fsm.r.mmap[f"{name}_buf"]["addrs"][0] - strobe_addr = fsm.r.base_addr - - yield from write_register(fsm, strobe_addr, 0) - yield from write_register(fsm, addr, data) - yield from write_register(fsm, strobe_addr, 1) - yield from write_register(fsm, strobe_addr, 0) - - -def test_single_shot_always_trigger(): +def test_signals_reset_correctly(): def testbench(): - if (yield fsm.r.state != fsm.states["IDLE"]): + # Make sure pointers and write enable reset to zero + for sig in [fsm.r.write_pointer, fsm.r.read_pointer, fsm.write_enable]: + if (yield sig) != 0: + raise ValueError + + # Make sure state resets to IDLE + if (yield fsm.r.state != States.IDLE): raise ValueError - yield fsm.trigger.eq(1) - yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"]) - yield from set_fsm_register("trigger_location", 4) - yield from set_fsm_register("request_start", 1) - yield from set_fsm_register("request_start", 0) + simulate(fsm, testbench) - for _ in range(100): + +def test_single_shot_no_wait_for_trigger(): + def testbench(): + # Configure and start FSM + yield fsm.trigger.eq(1) + yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.r.trigger_location.eq(4) + yield fsm.r.request_start.eq(1) + + # Wait until write_enable is asserted + while not (yield fsm.write_enable): yield - simulate(fsm, testbench, "single_shot_always_trigger.vcd") + # Wait 8 clock cycles for capture to complete + for i in range(8): + # Make sure that read_pointer does not increase + if (yield fsm.r.read_pointer) != 0: + raise ValueError + # Make sure that write_pointer increases by one each cycle + if (yield fsm.r.write_pointer) != i: + raise ValueError -def test_single_shot_wait_to_trigger(): - def testbench(): - if (yield fsm.r.state != fsm.states["IDLE"]): + yield + + # Wait one clock cycle (to let BRAM contents cycle in) + yield + + # Check that write_pointer points to the end of memory + if (yield fsm.r.write_pointer) != 7: raise ValueError - yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"]) - yield from set_fsm_register("trigger_location", 4) - yield from set_fsm_register("request_start", 1) - yield from set_fsm_register("request_start", 0) + # Check that state is CAPTURED + if (yield fsm.r.state) != States.CAPTURED: + raise ValueError + + simulate(fsm, testbench, "single_shot_no_wait_for_trigger.vcd") + + +def test_single_shot_wait_for_trigger(): + def testbench(): + # Configure and start FSM + yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.r.trigger_location.eq(4) + yield fsm.r.request_start.eq(1) + yield + + # Check that write_enable is asserted on the same edge as request_start + if not (yield fsm.write_enable): + raise ValueError + + # Wait 4 clock cycles to get to IN_POSITION + for i in range(4): + rp = yield fsm.r.read_pointer + wp = yield fsm.r.write_pointer + + # Make sure that read_pointer does not increase + if rp != 0: + raise ValueError + + # Make sure that write_pointer increases by one each cycle + if wp != i: + raise ValueError + + yield + + # Wait a few cycles before triggering: + for _ in range(10): + if (rp + 3) % fsm.config["sample_depth"] != wp: + raise ValueError + + yield + + # Provide the trigger, and check that the capture completes 4 cycles later + yield fsm.trigger.eq(1) + yield + + rp_start = yield fsm.r.read_pointer + for i in range(4): + rp = yield fsm.r.read_pointer + wp = yield fsm.r.write_pointer + + if rp != rp_start: + raise ValueError + + if (rp_start + 4 + i) % fsm.config["sample_depth"] != wp: + raise ValueError + + yield + + # Wait one clock cycle (to let BRAM contents cycle in) + yield + + # Check that write_pointer points to the end of memory + rp = yield fsm.r.read_pointer + wp = yield fsm.r.write_pointer + if (wp + 1) % fsm.config["sample_depth"] != rp: + raise ValueError + + # Check that state is CAPTURED + if (yield fsm.r.state) != States.CAPTURED: + raise ValueError + + simulate(fsm, testbench, "single_shot_wait_for_trigger.vcd") + + +def test_immediate(): + def testbench(): + # Configure and start FSM + yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE) + yield fsm.r.request_start.eq(1) + yield + + # Check that write_enable is asserted on the same edge as request_start + if not (yield fsm.write_enable): + raise ValueError + + for i in range(fsm.config["sample_depth"]): + rp = yield fsm.r.read_pointer + wp = yield fsm.r.write_pointer + + if rp != 0: + raise ValueError + + if wp != i: + raise ValueError + + yield + + # Wait one clock cycle (to let BRAM contents cycle in) + yield + + # Check that write_pointer points to the end of memory + rp = yield fsm.r.read_pointer + wp = yield fsm.r.write_pointer + if rp != 0: + raise ValueError + if wp != 7: + raise ValueError + + # Check that state is CAPTURED + if (yield fsm.r.state) != States.CAPTURED: + raise ValueError + + simulate(fsm, testbench, "immediate.vcd") + + +def test_incremental(): + def testbench(): + # Configure and start FSM + yield fsm.r.trigger_mode.eq(TriggerModes.INCREMENTAL) + yield fsm.r.request_start.eq(1) + yield + + # Check that write_enable is asserted on the same edge as request_start + # if not (yield fsm.write_enable): + # raise ValueError + + for _ in range(10): + for _ in range(3): + yield + + yield fsm.trigger.eq(1) + yield + yield fsm.trigger.eq(0) + yield + + # # Check that state is CAPTURED + # if (yield fsm.r.state) != States.CAPTURED: + # raise ValueError + + simulate(fsm, testbench, "incremental.vcd") + + +def test_single_shot_write_enable(): + def testbench(): + # Configure FSM + yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT) + yield fsm.r.trigger_location.eq(4) + yield + + # Make sure write is not enabled before starting the FSM + if (yield fsm.write_enable): + raise ValueError + + # Start the FSM, ensure write enable is asserted throughout the capture + yield fsm.r.request_start.eq(1) + yield + + for _ in range(fsm.config["sample_depth"]): + if not (yield fsm.write_enable): + raise ValueError - for _ in range(8): yield yield fsm.trigger.eq(1) + yield + + for _ in range(4): + if not (yield fsm.write_enable): + raise ValueError - for _ in range(100): yield - simulate(fsm, testbench, "single_shot_wait_to_trigger.vcd") + # Make sure write_enable is deasserted after + if (yield fsm.write_enable): + raise ValueError + + simulate(fsm, testbench, "single_shot_write_enable.vcd") + + +def test_immediate_write_enable(): + def testbench(): + # Configure FSM + yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE) + yield + + # Make sure write is not enabled before starting the FSM + if (yield fsm.write_enable): + raise ValueError + + # Start the FSM, ensure write enable is asserted throughout the capture + yield fsm.r.request_start.eq(1) + yield + + for _ in range(fsm.config["sample_depth"]): + if not (yield fsm.write_enable): + raise ValueError + + yield + + # Make sure write_enable is deasserted after + if (yield fsm.write_enable): + raise ValueError + + simulate(fsm, testbench, "immediate_write_enable.vcd")