diff --git a/src/manta/logic_analyzer/fsm.py b/src/manta/logic_analyzer/fsm.py index d146818..7dcbceb 100644 --- a/src/manta/logic_analyzer/fsm.py +++ b/src/manta/logic_analyzer/fsm.py @@ -58,62 +58,160 @@ class LogicAnalyzerFSM(Elaboratable): m.submodules.registers = self.r + sample_depth = self.config["sample_depth"] + request_start = self.r.request_start + request_stop = self.r.request_stop + trigger_mode = self.r.trigger_mode + trigger_location = self.r.trigger_location + state = self.r.state + write_enable = self.write_enable + write_pointer = self.r.write_pointer + read_pointer = self.r.read_pointer + prev_request_start = Signal(1) prev_request_stop = Signal(1) - # Rising edge detection for start/stop requests - m.d.sync += prev_request_start.eq(self.r.request_start) - m.d.sync += prev_request_stop.eq(self.r.request_stop) + next_state = Signal().like(state) + next_read_pointer = Signal().like(read_pointer) + latch_read_pointer = Signal().like(read_pointer) + next_write_pointer = Signal().like(write_pointer) - with m.If(self.r.state == self.states["IDLE"]): - m.d.sync += self.r.write_pointer.eq(0) - m.d.sync += self.r.read_pointer.eq(0) - m.d.sync += self.write_enable.eq(0) + # COMBINATIONAL SHIT + with m.If(trigger_mode == self.trigger_modes["SINGLE_SHOT"]): + with m.If(state == self.states["IDLE"]): + m.d.comb += write_enable.eq(0) + m.d.comb += write_pointer.eq(0) + m.d.comb += read_pointer.eq(0) + m.d.comb += next_state.eq(self.states["IDLE"]) - with m.If((self.r.request_start) & (~prev_request_start)): - m.d.sync += self.write_enable.eq(1) - with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]): - m.d.sync += self.r.state.eq(self.states["CAPTURING"]) - m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) + # Rising edge of request_start beings the capture: + with m.If((request_start) & (~prev_request_start)): + m.d.comb += write_enable.eq(1) + # Go straight to IN_POSITION if trigger_location == 0 + with m.If(trigger_location == 0): + m.d.comb += next_state.eq(self.states["IN_POSITION"]) - with m.Else(): - with m.If(self.r.trigger_location == 0): - m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) + # Otherwise go to MOVE_TO_POSITION + with m.Else(): + m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"]) + + with m.Elif(state == self.states["MOVE_TO_POSITION"]): + m.d.comb += write_enable.eq(1) + m.d.comb += write_pointer.eq(next_write_pointer) + m.d.comb += read_pointer.eq(0) + m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"]) + + with m.If(write_pointer == trigger_location - 1): + with m.If(self.trigger): + m.d.comb += next_state.eq(self.states["CAPTURING"]) with m.Else(): - m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) + m.d.comb += next_state.eq(self.states["IN_POSITION"]) - # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) + with m.Elif(state == self.states["IN_POSITION"]): + m.d.comb += write_enable.eq(1) + m.d.comb += write_pointer.eq(next_write_pointer) + m.d.comb += next_state.eq(self.states["IN_POSITION"]) - with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]): - m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) - - with m.If(self.r.write_pointer == self.r.trigger_location): with m.If(self.trigger): - m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + m.d.comb += next_state.eq(self.states["CAPTURING"]) + m.d.comb += read_pointer.eq(latch_read_pointer) with m.Else(): - m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) - self.increment_mod_sample_depth(m, self.r.read_pointer) + m.d.comb += read_pointer.eq(next_read_pointer) - with m.Elif(self.r.state == self.states["IN_POSITION"]): - self.increment_mod_sample_depth(m, self.r.write_pointer) + with m.Elif(state == self.states["CAPTURING"]): + m.d.comb += write_enable.eq(1) + m.d.comb += read_pointer.eq(latch_read_pointer) + m.d.comb += next_state.eq(self.states["CAPTURING"]) - with m.If(self.trigger): - m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + with m.If(next_write_pointer == read_pointer): + m.d.comb += write_enable.eq(0) + m.d.comb += next_state.eq(self.states["CAPTURED"]) - with m.Else(): - self.increment_mod_sample_depth(m, self.r.read_pointer) + with m.Else(): + m.d.comb += write_pointer.eq(next_write_pointer) - with m.Elif(self.r.state == self.states["CAPTURING"]): - with m.If(self.r.write_pointer == self.r.read_pointer): - m.d.sync += self.write_enable.eq(0) - m.d.sync += self.r.state.eq(self.states["CAPTURED"]) + with m.Elif(state == self.states["CAPTURED"]): + m.d.comb += next_state.eq(self.states["CAPTURED"]) + m.d.comb += read_pointer.eq(latch_read_pointer) + m.d.comb += write_enable.eq(0) + # m.d.comb += read_pointer.eq(read_pointer) + # m.d.comb += write_pointer.eq(write_pointer) - with m.Else(): - self.increment_mod_sample_depth(m, self.r.write_pointer) + with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]): + pass - with m.If((self.r.request_stop) & (~prev_request_stop)): - m.d.sync += self.r.state.eq(self.states["IDLE"]) + with m.If(self.r.trigger_mode == self.trigger_modes["INCREMENTAL"]): + pass + + # Regardless of trigger mode, go back to IDLE if request_stop is pulsed + with m.If((request_stop) & (~prev_request_stop)): + m.d.comb += next_state.eq(self.states["IDLE"]) + + # SEQUENTIAL SHIT + + # Rising edge detection for start/stop requests + m.d.sync += prev_request_start.eq(request_start) + m.d.sync += prev_request_stop.eq(request_stop) + + # Copy next into current + m.d.sync += state.eq(next_state) + m.d.sync += next_write_pointer.eq((write_pointer + 1) % sample_depth) + m.d.sync += next_read_pointer.eq((read_pointer + 1) % sample_depth) + m.d.sync += latch_read_pointer.eq(read_pointer) return m + + #### OLD STUFF FOR REFERENCE #### + + # with m.If(self.r.state == self.states["IDLE"]): + # m.d.sync += self.r.write_pointer.eq(0) + # m.d.sync += self.r.read_pointer.eq(0) + # m.d.sync += self.write_enable.eq(0) + + # with m.If((self.r.request_start) & (~prev_request_start)): + # m.d.sync += self.write_enable.eq(1) + # with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]): + # m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + # m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) + + # with m.Else(): + # with m.If(self.r.trigger_location == 0): + # m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) + + # with m.Else(): + # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) + + # # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) + + # with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]): + # m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) + + # with m.If(self.r.write_pointer == self.r.trigger_location): + # with m.If(self.trigger): + # m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + + # with m.Else(): + # m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) + # self.increment_mod_sample_depth(m, self.r.read_pointer) + + # with m.Elif(self.r.state == self.states["IN_POSITION"]): + # self.increment_mod_sample_depth(m, self.r.write_pointer) + + # with m.If(self.trigger): + # m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + + # with m.Else(): + # self.increment_mod_sample_depth(m, self.r.read_pointer) + + # with m.Elif(self.r.state == self.states["CAPTURING"]): + # with m.If(self.r.write_pointer == self.r.read_pointer): + # m.d.sync += self.write_enable.eq(0) + # m.d.sync += self.r.state.eq(self.states["CAPTURED"]) + + # with m.Else(): + # self.increment_mod_sample_depth(m, self.r.write_pointer) + + # with m.If((self.r.request_stop) & (~prev_request_stop)): + # m.d.sync += self.r.state.eq(self.states["IDLE"]) diff --git a/test/test_logic_analyzer_fsm_sim.py b/test/test_logic_analyzer_fsm_sim.py new file mode 100644 index 0000000..2bd7f8d --- /dev/null +++ b/test/test_logic_analyzer_fsm_sim.py @@ -0,0 +1,73 @@ +from amaranth.sim import Simulator +from manta.logic_analyzer import LogicAnalyzerFSM +from manta.utils import * + +""" +what do we want this to do? + +we want to run a capture in single shot mode, immediate mode, and incremental mode + + +single-shot case: +- exactly the right number of samples are taken +- we only start taking samples once captured + +immediate case: +- exactly the right number of samples are taken +- we only start taking samples once captured + +incremental case: +- exactly the right number of samples are taken +- we only take samples when trig is asserted + +""" +config = {"sample_depth": 8} +fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None) + + +def set_fsm_register(name, data): + addr = fsm.r.mmap[f"{name}_buf"]["addrs"][0] + strobe_addr = fsm.r.base_addr + + yield from write_register(fsm, strobe_addr, 0) + yield from write_register(fsm, addr, data) + yield from write_register(fsm, strobe_addr, 1) + yield from write_register(fsm, strobe_addr, 0) + + +def test_single_shot_always_trigger(): + def testbench(): + if (yield fsm.r.state != fsm.states["IDLE"]): + raise ValueError + + yield fsm.trigger.eq(1) + yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"]) + yield from set_fsm_register("trigger_location", 4) + yield from set_fsm_register("request_start", 1) + yield from set_fsm_register("request_start", 0) + + for _ in range(100): + yield + + simulate(fsm, testbench, "single_shot_always_trigger.vcd") + + +def test_single_shot_wait_to_trigger(): + def testbench(): + if (yield fsm.r.state != fsm.states["IDLE"]): + raise ValueError + + yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"]) + yield from set_fsm_register("trigger_location", 4) + yield from set_fsm_register("request_start", 1) + yield from set_fsm_register("request_start", 0) + + for _ in range(8): + yield + + yield fsm.trigger.eq(1) + + for _ in range(100): + yield + + simulate(fsm, testbench, "single_shot_wait_to_trigger.vcd")