diff --git a/src/manta/logic_analyzer_core.py b/src/manta/logic_analyzer_core.py index 245b467..8b40794 100644 --- a/src/manta/logic_analyzer_core.py +++ b/src/manta/logic_analyzer_core.py @@ -247,61 +247,83 @@ class LogicAnalyzerCore(Elaboratable): Cat(attrs["triggered"] for attrs in self.probe_signals.values()).any() ) + def increment_mod_sample_depth(self, m, signal): + # m.d.sync += signal.eq((signal + 1) % self.config["sample_depth"]) + + with m.If(signal == self.config["sample_depth"] - 1): + m.d.sync += signal.eq(0) + + with m.Else(): + m.d.sync += signal.eq(signal + 1) + def run_state_machine(self, m): - self.prev_request_start = Signal(1) - self.prev_request_stop = Signal(1) + prev_request_start = Signal(1) + prev_request_stop = Signal(1) + + request_start = self.registers.request_start + request_stop = self.registers.request_stop + trigger_mode = self.registers.trigger_mode + trigger_loc = self.registers.trigger_loc + state = self.registers.state + rp = self.registers.read_pointer + wp = self.registers.write_pointer + we = self.sample_mem.user_we + + m.d.comb += self.sample_mem.user_addr.eq(wp) # Rising edge detection for start/stop requests - m.d.sync += self.prev_request_start.eq(self.registers.request_start) - m.d.sync += self.prev_request_stop.eq(self.registers.request_stop) + m.d.sync += prev_request_start.eq(request_start) + m.d.sync += prev_request_stop.eq(request_stop) - m.d.comb += self.sample_mem.user_addr.eq(self.registers.write_pointer) + with m.If(state == self.states["IDLE"]): + m.d.sync += wp.eq(0) + m.d.sync += rp.eq(0) + m.d.sync += we.eq(0) - with m.If(self.registers.state == self.states["IDLE"]): - m.d.sync += self.registers.write_pointer.eq(0) - m.d.sync += self.registers.read_pointer.eq(0) - m.d.sync += self.sample_mem.user_we.eq(0) # or something like this - - with m.If((self.registers.request_start) & (~self.prev_request_start)): - m.d.sync += self.registers.state.eq(self.states["MOVE_TO_POSITION"]) - - with m.Elif(self.registers.state == self.states["MOVE_TO_POSITION"]): - m.d.sync += self.registers.write_pointer.eq( - self.registers.write_pointer + 1 - ) - m.d.sync += self.sample_mem.user_we.eq(1) - - with m.If(self.registers.write_pointer == self.registers.trigger_loc): - with m.If(self.trig): - m.d.sync += self.registers.state.eq(self.states["CAPTURING"]) + with m.If((request_start) & (~prev_request_start)): + m.d.sync += we.eq(1) + with m.If(trigger_mode == self.trigger_modes["IMMEDIATE"]): + m.d.sync += state.eq(self.states["CAPTURING"]) with m.Else(): - m.d.sync += self.registers.state.eq(self.states["IN_POSITION"]) + with m.If(trigger_loc == 0): + m.d.sync += state.eq(self.states["IN_POSITION"]) - with m.Elif(self.registers.state == self.states["IN_POSITION"]): - m.d.sync += self.registers.write_pointer.eq( - (self.registers.write_pointer + 1) % self.config["sample_depth"] - ) - m.d.sync += self.registers.read_pointer.eq( - (self.registers.read_pointer + 1) % self.config["sample_depth"] - ) - m.d.sync += self.sample_mem.user_we.eq(1) + with m.Else(): + m.d.sync += state.eq(self.states["MOVE_TO_POSITION"]) + + m.d.sync += state.eq(self.states["MOVE_TO_POSITION"]) + + with m.Elif(state == self.states["MOVE_TO_POSITION"]): + m.d.sync += wp.eq(wp + 1) + + with m.If(wp == trigger_loc): + with m.If(self.trig): + m.d.sync += state.eq(self.states["CAPTURING"]) + + with m.Else(): + m.d.sync += state.eq(self.states["IN_POSITION"]) + self.increment_mod_sample_depth(m, rp) + + with m.Elif(state == self.states["IN_POSITION"]): + self.increment_mod_sample_depth(m, wp) with m.If(self.trig): - m.d.sync += self.registers.state.eq(self.states["CAPTURING"]) - - with m.Elif(self.registers.state == self.states["CAPTURING"]): - with m.If(self.registers.write_pointer == self.registers.read_pointer): - m.d.sync += self.sample_mem.user_we.eq(0) - m.d.sync += self.registers.state.eq(self.states["CAPTURED"]) + m.d.sync += state.eq(self.states["CAPTURING"]) with m.Else(): - m.d.sync += self.registers.write_pointer.eq( - (self.registers.write_pointer + 1) % self.config["sample_depth"] - ) + self.increment_mod_sample_depth(m, rp) - with m.If((self.registers.request_stop) & (~self.prev_request_stop)): - m.d.sync += self.registers.state.eq(self.states["IDLE"]) + with m.Elif(state == self.states["CAPTURING"]): + with m.If(wp == rp): + m.d.sync += we.eq(0) + m.d.sync += state.eq(self.states["CAPTURED"]) + + with m.Else(): + self.increment_mod_sample_depth(m, wp) + + with m.If((request_stop) & (~prev_request_stop)): + m.d.sync += state.eq(self.states["IDLE"]) def elaborate(self, platform): m = Module() diff --git a/test/test_logic_analyzer_sim.py b/test/test_logic_analyzer_sim.py index b35e682..0f37aae 100644 --- a/test/test_logic_analyzer_sim.py +++ b/test/test_logic_analyzer_sim.py @@ -6,70 +6,12 @@ from random import sample config = { "type": "logic_analyzer", "sample_depth": 1024, - "trigger_loc": 500, + "trigger_loc": 512, "probes": {"larry": 1, "curly": 3, "moe": 9}, "triggers": ["moe RISING"], } -# class SimulatedBusInterface(): -# def __init__(self, core): -# self.core = core - -# def write(self, addrs, datas): -# # Handle a single integer address and data -# if isinstance(addrs, int) and isinstance(datas, int): -# return self.write([addrs], [datas]) - -# # Make sure address and datas are all integers -# if not isinstance(addrs, list) or not isinstance(datas, list): -# raise ValueError( -# "Write addresses and data must be an integer or list of integers." -# ) - -# if not all(isinstance(a, int) for a in addrs): -# raise ValueError("Write addresses must be all be integers.") - -# if not all(isinstance(d, int) for d in datas): -# raise ValueError("Write data must all be integers.") - -# # I'm not sure if it's necessary to split outputs into chunks -# # I think the output buffer doesn't really drop stuff, just the input buffer - -# for addr, data in zip(addrs, datas): -# yield from write_register(self.core, addr, data) - -# def read(self, addrs): -# # Handle a single integer address -# if isinstance(addrs, int): -# return self.read([addrs])[0] - -# # Make sure all list elements are integers -# if not all(isinstance(a, int) for a in addrs): -# raise ValueError("Read address must be an integer or list of integers.") - -# datas = [] -# for addr in addrs: -# yield la.addr_i.eq(addr) -# yield la.data_i.eq(0) -# yield la.rw_i.eq(0) -# yield la.valid_i.eq(1) -# yield -# yield la.addr_i.eq(0) -# yield la.valid_i.eq(0) - -# # wait for output to be valid -# while not (yield la.valid_o): -# yield - -# datas.append( (yield la.data_o) ) - -# return datas - la = LogicAnalyzerCore(config, base_addr=0, interface=None) -# interface = SimulatedBusInterface(la) -# la.registers.interface = interface # unironically the least cursed part of this -# la.sample_mem.interface = interface # unironically the least cursed part of this - def print_data_at_addr(addr): # place read transaction on the bus @@ -113,7 +55,7 @@ def test_do_you_fucking_work(): ) # right now this is not actually respected...oops # setting trigger location - yield from set_logic_analyzer_register("trigger_loc", 500) + yield from set_logic_analyzer_register("trigger_loc", 511) # starting capture yield from set_logic_analyzer_register("request_start", 1)