diff --git a/examples/amaranth/uart_logic_analyzer.py b/examples/amaranth/uart_logic_analyzer.py new file mode 100644 index 0000000..0d5a999 --- /dev/null +++ b/examples/amaranth/uart_logic_analyzer.py @@ -0,0 +1,78 @@ +from amaranth import * +from amaranth.lib import io +from manta import * +from manta.logic_analyzer import LogicAnalyzerCore, TriggerModes +from manta.uart import UARTInterface +from time import sleep + + +class UARTLogicAnalyzerExample(Elaboratable): + def __init__(self, platform, port): + self.platform = platform + + # Create Manta instance + self.manta = Manta() + + # Configure it to communicate over Ethernet + self.manta.interface = UARTInterface( + port=port, + baudrate=2000000, + clock_freq=platform.default_clk_frequency, + ) + + self.probe0 = Signal(1) + self.probe1 = Signal(2) + self.probe2 = Signal(3) + self.probe3 = Signal(4) + + self.manta.cores.la = LogicAnalyzerCore( + sample_depth=2048, + probes=[self.probe0, self.probe1, self.probe2, self.probe3], + ) + + def elaborate(self, platform): + m = Module() + + # Add Manta as a submodule + m.submodules.manta = self.manta + + counter = Signal(10) + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.probe0.eq(counter[0]) + m.d.comb += self.probe1.eq(counter[1:2]) + m.d.comb += self.probe2.eq(counter[3:5]) + m.d.comb += self.probe3.eq(counter[6:]) + + # Wire UART pins to the Manta instance + uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"}) + m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx) + m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx) + m.d.comb += self.manta.interface.rx.eq(uart_rx.i) + m.d.comb += uart_tx.o.eq(self.manta.interface.tx) + + return m + + def test(self): + # Build and program the FPGA + self.platform.build(self, do_program=True) + + # Take a capture + self.manta.cores.la.trigger_mode = TriggerModes.IMMEDIATE + cap = self.manta.cores.la.capture() + cap.export_vcd("capture.vcd") + cap.export_csv("capture.csv") + cap.export_playback_verilog("capture.v") + + +# Amaranth has a built-in build system, and well as a set of platform +# definitions for a huge number of FPGA boards. The class defined above is +# very generic, as it specifies a design independent of any particular FGPA +# board. This means that by changing which platform you pass UARTIOCoreExample +# below, you can port this example to any FPGA board! + +from amaranth_boards.icestick import ICEStickPlatform + +UARTLogicAnalyzerExample( + platform=ICEStickPlatform(), + port="/dev/serial/by-id/usb-Lattice_Lattice_FTUSB_Interface_Cable-if01-port0", +).test() diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py index f0fa8c1..12371f3 100644 --- a/src/manta/logic_analyzer/__init__.py +++ b/src/manta/logic_analyzer/__init__.py @@ -2,8 +2,8 @@ from amaranth import * from manta.utils import * from manta.memory_core import MemoryCore from manta.logic_analyzer.trigger_block import LogicAnalyzerTriggerBlock -from manta.logic_analyzer.fsm import LogicAnalyzerFSM, States, TriggerModes -from manta.logic_analyzer.playback import LogicAnalyzerPlayback +from manta.logic_analyzer.fsm import LogicAnalyzerFSM, TriggerModes +from manta.logic_analyzer.capture import LogicAnalyzerCapture import math @@ -20,42 +20,37 @@ class LogicAnalyzerCore(MantaCore): https://fischermoseley.github.io/manta/logic_analyzer_core/ """ - def __init__(self, config, base_addr, interface): - self._config = config - self._interface = interface - self._check_config() + def __init__(self, sample_depth, probes): + self._sample_depth = sample_depth + self._probes = probes + self.trigger_location = sample_depth // 2 + self.trigger_mode = TriggerModes.SINGLE_SHOT + self.triggers = [] # Bus Input/Output self.bus_i = Signal(InternalBus()) self.bus_o = Signal(InternalBus()) - self._probes = [ - Signal(width, name=name) for name, width in self._config["probes"].items() - ] - - # Submodules - self._fsm = LogicAnalyzerFSM(self._config, base_addr, self._interface) - self._trig_blk = LogicAnalyzerTriggerBlock( - self._probes, self._fsm.get_max_addr() + 1, self._interface - ) - - self._sample_mem = MemoryCore( - mode="fpga_to_host", - width=sum(self._config["probes"].values()), - depth=self._config["sample_depth"], - base_addr=self._trig_blk.get_max_addr() + 1, - interface=self._interface, - ) - @property def max_addr(self): + self.define_submodules() return self._sample_mem.max_addr @property def top_level_ports(self): return self._probes - def _check_config(self): + def to_config(self): + return { + "type": "logic_analyzer", + "sample_depth": self._sample_depth, + "trigger_location": self.trigger_location, + "probes": {p.name: len(p) for p in self._probes}, + "triggers": self.triggers, + } + + @classmethod + def from_config(cls, config): # Check for unrecognized options valid_options = [ "type", @@ -65,12 +60,12 @@ class LogicAnalyzerCore(MantaCore): "trigger_location", "trigger_mode", ] - for option in self._config: + for option in config: if option not in valid_options: warn(f"Ignoring unrecognized option '{option}' in Logic Analyzer.") # Check sample depth is provided and positive - sample_depth = self._config.get("sample_depth") + sample_depth = config.get("sample_depth") if not sample_depth: raise ValueError("Logic Analyzer must have sample_depth specified.") @@ -78,35 +73,35 @@ class LogicAnalyzerCore(MantaCore): raise ValueError("Logic Analyzer sample_depth must be a positive integer.") # Check probes - if "probes" not in self._config or len(self._config["probes"]) == 0: + if "probes" not in config or len(config["probes"]) == 0: raise ValueError("Logic Analyzer must have at least one probe specified.") - for name, width in self._config["probes"].items(): + for name, width in config["probes"].items(): if width < 0: raise ValueError(f"Width of probe {name} must be positive.") # Check trigger mode, if provided - trigger_mode = self._config.get("trigger_mode") + trigger_mode = config.get("trigger_mode") valid_modes = ["single_shot", "incremental", "immediate"] if trigger_mode and trigger_mode not in valid_modes: raise ValueError( - f"Unrecognized trigger mode {self._config['trigger_mode']} provided." + f"Unrecognized trigger mode {config['trigger_mode']} provided." ) # Check triggers if trigger_mode and trigger_mode != "immediate": - if "triggers" not in self._config or self._config["triggers"] == 0: + if "triggers" not in config or config["triggers"] == 0: raise ValueError( "Logic Analyzer must have at least one trigger specified if not running in immediate mode." ) # Check trigger location - trigger_location = self._config.get("trigger_location") + trigger_location = config.get("trigger_location") if trigger_location: if not isinstance(trigger_location, int) or trigger_location < 0: raise ValueError("Trigger location must be a positive integer.") - if trigger_location >= self._config["sample_depth"]: + if trigger_location >= config["sample_depth"]: raise ValueError("Trigger location must be less than sample depth.") if trigger_mode == "immediate": @@ -116,16 +111,16 @@ class LogicAnalyzerCore(MantaCore): # Check triggers themselves if trigger_mode == "immediate": - if "triggers" in self._config: + if "triggers" in config: warn( "Ignoring triggers as 'trigger_mode' is set to immediate, and there are no triggers to specify." ) else: - if ("triggers" not in self._config) or (len(self._config["triggers"]) == 0): + if ("triggers" not in config) or (len(config["triggers"]) == 0): raise ValueError("At least one trigger must be specified.") - for trigger in self._config.get("triggers"): + for trigger in config.get("triggers"): if not isinstance(trigger, str): raise ValueError("Trigger must be specified with a string.") @@ -156,9 +151,35 @@ class LogicAnalyzerCore(MantaCore): ) # Check probe names - if components[0] not in self._config["probes"]: + if components[0] not in config["probes"]: raise ValueError(f"Unknown probe name '{components[0]}' specified.") + # Checks complete, create LogicAnalyzerCore + probes = [Signal(width, name=name) for name, width in config["probes"].items()] + + return cls(config["sample_depth"], probes) + + def define_submodules(self): + self._fsm = LogicAnalyzerFSM( + sample_depth=self._sample_depth, + base_addr=self.base_addr, + interface=self.interface, + ) + + self._trig_blk = LogicAnalyzerTriggerBlock( + probes=self._probes, + base_addr=self._fsm.max_addr + 1, + interface=self.interface, + ) + + self._sample_mem = MemoryCore( + mode="fpga_to_host", + width=sum([len(p) for p in self._probes]), + depth=self._sample_depth, + ) + self._sample_mem.base_addr = self._trig_blk.max_addr + 1 + self._sample_mem.interface = self.interface + def elaborate(self, platform): m = Module() @@ -187,255 +208,41 @@ class LogicAnalyzerCore(MantaCore): return m - def capture(self, verbose=False): + def capture(self): """ Performs a capture, recording the state of all input probes to the FPGA's memory, and then returns that as a LogicAnalyzerCapture class on the host. """ - print_if_verbose = lambda x: print(x) if verbose else None - # If core is not in IDLE state, request that it return to IDLE - print_if_verbose(" -> Resetting core...") - state = self._fsm.registers.get_probe("state") - if state != States.IDLE: - self._fsm.registers.set_probe("request_start", 0) - self._fsm.registers.set_probe("request_stop", 0) - self._fsm.registers.set_probe("request_stop", 1) - self._fsm.registers.set_probe("request_stop", 0) + print(" -> Resetting core...") + self._fsm.stop_capture() - if self._fsm.registers.get_probe("state") != States.IDLE: - raise ValueError("Logic analyzer did not reset to IDLE state.") + print(" -> Setting triggers...") + self._trig_blk.set_triggers(self.triggers) - # Set triggers - print_if_verbose(" -> Setting triggers...") - self._trig_blk.clear_triggers() + print(" -> Setting trigger mode...") + self._fsm.write_register("trigger_mode", self.trigger_mode) - if self._config.get("trigger_mode") != "immediate": - self._trig_blk.set_triggers(self._config) + print(" -> Setting trigger location...") + self._fsm.write_register("trigger_location", self.trigger_location) - # Set trigger mode, default to single-shot if user didn't specify a mode - print_if_verbose(" -> Setting trigger mode...") - if "trigger_mode" in self._config: - mode = self._config["trigger_mode"].upper() - self._fsm.registers.set_probe("trigger_mode", TriggerModes[mode]) + print(" -> Starting capture...") + self._fsm.start_capture() - else: - self._fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT) + print(" -> Waiting for capture to complete...") + self._fsm.wait_for_capture() - # Set trigger location - print_if_verbose(" -> Setting trigger location...") - if "trigger_location" in self._config: - self._fsm.registers.set_probe( - "trigger_location", self._config["trigger_location"] - ) - - else: - self._fsm.registers.set_probe( - "trigger_location", self._config["sample_depth"] // 2 - ) - - # Send a start request to the state machine - print_if_verbose(" -> Starting capture...") - self._fsm.registers.set_probe("request_start", 0) - self._fsm.registers.set_probe("request_start", 1) - self._fsm.registers.set_probe("request_start", 0) - - # Poll the state machine's state, and wait for the capture to complete - print_if_verbose(" -> Waiting for capture to complete...") - while self._fsm.registers.get_probe("state") != States.CAPTURED: - pass - - # Read out the entirety of the sample memory - print_if_verbose(" -> Reading sample memory contents...") - addrs = list(range(self._config["sample_depth"])) + print(" -> Reading sample memory contents...") + addrs = list(range(self._sample_depth)) raw_capture = self._sample_mem.read(addrs) # Revolve the memory around the read_pointer, such that all the beginning # of the caputure is at the first element - print_if_verbose(" -> Checking read pointer and revolving memory...") - read_pointer = self._fsm.registers.get_probe("read_pointer") + print(" -> Checking read pointer and revolving memory...") + read_pointer = self._fsm.read_register("read_pointer") data = raw_capture[read_pointer:] + raw_capture[:read_pointer] - return LogicAnalyzerCapture(data, self._config, self._interface) - - -class LogicAnalyzerCapture: - """ - A container class for the data collected by a LogicAnalyzerCore. Contains - methods for exporting the data as a VCD waveform file, a Python list, a - CSV file, or a Verilog module. - """ - - def __init__(self, data, config, interface): - self._data = data - self._config = config - self._interface = interface - - def get_trigger_location(self): - """ - Gets the location of the trigger in the capture. This will match the - value of "trigger_location" provided in the configuration file at the - time of capture. - """ - - if "trigger_location" in self._config: - return self._config["trigger_location"] - - else: - return self._config["sample_depth"] // 2 - - def get_trace(self, probe_name): - """ - Gets the value of a single probe over the capture. - """ - - # Sum up the widths of all the probes below this one - lower = 0 - for name, width in self._config["probes"].items(): - if name == probe_name: - break - - lower += width - - # Add the width of the probe we'd like - upper = lower + self._config["probes"][probe_name] - - total_probe_width = sum(self._config["probes"].values()) - binary = [f"{d:0{total_probe_width}b}" for d in self._data] - return [int(b[lower:upper], 2) for b in binary] - - def export_csv(self, path): - """ - Export the capture to a CSV file, containing the data of all probes in - the core. - """ - - names = list(self._config["probes"].keys()) - values = [self.get_trace(n) for n in names] - - # Transpose list of lists so that data flows top-to-bottom instead of - # left-to-right - values_t = [list(x) for x in zip(*values)] - - import csv - - with open(path, "w") as f: - writer = csv.writer(f) - - writer.writerow(names) - writer.writerows(values_t) - - def export_vcd(self, path): - """ - Export the capture to a VCD file, containing the data of all probes in - the core. - """ - - from datetime import datetime - - from vcd import VCDWriter - - # Use the same datetime format that iVerilog uses - timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") - vcd_file = open(path, "w") - - # Compute the timescale from the frequency of the provided clock - half_period = 1 / (2 * self._interface._clock_freq) - exponent = math.floor(math.log10(half_period)) - exponent_eng = (exponent // 3) * 3 - - # The VCD file format specification supports no units larger or smaller - # than these - units = { - 0: "s", - -3: "ms", - -6: "us", - -9: "ns", - -12: "ps", - -15: "fs", - } - - timescale_unit = units[exponent_eng] - timescale_exponent = 10 ** (exponent - exponent_eng) - timescale_exact = half_period / (10**exponent) - timescale_integer = round(timescale_exact) - - if abs(timescale_exact - timescale_integer) > 1e-3: - warn("VCD file timescale will differ slightly from exact clock frequency.") - - timescale = (timescale_exponent, timescale_unit) - - with VCDWriter(vcd_file, timescale, timestamp, "manta") as writer: - # Each probe has a name, width, and writer associated with it - signals = [] - for name, width in self._config["probes"].items(): - signal = { - "name": name, - "width": width, - "data": self.get_trace(name), - "var": writer.register_var("manta", name, "wire", size=width), - } - signals.append(signal) - - clock = writer.register_var("manta", "clk", "wire", size=1) - - # Include a trigger signal such would be meaningful (ie, we didn't trigger immediately) - if ( - "trigger_mode" not in self._config - or self._config["trigger_mode"] == "single_shot" - ): - trigger = writer.register_var("manta", "trigger", "wire", size=1) - - # Add the data to each probe in the vcd file - for sample_index in range(0, 2 * len(self._data)): - sample_timestamp = timescale_integer * sample_index - - # Run the clock - writer.change(clock, sample_timestamp, sample_index % 2 == 0) - - # Set the trigger (if there is one) - if ( - "trigger_mode" not in self._config - or self._config["trigger_mode"] == "single_shot" - ): - triggered = (sample_index // 2) >= self.get_trigger_location() - writer.change(trigger, sample_timestamp, triggered) - - # Add other signals - for signal in signals: - var = signal["var"] - sample = signal["data"][sample_index // 2] - - writer.change(var, sample_timestamp, sample) - - vcd_file.close() - - def get_playback_module(self): - """ - Returns an Amaranth module that will playback the captured data. This - module is synthesizable, so it may be used in either simulation or - on the FPGA directly by including it your build process. - """ - - return LogicAnalyzerPlayback(self._data, self._config) - - def export_playback_verilog(self, path): - """ - Exports a Verilog module that will playback the captured data. This - module is synthesizable, so it may be used in either simulation or - on the FPGA directly by including it your build process. - """ - - lap = self.get_playback_module() - from amaranth.back import verilog - - with open(path, "w") as f: - f.write( - verilog.convert( - lap, - name="logic_analyzer_playback", - ports=lap.get_top_level_ports(), - strip_internal_attrs=True, - ) - ) + return LogicAnalyzerCapture( + self._probes, self.trigger_location, self.trigger_mode, data + ) diff --git a/src/manta/logic_analyzer/capture.py b/src/manta/logic_analyzer/capture.py new file mode 100644 index 0000000..d26b894 --- /dev/null +++ b/src/manta/logic_analyzer/capture.py @@ -0,0 +1,182 @@ +from manta.logic_analyzer.playback import LogicAnalyzerPlayback +from manta.logic_analyzer import TriggerModes + + +class LogicAnalyzerCapture: + """ + A container class for the data collected by a LogicAnalyzerCore. Contains + methods for exporting the data as a VCD waveform file, a Python list, a + CSV file, or a Verilog module. + """ + + def __init__(self, probes, trigger_location, trigger_mode, data): + self._probes = probes + self._trigger_location = trigger_location + self._trigger_mode = trigger_mode + self._data = data + + print(self._trigger_mode) + + def get_trigger_location(self): + """ + Returns the location of the trigger in the capture. This will match the + value of "trigger_location" provided in the configuration file at the + time of capture. + """ + + return self._trigger_location + + def get_trace(self, name): + """ + Gets the value of a single probe over the capture. + """ + + # Get index of probe with given name + indicies = [i for i, p in enumerate(self._probes) if p.name == name] + if len(indicies) == 0: + raise ValueError(f"Probe {name} not found in LogicAnalyzerCapture!") + + if len(indicies) > 1: + raise ValueError( + f"Probe {name} found multiple times in LogicAnalyzerCapture!" + ) + + idx = indicies[0] + + # Sum up the widths of all the probes below this one + lower = sum([len(p) for p in self._probes[:idx]]) + + # Add the width of the probe we'd like + upper = lower + len(self._probes[idx]) + + total_probe_width = sum([len(p) for p in self._probes]) + binary = [f"{d:0{total_probe_width}b}" for d in self._data] + return [int(b[lower:upper], 2) for b in binary] + + def export_csv(self, path): + """ + Export the capture to a CSV file, containing the data of all probes in + the core. + """ + + names = [p.name for p in self._probes] + values = [self.get_trace(n) for n in names] + + # Transpose list of lists so that data flows top-to-bottom instead of + # left-to-right + values_transpose = [list(x) for x in zip(*values)] + + import csv + + with open(path, "w") as f: + writer = csv.writer(f) + + writer.writerow(names) + writer.writerows(values_transpose) + + def export_vcd(self, path): + """ + Export the capture to a VCD file, containing the data of all probes in + the core. + """ + + from vcd import VCDWriter + from datetime import datetime + + # Compute the timescale from the frequency of the provided clock + half_period = 1 / (2 * self._interface._clock_freq) + exponent = math.floor(math.log10(half_period)) + exponent_eng = (exponent // 3) * 3 + + # The VCD file format specification supports no units larger or smaller + # than these + units = { + 0: "s", + -3: "ms", + -6: "us", + -9: "ns", + -12: "ps", + -15: "fs", + } + + timescale_unit = units[exponent_eng] + timescale_exponent = 10 ** (exponent - exponent_eng) + timescale_exact = half_period / (10**exponent) + timescale_integer = round(timescale_exact) + + if abs(timescale_exact - timescale_integer) > 1e-3: + warn("VCD file timescale will differ slightly from exact clock frequency.") + + timescale = (timescale_exponent, timescale_unit) + + # Use the same datetime format that iVerilog uses + timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") + vcd_file = open(path, "w") + + with VCDWriter(vcd_file, timescale, timestamp, "manta") as writer: + # Each probe has a name, width, and writer associated with it + signals = [] + for p in self._probes: + signal = { + "name": p.name, + "width": len(p), + "data": self.get_trace(p.name), + "var": writer.register_var("manta", p.name, "wire", size=len(p)), + } + signals.append(signal) + + clock = writer.register_var("manta", "clk", "wire", size=1) + + # Include a trigger signal such would be meaningful (ie, we didn't trigger immediately) + if self._trigger_mode == TriggerModes.SINGLE_SHOT: + trigger = writer.register_var("manta", "trigger", "wire", size=1) + + # Add the data to each probe in the vcd file + for sample_index in range(0, 2 * len(self._data)): + sample_timestamp = timescale_integer * sample_index + + # Run the clock + writer.change(clock, sample_timestamp, sample_index % 2 == 0) + + # Set the trigger (if there is one) + if self._trigger_mode == TriggerModes.SINGLE_SHOT: + triggered = (sample_index // 2) >= self._trigger_location + writer.change(trigger, sample_timestep, triggered) + + # Add other signals + for signal in signals: + var = signal["var"] + sample = signal["data"][sample_index // 2] + + writer.change(var, sample_timestamp, sample) + + vcd_file.close() + + def get_playback_module(self): + """ + Returns an Amaranth module that will playback the captured data. This + module is synthesizable, so it may be used in either simulation or + on the FPGA directly by including it your build process. + """ + + return LogicAnalyzerPlayback(self._probes, self._data) + + def export_playback_verilog(self, path): + """ + Exports a Verilog module that will playback the captured data. This + module is synthesizable, so it may be used in either simulation or + on the FPGA directly by including it your build process. + """ + + lap = self.get_playback_module() + from amaranth.back import verilog + + with open(path, "w") as f: + f.write( + verilog.convert( + lap, + name="logic_analyzer_playback", + ports=lap.get_top_level_ports(), + strip_internal_attrs=True, + ) + ) diff --git a/src/manta/logic_analyzer/fsm.py b/src/manta/logic_analyzer/fsm.py index 5eebb31..7e4966a 100644 --- a/src/manta/logic_analyzer/fsm.py +++ b/src/manta/logic_analyzer/fsm.py @@ -24,8 +24,8 @@ class LogicAnalyzerFSM(Elaboratable): memory in each trigger mode (immediate, incremental, single-shot). """ - def __init__(self, config, base_addr, interface): - self._sample_depth = config["sample_depth"] + def __init__(self, sample_depth, base_addr, interface): + self._sample_depth = sample_depth # Outputs to rest of Logic Analyzer self.trigger = Signal(1) @@ -53,18 +53,16 @@ class LogicAnalyzerFSM(Elaboratable): self.request_stop, ] - self.registers = IOCore(base_addr, interface, inputs, outputs) + self.registers = IOCore(inputs, outputs) + self.registers.base_addr = base_addr + self.registers.interface = interface # Bus Input/Output self.bus_i = self.registers.bus_i self.bus_o = self.registers.bus_o - def get_max_addr(self): - """ - Return the maximum addresses in memory used by the core. The address - space used by the core extends from `base_addr` to the number returned - by this function (including the endpoints). - """ + @property + def max_addr(self): return self.registers.max_addr def elaborate(self, platform): @@ -171,3 +169,32 @@ class LogicAnalyzerFSM(Elaboratable): m.d.sync += state.eq(States.IDLE) return m + + def stop_capture(self): + # If core is not in IDLE state, request that it return to IDLE + state = self.registers.get_probe("state") + if state != States.IDLE: + self.registers.set_probe("request_start", 0) + self.registers.set_probe("request_stop", 0) + self.registers.set_probe("request_stop", 1) + self.registers.set_probe("request_stop", 0) + + if self.registers.get_probe("state") != States.IDLE: + raise ValueError("Logic analyzer did not reset to IDLE state.") + + def start_capture(self): + # Send a start request to the state machine + self.registers.set_probe("request_start", 0) + self.registers.set_probe("request_start", 1) + self.registers.set_probe("request_start", 0) + + def wait_for_capture(self): + # Poll the state machine, and wait for the capture to complete + while self.registers.get_probe("state") != States.CAPTURED: + pass + + def read_register(self, name): + return self.registers.get_probe(name) + + def write_register(self, name, value): + return self.registers.set_probe(name, value) diff --git a/src/manta/logic_analyzer/playback.py b/src/manta/logic_analyzer/playback.py index 5ddb6d9..4940750 100644 --- a/src/manta/logic_analyzer/playback.py +++ b/src/manta/logic_analyzer/playback.py @@ -8,65 +8,59 @@ class LogicAnalyzerPlayback(Elaboratable): along with the config of the core used to take it. """ - def __init__(self, data, config): - self.data = data - self.config = config + def __init__(self, probes, data): + self._probes = probes + self._data = data # State Machine self.start = Signal(1) self.valid = Signal(1) - # Top-Level Probe signals - self.top_level_probes = {} - for name, width in self.config["probes"].items(): - self.top_level_probes[name] = Signal(width, name=name) + def elaborate(self, platform): + m = Module() # Instantiate memory self.mem = Memory( - depth=self.config["sample_depth"], - width=sum(self.config["probes"].values()), - init=self.data, + depth=len(self._data), + width=sum([len(p) for p in self._probes]), + init=self._data, ) - - self.read_port = self.mem.read_port() - - def elaborate(self, platform): - m = Module() m.submodules.mem = self.mem - m.d.comb += self.read_port.en.eq(1) + read_port = self.mem.read_port() + + m.d.comb += read_port.en.eq(1) # State Machine busy = Signal(1) with m.If(~busy): with m.If(self.start): m.d.sync += busy.eq(1) - # m.d.sync += self.read_port.addr.eq(1) + # m.d.sync += read_port.addr.eq(1) with m.Else(): - with m.If(self.read_port.addr == self.config["sample_depth"] - 1): + with m.If(read_port.addr == len(self._data) - 1): m.d.sync += busy.eq(0) - m.d.sync += self.read_port.addr.eq(0) + m.d.sync += read_port.addr.eq(0) with m.Else(): - m.d.sync += self.read_port.addr.eq(self.read_port.addr + 1) + m.d.sync += read_port.addr.eq(read_port.addr + 1) # Pipeline to accomodate for the 2-cycle latency in the RAM m.d.sync += self.valid.eq(busy) # Assign the probe values by part-selecting from the data port lower = 0 - for name, width in reversed(self.config["probes"].items()): - signal = self.top_level_probes[name] + for p in reversed(self._probes): # Set output probe to zero if we're not with m.If(self.valid): - m.d.comb += signal.eq(self.read_port.data[lower : lower + width]) + m.d.comb += p.eq(read_port.data[lower : lower + len(p)]) with m.Else(): - m.d.comb += signal.eq(0) + m.d.comb += p.eq(0) - lower += width + lower += len(p) return m @@ -75,4 +69,4 @@ class LogicAnalyzerPlayback(Elaboratable): Returns the Amaranth signals that should be included as ports in the exported Verilog module. """ - return [self.start, self.valid] + list(self.top_level_probes.values()) + return [self.start, self.valid] + self._probes diff --git a/src/manta/logic_analyzer/trigger_block.py b/src/manta/logic_analyzer/trigger_block.py index fbf7c0c..48051d0 100644 --- a/src/manta/logic_analyzer/trigger_block.py +++ b/src/manta/logic_analyzer/trigger_block.py @@ -19,7 +19,9 @@ class LogicAnalyzerTriggerBlock(Elaboratable): # Make IO core for everything ops = [t.op for t in self._triggers] args = [t.arg for t in self._triggers] - self.registers = IOCore(base_addr, interface, outputs=ops + args) + self.registers = IOCore(outputs=ops + args) + self.registers.base_addr = base_addr + self.registers.interface = interface # Bus Input/Output self.bus_i = self.registers.bus_i @@ -28,23 +30,18 @@ class LogicAnalyzerTriggerBlock(Elaboratable): # Global trigger. High if any probe is triggered. self.trig = Signal() - def get_max_addr(self): - """ - Return the maximum addresses in memory used by the core. The address - space used by the core extends from `base_addr` to the number returned - by this function (including the endpoints). - """ + @property + def max_addr(self): return self.registers.max_addr - def clear_triggers(self): + def set_triggers(self, triggers): # Reset all triggers to disabled with no argument for p in self._probes: self.registers.set_probe(p.name + "_op", Operations.DISABLE) self.registers.set_probe(p.name + "_arg", 0) - def set_triggers(self, config): # Set triggers - for trigger in config["triggers"]: + for trigger in triggers: components = trigger.strip().split(" ") # Handle triggers that don't need an argument diff --git a/src/manta/memory_core.py b/src/manta/memory_core.py index 2010a7a..ed8fa14 100644 --- a/src/manta/memory_core.py +++ b/src/manta/memory_core.py @@ -84,7 +84,7 @@ class MemoryCore(MantaCore): "type": "memory", "mode": self._mode, "width": self._width, - "depth": self._depth + "depth": self._depth, } @classmethod diff --git a/test/test_config_export.py b/test/test_config_export.py index c972699..8d1c86a 100644 --- a/test/test_config_export.py +++ b/test/test_config_export.py @@ -1,12 +1,15 @@ from manta import Manta from manta.io_core import IOCore from manta.memory_core import MemoryCore +from manta.logic_analyzer import LogicAnalyzerCore from manta.uart import UARTInterface +from manta.ethernet import EthernetInterface from amaranth import * import tempfile import os import yaml + def test_io_core_dump(): # Create some dummy signals to pass to the IO Core probe0 = Signal(1) @@ -16,10 +19,7 @@ def test_io_core_dump(): # Create Manta instance manta = Manta() - manta.cores.test_core = IOCore( - inputs = [probe0, probe1], - outputs = [probe2, probe3] - ) + manta.cores.test_core = IOCore(inputs=[probe0, probe1], outputs=[probe2, probe3]) # Create Temporary File tf = tempfile.NamedTemporaryFile(delete=False) @@ -36,21 +36,12 @@ def test_io_core_dump(): expected = { "cores": { "test_core": { - "type": "io", - "inputs": { - "probe0": 1, - "probe1": 2 - }, + "type": "io", + "inputs": {"probe0": 1, "probe1": 2}, "outputs": { - "probe2": { - "width": 3, - "initial_value": 0 - }, - "probe3": { - "width": 4, - "initial_value": 13 - } - } + "probe2": {"width": 3, "initial_value": 0}, + "probe3": {"width": 4, "initial_value": 13}, + }, } } } @@ -58,13 +49,14 @@ def test_io_core_dump(): if data != expected: raise ValueError("Exported YAML does not match configuration!") + def test_memory_core_dump(): # Create Manta instance manta = Manta() manta.cores.test_core = MemoryCore( - mode = "bidirectional", - width = 32, - depth = 1024, + mode="bidirectional", + width=32, + depth=1024, ) # Create Temporary File @@ -82,10 +74,10 @@ def test_memory_core_dump(): expected = { "cores": { "test_core": { - "type": "memory", - "mode":"bidirectional", - "width":32, - "depth":1024 + "type": "memory", + "mode": "bidirectional", + "width": 32, + "depth": 1024, } } } @@ -93,15 +85,53 @@ def test_memory_core_dump(): if data != expected: raise ValueError("Exported YAML does not match configuration!") + def test_logic_analyzer_core_dump(): - raise ValueError + # Create some dummy signals to pass to the Logic Analyzer + probe0 = Signal(1) + probe1 = Signal(2) + probe2 = Signal(3) + + # Create Manta instance + manta = Manta() + manta.cores.test_core = LogicAnalyzerCore( + sample_depth=2048, probes=[probe0, probe1, probe2] + ) + + # Create Temporary File + tf = tempfile.NamedTemporaryFile(delete=False) + tf.close() + + # Export Manta configuration + manta.export_config(tf.name) + + # Parse the exported YAML + with open(tf.name, "r") as f: + data = yaml.safe_load(f) + + print(tf.name) + + # Verify that exported YAML matches configuration + expected = { + "cores": { + "test_core": { + "type": "logic_analyzer", + "sample_depth": 2048, + "trigger_location": 1024, + "probes": {"probe0": 1, "probe1": 2, "probe2": 3}, + "triggers": [], + } + } + } + + if data != expected: + raise ValueError("Exported YAML does not match configuration!") + def test_uart_interface_dump(): manta = Manta() manta.interface = UARTInterface( - port = "/dev/ttyUSB0", - baudrate = 115200, - clock_freq = 100e6 + port="/dev/ttyUSB0", baudrate=115200, clock_freq=100e6 ) # Create Temporary File @@ -120,18 +150,54 @@ def test_uart_interface_dump(): "uart": { "port": "/dev/ttyUSB0", "baudrate": 115200, - # Be careful with the float comparison here, copy-pasting from the # exported YAML seems to have the best results. Otherwise this test # will fail when it shouldn't. "clock_freq": 100000000.0, - "chunk_size": 256 + "chunk_size": 256, } } if data != expected: raise ValueError("Exported YAML does not match configuration!") -def test_ethernet_interface_dump(): - raise ValueError +def test_ethernet_interface_dump(): + manta = Manta() + manta.interface = EthernetInterface( + fpga_ip_addr="192.168.0.101", + host_ip_addr="192.168.0.100", + udp_port=2000, + phy="", + clk_freq=0, + ) + + # Create Temporary File + tf = tempfile.NamedTemporaryFile(delete=False) + tf.close() + + # Export Manta configuration + manta.export_config(tf.name) + + # Parse the exported YAML + with open(tf.name, "r") as f: + data = yaml.safe_load(f) + + # Verify that exported YAML matches configuration + expected = { + "ethernet": { + "phy": "LiteEthPHYRMII", + "vendor": "xilinx", + "toolchain": "vivado", + # Be careful with the float comparison here, copy-pasting from the + # exported YAML seems to have the best results. Otherwise this test + # will fail when it shouldn't. + "clk_freq": 50000000.0, + "refclk_freq": 50000000.0, + "fpga_ip_addr": "192.168.0.101", + "host_ip_addr": "192.168.0.100", + } + } + + if data != expected: + raise ValueError("Exported YAML does not match configuration!")