diff --git a/.gitignore b/.gitignore index 5dc2efa..48226f2 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ build/ *.sv *.vcd *.out +*.csv # Vivado files from the occasional debugging sesh *.log diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py index d88cbef..20a8ef8 100644 --- a/src/manta/logic_analyzer/__init__.py +++ b/src/manta/logic_analyzer/__init__.py @@ -19,27 +19,27 @@ class LogicAnalyzerCore(Elaboratable): """ def __init__(self, config, base_addr, interface): - self.config = config - self.check_config(config) + self._config = config + self._check_config() # Bus Input/Output self.bus_i = Signal(InternalBus()) self.bus_o = Signal(InternalBus()) - self.probes = [ - Signal(width, name=name) for name, width in self.config["probes"].items() + self._probes = [ + Signal(width, name=name) for name, width in self._config["probes"].items() ] # Submodules - self.fsm = LogicAnalyzerFSM(self.config, base_addr, interface) - self.trig_blk = LogicAnalyzerTriggerBlock( - self.probes, self.fsm.get_max_addr() + 1, interface + self._fsm = LogicAnalyzerFSM(self._config, base_addr, interface) + self._trig_blk = LogicAnalyzerTriggerBlock( + self._probes, self._fsm.get_max_addr() + 1, interface ) - self.sample_mem = LogicAnalyzerSampleMemory( - self.config, self.trig_blk.get_max_addr() + 1, interface + self._sample_mem = LogicAnalyzerSampleMemory( + self._config, self._trig_blk.get_max_addr() + 1, interface ) - def check_config(self, config): + def _check_config(self): # Check for unrecognized options valid_options = [ "type", @@ -49,12 +49,12 @@ class LogicAnalyzerCore(Elaboratable): "trigger_location", "trigger_mode", ] - for option in config: + for option in self._config: if option not in valid_options: warn(f"Ignoring unrecognized option '{option}' in Logic Analyzer.") # Check sample depth is provided and positive - sample_depth = config.get("sample_depth") + sample_depth = self._config.get("sample_depth") if not sample_depth: raise ValueError("Logic Analyzer must have sample_depth specified.") @@ -62,35 +62,35 @@ class LogicAnalyzerCore(Elaboratable): raise ValueError("Logic Analyzer sample_depth must be a positive integer.") # Check probes - if "probes" not in config or len(config["probes"]) == 0: + if "probes" not in self._config or len(self._config["probes"]) == 0: raise ValueError("Logic Analyzer must have at least one probe specified.") - for name, width in config["probes"].items(): + for name, width in self._config["probes"].items(): if width < 0: raise ValueError(f"Width of probe {name} must be positive.") # Check trigger mode, if provided - trigger_mode = config.get("trigger_mode") + trigger_mode = self._config.get("trigger_mode") valid_modes = ["single_shot", "incremental", "immediate"] if trigger_mode and trigger_mode not in valid_modes: raise ValueError( - f"Unrecognized trigger mode {config['trigger_mode']} provided." + f"Unrecognized trigger mode {self._config['trigger_mode']} provided." ) # Check triggers if trigger_mode and trigger_mode != "immediate": - if "triggers" not in config or config["triggers"] == 0: + if "triggers" not in self._config or self._config["triggers"] == 0: raise ValueError( "Logic Analyzer must have at least one trigger specified if not running in immediate mode." ) # Check trigger location - trigger_location = config.get("trigger_location") + trigger_location = self._config.get("trigger_location") if trigger_location: if not isinstance(trigger_location, int) or trigger_location < 0: raise ValueError("Trigger location must be a positive integer.") - if trigger_location >= config["sample_depth"]: + if trigger_location >= self._config["sample_depth"]: raise ValueError("Trigger location must be less than sample depth.") if trigger_mode == "immediate": @@ -100,16 +100,16 @@ class LogicAnalyzerCore(Elaboratable): # Check triggers themselves if trigger_mode == "immediate": - if "triggers" in config: + if "triggers" in self._config: warn( "Ignoring triggers as 'trigger_mode' is set to immediate, and there are no triggers to specify." ) else: - if ("triggers" not in config) or (len(config["triggers"]) == 0): + if ("triggers" not in self._config) or (len(self._config["triggers"]) == 0): raise ValueError("At least one trigger must be specified.") - for trigger in config.get("triggers"): + for trigger in self._config.get("triggers"): if not isinstance(trigger, str): raise ValueError("Trigger must be specified with a string.") @@ -140,33 +140,33 @@ class LogicAnalyzerCore(Elaboratable): ) # Check probe names - if components[0] not in config["probes"]: + if components[0] not in self._config["probes"]: raise ValueError(f"Unknown probe name '{components[0]}' specified.") def elaborate(self, platform): m = Module() # Add submodules - m.submodules.fsm = fsm = self.fsm - m.submodules.sample_mem = sample_mem = self.sample_mem - m.submodules.trig_blk = trig_blk = self.trig_blk + m.submodules.fsm = self._fsm + m.submodules.sample_mem = self._sample_mem + m.submodules.trig_blk = self._trig_blk # Concat all the probes together, and feed to input of sample memory # (it is necessary to reverse the order such that first probe occupies # the lowest location in memory) - m.d.comb += sample_mem.user_data.eq(Cat(self.probes[::-1])) + m.d.comb += self._sample_mem.user_data.eq(Cat(self._probes[::-1])) # Wire bus connections between internal modules m.d.comb += [ # Bus Connections - fsm.bus_i.eq(self.bus_i), - trig_blk.bus_i.eq(self.fsm.bus_o), - sample_mem.bus_i.eq(trig_blk.bus_o), - self.bus_o.eq(sample_mem.bus_o), + self._fsm.bus_i.eq(self.bus_i), + self._trig_blk.bus_i.eq(self._fsm.bus_o), + self._sample_mem.bus_i.eq(self._trig_blk.bus_o), + self.bus_o.eq(self._sample_mem.bus_o), # Non-bus Connections - fsm.trigger.eq(trig_blk.trig), - sample_mem.user_addr.eq(fsm.write_pointer), - sample_mem.user_we.eq(fsm.write_enable), + self._fsm.trigger.eq(self._trig_blk.trig), + self._sample_mem.user_addr.eq(self._fsm.write_pointer), + self._sample_mem.user_we.eq(self._fsm.write_enable), ] return m @@ -176,14 +176,7 @@ class LogicAnalyzerCore(Elaboratable): Return the Amaranth signals that should be included as ports in the top-level Manta module. """ - return self.probes - - def get_probe(self, name): - for p in self.probes: - if p.name == name: - return p - - raise ValueError(f"Probe '{name}' not found in Logic Analyzer core.") + return self._probes def get_max_addr(self): """ @@ -191,167 +184,150 @@ class LogicAnalyzerCore(Elaboratable): space used by the core extends from `base_addr` to the number returned by this function. """ - return self.sample_mem.get_max_addr() + return self._sample_mem.get_max_addr() def capture(self, verbose=False): - """Perform a capture, recording the state of all input probes to the FPGA's memory, and - then reading that out on the host. - - Parameters: - ---------- - verbose : bool - Whether or not to print the status of the capture to stdout as it progresses. - Defaults to False. - - Returns: - ---------- - An instance of LogicAnalyzerCapture. + """ + Performs a capture, recording the state of all input probes to the + FPGA's memory, and then returns that as a LogicAnalyzerCapture class + on the host. """ print_if_verbose = lambda x: print(x) if verbose else None # If core is not in IDLE state, request that it return to IDLE print_if_verbose(" -> Resetting core...") - state = self.fsm.registers.get_probe("state") + state = self._fsm.registers.get_probe("state") if state != States.IDLE: - self.fsm.registers.set_probe("request_start", 0) - self.fsm.registers.set_probe("request_stop", 0) - self.fsm.registers.set_probe("request_stop", 1) - self.fsm.registers.set_probe("request_stop", 0) + self._fsm.registers.set_probe("request_start", 0) + self._fsm.registers.set_probe("request_stop", 0) + self._fsm.registers.set_probe("request_stop", 1) + self._fsm.registers.set_probe("request_stop", 0) - if self.fsm.registers.get_probe("state") != States.IDLE: + if self._fsm.registers.get_probe("state") != States.IDLE: raise ValueError("Logic analyzer did not reset to IDLE state.") # Set triggers print_if_verbose(" -> Setting triggers...") - self.trig_blk.clear_triggers() + self._trig_blk.clear_triggers() - if self.config.get("trigger_mode") != "immediate": - self.trig_blk.set_triggers(self.config) + if self._config.get("trigger_mode") != "immediate": + self._trig_blk.set_triggers(self._config) # Set trigger mode, default to single-shot if user didn't specify a mode print_if_verbose(" -> Setting trigger mode...") - if "trigger_mode" in self.config: - mode = self.config["trigger_mode"].upper() - self.fsm.registers.set_probe("trigger_mode", TriggerModes[mode]) + if "trigger_mode" in self._config: + mode = self._config["trigger_mode"].upper() + self._fsm.registers.set_probe("trigger_mode", TriggerModes[mode]) else: - self.fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT) + self._fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT) # Set trigger location print_if_verbose(" -> Setting trigger location...") - if "trigger_location" in self.config: - self.fsm.registers.set_probe( - "trigger_location", self.config["trigger_location"] + if "trigger_location" in self._config: + self._fsm.registers.set_probe( + "trigger_location", self._config["trigger_location"] ) else: - self.fsm.registers.set_probe( - "trigger_location", self.config["sample_depth"] // 2 + self._fsm.registers.set_probe( + "trigger_location", self._config["sample_depth"] // 2 ) # Send a start request to the state machine print_if_verbose(" -> Starting capture...") - self.fsm.registers.set_probe("request_start", 0) - self.fsm.registers.set_probe("request_start", 1) - self.fsm.registers.set_probe("request_start", 0) + self._fsm.registers.set_probe("request_start", 0) + self._fsm.registers.set_probe("request_start", 1) + self._fsm.registers.set_probe("request_start", 0) # Poll the state machine's state, and wait for the capture to complete print_if_verbose(" -> Waiting for capture to complete...") - while self.fsm.registers.get_probe("state") != States.CAPTURED: + while self._fsm.registers.get_probe("state") != States.CAPTURED: pass # Read out the entirety of the sample memory print_if_verbose(" -> Reading sample memory contents...") - addrs = list(range(self.config["sample_depth"])) - raw_capture = self.sample_mem.read_from_user_addr(addrs) + addrs = list(range(self._config["sample_depth"])) + raw_capture = self._sample_mem.read_from_user_addr(addrs) # Revolve the memory around the read_pointer, such that all the beginning # of the caputure is at the first element print_if_verbose(" -> Checking read pointer and revolving memory...") - read_pointer = self.fsm.registers.get_probe("read_pointer") + read_pointer = self._fsm.registers.get_probe("read_pointer") data = raw_capture[read_pointer:] + raw_capture[:read_pointer] - return LogicAnalyzerCapture(data, self.config) + return LogicAnalyzerCapture(data, self._config) class LogicAnalyzerCapture: - """A container for the data collected during a capture from a LogicAnalyzerCore. Contains - methods for exporting the data as a VCD waveform file, or as a Verilog module for playing - back captured data in simulation/synthesis. - - Parameters: - ---------- - data : list[int] - The raw captured data taken by the LogicAnalyzerCore. This consists of the values of - all the input probes concatenated together at every timestep. - - config : dict - The configuration of the LogicAnalyzerCore that took this capture. + """ + A container class for the data collected by a LogicAnalyzerCore. Contains + methods for exporting the data as a VCD waveform file, a Python list, a + CSV file, or a Verilog module. """ def __init__(self, data, config): - self.data = data - self.config = config + self._data = data + self._config = config def get_trigger_location(self): - """Gets the location of the trigger in the capture. This will match the value of - "trigger_location" provided in the configuration file at the time of capture. - - Parameters: - ---------- - None - - Returns: - ---------- - The trigger location as an `int`. """ - if "trigger_location" in self.config: - return self.config["trigger_location"] + Gets the location of the trigger in the capture. This will match the + value of "trigger_location" provided in the configuration file at the + time of capture. + """ + + if "trigger_location" in self._config: + return self._config["trigger_location"] else: - return self.config["sample_depth"] // 2 + return self._config["sample_depth"] // 2 def get_trace(self, probe_name): - """Gets the value of a single probe over the capture. - - Parameters: - ---------- - probe_name : int - The name of the probe in the LogicAnalyzer Core. This must match the name provided - in the configuration file. - - Returns: - ---------- - The value of the probe at every timestep in the capture, as a list of integers. + """ + Gets the value of a single probe over the capture. """ # sum up the widths of all the probes below this one lower = 0 - for name, width in self.config["probes"].items(): + for name, width in self._config["probes"].items(): if name == probe_name: break lower += width # add the width of the probe we'd like - upper = lower + self.config["probes"][probe_name] + upper = lower + self._config["probes"][probe_name] - total_probe_width = sum(self.config["probes"].values()) - binary = [f"{d:0{total_probe_width}b}" for d in self.data] + total_probe_width = sum(self._config["probes"].values()) + binary = [f"{d:0{total_probe_width}b}" for d in self._data] return [int(b[lower:upper], 2) for b in binary] + def export_csv(self, path): + """ + Export the capture to a CSV file, containing the data of all probes in + the core. + """ + + names = list(self._config["probes"].keys()) + values = [self.get_trace(n) for n in names] + + # Transpose list of lists so that data flows top-to-bottom instead of + # left-to-right + values_t = [list(x) for x in zip(*values)] + + import csv + with open(path, 'w') as f: + writer = csv.writer(f) + + writer.writerow(names) + writer.writerows(values_t) + + def export_vcd(self, path): - """Export the capture to a VCD file, containing the data of all probes in the core. - - Parameters: - ---------- - path : str - The path of the output file, including the ".vcd" file extension. - - Returns: - ---------- - None - + """ + Export the capture to a VCD file, containing the data of all probes in + the core. """ from vcd import VCDWriter @@ -364,7 +340,7 @@ class LogicAnalyzerCapture: with VCDWriter(vcd_file, "10 ns", timestamp, "manta") as writer: # each probe has a name, width, and writer associated with it signals = [] - for name, width in self.config["probes"].items(): + for name, width in self._config["probes"].items(): signal = { "name": name, "width": width, @@ -377,20 +353,20 @@ class LogicAnalyzerCapture: # include a trigger signal such would be meaningful (ie, we didn't trigger immediately) if ( - "trigger_mode" not in self.config - or self.config["trigger_mode"] == "single_shot" + "trigger_mode" not in self._config + or self._config["trigger_mode"] == "single_shot" ): trigger = writer.register_var("manta", "trigger", "wire", size=1) # add the data to each probe in the vcd file - for timestamp in range(0, 2 * len(self.data)): + for timestamp in range(0, 2 * len(self._data)): # run the clock writer.change(clock, timestamp, timestamp % 2 == 0) # set the trigger (if there is one) if ( - "trigger_mode" not in self.config - or self.config["trigger_mode"] == "single_shot" + "trigger_mode" not in self._config + or self._config["trigger_mode"] == "single_shot" ): triggered = (timestamp // 2) >= self.get_trigger_location() writer.change(trigger, timestamp, triggered) @@ -405,31 +381,19 @@ class LogicAnalyzerCapture: vcd_file.close() def get_playback_module(self): - """Gets an Amaranth module that will playback the captured data. This module is - synthesizable, so it may be used in either simulation or synthesis. - - Parameters: - ---------- - None - - Returns: - ---------- - An instance of LogicAnalyzerPlayback, which is a synthesizable Amaranth module. """ - return LogicAnalyzerPlayback(self.data, self.config) + Returns an Amaranth module that will playback the captured data. This + module is synthesizable, so it may be used in either simulation or + on the FPGA directly by including it your build process. + """ + + return LogicAnalyzerPlayback(self._data, self._config) def export_playback_verilog(self, path): - """Exports a Verilog module that will playback the captured data. This module is - synthesizable, so it may be used in either simulation or synthesis. - - Parameters: - ---------- - path : str - The path of the output file, including the ".v" file extension. - - Returns: - ---------- - None + """ + Exports a Verilog module that will playback the captured data. This + module is synthesizable, so it may be used in either simulation or + on the FPGA directly by including it your build process. """ lap = self.get_playback_module() diff --git a/src/manta/logic_analyzer/playback.py b/src/manta/logic_analyzer/playback.py index 3fc72e4..5ddb6d9 100644 --- a/src/manta/logic_analyzer/playback.py +++ b/src/manta/logic_analyzer/playback.py @@ -3,10 +3,9 @@ from amaranth import * class LogicAnalyzerPlayback(Elaboratable): """ - A synthesizable module that plays back data captured by a LogicAnalyzerCore. - - Takes a list of all the samples captured by a core, along with the config - of the core used to take it. + A synthesizable module that plays back data captured by a + LogicAnalyzerCore. Takes a list of all the samples captured by a core, + along with the config of the core used to take it. """ def __init__(self, data, config): @@ -73,7 +72,7 @@ class LogicAnalyzerPlayback(Elaboratable): def get_top_level_ports(self): """ - Return the Amaranth signals that should be included as ports in the + Returns the Amaranth signals that should be included as ports in the exported Verilog module. """ return [self.start, self.valid] + list(self.top_level_probes.values()) diff --git a/test/test_logic_analyzer_hw.py b/test/test_logic_analyzer_hw.py index 12e8c41..d1dd695 100644 --- a/test/test_logic_analyzer_hw.py +++ b/test/test_logic_analyzer_hw.py @@ -36,9 +36,9 @@ class LogicAnalyzerCounterTest(Elaboratable): m.submodules.manta = self.manta uart_pins = platform.request("uart") - larry = self.manta.la.probes[0] - curly = self.manta.la.probes[1] - moe = self.manta.la.probes[2] + larry = self.manta.la._probes[0] + curly = self.manta.la._probes[1] + moe = self.manta.la._probes[2] m.d.sync += larry.eq(larry + 1) m.d.sync += curly.eq(curly + 1) @@ -61,11 +61,14 @@ class LogicAnalyzerCounterTest(Elaboratable): # check that VCD export works cap.export_vcd("out.vcd") + # check that CSV export works + cap.export_csv("out.csv") + # check that Verilog export works cap.export_playback_verilog("out.v") # verify that each signal is just a counter modulo the width of the signal - for name, width in self.manta.la.config["probes"].items(): + for name, width in self.manta.la._config["probes"].items(): trace = cap.get_trace(name) for i in range(len(trace) - 1):