add initial FSM tests
This commit is contained in:
parent
a8b43849ec
commit
6e3fe8cb0e
|
|
@ -58,62 +58,160 @@ class LogicAnalyzerFSM(Elaboratable):
|
|||
|
||||
m.submodules.registers = self.r
|
||||
|
||||
sample_depth = self.config["sample_depth"]
|
||||
request_start = self.r.request_start
|
||||
request_stop = self.r.request_stop
|
||||
trigger_mode = self.r.trigger_mode
|
||||
trigger_location = self.r.trigger_location
|
||||
state = self.r.state
|
||||
write_enable = self.write_enable
|
||||
write_pointer = self.r.write_pointer
|
||||
read_pointer = self.r.read_pointer
|
||||
|
||||
prev_request_start = Signal(1)
|
||||
prev_request_stop = Signal(1)
|
||||
|
||||
# Rising edge detection for start/stop requests
|
||||
m.d.sync += prev_request_start.eq(self.r.request_start)
|
||||
m.d.sync += prev_request_stop.eq(self.r.request_stop)
|
||||
next_state = Signal().like(state)
|
||||
next_read_pointer = Signal().like(read_pointer)
|
||||
latch_read_pointer = Signal().like(read_pointer)
|
||||
next_write_pointer = Signal().like(write_pointer)
|
||||
|
||||
with m.If(self.r.state == self.states["IDLE"]):
|
||||
m.d.sync += self.r.write_pointer.eq(0)
|
||||
m.d.sync += self.r.read_pointer.eq(0)
|
||||
m.d.sync += self.write_enable.eq(0)
|
||||
# COMBINATIONAL SHIT
|
||||
with m.If(trigger_mode == self.trigger_modes["SINGLE_SHOT"]):
|
||||
with m.If(state == self.states["IDLE"]):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(0)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(self.states["IDLE"])
|
||||
|
||||
with m.If((self.r.request_start) & (~prev_request_start)):
|
||||
m.d.sync += self.write_enable.eq(1)
|
||||
with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]):
|
||||
m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1)
|
||||
# Rising edge of request_start beings the capture:
|
||||
with m.If((request_start) & (~prev_request_start)):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
# Go straight to IN_POSITION if trigger_location == 0
|
||||
with m.If(trigger_location == 0):
|
||||
m.d.comb += next_state.eq(self.states["IN_POSITION"])
|
||||
|
||||
with m.Else():
|
||||
with m.If(self.r.trigger_location == 0):
|
||||
m.d.sync += self.r.state.eq(self.states["IN_POSITION"])
|
||||
# Otherwise go to MOVE_TO_POSITION
|
||||
with m.Else():
|
||||
m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
with m.Elif(state == self.states["MOVE_TO_POSITION"]):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
with m.If(write_pointer == trigger_location - 1):
|
||||
with m.If(self.trigger):
|
||||
m.d.comb += next_state.eq(self.states["CAPTURING"])
|
||||
|
||||
with m.Else():
|
||||
m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"])
|
||||
m.d.comb += next_state.eq(self.states["IN_POSITION"])
|
||||
|
||||
# m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"])
|
||||
with m.Elif(state == self.states["IN_POSITION"]):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
m.d.comb += next_state.eq(self.states["IN_POSITION"])
|
||||
|
||||
with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]):
|
||||
m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1)
|
||||
|
||||
with m.If(self.r.write_pointer == self.r.trigger_location):
|
||||
with m.If(self.trigger):
|
||||
m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
m.d.comb += next_state.eq(self.states["CAPTURING"])
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
|
||||
with m.Else():
|
||||
m.d.sync += self.r.state.eq(self.states["IN_POSITION"])
|
||||
self.increment_mod_sample_depth(m, self.r.read_pointer)
|
||||
m.d.comb += read_pointer.eq(next_read_pointer)
|
||||
|
||||
with m.Elif(self.r.state == self.states["IN_POSITION"]):
|
||||
self.increment_mod_sample_depth(m, self.r.write_pointer)
|
||||
with m.Elif(state == self.states["CAPTURING"]):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += next_state.eq(self.states["CAPTURING"])
|
||||
|
||||
with m.If(self.trigger):
|
||||
m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
with m.If(next_write_pointer == read_pointer):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += next_state.eq(self.states["CAPTURED"])
|
||||
|
||||
with m.Else():
|
||||
self.increment_mod_sample_depth(m, self.r.read_pointer)
|
||||
with m.Else():
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
|
||||
with m.Elif(self.r.state == self.states["CAPTURING"]):
|
||||
with m.If(self.r.write_pointer == self.r.read_pointer):
|
||||
m.d.sync += self.write_enable.eq(0)
|
||||
m.d.sync += self.r.state.eq(self.states["CAPTURED"])
|
||||
with m.Elif(state == self.states["CAPTURED"]):
|
||||
m.d.comb += next_state.eq(self.states["CAPTURED"])
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += write_enable.eq(0)
|
||||
# m.d.comb += read_pointer.eq(read_pointer)
|
||||
# m.d.comb += write_pointer.eq(write_pointer)
|
||||
|
||||
with m.Else():
|
||||
self.increment_mod_sample_depth(m, self.r.write_pointer)
|
||||
with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]):
|
||||
pass
|
||||
|
||||
with m.If((self.r.request_stop) & (~prev_request_stop)):
|
||||
m.d.sync += self.r.state.eq(self.states["IDLE"])
|
||||
with m.If(self.r.trigger_mode == self.trigger_modes["INCREMENTAL"]):
|
||||
pass
|
||||
|
||||
# Regardless of trigger mode, go back to IDLE if request_stop is pulsed
|
||||
with m.If((request_stop) & (~prev_request_stop)):
|
||||
m.d.comb += next_state.eq(self.states["IDLE"])
|
||||
|
||||
# SEQUENTIAL SHIT
|
||||
|
||||
# Rising edge detection for start/stop requests
|
||||
m.d.sync += prev_request_start.eq(request_start)
|
||||
m.d.sync += prev_request_stop.eq(request_stop)
|
||||
|
||||
# Copy next into current
|
||||
m.d.sync += state.eq(next_state)
|
||||
m.d.sync += next_write_pointer.eq((write_pointer + 1) % sample_depth)
|
||||
m.d.sync += next_read_pointer.eq((read_pointer + 1) % sample_depth)
|
||||
m.d.sync += latch_read_pointer.eq(read_pointer)
|
||||
|
||||
return m
|
||||
|
||||
#### OLD STUFF FOR REFERENCE ####
|
||||
|
||||
# with m.If(self.r.state == self.states["IDLE"]):
|
||||
# m.d.sync += self.r.write_pointer.eq(0)
|
||||
# m.d.sync += self.r.read_pointer.eq(0)
|
||||
# m.d.sync += self.write_enable.eq(0)
|
||||
|
||||
# with m.If((self.r.request_start) & (~prev_request_start)):
|
||||
# m.d.sync += self.write_enable.eq(1)
|
||||
# with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]):
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
# m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1)
|
||||
|
||||
# with m.Else():
|
||||
# with m.If(self.r.trigger_location == 0):
|
||||
# m.d.sync += self.r.state.eq(self.states["IN_POSITION"])
|
||||
|
||||
# with m.Else():
|
||||
# m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
# # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
# with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]):
|
||||
# m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1)
|
||||
|
||||
# with m.If(self.r.write_pointer == self.r.trigger_location):
|
||||
# with m.If(self.trigger):
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
|
||||
# with m.Else():
|
||||
# m.d.sync += self.r.state.eq(self.states["IN_POSITION"])
|
||||
# self.increment_mod_sample_depth(m, self.r.read_pointer)
|
||||
|
||||
# with m.Elif(self.r.state == self.states["IN_POSITION"]):
|
||||
# self.increment_mod_sample_depth(m, self.r.write_pointer)
|
||||
|
||||
# with m.If(self.trigger):
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
|
||||
# with m.Else():
|
||||
# self.increment_mod_sample_depth(m, self.r.read_pointer)
|
||||
|
||||
# with m.Elif(self.r.state == self.states["CAPTURING"]):
|
||||
# with m.If(self.r.write_pointer == self.r.read_pointer):
|
||||
# m.d.sync += self.write_enable.eq(0)
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURED"])
|
||||
|
||||
# with m.Else():
|
||||
# self.increment_mod_sample_depth(m, self.r.write_pointer)
|
||||
|
||||
# with m.If((self.r.request_stop) & (~prev_request_stop)):
|
||||
# m.d.sync += self.r.state.eq(self.states["IDLE"])
|
||||
|
|
|
|||
|
|
@ -0,0 +1,73 @@
|
|||
from amaranth.sim import Simulator
|
||||
from manta.logic_analyzer import LogicAnalyzerFSM
|
||||
from manta.utils import *
|
||||
|
||||
"""
|
||||
what do we want this to do?
|
||||
|
||||
we want to run a capture in single shot mode, immediate mode, and incremental mode
|
||||
|
||||
|
||||
single-shot case:
|
||||
- exactly the right number of samples are taken
|
||||
- we only start taking samples once captured
|
||||
|
||||
immediate case:
|
||||
- exactly the right number of samples are taken
|
||||
- we only start taking samples once captured
|
||||
|
||||
incremental case:
|
||||
- exactly the right number of samples are taken
|
||||
- we only take samples when trig is asserted
|
||||
|
||||
"""
|
||||
config = {"sample_depth": 8}
|
||||
fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
|
||||
|
||||
|
||||
def set_fsm_register(name, data):
|
||||
addr = fsm.r.mmap[f"{name}_buf"]["addrs"][0]
|
||||
strobe_addr = fsm.r.base_addr
|
||||
|
||||
yield from write_register(fsm, strobe_addr, 0)
|
||||
yield from write_register(fsm, addr, data)
|
||||
yield from write_register(fsm, strobe_addr, 1)
|
||||
yield from write_register(fsm, strobe_addr, 0)
|
||||
|
||||
|
||||
def test_single_shot_always_trigger():
|
||||
def testbench():
|
||||
if (yield fsm.r.state != fsm.states["IDLE"]):
|
||||
raise ValueError
|
||||
|
||||
yield fsm.trigger.eq(1)
|
||||
yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"])
|
||||
yield from set_fsm_register("trigger_location", 4)
|
||||
yield from set_fsm_register("request_start", 1)
|
||||
yield from set_fsm_register("request_start", 0)
|
||||
|
||||
for _ in range(100):
|
||||
yield
|
||||
|
||||
simulate(fsm, testbench, "single_shot_always_trigger.vcd")
|
||||
|
||||
|
||||
def test_single_shot_wait_to_trigger():
|
||||
def testbench():
|
||||
if (yield fsm.r.state != fsm.states["IDLE"]):
|
||||
raise ValueError
|
||||
|
||||
yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"])
|
||||
yield from set_fsm_register("trigger_location", 4)
|
||||
yield from set_fsm_register("request_start", 1)
|
||||
yield from set_fsm_register("request_start", 0)
|
||||
|
||||
for _ in range(8):
|
||||
yield
|
||||
|
||||
yield fsm.trigger.eq(1)
|
||||
|
||||
for _ in range(100):
|
||||
yield
|
||||
|
||||
simulate(fsm, testbench, "single_shot_wait_to_trigger.vcd")
|
||||
Loading…
Reference in New Issue