refactor logic analyzer FSM to be sequential-only for better timing

This commit is contained in:
Fischer Moseley 2024-01-21 23:45:14 -08:00
parent c177e3b5b5
commit ee4a79a4d4
3 changed files with 69 additions and 165 deletions

View File

@ -103,8 +103,8 @@ class LogicAnalyzerCore(Elaboratable):
if not isinstance(trigger_location, int) or trigger_location < 0:
raise ValueError("Trigger location must be a positive integer.")
if trigger_location > config["sample_depth"]:
raise ValueError("Trigger location cannot exceed sample depth.")
if trigger_location >= config["sample_depth"]:
raise ValueError("Trigger location must be less than sample depth.")
if trigger_mode == "immediate":
warn(
@ -229,7 +229,7 @@ class LogicAnalyzerCore(Elaboratable):
print_if_verbose(" -> Setting triggers...")
self.trig_blk.clear_triggers()
if self.config["trigger_mode"] != "immediate":
if self.config.get("trigger_mode") != "immediate":
self.trig_blk.set_triggers(self.config)
# Set trigger mode, default to single-shot if user didn't specify a mode

View File

@ -76,154 +76,85 @@ class LogicAnalyzerFSM(Elaboratable):
prev_request_start = Signal(1)
prev_request_stop = Signal(1)
next_state = Signal().like(state)
next_read_pointer = Signal().like(read_pointer)
latch_read_pointer = Signal().like(read_pointer)
latch_write_pointer = Signal().like(write_pointer)
latch_write_enable = Signal().like(write_enable)
next_write_pointer = Signal().like(write_pointer)
# --- Sequential Logic ---
m.d.comb += next_write_pointer.eq((write_pointer + 1) % sample_depth)
# Rising edge detection for start/stop requests
m.d.sync += prev_request_start.eq(request_start)
m.d.sync += prev_request_stop.eq(request_stop)
# Copy next into current
m.d.sync += state.eq(next_state)
m.d.sync += next_write_pointer.eq((write_pointer + 1) % sample_depth)
m.d.sync += next_read_pointer.eq((read_pointer + 1) % sample_depth)
m.d.sync += latch_read_pointer.eq(read_pointer)
m.d.sync += latch_write_pointer.eq(write_pointer)
m.d.sync += latch_write_enable.eq(write_enable)
with m.If(state == States.IDLE):
m.d.sync += write_pointer.eq(0)
m.d.sync += read_pointer.eq(0)
m.d.sync += write_enable.eq(0)
# --- Combinational Logic ---
with m.If((request_start) & (~prev_request_start)):
with m.If(trigger_mode == TriggerModes.IMMEDIATE):
m.d.sync += state.eq(States.CAPTURING)
m.d.sync += write_enable.eq(1)
# --- Single Shot Trigger Mode ---
with m.If(trigger_mode == TriggerModes.SINGLE_SHOT):
with m.If(state == States.IDLE):
m.d.comb += write_enable.eq(0)
m.d.comb += write_pointer.eq(0)
m.d.comb += read_pointer.eq(0)
m.d.comb += next_state.eq(States.IDLE)
with m.Elif(trigger_mode == TriggerModes.INCREMENTAL):
m.d.sync += state.eq(States.CAPTURING)
m.d.sync += write_enable.eq(1)
# Rising edge of request_start beings the capture:
with m.If((request_start) & (~prev_request_start)):
m.d.comb += write_enable.eq(1)
# Go straight to IN_POSITION if trigger_location == 0
with m.Elif(trigger_mode == TriggerModes.SINGLE_SHOT):
with m.If(trigger_location == 0):
m.d.comb += next_state.eq(States.IN_POSITION)
# Otherwise go to MOVE_TO_POSITION
with m.Else():
m.d.comb += next_state.eq(States.MOVE_TO_POSITION)
with m.Elif(state == States.MOVE_TO_POSITION):
m.d.comb += write_enable.eq(1)
m.d.comb += write_pointer.eq(next_write_pointer)
m.d.comb += read_pointer.eq(0)
m.d.comb += next_state.eq(States.MOVE_TO_POSITION)
with m.If(write_pointer == trigger_location - 1):
with m.If(self.trigger):
m.d.comb += next_state.eq(States.CAPTURING)
m.d.sync += state.eq(States.IN_POSITION)
with m.Else():
m.d.comb += next_state.eq(States.IN_POSITION)
m.d.sync += state.eq(States.MOVE_TO_POSITION)
with m.Elif(state == States.IN_POSITION):
m.d.comb += write_enable.eq(1)
m.d.comb += write_pointer.eq(next_write_pointer)
m.d.comb += next_state.eq(States.IN_POSITION)
m.d.sync += write_enable.eq(1)
with m.Elif(state == States.MOVE_TO_POSITION):
m.d.sync += write_pointer.eq(next_write_pointer)
with m.If(write_pointer == trigger_location - 1):
with m.If(self.trigger):
m.d.comb += next_state.eq(States.CAPTURING)
m.d.comb += read_pointer.eq(latch_read_pointer)
m.d.sync += state.eq(States.CAPTURING)
with m.Else():
m.d.comb += read_pointer.eq(next_read_pointer)
m.d.sync += state.eq(States.IN_POSITION)
with m.Elif(state == States.CAPTURING):
m.d.comb += write_enable.eq(1)
m.d.comb += read_pointer.eq(latch_read_pointer)
m.d.comb += next_state.eq(States.CAPTURING)
with m.Elif(state == States.IN_POSITION):
m.d.sync += write_pointer.eq(next_write_pointer)
with m.If(self.trigger):
m.d.sync += state.eq(States.CAPTURING)
# kind of horrible, i'll get rid of this later...
with m.If(write_pointer > trigger_location):
m.d.sync += read_pointer.eq(write_pointer - trigger_location)
with m.Else():
m.d.sync += read_pointer.eq(
write_pointer - trigger_location + sample_depth
)
# ok that's all for horrible
with m.If(state == States.CAPTURING):
# Non- incremental modes
with m.If(trigger_mode != TriggerModes.INCREMENTAL):
with m.If(next_write_pointer == read_pointer):
m.d.comb += write_enable.eq(0)
m.d.comb += write_pointer.eq(latch_write_pointer)
m.d.comb += next_state.eq(States.CAPTURED)
m.d.sync += write_enable.eq(0)
m.d.sync += state.eq(States.CAPTURED)
with m.Else():
m.d.comb += write_pointer.eq(next_write_pointer)
m.d.sync += write_pointer.eq(next_write_pointer)
with m.Elif(state == States.CAPTURED):
m.d.comb += next_state.eq(States.CAPTURED)
m.d.comb += read_pointer.eq(latch_read_pointer)
m.d.comb += write_pointer.eq(latch_write_pointer)
m.d.comb += write_enable.eq(0)
# Incremental mode
with m.Else():
with m.If(self.trigger):
with m.If(next_write_pointer == read_pointer):
m.d.sync += write_enable.eq(0)
m.d.sync += state.eq(States.CAPTURED)
# --- Immediate Trigger Mode ---
with m.If(self.r.trigger_mode == TriggerModes.IMMEDIATE):
m.d.comb += read_pointer.eq(0)
with m.If(self.r.state == States.IDLE):
m.d.comb += write_enable.eq(0)
m.d.comb += write_pointer.eq(0)
m.d.comb += next_state.eq(States.IDLE)
# Rising edge of request_start beings the capture:
with m.If((request_start) & (~prev_request_start)):
m.d.comb += write_enable.eq(1)
m.d.comb += next_state.eq(States.CAPTURING)
with m.Elif(state == States.CAPTURING):
m.d.comb += write_enable.eq(1)
m.d.comb += next_state.eq(States.CAPTURING)
m.d.comb += write_pointer.eq(next_write_pointer)
with m.If(next_write_pointer == read_pointer):
m.d.comb += write_enable.eq(0)
m.d.comb += write_pointer.eq(latch_write_pointer)
m.d.comb += next_state.eq(States.CAPTURED)
with m.Elif(state == States.CAPTURED):
m.d.comb += write_enable.eq(0)
m.d.comb += write_pointer.eq(latch_write_pointer)
m.d.comb += next_state.eq(States.CAPTURED)
# --- Incremental Trigger Mode ---
with m.If(self.r.trigger_mode == TriggerModes.INCREMENTAL):
with m.If(state == States.IDLE):
m.d.comb += write_enable.eq(0)
m.d.comb += write_pointer.eq(0)
m.d.comb += read_pointer.eq(0)
m.d.comb += next_state.eq(States.IDLE)
# Rising edge of request_start beings the capture:
with m.If((request_start) & (~prev_request_start)):
m.d.comb += write_enable.eq(self.trigger)
m.d.comb += next_state.eq(States.CAPTURING)
with m.Elif(state == States.CAPTURING):
m.d.comb += read_pointer.eq(0)
m.d.comb += next_state.eq(States.CAPTURING)
m.d.comb += write_enable.eq(self.trigger)
with m.If(latch_write_enable):
m.d.comb += write_pointer.eq(next_write_pointer)
with m.Else():
m.d.comb += write_pointer.eq(latch_write_pointer)
with m.If((self.trigger) & (next_write_pointer == read_pointer)):
m.d.comb += write_pointer.eq(latch_write_pointer)
m.d.comb += next_state.eq(States.CAPTURED)
with m.Elif(state == States.CAPTURED):
m.d.comb += next_state.eq(States.CAPTURED)
m.d.comb += read_pointer.eq(latch_read_pointer)
m.d.comb += write_pointer.eq(latch_write_pointer)
m.d.comb += write_enable.eq(0)
with m.Else():
m.d.sync += write_pointer.eq(next_write_pointer)
# Regardless of trigger mode, go back to IDLE if request_stop is pulsed
with m.If((request_stop) & (~prev_request_stop)):
m.d.comb += next_state.eq(States.IDLE)
m.d.sync += state.eq(States.IDLE)
return m

View File

@ -2,25 +2,6 @@ from amaranth.sim import Simulator
from manta.logic_analyzer import *
from manta.utils import *
"""
what do we want this to do?
we want to run a capture in single shot mode, immediate mode, and incremental mode
single-shot case:
- exactly the right number of samples are taken
- we only start taking samples once captured
immediate case:
- exactly the right number of samples are taken
- we only start taking samples once captured
incremental case:
- exactly the right number of samples are taken
- we only take samples when trig is asserted
"""
config = {"sample_depth": 8}
fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
@ -84,8 +65,9 @@ def test_single_shot_wait_for_trigger():
yield fsm.r.trigger_location.eq(4)
yield fsm.r.request_start.eq(1)
yield
yield
# Check that write_enable is asserted on the same edge as request_start
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
raise ValueError
@ -104,28 +86,15 @@ def test_single_shot_wait_for_trigger():
yield
# Wait a few cycles before triggering:
# Wait a few cycles before triggering
for _ in range(10):
if (rp + 3) % fsm.config["sample_depth"] != wp:
raise ValueError
yield
# Provide the trigger, and check that the capture completes 4 cycles later
yield fsm.trigger.eq(1)
yield
rp_start = yield fsm.r.read_pointer
for i in range(4):
rp = yield fsm.r.read_pointer
wp = yield fsm.r.write_pointer
if rp != rp_start:
raise ValueError
if (rp_start + 4 + i) % fsm.config["sample_depth"] != wp:
raise ValueError
yield
# Wait one clock cycle (to let BRAM contents cycle in)
@ -150,8 +119,9 @@ def test_immediate():
yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.r.request_start.eq(1)
yield
yield
# Check that write_enable is asserted on the same edge as request_start
# Check that write_enable is asserted a cycle after request_start
if not (yield fsm.write_enable):
raise ValueError
@ -191,10 +161,11 @@ def test_incremental():
yield fsm.r.trigger_mode.eq(TriggerModes.INCREMENTAL)
yield fsm.r.request_start.eq(1)
yield
yield
# Check that write_enable is asserted on the same edge as request_start
# if not (yield fsm.write_enable):
# raise ValueError
if not (yield fsm.write_enable):
raise ValueError
for _ in range(10):
for _ in range(3):
@ -205,9 +176,9 @@ def test_incremental():
yield fsm.trigger.eq(0)
yield
# # Check that state is CAPTURED
# if (yield fsm.r.state) != States.CAPTURED:
# raise ValueError
# Check that state is CAPTURED
if (yield fsm.r.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "incremental.vcd")
@ -226,6 +197,7 @@ def test_single_shot_write_enable():
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.r.request_start.eq(1)
yield
yield
for _ in range(fsm.config["sample_depth"]):
if not (yield fsm.write_enable):
@ -262,6 +234,7 @@ def test_immediate_write_enable():
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.r.request_start.eq(1)
yield
yield
for _ in range(fsm.config["sample_depth"]):
if not (yield fsm.write_enable):