From a11605b2b7b15b1f8c55fee10aa385cb62ad1a34 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Fri, 5 Jan 2024 16:50:25 -0800 Subject: [PATCH] refactor logic analyzer --- src/manta/cli.py | 1 + src/manta/logic_analyzer/__init__.py | 351 +++++++++++++ src/manta/logic_analyzer/fsm.py | 126 +++++ src/manta/logic_analyzer/playback.py | 68 +++ src/manta/logic_analyzer/sample_mem.py | 11 + src/manta/logic_analyzer/trigger_block.py | 126 +++++ src/manta/logic_analyzer_core.py | 591 ---------------------- src/manta/manta.py | 2 +- test/test_logic_analyzer_sim.py | 42 +- 9 files changed, 712 insertions(+), 606 deletions(-) create mode 100644 src/manta/logic_analyzer/__init__.py create mode 100644 src/manta/logic_analyzer/fsm.py create mode 100644 src/manta/logic_analyzer/playback.py create mode 100644 src/manta/logic_analyzer/sample_mem.py create mode 100644 src/manta/logic_analyzer/trigger_block.py delete mode 100644 src/manta/logic_analyzer_core.py diff --git a/src/manta/cli.py b/src/manta/cli.py index 4595f51..7528cf8 100644 --- a/src/manta/cli.py +++ b/src/manta/cli.py @@ -15,6 +15,7 @@ Usage: help Display this help menu """ + def help(): print(logo) diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py new file mode 100644 index 0000000..68d858d --- /dev/null +++ b/src/manta/logic_analyzer/__init__.py @@ -0,0 +1,351 @@ +from amaranth import * +from warnings import warn +from ..utils import * +from .trigger_block import LogicAnalyzerTriggerBlock +from .fsm import LogicAnalyzerFSM +from .sample_mem import LogicAnalyzerSampleMemory +from .playback import LogicAnalyzerPlayback + + +class LogicAnalyzerCore(Elaboratable): + """ """ + + def __init__(self, config, base_addr, interface): + self.config = config + self.check_config(config) + + # Bus Input + self.addr_i = Signal(16) + self.data_i = Signal(16) + self.rw_i = Signal(1) + self.valid_i = Signal(1) + + # Bus Output + self.addr_o = Signal(16) + self.data_o = Signal(16) + self.rw_o = Signal(1) + self.valid_o = Signal(1) + + self.probes = [ + Signal(width, name=name) for name, width in self.config["probes"].items() + ] + + # Submodules + self.fsm = LogicAnalyzerFSM(self.config, base_addr, interface) + self.trig_blk = LogicAnalyzerTriggerBlock( + self.config, self.fsm.get_max_addr() + 1, interface + ) + self.sample_mem = LogicAnalyzerSampleMemory( + self.config, self.trig_blk.get_max_addr() + 1, interface + ) + + # Top-Level Probes: + for name, width in self.config["probes"].items(): + if hasattr(self, name): + raise ValueError( + f"Unable to assign probe name '{name}' as it clashes with a reserved name in the backend. Please rename the probe." + ) + + setattr(self, name, Signal(width, name=name)) + + def check_config(self, config): + # Check for unrecognized options + valid_options = [ + "type", + "sample_depth", + "probes", + "triggers", + "trigger_loc", + "trigger_mode", + ] + for option in config: + if option not in valid_options: + warn(f"Ignoring unrecognized option '{option}' in Logic Analyzer.") + + # Check sample depth is provided and positive + if "sample_depth" not in config: + raise ValueError("Logic Analyzer must have sample_depth specified.") + + if not isinstance(config["sample_depth"], int): + raise ValueError("Logic Analyzer sample_depth must be an integer.") + + if config["sample_depth"] <= 0: + raise ValueError("Logic Analyzer sample_depth must be positive.") + + # Check probes + if "probes" not in config: + raise ValueError("Logic Analyzer must have at least one probe specified.") + + if len(config["probes"]) == 0: + raise ValueError("Logic Analyzer must have at least one probe specified.") + + for name, width in config["probes"].items(): + if width < 0: + raise ValueError(f"Width of probe {name} must be positive.") + + # Check triggers + if "triggers" not in config: + raise ValueError("Logic Analyzer must have at least one trigger specified.") + + if len(config["triggers"]) == 0: + raise ValueError("Logic Analyzer must have at least one trigger specified.") + + # Check trigger location + if "trigger_loc" in config: + if not isinstance(config["trigger_loc"], int): + raise ValueError("Trigger location must be an integer.") + + if config["trigger_loc"] < 0: + raise ValueError("Trigger location must be positive.") + + if config["trigger_loc"] > config["sample_depth"]: + raise ValueError("Trigger location cannot exceed sample depth.") + + # Check trigger mode, if provided + if "trigger_mode" in config: + valid_modes = ["single_shot", "incremental", "immediate"] + if config["trigger_mode"] not in valid_modes: + raise ValueError( + f"Unrecognized trigger mode {config['trigger_mode']} provided." + ) + + if config["trigger_mode"] == "incremental": + if "trigger_loc" in config: + warn( + "Ignoring option 'trigger_loc', as 'trigger_mode' is set to immediate, and there is no trigger condition to wait for." + ) + + # Check triggers themselves + for trigger in config["triggers"]: + if not isinstance(trigger, str): + raise ValueError("Trigger must be specified with a string.") + + # Trigger conditions may be composed of either two or three components, + # depending on the operation specified. In the case of operations that + # don't need an argument (like DISABLE, RISING, FALLING, CHANGING) or + # three statements in + + # Check the trigger operations + components = trigger.strip().split(" ") + if len(components) == 2: + name, op = components + if op not in ["DISABLE", "RISING", "FALLING", "CHANGING"]: + raise ValueError( + f"Unable to interpret trigger condition '{trigger}'." + ) + + elif len(components) == 3: + name, op, arg = components + if op not in ["GT", "LT", "GEQ", "LEQ", "EQ", "NEQ"]: + raise ValueError( + f"Unable to interpret trigger condition '{trigger}'." + ) + + else: + raise ValueError(f"Unable to interpret trigger condition '{trigger}'.") + + # Check probe names + if components[0] not in config["probes"]: + raise ValueError(f"Unknown probe name '{components[0]}' specified.") + + def elaborate(self, platform): + m = Module() + + # Add submodules + m.submodules["fsm"] = fsm = self.fsm + m.submodules["sample_mem"] = sample_mem = self.sample_mem + m.submodules["trig_blk"] = trig_blk = self.trig_blk + + # Concat all the probes together, and feed to input of sample memory + # (it is necessary to reverse the order such that first probe occupies + # the lowest location in memory) + m.d.comb += sample_mem.user_data.eq(Cat(self.probes[::-1])) + + # Wire bus connections between internal modules + m.d.comb += [ + # Bus Connections + fsm.addr_i.eq(self.addr_i), + fsm.data_i.eq(self.data_i), + fsm.rw_i.eq(self.rw_i), + fsm.valid_i.eq(self.valid_i), + trig_blk.addr_i.eq(fsm.addr_o), + trig_blk.data_i.eq(fsm.data_o), + trig_blk.rw_i.eq(fsm.rw_o), + trig_blk.valid_i.eq(fsm.valid_o), + sample_mem.addr_i.eq(trig_blk.addr_o), + sample_mem.data_i.eq(trig_blk.data_o), + sample_mem.rw_i.eq(trig_blk.rw_o), + sample_mem.valid_i.eq(trig_blk.valid_o), + self.addr_o.eq(sample_mem.addr_o), + self.data_o.eq(sample_mem.data_o), + self.rw_o.eq(sample_mem.rw_o), + self.valid_o.eq(sample_mem.valid_o), + # Non-bus Connections + fsm.trigger.eq(trig_blk.trig), + sample_mem.user_we.eq(fsm.write_enable), + ] + + return m + + def get_top_level_ports(self): + return self.probes + + def get_max_addr(self): + return self.sample_mem.get_max_addr() + + def set_triggers(self): + # reset all triggers to zero + for p in self.probes: + self.trig_blk.r.set_probe(p.name + "_op", 0) + self.trig_blk.r.set_probe(p.name + "_arg", 0) + + # set triggers + for trigger in self.config["triggers"]: + components = trigger.strip().split(" ") + + # Handle triggers that don't need an argument + if len(components) == 2: + name, op = components + self.trig_blk.r.set_probe(name + "_op", self.operations[op]) + + # Handle triggers that do need an argument + elif len(components) == 3: + name, op, arg = components + self.registers.set_probe(name + "_op", self.operations[op]) + self.registers.set_probe(name + "_arg", int(arg)) + + def capture(self, verbose=False): + print_if_verbose = lambda x: print(x) if verbose else None + + # If core is not in IDLE state, request that it return to IDLE + print_if_verbose(" -> Resetting core...") + state = self.registers.get_probe("state") + if state != self.states["IDLE"]: + self.registers.set_probe("request_stop", 0) + self.registers.set_probe("request_stop", 1) + self.registers.set_probe("request_stop", 0) + + if self.registers.get_probe("state") != self.states["IDLE"]: + raise ValueError("Logic analyzer did not reset to IDLE state.") + + # Set triggers + print_if_verbose(" -> Setting triggers...") + self.set_triggers() + + # Set trigger mode, default to single-shot if user didn't specify a mode + print_if_verbose(" -> Setting trigger mode...") + if "trigger_mode" in self.config: + self.registers.set_probe("trigger_mode", self.config["trigger_mode"]) + + else: + self.registers.set_probe("trigger_mode", self.trigger_modes["SINGLE_SHOT"]) + + # Set trigger location + print_if_verbose(" -> Setting trigger location...") + self.registers.set_probe("trigger_loc", self.config["trigger_loc"]) + + # Send a start request to the state machine + print_if_verbose(" -> Starting capture...") + self.registers.set_probe("request_start", 1) + self.registers.set_probe("request_start", 0) + + # Poll the state machine's state, and wait for the capture to complete + print_if_verbose(" -> Waiting for capture to complete...") + while self.registers.get_probe("state") != self.states["CAPTURED"]: + pass + + # Read out the entirety of the sample memory + print_if_verbose(" -> Reading sample memory contents...") + addrs = list(range(self.config["sample_depth"])) + raw_capture = self.sample_mem.read_from_user_addr(addrs) + + # Revolve the memory around the read_pointer, such that all the beginning + # of the caputure is at the first element + print_if_verbose(" -> Checking read pointer and revolving memory...") + read_pointer = self.registers.get_probe("read_pointer") + + data = raw_capture[read_pointer:] + raw_capture[:read_pointer] + return LogicAnalyzerCapture(data, self.config) + + +class LogicAnalyzerCapture: + def __init__(self, data, config): + self.data = data + self.config = config + + def get_trigger_loc(self): + return self.config["trigger_loc"] + + def get_trace(self, probe_name): + # sum up the widths of all the probes below this one + lower = 0 + for name, width in self.config["probes"].items(): + if name == probe_name: + break + + lower += width + + # add the width of the probe we'd like + upper = lower + self.config["probes"][probe_name] + + total_probe_width = sum(self.config["probes"].values()) + binary = [f"{d:0{total_probe_width}b}" for d in self.data] + return [int(b[lower:upper], 2) for b in binary] + + def export_vcd(self, path): + from vcd import VCDWriter + from datetime import datetime + + # Use the same datetime format that iVerilog uses + timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") + vcd_file = open(path, "w") + + with VCDWriter(vcd_file, "10 ns", timestamp, "manta") as writer: + # each probe has a name, width, and writer associated with it + signals = [] + for name, width in self.config["probes"].items(): + signal = { + "name": name, + "width": width, + "data": self.get_trace(name), + "var": writer.register_var("manta", name, "wire", size=width), + } + signals.append(signal) + + clock = writer.register_var("manta", "clk", "wire", size=1) + trigger = writer.register_var("manta", "trigger", "wire", size=1) + + # add the data to each probe in the vcd file + for timestamp in range(0, 2 * len(self.data)): + # run the clock + writer.change(clock, timestamp, timestamp % 2 == 0) + + # set the trigger + triggered = (timestamp // 2) >= self.get_trigger_loc() + writer.change(trigger, timestamp, triggered) + + # add other signals + for signal in signals: + var = signal["var"] + sample = signal["data"][timestamp // 2] + + writer.change(var, timestamp, sample) + + vcd_file.close() + + def export_playback_module(self): + return LogicAnalyzerPlayback(self.data, self.config) + + def export_playback_verilog(self, path): + lap = self.export_playback_module() + from amaranth.back import verilog + + with open(path, "w") as f: + f.write( + verilog.convert( + lap, + name="logic_analyzer_playback", + ports=lap.get_top_level_ports(), + strip_internal_attrs=True, + ) + ) diff --git a/src/manta/logic_analyzer/fsm.py b/src/manta/logic_analyzer/fsm.py new file mode 100644 index 0000000..b2f0059 --- /dev/null +++ b/src/manta/logic_analyzer/fsm.py @@ -0,0 +1,126 @@ +from amaranth import * +from math import ceil, log2 +from ..io_core import IOCore + + +class LogicAnalyzerFSM(Elaboratable): + """ """ + + def __init__(self, config, base_addr, interface): + self.config = config + self.states = { + "IDLE": 0, + "MOVE_TO_POSITION": 1, + "IN_POSITION": 2, + "CAPTURING": 3, + "CAPTURED": 4, + } + + self.trigger_modes = {"SINGLE_SHOT": 0, "INCREMENTAL": 1, "IMMEDIATE": 2} + + self.trigger = Signal(1) + self.write_enable = Signal(1) + + register_config = { + "inputs": { + "state": 4, + "read_pointer": ceil(log2(self.config["sample_depth"])), + "write_pointer": ceil(log2(self.config["sample_depth"])), + }, + "outputs": { + "trigger_location": ceil(log2(self.config["sample_depth"])), + "trigger_mode": 2, + "request_start": 1, + "request_stop": 1, + }, + } + + self.r = IOCore(register_config, base_addr, interface) + + # Bus Input + self.addr_i = self.r.addr_i + self.data_i = self.r.data_i + self.rw_i = self.r.rw_i + self.valid_i = self.r.valid_i + + # Bus Output + self.addr_o = self.r.addr_o + self.data_o = self.r.data_o + self.rw_o = self.r.rw_o + self.valid_o = self.r.valid_o + + def get_max_addr(self): + return self.r.get_max_addr() + + def increment_mod_sample_depth(self, m, signal): + # m.d.sync += signal.eq((signal + 1) % self.config["sample_depth"]) + + with m.If(signal == self.config["sample_depth"] - 1): + m.d.sync += signal.eq(0) + + with m.Else(): + m.d.sync += signal.eq(signal + 1) + + def elaborate(self, platform): + m = Module() + + m.submodules["registers"] = self.r + + prev_request_start = Signal(1) + prev_request_stop = Signal(1) + + # Rising edge detection for start/stop requests + m.d.sync += prev_request_start.eq(self.r.request_start) + m.d.sync += prev_request_stop.eq(self.r.request_stop) + + with m.If(self.r.state == self.states["IDLE"]): + m.d.sync += self.r.write_pointer.eq(0) + m.d.sync += self.r.read_pointer.eq(0) + m.d.sync += self.write_enable.eq(0) + + with m.If((self.r.request_start) & (~prev_request_start)): + m.d.sync += self.write_enable.eq(1) + with m.If(self.r.trigger_mode == self.trigger_modes["IMMEDIATE"]): + m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + + with m.Else(): + with m.If(self.r.trigger_location == 0): + m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) + + with m.Else(): + m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) + + m.d.sync += self.r.state.eq(self.states["MOVE_TO_POSITION"]) + + with m.Elif(self.r.state == self.states["MOVE_TO_POSITION"]): + m.d.sync += self.r.write_pointer.eq(self.r.write_pointer + 1) + + with m.If(self.r.write_pointer == self.r.trigger_location): + with m.If(self.trigger): + m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + + with m.Else(): + m.d.sync += self.r.state.eq(self.states["IN_POSITION"]) + self.increment_mod_sample_depth(m, self.r.read_pointer) + + with m.Elif(self.r.state == self.states["IN_POSITION"]): + self.increment_mod_sample_depth(m, self.r.write_pointer) + + with m.If(self.trigger): + m.d.sync += self.r.state.eq(self.states["CAPTURING"]) + + with m.Else(): + self.increment_mod_sample_depth(m, self.r.read_pointer) + + with m.Elif(self.r.state == self.states["CAPTURING"]): + with m.If(self.r.write_pointer == self.r.read_pointer): + m.d.sync += self.write_enable.eq(0) + m.d.sync += self.r.state.eq(self.states["CAPTURED"]) + + with m.Else(): + self.increment_mod_sample_depth(m, self.r.write_pointer) + + with m.If((self.r.request_stop) & (~prev_request_stop)): + m.d.sync += self.r.state.eq(self.states["IDLE"]) + + return m diff --git a/src/manta/logic_analyzer/playback.py b/src/manta/logic_analyzer/playback.py new file mode 100644 index 0000000..d5eea8e --- /dev/null +++ b/src/manta/logic_analyzer/playback.py @@ -0,0 +1,68 @@ +from amaranth import * + + +class LogicAnalyzerPlayback(Elaboratable): + def __init__(self, data, config): + self.data = data + self.config = config + + # State Machine + self.start = Signal(1) + self.valid = Signal(1) + + # Top-Level Probe signals + self.top_level_probes = {} + for name, width in self.config["probes"].items(): + self.top_level_probes[name] = Signal(width, name=name) + + # Instantiate memory + self.mem = Memory( + depth=self.config["sample_depth"], + width=sum(self.config["probes"].values()), + init=self.data, + ) + + self.read_port = self.mem.read_port() + + def elaborate(self, platform): + m = Module() + m.submodules["mem"] = self.mem + + m.d.comb += self.read_port.en.eq(1) + + # State Machine + busy = Signal(1) + with m.If(~busy): + with m.If(self.start): + m.d.sync += busy.eq(1) + # m.d.sync += self.read_port.addr.eq(1) + + with m.Else(): + with m.If(self.read_port.addr == self.config["sample_depth"] - 1): + m.d.sync += busy.eq(0) + m.d.sync += self.read_port.addr.eq(0) + + with m.Else(): + m.d.sync += self.read_port.addr.eq(self.read_port.addr + 1) + + # Pipeline to accomodate for the 2-cycle latency in the RAM + m.d.sync += self.valid.eq(busy) + + # Assign the probe values by part-selecting from the data port + lower = 0 + for name, width in reversed(self.config["probes"].items()): + signal = self.top_level_probes[name] + + # Set output probe to zero if we're not + with m.If(self.valid): + m.d.comb += signal.eq(self.read_port.data[lower : lower + width]) + + with m.Else(): + m.d.comb += signal.eq(0) + + lower += width + + return m + + def get_top_level_ports(self): + return [self.start, self.valid] + list(self.top_level_probes.values()) diff --git a/src/manta/logic_analyzer/sample_mem.py b/src/manta/logic_analyzer/sample_mem.py new file mode 100644 index 0000000..b6806fd --- /dev/null +++ b/src/manta/logic_analyzer/sample_mem.py @@ -0,0 +1,11 @@ +from amaranth import * +from ..memory_core import ReadOnlyMemoryCore + + +class LogicAnalyzerSampleMemory(ReadOnlyMemoryCore): + def __init__(self, config, base_addr, interface): + width = sum(config["probes"].values()) + depth = config["sample_depth"] + mem_config = {"width": width, "depth": depth} + + super().__init__(mem_config, base_addr, interface) diff --git a/src/manta/logic_analyzer/trigger_block.py b/src/manta/logic_analyzer/trigger_block.py new file mode 100644 index 0000000..0958109 --- /dev/null +++ b/src/manta/logic_analyzer/trigger_block.py @@ -0,0 +1,126 @@ +from amaranth import * +from ..io_core import IOCore + + +class LogicAnalyzerTriggerBlock(Elaboratable): + """ """ + + def __init__(self, config, base_addr, interface): + self.config = config + + # Instantiate a bunch of trigger blocks + self.probes = [ + Signal(width, name=name) for name, width in self.config["probes"].items() + ] + self.triggers = [LogicAnalyzerTrigger(p) for p in self.probes] + + # Make IO core for everything + outputs = {} + for p in self.probes: + outputs[p.name + "_arg"] = p.width + outputs[p.name + "_op"] = 4 + + self.r = IOCore({"outputs": outputs}, base_addr, interface) + + # Bus Input + self.addr_i = self.r.addr_i + self.data_i = self.r.data_i + self.rw_i = self.r.rw_i + self.valid_i = self.r.valid_i + + # Bus Output + self.addr_o = self.r.addr_o + self.data_o = self.r.data_o + self.rw_o = self.r.rw_o + self.valid_o = self.r.valid_o + + # Global trigger. High if any probe is triggered. + self.trig = Signal(1) + + def get_max_addr(self): + return self.r.get_max_addr() + + def elaborate(self, platform): + m = Module() + + # Add IO Core as submodule + m.submodules["registers"] = self.r + + # Add triggers as submodules + for t in self.triggers: + m.submodules[t.signal.name + "_trigger"] = t + + for probe, trigger in zip(self.probes, self.triggers): + # Connect trigger input signals to top-level signals + m.d.comb += trigger.signal.eq(probe) + + # Connect IO core registers to triggers + m.d.comb += trigger.arg.eq(getattr(self.r, probe.name + "_arg")) + m.d.comb += trigger.op.eq(getattr(self.r, probe.name + "_op")) + + m.d.comb += self.trig.eq(Cat([t.triggered for t in self.triggers]).any()) + + return m + + +class LogicAnalyzerTrigger(Elaboratable): + def __init__(self, signal): + self.operations = { + "DISABLE": 0, + "RISING": 1, + "FALLING": 2, + "CHANGING": 3, + "GT": 4, + "LT": 5, + "GEQ": 6, + "LEQ": 7, + "EQ": 8, + "NEQ": 9, + } + + self.signal = signal + self.op = Signal(range(len(self.operations))) + self.arg = Signal().like(signal) + self.triggered = Signal(1) + + def elaborate(self, platform): + m = Module() + + # Save previous value to register for edge detection + prev = Signal().like(self.signal) + m.d.sync += prev.eq(self.signal) + + with m.If(self.op == self.operations["DISABLE"]): + m.d.comb += self.triggered.eq(0) + + with m.Elif(self.op == self.operations["RISING"]): + m.d.comb += self.triggered.eq((self.signal) & (~prev)) + + with m.Elif(self.op == self.operations["FALLING"]): + m.d.comb += self.triggered.eq((~self.signal) & (prev)) + + with m.Elif(self.op == self.operations["CHANGING"]): + m.d.comb += self.triggered.eq(self.signal != prev) + + with m.Elif(self.op == self.operations["GT"]): + m.d.comb += self.triggered.eq(self.signal > self.arg) + + with m.Elif(self.op == self.operations["LT"]): + m.d.comb += self.triggered.eq(self.signal < self.arg) + + with m.Elif(self.op == self.operations["GEQ"]): + m.d.comb += self.triggered.eq(self.signal >= self.arg) + + with m.Elif(self.op == self.operations["LEQ"]): + m.d.comb += self.triggered.eq(self.signal <= self.arg) + + with m.Elif(self.op == self.operations["EQ"]): + m.d.comb += self.triggered.eq(self.signal == self.arg) + + with m.Elif(self.op == self.operations["NEQ"]): + m.d.comb += self.triggered.eq(self.signal != self.arg) + + with m.Else(): + m.d.comb += self.triggered.eq(0) + + return m diff --git a/src/manta/logic_analyzer_core.py b/src/manta/logic_analyzer_core.py deleted file mode 100644 index 8b40794..0000000 --- a/src/manta/logic_analyzer_core.py +++ /dev/null @@ -1,591 +0,0 @@ -from amaranth import * -from warnings import warn -from .utils import * -from .io_core import IOCore -from .memory_core import ReadOnlyMemoryCore -from math import ceil, log2 - - -class LogicAnalyzerCore(Elaboratable): - """ """ - - def __init__(self, config, base_addr, interface): - self.config = config - self.base_addr = base_addr - self.interface = interface - - self.check_config(config) - - # State Machine Values - self.states = { - "IDLE": 0, - "MOVE_TO_POSITION": 1, - "IN_POSITION": 2, - "CAPTURING": 3, - "CAPTURED": 4, - } - - # Trigger Modes - self.trigger_modes = {"SINGLE_SHOT": 0, "INCREMENTAL": 1, "IMMEDIATE": 2} - - # Trigger operations - self.operations = { - "DISABLE": 0, - "RISING": 1, - "FALLING": 2, - "CHANGING": 3, - "GT": 4, - "LT": 5, - "GEQ": 6, - "LEQ": 7, - "EQ": 8, - "NEQ": 9, - } - - self.registers = self.make_registers(self.base_addr) - self.sample_mem = self.make_sample_mem(self.registers.max_addr) - self.define_signals() - - def check_config(self, config): - # Check for unrecognized options - valid_options = [ - "type", - "sample_depth", - "probes", - "triggers", - "trigger_loc", - "trigger_mode", - ] - for option in config: - if option not in valid_options: - warn(f"Ignoring unrecognized option '{option}' in Logic Analyzer.") - - # Check sample depth is provided and positive - if "sample_depth" not in config: - raise ValueError("Logic Analyzer must have sample_depth specified.") - - if not isinstance(config["sample_depth"], int): - raise ValueError("Logic Analyzer sample_depth must be an integer.") - - if config["sample_depth"] <= 0: - raise ValueError("Logic Analyzer sample_depth must be positive.") - - # Check probes - if "probes" not in config: - raise ValueError("Logic Analyzer must have at least one probe specified.") - - if len(config["probes"]) == 0: - raise ValueError("Logic Analyzer must have at least one probe specified.") - - for name, width in config["probes"].items(): - if width < 0: - raise ValueError(f"Width of probe {name} must be positive.") - - # Check triggers - if "triggers" not in config: - raise ValueError("Logic Analyzer must have at least one trigger specified.") - - if len(config["triggers"]) == 0: - raise ValueError("Logic Analyzer must have at least one trigger specified.") - - # Check trigger location - if "trigger_loc" in config: - if not isinstance(config["trigger_loc"], int): - raise ValueError("Trigger location must be an integer.") - - if config["trigger_loc"] < 0: - raise ValueError("Trigger location must be positive.") - - if config["trigger_loc"] > config["sample_depth"]: - raise ValueError("Trigger location cannot exceed sample depth.") - - # Check trigger mode - if "trigger_mode" in config: - valid_modes = ["single_shot", "incremental", "immediate"] - if config["trigger_mode"] not in valid_modes: - raise ValueError( - f"Unrecognized trigger mode {config['trigger_mode']} provided." - ) - - # Check triggers themselves - for trigger in config["triggers"]: - if not isinstance(trigger, str): - raise ValueError("Trigger must be specified with a string.") - - # Trigger conditions may be composed of either two or three components, - # depending on the operation specified. In the case of operations that - # don't need an argument (like DISABLE, RISING, FALLING, CHANGING) or - # three statements in - - # Check the trigger operations - components = trigger.strip().split(" ") - if len(components) == 2: - name, op = components - if op not in ["DISABLE", "RISING", "FALLING", "CHANGING"]: - raise ValueError( - f"Unable to interpret trigger condition '{trigger}'." - ) - - elif len(components) == 3: - name, op, arg = components - if op not in ["GT", "LT", "GEQ", "LEQ", "EQ", "NEQ"]: - raise ValueError( - f"Unable to interpret trigger condition '{trigger}'." - ) - - else: - raise ValueError(f"Unable to interpret trigger condition '{trigger}'.") - - # Check probe names - if components[0] not in config["probes"]: - raise ValueError(f"Unknown probe name '{components[0]}' specified.") - - def define_signals(self): - # Bus Input - self.addr_i = Signal(16) - self.data_i = Signal(16) - self.rw_i = Signal(1) - self.valid_i = Signal(1) - - # Bus Output - self.addr_o = Signal(16) - self.data_o = Signal(16) - self.rw_o = Signal(1) - self.valid_o = Signal(1) - - # Probes - self.probe_signals = {} - for name, width in self.config["probes"].items(): - self.probe_signals[name] = { - "top_level": Signal(width), - "prev": Signal(width), - "trigger_arg": getattr(self.registers, f"{name}_arg"), - "trigger_op": getattr(self.registers, f"{name}_op"), - "triggered": Signal(1), - } - - # Global trigger. High if any probe is triggered. - self.trig = Signal(1) - - def make_registers(self, base_addr): - # The logic analyzer uses an IO core to handle inputs to the FSM and trigger comparators - register_config = { - "inputs": { - "state": 4, - "read_pointer": ceil(log2(self.config["sample_depth"])), - "write_pointer": ceil(log2(self.config["sample_depth"])), - }, - "outputs": { - "trigger_loc": ceil(log2(self.config["sample_depth"])), - "trigger_mode": 2, - "request_start": 1, - "request_stop": 1, - }, - } - - for name, width in self.config["probes"].items(): - register_config["outputs"][name + "_arg"] = width - register_config["outputs"][name + "_op"] = 4 - - return IOCore(register_config, base_addr, self.interface) - - def make_sample_mem(self, base_addr): - sample_mem_config = { - "width": sum(self.config["probes"].values()), - "depth": self.config["sample_depth"], - } - - return ReadOnlyMemoryCore(sample_mem_config, base_addr, self.interface) - - def run_triggers(self, m): - # Run the trigger for each individual probe - for name, attrs in self.probe_signals.items(): - top_level = attrs["top_level"] - prev = attrs["prev"] - trigger_arg = attrs["trigger_arg"] - trigger_op = attrs["trigger_op"] - triggered = attrs["triggered"] - - # Save the previous value to a register so we can do rising/falling edge detection later! - m.d.sync += prev.eq(top_level) - - with m.If(trigger_op == self.operations["DISABLE"]): - m.d.comb += triggered.eq(0) - - with m.Elif(trigger_op == self.operations["RISING"]): - m.d.comb += triggered.eq((top_level) & (~prev)) - - with m.Elif(trigger_op == self.operations["FALLING"]): - m.d.comb += triggered.eq((~top_level) & (prev)) - - with m.Elif(trigger_op == self.operations["CHANGING"]): - m.d.comb += triggered.eq(top_level != prev) - - with m.Elif(trigger_op == self.operations["GT"]): - m.d.comb += triggered.eq(top_level > trigger_arg) - - with m.Elif(trigger_op == self.operations["LT"]): - m.d.comb += triggered.eq(top_level < trigger_arg) - - with m.Elif(trigger_op == self.operations["GEQ"]): - m.d.comb += triggered.eq(top_level >= trigger_arg) - - with m.Elif(trigger_op == self.operations["LEQ"]): - m.d.comb += triggered.eq(top_level <= trigger_arg) - - with m.Elif(trigger_op == self.operations["EQ"]): - m.d.comb += triggered.eq(top_level == trigger_arg) - - with m.Elif(trigger_op == self.operations["NEQ"]): - m.d.comb += triggered.eq(top_level != trigger_arg) - - with m.Else(): - m.d.comb += triggered.eq(0) - - # Combine all the triggers - m.d.comb += self.trig.eq( - Cat(attrs["triggered"] for attrs in self.probe_signals.values()).any() - ) - - def increment_mod_sample_depth(self, m, signal): - # m.d.sync += signal.eq((signal + 1) % self.config["sample_depth"]) - - with m.If(signal == self.config["sample_depth"] - 1): - m.d.sync += signal.eq(0) - - with m.Else(): - m.d.sync += signal.eq(signal + 1) - - def run_state_machine(self, m): - prev_request_start = Signal(1) - prev_request_stop = Signal(1) - - request_start = self.registers.request_start - request_stop = self.registers.request_stop - trigger_mode = self.registers.trigger_mode - trigger_loc = self.registers.trigger_loc - state = self.registers.state - rp = self.registers.read_pointer - wp = self.registers.write_pointer - we = self.sample_mem.user_we - - m.d.comb += self.sample_mem.user_addr.eq(wp) - - # Rising edge detection for start/stop requests - m.d.sync += prev_request_start.eq(request_start) - m.d.sync += prev_request_stop.eq(request_stop) - - with m.If(state == self.states["IDLE"]): - m.d.sync += wp.eq(0) - m.d.sync += rp.eq(0) - m.d.sync += we.eq(0) - - with m.If((request_start) & (~prev_request_start)): - m.d.sync += we.eq(1) - with m.If(trigger_mode == self.trigger_modes["IMMEDIATE"]): - m.d.sync += state.eq(self.states["CAPTURING"]) - - with m.Else(): - with m.If(trigger_loc == 0): - m.d.sync += state.eq(self.states["IN_POSITION"]) - - with m.Else(): - m.d.sync += state.eq(self.states["MOVE_TO_POSITION"]) - - m.d.sync += state.eq(self.states["MOVE_TO_POSITION"]) - - with m.Elif(state == self.states["MOVE_TO_POSITION"]): - m.d.sync += wp.eq(wp + 1) - - with m.If(wp == trigger_loc): - with m.If(self.trig): - m.d.sync += state.eq(self.states["CAPTURING"]) - - with m.Else(): - m.d.sync += state.eq(self.states["IN_POSITION"]) - self.increment_mod_sample_depth(m, rp) - - with m.Elif(state == self.states["IN_POSITION"]): - self.increment_mod_sample_depth(m, wp) - - with m.If(self.trig): - m.d.sync += state.eq(self.states["CAPTURING"]) - - with m.Else(): - self.increment_mod_sample_depth(m, rp) - - with m.Elif(state == self.states["CAPTURING"]): - with m.If(wp == rp): - m.d.sync += we.eq(0) - m.d.sync += state.eq(self.states["CAPTURED"]) - - with m.Else(): - self.increment_mod_sample_depth(m, wp) - - with m.If((request_stop) & (~prev_request_stop)): - m.d.sync += state.eq(self.states["IDLE"]) - - def elaborate(self, platform): - m = Module() - - # Add registers and sample memory as submodules - m.submodules["registers"] = self.registers - m.submodules["sample_mem"] = self.sample_mem - - # Concat all the probes together, and feed to input of sample memory - # (it is necessary to reverse the order such that first probe occupies - # the lowest location in memory) - m.d.comb += self.sample_mem.user_data.eq( - Cat([p["top_level"] for p in self.probe_signals.values()][::-1]) - ) - - self.run_state_machine(m) - self.run_triggers(m) - - # Wire internal modules - m.d.comb += [ - self.registers.addr_i.eq(self.addr_i), - self.registers.data_i.eq(self.data_i), - self.registers.rw_i.eq(self.rw_i), - self.registers.valid_i.eq(self.valid_i), - self.sample_mem.addr_i.eq(self.registers.addr_o), - self.sample_mem.data_i.eq(self.registers.data_o), - self.sample_mem.rw_i.eq(self.registers.rw_o), - self.sample_mem.valid_i.eq(self.registers.valid_o), - self.addr_o.eq(self.sample_mem.addr_o), - self.data_o.eq(self.sample_mem.data_o), - self.rw_o.eq(self.sample_mem.rw_o), - self.valid_o.eq(self.sample_mem.valid_o), - ] - - return m - - def get_top_level_ports(self): - return [p["top_level"] for p in self.probe_signals.values()] - - def get_max_addr(self): - return self.sample_mem.get_max_addr() - - def set_triggers(self): - # reset all triggers to zero - for name in self.probe_signals.keys(): - self.registers.set_probe(name + "_op", 0) - self.registers.set_probe(name + "_arg", 0) - - # set triggers - for trigger in self.config["triggers"]: - components = trigger.strip().split(" ") - - # Handle triggers that don't need an argument - if len(components) == 2: - name, op = components - self.registers.set_probe(name + "_op", self.operations[op]) - - # Handle triggers that do need an argument - elif len(components) == 3: - name, op, arg = components - self.registers.set_probe(name + "_op", self.operations[op]) - self.registers.set_probe(name + "_arg", int(arg)) - - def capture(self, verbose=False): - print_if_verbose = lambda x: print(x) if verbose else None - - # If core is not in IDLE state, request that it return to IDLE - print_if_verbose(" -> Resetting core...") - state = self.registers.get_probe("state") - if state != self.states["IDLE"]: - self.registers.set_probe("request_stop", 0) - self.registers.set_probe("request_stop", 1) - self.registers.set_probe("request_stop", 0) - - if self.registers.get_probe("state") != self.states["IDLE"]: - raise ValueError("Logic analyzer did not reset to IDLE state.") - - # Set triggers - print_if_verbose(" -> Setting triggers...") - self.set_triggers() - - # Set trigger mode, default to single-shot if user didn't specify a mode - print_if_verbose(" -> Setting trigger mode...") - if "trigger_mode" in self.config: - self.registers.set_probe("trigger_mode", self.config["trigger_mode"]) - - else: - self.registers.set_probe("trigger_mode", self.trigger_modes["SINGLE_SHOT"]) - - # Set trigger location - print_if_verbose(" -> Setting trigger location...") - self.registers.set_probe("trigger_loc", self.config["trigger_loc"]) - - # Send a start request to the state machine - print_if_verbose(" -> Starting capture...") - self.registers.set_probe("request_start", 1) - self.registers.set_probe("request_start", 0) - - # Poll the state machine's state, and wait for the capture to complete - print_if_verbose(" -> Waiting for capture to complete...") - while self.registers.get_probe("state") != self.states["CAPTURED"]: - pass - - # Read out the entirety of the sample memory - print_if_verbose(" -> Reading sample memory contents...") - addrs = list(range(self.config["sample_depth"])) - raw_capture = self.sample_mem.read_from_user_addr(addrs) - - # Revolve the memory around the read_pointer, such that all the beginning - # of the caputure is at the first element - print_if_verbose(" -> Checking read pointer and revolving memory...") - read_pointer = self.registers.get_probe("read_pointer") - - data = raw_capture[read_pointer:] + raw_capture[:read_pointer] - return LogicAnalyzerCapture(data, self.config) - - -class LogicAnalyzerCapture: - def __init__(self, data, config): - self.data = data - self.config = config - - def get_trigger_loc(self): - return self.config["trigger_loc"] - - def get_trace(self, probe_name): - # sum up the widths of all the probes below this one - lower = 0 - for name, width in self.config["probes"].items(): - if name == probe_name: - break - - lower += width - - # add the width of the probe we'd like - upper = lower + self.config["probes"][probe_name] - - total_probe_width = sum(self.config["probes"].values()) - binary = [f"{d:0{total_probe_width}b}" for d in self.data] - return [int(b[lower:upper], 2) for b in binary] - - def export_vcd(self, path): - from vcd import VCDWriter - from datetime import datetime - - # Use the same datetime format that iVerilog uses - timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") - vcd_file = open(path, "w") - - with VCDWriter(vcd_file, "10 ns", timestamp, "manta") as writer: - # each probe has a name, width, and writer associated with it - signals = [] - for name, width in self.config["probes"].items(): - signal = { - "name": name, - "width": width, - "data": self.get_trace(name), - "var": writer.register_var("manta", name, "wire", size=width), - } - signals.append(signal) - - clock = writer.register_var("manta", "clk", "wire", size=1) - trigger = writer.register_var("manta", "trigger", "wire", size=1) - - # add the data to each probe in the vcd file - for timestamp in range(0, 2 * len(self.data)): - # run the clock - writer.change(clock, timestamp, timestamp % 2 == 0) - - # set the trigger - triggered = (timestamp // 2) >= self.get_trigger_loc() - writer.change(trigger, timestamp, triggered) - - # add other signals - for signal in signals: - var = signal["var"] - sample = signal["data"][timestamp // 2] - - writer.change(var, timestamp, sample) - - vcd_file.close() - - def export_playback_module(self): - return LogicAnalyzerPlayback(self.data, self.config) - - def export_playback_verilog(self, path): - lap = self.export_playback_module() - from amaranth.back import verilog - - with open(path, "w") as f: - f.write( - verilog.convert( - lap, - name="logic_analyzer_playback", - ports=lap.get_top_level_ports(), - strip_internal_attrs=True, - ) - ) - - -class LogicAnalyzerPlayback(Elaboratable): - def __init__(self, data, config): - self.data = data - self.config = config - - # State Machine - self.start = Signal(1) - self.valid = Signal(1) - - # Top-Level Probe signals - self.top_level_probes = {} - for name, width in self.config["probes"].items(): - self.top_level_probes[name] = Signal(width, name=name) - - # Instantiate memory - self.mem = Memory( - depth=self.config["sample_depth"], - width=sum(self.config["probes"].values()), - init=self.data, - ) - - self.read_port = self.mem.read_port() - - def elaborate(self, platform): - m = Module() - m.submodules["mem"] = self.mem - - m.d.comb += self.read_port.en.eq(1) - - # State Machine - busy = Signal(1) - with m.If(~busy): - with m.If(self.start): - m.d.sync += busy.eq(1) - # m.d.sync += self.read_port.addr.eq(1) - - with m.Else(): - with m.If(self.read_port.addr == self.config["sample_depth"] - 1): - m.d.sync += busy.eq(0) - m.d.sync += self.read_port.addr.eq(0) - - with m.Else(): - m.d.sync += self.read_port.addr.eq(self.read_port.addr + 1) - - # Pipeline to accomodate for the 2-cycle latency in the RAM - m.d.sync += self.valid.eq(busy) - - # Assign the probe values by part-selecting from the data port - lower = 0 - for name, width in reversed(self.config["probes"].items()): - signal = self.top_level_probes[name] - - # Set output probe to zero if we're not - with m.If(self.valid): - m.d.comb += signal.eq(self.read_port.data[lower : lower + width]) - - with m.Else(): - m.d.comb += signal.eq(0) - - lower += width - - return m - - def get_top_level_ports(self): - return [self.start, self.valid] + list(self.top_level_probes.values()) diff --git a/src/manta/manta.py b/src/manta/manta.py index 073f55a..97f1b4c 100644 --- a/src/manta/manta.py +++ b/src/manta/manta.py @@ -5,7 +5,7 @@ from .uart import UARTInterface # from .ethernet import EthernetInterface from .io_core import IOCore from .memory_core import ReadOnlyMemoryCore -from .logic_analyzer_core import LogicAnalyzerCore +from .logic_analyzer import LogicAnalyzerCore class Manta(Elaboratable): diff --git a/test/test_logic_analyzer_sim.py b/test/test_logic_analyzer_sim.py index 0f37aae..01594dd 100644 --- a/test/test_logic_analyzer_sim.py +++ b/test/test_logic_analyzer_sim.py @@ -1,5 +1,5 @@ from amaranth.sim import Simulator -from manta.logic_analyzer_core import LogicAnalyzerCore +from manta.logic_analyzer import LogicAnalyzerCore from manta.utils import * from random import sample @@ -13,6 +13,7 @@ config = { la = LogicAnalyzerCore(config, base_addr=0, interface=None) + def print_data_at_addr(addr): # place read transaction on the bus yield la.addr_i.eq(addr) @@ -30,43 +31,56 @@ def print_data_at_addr(addr): print(f"addr: {hex(addr)} data: {hex((yield la.data_o))}") -def set_logic_analyzer_register(name, data): - addr = la.registers.mmap[f"{name}_buf"]["addrs"][0] +def set_fsm_register(name, data): + addr = la.fsm.r.mmap[f"{name}_buf"]["addrs"][0] yield from write_register(la, 0, 0) yield from write_register(la, addr, data) yield from write_register(la, 0, 1) yield from write_register(la, 0, 0) +def set_trig_blk_register(name, data): + addr = la.trig_blk.r.mmap[f"{name}_buf"]["addrs"][0] + + yield from write_register(la, 0, 0) + yield from write_register(la, addr, data) + yield from write_register(la, 0, 1) + yield from write_register(la, 0, 0) + +def set_probe(name, value): + probe = None + for p in la.probes: + if p.name == name: + probe = p + + yield p.eq(value) def test_do_you_fucking_work(): def testbench(): # # ok nice what happens if we try to run the core, which includes: - yield from set_logic_analyzer_register("request_stop", 1) - yield from set_logic_analyzer_register("request_stop", 0) + yield from set_fsm_register("request_stop", 1) + yield from set_fsm_register("request_stop", 0) # setting triggers - yield from set_logic_analyzer_register("curly_op", la.operations["EQ"]) - yield from set_logic_analyzer_register("curly_arg", 4) + yield from set_trig_blk_register("curly_op", la.trig_blk.triggers[0].operations["EQ"]) + yield from set_trig_blk_register("curly_arg", 4) # setting trigger mode - yield from set_logic_analyzer_register( - "trigger_mode", 0 - ) # right now this is not actually respected...oops + yield from set_fsm_register("trigger_mode", 0) # setting trigger location - yield from set_logic_analyzer_register("trigger_loc", 511) + yield from set_fsm_register("trigger_location", 511) # starting capture - yield from set_logic_analyzer_register("request_start", 1) - yield from set_logic_analyzer_register("request_start", 0) + yield from set_fsm_register("request_start", 1) + yield from set_fsm_register("request_start", 0) # wait a few hundred clock cycles, see what happens for _ in range(700): yield # provide the trigger condition - yield la.probe_signals["curly"]["top_level"].eq(4) + yield from set_probe("curly", 4) for _ in range(700): yield