add immediate capture mode to logic analyzer

This commit is contained in:
Fischer Moseley 2024-01-03 13:35:09 -07:00
parent 07ae9cc2e8
commit ee18e10ae1
2 changed files with 66 additions and 102 deletions

View File

@ -247,61 +247,83 @@ class LogicAnalyzerCore(Elaboratable):
Cat(attrs["triggered"] for attrs in self.probe_signals.values()).any()
)
def increment_mod_sample_depth(self, m, signal):
# m.d.sync += signal.eq((signal + 1) % self.config["sample_depth"])
with m.If(signal == self.config["sample_depth"] - 1):
m.d.sync += signal.eq(0)
with m.Else():
m.d.sync += signal.eq(signal + 1)
def run_state_machine(self, m):
self.prev_request_start = Signal(1)
self.prev_request_stop = Signal(1)
prev_request_start = Signal(1)
prev_request_stop = Signal(1)
request_start = self.registers.request_start
request_stop = self.registers.request_stop
trigger_mode = self.registers.trigger_mode
trigger_loc = self.registers.trigger_loc
state = self.registers.state
rp = self.registers.read_pointer
wp = self.registers.write_pointer
we = self.sample_mem.user_we
m.d.comb += self.sample_mem.user_addr.eq(wp)
# Rising edge detection for start/stop requests
m.d.sync += self.prev_request_start.eq(self.registers.request_start)
m.d.sync += self.prev_request_stop.eq(self.registers.request_stop)
m.d.sync += prev_request_start.eq(request_start)
m.d.sync += prev_request_stop.eq(request_stop)
m.d.comb += self.sample_mem.user_addr.eq(self.registers.write_pointer)
with m.If(state == self.states["IDLE"]):
m.d.sync += wp.eq(0)
m.d.sync += rp.eq(0)
m.d.sync += we.eq(0)
with m.If(self.registers.state == self.states["IDLE"]):
m.d.sync += self.registers.write_pointer.eq(0)
m.d.sync += self.registers.read_pointer.eq(0)
m.d.sync += self.sample_mem.user_we.eq(0) # or something like this
with m.If((self.registers.request_start) & (~self.prev_request_start)):
m.d.sync += self.registers.state.eq(self.states["MOVE_TO_POSITION"])
with m.Elif(self.registers.state == self.states["MOVE_TO_POSITION"]):
m.d.sync += self.registers.write_pointer.eq(
self.registers.write_pointer + 1
)
m.d.sync += self.sample_mem.user_we.eq(1)
with m.If(self.registers.write_pointer == self.registers.trigger_loc):
with m.If(self.trig):
m.d.sync += self.registers.state.eq(self.states["CAPTURING"])
with m.If((request_start) & (~prev_request_start)):
m.d.sync += we.eq(1)
with m.If(trigger_mode == self.trigger_modes["IMMEDIATE"]):
m.d.sync += state.eq(self.states["CAPTURING"])
with m.Else():
m.d.sync += self.registers.state.eq(self.states["IN_POSITION"])
with m.If(trigger_loc == 0):
m.d.sync += state.eq(self.states["IN_POSITION"])
with m.Elif(self.registers.state == self.states["IN_POSITION"]):
m.d.sync += self.registers.write_pointer.eq(
(self.registers.write_pointer + 1) % self.config["sample_depth"]
)
m.d.sync += self.registers.read_pointer.eq(
(self.registers.read_pointer + 1) % self.config["sample_depth"]
)
m.d.sync += self.sample_mem.user_we.eq(1)
with m.Else():
m.d.sync += state.eq(self.states["MOVE_TO_POSITION"])
m.d.sync += state.eq(self.states["MOVE_TO_POSITION"])
with m.Elif(state == self.states["MOVE_TO_POSITION"]):
m.d.sync += wp.eq(wp + 1)
with m.If(wp == trigger_loc):
with m.If(self.trig):
m.d.sync += state.eq(self.states["CAPTURING"])
with m.Else():
m.d.sync += state.eq(self.states["IN_POSITION"])
self.increment_mod_sample_depth(m, rp)
with m.Elif(state == self.states["IN_POSITION"]):
self.increment_mod_sample_depth(m, wp)
with m.If(self.trig):
m.d.sync += self.registers.state.eq(self.states["CAPTURING"])
with m.Elif(self.registers.state == self.states["CAPTURING"]):
with m.If(self.registers.write_pointer == self.registers.read_pointer):
m.d.sync += self.sample_mem.user_we.eq(0)
m.d.sync += self.registers.state.eq(self.states["CAPTURED"])
m.d.sync += state.eq(self.states["CAPTURING"])
with m.Else():
m.d.sync += self.registers.write_pointer.eq(
(self.registers.write_pointer + 1) % self.config["sample_depth"]
)
self.increment_mod_sample_depth(m, rp)
with m.If((self.registers.request_stop) & (~self.prev_request_stop)):
m.d.sync += self.registers.state.eq(self.states["IDLE"])
with m.Elif(state == self.states["CAPTURING"]):
with m.If(wp == rp):
m.d.sync += we.eq(0)
m.d.sync += state.eq(self.states["CAPTURED"])
with m.Else():
self.increment_mod_sample_depth(m, wp)
with m.If((request_stop) & (~prev_request_stop)):
m.d.sync += state.eq(self.states["IDLE"])
def elaborate(self, platform):
m = Module()

