refactor logic analyzer to use enums, add incremental + immediate trigger modes
This commit is contained in:
parent
6e3fe8cb0e
commit
ab0909d06b
|
|
@ -2,7 +2,7 @@ from amaranth import *
|
|||
from warnings import warn
|
||||
from ..utils import *
|
||||
from .trigger_block import LogicAnalyzerTriggerBlock
|
||||
from .fsm import LogicAnalyzerFSM
|
||||
from .fsm import LogicAnalyzerFSM, States, TriggerModes
|
||||
from .sample_mem import LogicAnalyzerSampleMemory
|
||||
from .playback import LogicAnalyzerPlayback
|
||||
|
||||
|
|
@ -217,13 +217,13 @@ class LogicAnalyzerCore(Elaboratable):
|
|||
# If core is not in IDLE state, request that it return to IDLE
|
||||
print_if_verbose(" -> Resetting core...")
|
||||
state = self.fsm.r.get_probe("state")
|
||||
if state != self.fsm.states["IDLE"]:
|
||||
if state != States.IDLE:
|
||||
self.fsm.r.set_probe("request_start", 0)
|
||||
self.fsm.r.set_probe("request_stop", 0)
|
||||
self.fsm.r.set_probe("request_stop", 1)
|
||||
self.fsm.r.set_probe("request_stop", 0)
|
||||
|
||||
if self.fsm.r.get_probe("state") != self.fsm.states["IDLE"]:
|
||||
if self.fsm.r.get_probe("state") != States.IDLE:
|
||||
raise ValueError("Logic analyzer did not reset to IDLE state.")
|
||||
|
||||
# Set triggers
|
||||
|
|
@ -237,10 +237,10 @@ class LogicAnalyzerCore(Elaboratable):
|
|||
print_if_verbose(" -> Setting trigger mode...")
|
||||
if "trigger_mode" in self.config:
|
||||
mode = self.config["trigger_mode"].upper()
|
||||
self.fsm.r.set_probe("trigger_mode", self.fsm.trigger_modes[mode])
|
||||
self.fsm.r.set_probe("trigger_mode", TriggerModes[mode])
|
||||
|
||||
else:
|
||||
self.fsm.r.set_probe("trigger_mode", self.fsm.trigger_modes["SINGLE_SHOT"])
|
||||
self.fsm.r.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT)
|
||||
|
||||
# Set trigger location
|
||||
print_if_verbose(" -> Setting trigger location...")
|
||||
|
|
@ -258,7 +258,7 @@ class LogicAnalyzerCore(Elaboratable):
|
|||
|
||||
# Poll the state machine's state, and wait for the capture to complete
|
||||
print_if_verbose(" -> Waiting for capture to complete...")
|
||||
while self.fsm.r.get_probe("state") != self.fsm.states["CAPTURED"]:
|
||||
while self.fsm.r.get_probe("state") != States.CAPTURED:
|
||||
pass
|
||||
|
||||
# Read out the entirety of the sample memory
|
||||
|
|
|
|||
|
|
@ -1,23 +1,28 @@
|
|||
from amaranth import *
|
||||
from amaranth.lib.enum import IntEnum
|
||||
from math import ceil, log2
|
||||
from ..io_core import IOCore
|
||||
|
||||
|
||||
class States(IntEnum):
|
||||
IDLE = 0
|
||||
MOVE_TO_POSITION = 1
|
||||
IN_POSITION = 2
|
||||
CAPTURING = 3
|
||||
CAPTURED = 4
|
||||
|
||||
|
||||
class TriggerModes(IntEnum):
|
||||
SINGLE_SHOT = 0
|
||||
INCREMENTAL = 1
|
||||
IMMEDIATE = 2
|
||||
|
||||
|
||||
class LogicAnalyzerFSM(Elaboratable):
|
||||
""" """
|
||||
|
||||
def __init__(self, config, base_addr, interface):
|
||||
self.config = config
|
||||
self.states = {
|
||||
"IDLE": 0,
|
||||
"MOVE_TO_POSITION": 1,
|
||||
"IN_POSITION": 2,
|
||||
"CAPTURING": 3,
|
||||
"CAPTURED": 4,
|
||||
}
|
||||
|
||||
self.trigger_modes = {"SINGLE_SHOT": 0, "INCREMENTAL": 1, "IMMEDIATE": 2}
|
||||
|
||||
self.trigger = Signal(1)
|
||||
self.write_enable = Signal(1)
|
||||
|
||||
|
|
@ -74,83 +79,11 @@ class LogicAnalyzerFSM(Elaboratable):
|
|||
next_state = Signal().like(state)
|
||||
next_read_pointer = Signal().like(read_pointer)
|
||||
latch_read_pointer = Signal().like(read_pointer)
|
||||
latch_write_pointer = Signal().like(write_pointer)
|
||||
latch_write_enable = Signal().like(write_enable)
|
||||
next_write_pointer = Signal().like(write_pointer)
|
||||
|
||||
# COMBINATIONAL SHIT
|
||||
with m.If(trigger_mode == self.trigger_modes["SINGLE_SHOT"]):
|
||||
with m.If(state == self.states["IDLE"]):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(0)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(self.states["IDLE"])
|
||||
|
||||
# Rising edge of request_start beings the capture:
|
||||
with m.If((request_start) & (~prev_request_start)):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
# Go straight to IN_POSITION if trigger_location == 0
|
||||
with m.If(trigger_location == 0):
|
||||
m.d.comb += next_state.eq(self.states["IN_POSITION"])
|
||||
|
||||
# Otherwise go to MOVE_TO_POSITION
|
||||
with m.Else():
|
||||
m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
with m.Elif(state == self.states["MOVE_TO_POSITION"]):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
with m.If(write_pointer == trigger_location - 1):
|
||||
with m.If(self.trigger):
|
||||
m.d.comb += next_state.eq(self.states["CAPTURING"])
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += next_state.eq(self.states["IN_POSITION"])
|
||||
|
||||
with m.Elif(state == self.states["IN_POSITION"]):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
m.d.comb += next_state.eq(self.states["IN_POSITION"])
|
||||
|
||||
with m.If(self.trigger):
|
||||
m.d.comb += next_state.eq(self.states["CAPTURING"])
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += read_pointer.eq(next_read_pointer)
|
||||
|
||||
with m.Elif(state == self.states["CAPTURING"]):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += next_state.eq(self.states["CAPTURING"])
|
||||
|
||||
with m.If(next_write_pointer == read_pointer):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += next_state.eq(self.states["CAPTURED"])
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
|
||||
with m.Elif(state == self.states["CAPTURED"]):
|
||||
m.d.comb += next_state.eq(self.states["CAPTURED"])
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += write_enable.eq(0)
|
||||
# m.d.comb += read_pointer.eq(read_pointer)
|
||||
# m.d.comb += write_pointer.eq(write_pointer)
|
||||
|
||||
with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]):
|
||||
pass
|
||||
|
||||
with m.If(self.r.trigger_mode == self.trigger_modes["INCREMENTAL"]):
|
||||
pass
|
||||
|
||||
# Regardless of trigger mode, go back to IDLE if request_stop is pulsed
|
||||
with m.If((request_stop) & (~prev_request_stop)):
|
||||
m.d.comb += next_state.eq(self.states["IDLE"])
|
||||
|
||||
# SEQUENTIAL SHIT
|
||||
|
||||
# --- Sequential Logic ---
|
||||
# Rising edge detection for start/stop requests
|
||||
m.d.sync += prev_request_start.eq(request_start)
|
||||
m.d.sync += prev_request_stop.eq(request_stop)
|
||||
|
|
@ -160,58 +93,137 @@ class LogicAnalyzerFSM(Elaboratable):
|
|||
m.d.sync += next_write_pointer.eq((write_pointer + 1) % sample_depth)
|
||||
m.d.sync += next_read_pointer.eq((read_pointer + 1) % sample_depth)
|
||||
m.d.sync += latch_read_pointer.eq(read_pointer)
|
||||
m.d.sync += latch_write_pointer.eq(write_pointer)
|
||||
m.d.sync += latch_write_enable.eq(write_enable)
|
||||
|
||||
# --- Combinational Logic ---
|
||||
|
||||
# --- Single Shot Trigger Mode ---
|
||||
with m.If(trigger_mode == TriggerModes.SINGLE_SHOT):
|
||||
with m.If(state == States.IDLE):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(0)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(States.IDLE)
|
||||
|
||||
# Rising edge of request_start beings the capture:
|
||||
with m.If((request_start) & (~prev_request_start)):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
# Go straight to IN_POSITION if trigger_location == 0
|
||||
with m.If(trigger_location == 0):
|
||||
m.d.comb += next_state.eq(States.IN_POSITION)
|
||||
|
||||
# Otherwise go to MOVE_TO_POSITION
|
||||
with m.Else():
|
||||
m.d.comb += next_state.eq(States.MOVE_TO_POSITION)
|
||||
|
||||
with m.Elif(state == States.MOVE_TO_POSITION):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(States.MOVE_TO_POSITION)
|
||||
|
||||
with m.If(write_pointer == trigger_location - 1):
|
||||
with m.If(self.trigger):
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += next_state.eq(States.IN_POSITION)
|
||||
|
||||
with m.Elif(state == States.IN_POSITION):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
m.d.comb += next_state.eq(States.IN_POSITION)
|
||||
|
||||
with m.If(self.trigger):
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += read_pointer.eq(next_read_pointer)
|
||||
|
||||
with m.Elif(state == States.CAPTURING):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
|
||||
with m.If(next_write_pointer == read_pointer):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
m.d.comb += next_state.eq(States.CAPTURED)
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
|
||||
with m.Elif(state == States.CAPTURED):
|
||||
m.d.comb += next_state.eq(States.CAPTURED)
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
m.d.comb += write_enable.eq(0)
|
||||
|
||||
# --- Immediate Trigger Mode ---
|
||||
with m.If(self.r.trigger_mode == TriggerModes.IMMEDIATE):
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
with m.If(self.r.state == States.IDLE):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(States.IDLE)
|
||||
|
||||
# Rising edge of request_start beings the capture:
|
||||
with m.If((request_start) & (~prev_request_start)):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
|
||||
with m.Elif(state == States.CAPTURING):
|
||||
m.d.comb += write_enable.eq(1)
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
|
||||
with m.If(next_write_pointer == read_pointer):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
m.d.comb += next_state.eq(States.CAPTURED)
|
||||
|
||||
with m.Elif(state == States.CAPTURED):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
m.d.comb += next_state.eq(States.CAPTURED)
|
||||
|
||||
# --- Incremental Trigger Mode ---
|
||||
with m.If(self.r.trigger_mode == TriggerModes.INCREMENTAL):
|
||||
with m.If(state == States.IDLE):
|
||||
m.d.comb += write_enable.eq(0)
|
||||
m.d.comb += write_pointer.eq(0)
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(States.IDLE)
|
||||
|
||||
# Rising edge of request_start beings the capture:
|
||||
with m.If((request_start) & (~prev_request_start)):
|
||||
m.d.comb += write_enable.eq(self.trigger)
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
|
||||
with m.Elif(state == States.CAPTURING):
|
||||
m.d.comb += read_pointer.eq(0)
|
||||
m.d.comb += next_state.eq(States.CAPTURING)
|
||||
m.d.comb += write_enable.eq(self.trigger)
|
||||
|
||||
with m.If(latch_write_enable):
|
||||
m.d.comb += write_pointer.eq(next_write_pointer)
|
||||
with m.Else():
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
|
||||
with m.If((self.trigger) & (next_write_pointer == read_pointer)):
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
m.d.comb += next_state.eq(States.CAPTURED)
|
||||
|
||||
with m.Elif(state == States.CAPTURED):
|
||||
m.d.comb += next_state.eq(States.CAPTURED)
|
||||
m.d.comb += read_pointer.eq(latch_read_pointer)
|
||||
m.d.comb += write_pointer.eq(latch_write_pointer)
|
||||
m.d.comb += write_enable.eq(0)
|
||||
|
||||
# Regardless of trigger mode, go back to IDLE if request_stop is pulsed
|
||||
with m.If((request_stop) & (~prev_request_stop)):
|
||||
m.d.comb += next_state.eq(States.IDLE)
|
||||
|
||||
return m
|
||||
|
||||
#### OLD STUFF FOR REFERENCE ####
|
||||
|
||||
# with m.If(self.r.state == self.states["IDLE"]):
|
||||
# m.d.sync += self.r.write_pointer.eq(0)
|
||||
# m.d.sync += self.r.read_pointer.eq(0)
|
||||
# m.d.sync += self.write_enable.eq(0)
|
||||
|
||||
# with m.If((self.r.request_start) & (~prev_request_start)):
|
||||
# m.d.sync += self.write_enable.eq(1)
|
||||
# with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]):
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
# m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1)
|
||||
|
||||
# with m.Else():
|
||||
# with m.If(self.r.trigger_location == 0):
|
||||
# m.d.sync += self.r.state.eq(self.states["IN_POSITION"])
|
||||
|
||||
# with m.Else():
|
||||
# m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
# # m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"])
|
||||
|
||||
# with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]):
|
||||
# m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1)
|
||||
|
||||
# with m.If(self.r.write_pointer == self.r.trigger_location):
|
||||
# with m.If(self.trigger):
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
|
||||
# with m.Else():
|
||||
# m.d.sync += self.r.state.eq(self.states["IN_POSITION"])
|
||||
# self.increment_mod_sample_depth(m, self.r.read_pointer)
|
||||
|
||||
# with m.Elif(self.r.state == self.states["IN_POSITION"]):
|
||||
# self.increment_mod_sample_depth(m, self.r.write_pointer)
|
||||
|
||||
# with m.If(self.trigger):
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURING"])
|
||||
|
||||
# with m.Else():
|
||||
# self.increment_mod_sample_depth(m, self.r.read_pointer)
|
||||
|
||||
# with m.Elif(self.r.state == self.states["CAPTURING"]):
|
||||
# with m.If(self.r.write_pointer == self.r.read_pointer):
|
||||
# m.d.sync += self.write_enable.eq(0)
|
||||
# m.d.sync += self.r.state.eq(self.states["CAPTURED"])
|
||||
|
||||
# with m.Else():
|
||||
# self.increment_mod_sample_depth(m, self.r.write_pointer)
|
||||
|
||||
# with m.If((self.r.request_stop) & (~prev_request_stop)):
|
||||
# m.d.sync += self.r.state.eq(self.states["IDLE"])
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
from amaranth.sim import Simulator
|
||||
from manta.logic_analyzer import LogicAnalyzerFSM
|
||||
from manta.logic_analyzer import *
|
||||
from manta.utils import *
|
||||
|
||||
"""
|
||||
|
|
@ -25,49 +25,252 @@ config = {"sample_depth": 8}
|
|||
fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
|
||||
|
||||
|
||||
def set_fsm_register(name, data):
|
||||
addr = fsm.r.mmap[f"{name}_buf"]["addrs"][0]
|
||||
strobe_addr = fsm.r.base_addr
|
||||
|
||||
yield from write_register(fsm, strobe_addr, 0)
|
||||
yield from write_register(fsm, addr, data)
|
||||
yield from write_register(fsm, strobe_addr, 1)
|
||||
yield from write_register(fsm, strobe_addr, 0)
|
||||
|
||||
|
||||
def test_single_shot_always_trigger():
|
||||
def test_signals_reset_correctly():
|
||||
def testbench():
|
||||
if (yield fsm.r.state != fsm.states["IDLE"]):
|
||||
# Make sure pointers and write enable reset to zero
|
||||
for sig in [fsm.r.write_pointer, fsm.r.read_pointer, fsm.write_enable]:
|
||||
if (yield sig) != 0:
|
||||
raise ValueError
|
||||
|
||||
# Make sure state resets to IDLE
|
||||
if (yield fsm.r.state != States.IDLE):
|
||||
raise ValueError
|
||||
|
||||
yield fsm.trigger.eq(1)
|
||||
yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"])
|
||||
yield from set_fsm_register("trigger_location", 4)
|
||||
yield from set_fsm_register("request_start", 1)
|
||||
yield from set_fsm_register("request_start", 0)
|
||||
simulate(fsm, testbench)
|
||||
|
||||
for _ in range(100):
|
||||
|
||||
def test_single_shot_no_wait_for_trigger():
|
||||
def testbench():
|
||||
# Configure and start FSM
|
||||
yield fsm.trigger.eq(1)
|
||||
yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
|
||||
yield fsm.r.trigger_location.eq(4)
|
||||
yield fsm.r.request_start.eq(1)
|
||||
|
||||
# Wait until write_enable is asserted
|
||||
while not (yield fsm.write_enable):
|
||||
yield
|
||||
|
||||
simulate(fsm, testbench, "single_shot_always_trigger.vcd")
|
||||
# Wait 8 clock cycles for capture to complete
|
||||
for i in range(8):
|
||||
# Make sure that read_pointer does not increase
|
||||
if (yield fsm.r.read_pointer) != 0:
|
||||
raise ValueError
|
||||
|
||||
# Make sure that write_pointer increases by one each cycle
|
||||
if (yield fsm.r.write_pointer) != i:
|
||||
raise ValueError
|
||||
|
||||
def test_single_shot_wait_to_trigger():
|
||||
def testbench():
|
||||
if (yield fsm.r.state != fsm.states["IDLE"]):
|
||||
yield
|
||||
|
||||
# Wait one clock cycle (to let BRAM contents cycle in)
|
||||
yield
|
||||
|
||||
# Check that write_pointer points to the end of memory
|
||||
if (yield fsm.r.write_pointer) != 7:
|
||||
raise ValueError
|
||||
|
||||
yield from set_fsm_register("trigger_mode", fsm.trigger_modes["SINGLE_SHOT"])
|
||||
yield from set_fsm_register("trigger_location", 4)
|
||||
yield from set_fsm_register("request_start", 1)
|
||||
yield from set_fsm_register("request_start", 0)
|
||||
# Check that state is CAPTURED
|
||||
if (yield fsm.r.state) != States.CAPTURED:
|
||||
raise ValueError
|
||||
|
||||
simulate(fsm, testbench, "single_shot_no_wait_for_trigger.vcd")
|
||||
|
||||
|
||||
def test_single_shot_wait_for_trigger():
|
||||
def testbench():
|
||||
# Configure and start FSM
|
||||
yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
|
||||
yield fsm.r.trigger_location.eq(4)
|
||||
yield fsm.r.request_start.eq(1)
|
||||
yield
|
||||
|
||||
# Check that write_enable is asserted on the same edge as request_start
|
||||
if not (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
# Wait 4 clock cycles to get to IN_POSITION
|
||||
for i in range(4):
|
||||
rp = yield fsm.r.read_pointer
|
||||
wp = yield fsm.r.write_pointer
|
||||
|
||||
# Make sure that read_pointer does not increase
|
||||
if rp != 0:
|
||||
raise ValueError
|
||||
|
||||
# Make sure that write_pointer increases by one each cycle
|
||||
if wp != i:
|
||||
raise ValueError
|
||||
|
||||
yield
|
||||
|
||||
# Wait a few cycles before triggering:
|
||||
for _ in range(10):
|
||||
if (rp + 3) % fsm.config["sample_depth"] != wp:
|
||||
raise ValueError
|
||||
|
||||
yield
|
||||
|
||||
# Provide the trigger, and check that the capture completes 4 cycles later
|
||||
yield fsm.trigger.eq(1)
|
||||
yield
|
||||
|
||||
rp_start = yield fsm.r.read_pointer
|
||||
for i in range(4):
|
||||
rp = yield fsm.r.read_pointer
|
||||
wp = yield fsm.r.write_pointer
|
||||
|
||||
if rp != rp_start:
|
||||
raise ValueError
|
||||
|
||||
if (rp_start + 4 + i) % fsm.config["sample_depth"] != wp:
|
||||
raise ValueError
|
||||
|
||||
yield
|
||||
|
||||
# Wait one clock cycle (to let BRAM contents cycle in)
|
||||
yield
|
||||
|
||||
# Check that write_pointer points to the end of memory
|
||||
rp = yield fsm.r.read_pointer
|
||||
wp = yield fsm.r.write_pointer
|
||||
if (wp + 1) % fsm.config["sample_depth"] != rp:
|
||||
raise ValueError
|
||||
|
||||
# Check that state is CAPTURED
|
||||
if (yield fsm.r.state) != States.CAPTURED:
|
||||
raise ValueError
|
||||
|
||||
simulate(fsm, testbench, "single_shot_wait_for_trigger.vcd")
|
||||
|
||||
|
||||
def test_immediate():
|
||||
def testbench():
|
||||
# Configure and start FSM
|
||||
yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE)
|
||||
yield fsm.r.request_start.eq(1)
|
||||
yield
|
||||
|
||||
# Check that write_enable is asserted on the same edge as request_start
|
||||
if not (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
for i in range(fsm.config["sample_depth"]):
|
||||
rp = yield fsm.r.read_pointer
|
||||
wp = yield fsm.r.write_pointer
|
||||
|
||||
if rp != 0:
|
||||
raise ValueError
|
||||
|
||||
if wp != i:
|
||||
raise ValueError
|
||||
|
||||
yield
|
||||
|
||||
# Wait one clock cycle (to let BRAM contents cycle in)
|
||||
yield
|
||||
|
||||
# Check that write_pointer points to the end of memory
|
||||
rp = yield fsm.r.read_pointer
|
||||
wp = yield fsm.r.write_pointer
|
||||
if rp != 0:
|
||||
raise ValueError
|
||||
if wp != 7:
|
||||
raise ValueError
|
||||
|
||||
# Check that state is CAPTURED
|
||||
if (yield fsm.r.state) != States.CAPTURED:
|
||||
raise ValueError
|
||||
|
||||
simulate(fsm, testbench, "immediate.vcd")
|
||||
|
||||
|
||||
def test_incremental():
|
||||
def testbench():
|
||||
# Configure and start FSM
|
||||
yield fsm.r.trigger_mode.eq(TriggerModes.INCREMENTAL)
|
||||
yield fsm.r.request_start.eq(1)
|
||||
yield
|
||||
|
||||
# Check that write_enable is asserted on the same edge as request_start
|
||||
# if not (yield fsm.write_enable):
|
||||
# raise ValueError
|
||||
|
||||
for _ in range(10):
|
||||
for _ in range(3):
|
||||
yield
|
||||
|
||||
yield fsm.trigger.eq(1)
|
||||
yield
|
||||
yield fsm.trigger.eq(0)
|
||||
yield
|
||||
|
||||
# # Check that state is CAPTURED
|
||||
# if (yield fsm.r.state) != States.CAPTURED:
|
||||
# raise ValueError
|
||||
|
||||
simulate(fsm, testbench, "incremental.vcd")
|
||||
|
||||
|
||||
def test_single_shot_write_enable():
|
||||
def testbench():
|
||||
# Configure FSM
|
||||
yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
|
||||
yield fsm.r.trigger_location.eq(4)
|
||||
yield
|
||||
|
||||
# Make sure write is not enabled before starting the FSM
|
||||
if (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
# Start the FSM, ensure write enable is asserted throughout the capture
|
||||
yield fsm.r.request_start.eq(1)
|
||||
yield
|
||||
|
||||
for _ in range(fsm.config["sample_depth"]):
|
||||
if not (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
for _ in range(8):
|
||||
yield
|
||||
|
||||
yield fsm.trigger.eq(1)
|
||||
yield
|
||||
|
||||
for _ in range(4):
|
||||
if not (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
for _ in range(100):
|
||||
yield
|
||||
|
||||
simulate(fsm, testbench, "single_shot_wait_to_trigger.vcd")
|
||||
# Make sure write_enable is deasserted after
|
||||
if (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
simulate(fsm, testbench, "single_shot_write_enable.vcd")
|
||||
|
||||
|
||||
def test_immediate_write_enable():
|
||||
def testbench():
|
||||
# Configure FSM
|
||||
yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE)
|
||||
yield
|
||||
|
||||
# Make sure write is not enabled before starting the FSM
|
||||
if (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
# Start the FSM, ensure write enable is asserted throughout the capture
|
||||
yield fsm.r.request_start.eq(1)
|
||||
yield
|
||||
|
||||
for _ in range(fsm.config["sample_depth"]):
|
||||
if not (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
yield
|
||||
|
||||
# Make sure write_enable is deasserted after
|
||||
if (yield fsm.write_enable):
|
||||
raise ValueError
|
||||
|
||||
simulate(fsm, testbench, "immediate_write_enable.vcd")
|
||||
|
|
|
|||
Loading…
Reference in New Issue