View File

@ -6,70 +6,12 @@ from random import sample
config = {
"type": "logic_analyzer",
"sample_depth": 1024,
"trigger_loc": 500,
"trigger_loc": 512,
"probes": {"larry": 1, "curly": 3, "moe": 9},
"triggers": ["moe RISING"],
}
# class SimulatedBusInterface():
# def __init__(self, core):
# self.core = core
# def write(self, addrs, datas):
# # Handle a single integer address and data
# if isinstance(addrs, int) and isinstance(datas, int):
# return self.write([addrs], [datas])
# # Make sure address and datas are all integers
# if not isinstance(addrs, list) or not isinstance(datas, list):
# raise ValueError(
# "Write addresses and data must be an integer or list of integers."
# )
# if not all(isinstance(a, int) for a in addrs):
# raise ValueError("Write addresses must be all be integers.")
# if not all(isinstance(d, int) for d in datas):
# raise ValueError("Write data must all be integers.")
# # I'm not sure if it's necessary to split outputs into chunks
# # I think the output buffer doesn't really drop stuff, just the input buffer
# for addr, data in zip(addrs, datas):
# yield from write_register(self.core, addr, data)
# def read(self, addrs):
# # Handle a single integer address
# if isinstance(addrs, int):
# return self.read([addrs])[0]
# # Make sure all list elements are integers
# if not all(isinstance(a, int) for a in addrs):
# raise ValueError("Read address must be an integer or list of integers.")
# datas = []
# for addr in addrs:
# yield la.addr_i.eq(addr)
# yield la.data_i.eq(0)
# yield la.rw_i.eq(0)
# yield la.valid_i.eq(1)
# yield
# yield la.addr_i.eq(0)
# yield la.valid_i.eq(0)
# # wait for output to be valid
# while not (yield la.valid_o):
# yield
# datas.append( (yield la.data_o) )
# return datas
la = LogicAnalyzerCore(config, base_addr=0, interface=None)
# interface = SimulatedBusInterface(la)
# la.registers.interface = interface # unironically the least cursed part of this
# la.sample_mem.interface = interface # unironically the least cursed part of this
def print_data_at_addr(addr):
# place read transaction on the bus
@ -113,7 +55,7 @@ def test_do_you_fucking_work():
) # right now this is not actually respected...oops
# setting trigger location
yield from set_logic_analyzer_register("trigger_loc", 500)
yield from set_logic_analyzer_register("trigger_loc", 511)
# starting capture
yield from set_logic_analyzer_register("request_start", 1)