enforce consistent docstrings and underscores in logic analyzer core

This commit is contained in:
Fischer Moseley 2024-02-19 11:23:11 -08:00
parent e2450ddbff
commit 7ee51158d2
4 changed files with 138 additions and 171 deletions

1
.gitignore vendored
View File

@ -14,6 +14,7 @@ build/
*.sv
*.vcd
*.out
*.csv
# Vivado files from the occasional debugging sesh
*.log

View File

@ -19,27 +19,27 @@ class LogicAnalyzerCore(Elaboratable):
"""
def __init__(self, config, base_addr, interface):
self.config = config
self.check_config(config)
self._config = config
self._check_config()
# Bus Input/Output
self.bus_i = Signal(InternalBus())
self.bus_o = Signal(InternalBus())
self.probes = [
Signal(width, name=name) for name, width in self.config["probes"].items()
self._probes = [
Signal(width, name=name) for name, width in self._config["probes"].items()
]
# Submodules
self.fsm = LogicAnalyzerFSM(self.config, base_addr, interface)
self.trig_blk = LogicAnalyzerTriggerBlock(
self.probes, self.fsm.get_max_addr() + 1, interface
self._fsm = LogicAnalyzerFSM(self._config, base_addr, interface)
self._trig_blk = LogicAnalyzerTriggerBlock(
self._probes, self._fsm.get_max_addr() + 1, interface
)
self.sample_mem = LogicAnalyzerSampleMemory(
self.config, self.trig_blk.get_max_addr() + 1, interface
self._sample_mem = LogicAnalyzerSampleMemory(
self._config, self._trig_blk.get_max_addr() + 1, interface
)
def check_config(self, config):
def _check_config(self):
# Check for unrecognized options
valid_options = [
"type",
@ -49,12 +49,12 @@ class LogicAnalyzerCore(Elaboratable):
"trigger_location",
"trigger_mode",
]
for option in config:
for option in self._config:
if option not in valid_options:
warn(f"Ignoring unrecognized option '{option}' in Logic Analyzer.")
# Check sample depth is provided and positive
sample_depth = config.get("sample_depth")
sample_depth = self._config.get("sample_depth")
if not sample_depth:
raise ValueError("Logic Analyzer must have sample_depth specified.")
@ -62,35 +62,35 @@ class LogicAnalyzerCore(Elaboratable):
raise ValueError("Logic Analyzer sample_depth must be a positive integer.")
# Check probes
if "probes" not in config or len(config["probes"]) == 0:
if "probes" not in self._config or len(self._config["probes"]) == 0:
raise ValueError("Logic Analyzer must have at least one probe specified.")
for name, width in config["probes"].items():
for name, width in self._config["probes"].items():
if width < 0:
raise ValueError(f"Width of probe {name} must be positive.")
# Check trigger mode, if provided
trigger_mode = config.get("trigger_mode")
trigger_mode = self._config.get("trigger_mode")
valid_modes = ["single_shot", "incremental", "immediate"]
if trigger_mode and trigger_mode not in valid_modes:
raise ValueError(
f"Unrecognized trigger mode {config['trigger_mode']} provided."
f"Unrecognized trigger mode {self._config['trigger_mode']} provided."
)
# Check triggers
if trigger_mode and trigger_mode != "immediate":
if "triggers" not in config or config["triggers"] == 0:
if "triggers" not in self._config or self._config["triggers"] == 0:
raise ValueError(
"Logic Analyzer must have at least one trigger specified if not running in immediate mode."
)
# Check trigger location
trigger_location = config.get("trigger_location")
trigger_location = self._config.get("trigger_location")
if trigger_location:
if not isinstance(trigger_location, int) or trigger_location < 0:
raise ValueError("Trigger location must be a positive integer.")
if trigger_location >= config["sample_depth"]:
if trigger_location >= self._config["sample_depth"]:
raise ValueError("Trigger location must be less than sample depth.")
if trigger_mode == "immediate":
@ -100,16 +100,16 @@ class LogicAnalyzerCore(Elaboratable):
# Check triggers themselves
if trigger_mode == "immediate":
if "triggers" in config:
if "triggers" in self._config:
warn(
"Ignoring triggers as 'trigger_mode' is set to immediate, and there are no triggers to specify."
)
else:
if ("triggers" not in config) or (len(config["triggers"]) == 0):
if ("triggers" not in self._config) or (len(self._config["triggers"]) == 0):
raise ValueError("At least one trigger must be specified.")
for trigger in config.get("triggers"):
for trigger in self._config.get("triggers"):
if not isinstance(trigger, str):
raise ValueError("Trigger must be specified with a string.")
@ -140,33 +140,33 @@ class LogicAnalyzerCore(Elaboratable):
)
# Check probe names
if components[0] not in config["probes"]:
if components[0] not in self._config["probes"]:
raise ValueError(f"Unknown probe name '{components[0]}' specified.")
def elaborate(self, platform):
m = Module()
# Add submodules
m.submodules.fsm = fsm = self.fsm
m.submodules.sample_mem = sample_mem = self.sample_mem
m.submodules.trig_blk = trig_blk = self.trig_blk
m.submodules.fsm = self._fsm
m.submodules.sample_mem = self._sample_mem
m.submodules.trig_blk = self._trig_blk
# Concat all the probes together, and feed to input of sample memory
# (it is necessary to reverse the order such that first probe occupies
# the lowest location in memory)
m.d.comb += sample_mem.user_data.eq(Cat(self.probes[::-1]))
m.d.comb += self._sample_mem.user_data.eq(Cat(self._probes[::-1]))
# Wire bus connections between internal modules
m.d.comb += [
# Bus Connections
fsm.bus_i.eq(self.bus_i),
trig_blk.bus_i.eq(self.fsm.bus_o),
sample_mem.bus_i.eq(trig_blk.bus_o),
self.bus_o.eq(sample_mem.bus_o),
self._fsm.bus_i.eq(self.bus_i),
self._trig_blk.bus_i.eq(self._fsm.bus_o),
self._sample_mem.bus_i.eq(self._trig_blk.bus_o),
self.bus_o.eq(self._sample_mem.bus_o),
# Non-bus Connections
fsm.trigger.eq(trig_blk.trig),
sample_mem.user_addr.eq(fsm.write_pointer),
sample_mem.user_we.eq(fsm.write_enable),
self._fsm.trigger.eq(self._trig_blk.trig),
self._sample_mem.user_addr.eq(self._fsm.write_pointer),
self._sample_mem.user_we.eq(self._fsm.write_enable),
]
return m
@ -176,14 +176,7 @@ class LogicAnalyzerCore(Elaboratable):
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
return self.probes
def get_probe(self, name):
for p in self.probes:
if p.name == name:
return p
raise ValueError(f"Probe '{name}' not found in Logic Analyzer core.")
return self._probes
def get_max_addr(self):
"""
@ -191,167 +184,150 @@ class LogicAnalyzerCore(Elaboratable):
space used by the core extends from `base_addr` to the number returned
by this function.
"""
return self.sample_mem.get_max_addr()
return self._sample_mem.get_max_addr()
def capture(self, verbose=False):
"""Perform a capture, recording the state of all input probes to the FPGA's memory, and
then reading that out on the host.
Parameters:
----------
verbose : bool
Whether or not to print the status of the capture to stdout as it progresses.
Defaults to False.
Returns:
----------
An instance of LogicAnalyzerCapture.
"""
Performs a capture, recording the state of all input probes to the
FPGA's memory, and then returns that as a LogicAnalyzerCapture class
on the host.
"""
print_if_verbose = lambda x: print(x) if verbose else None
# If core is not in IDLE state, request that it return to IDLE
print_if_verbose(" -> Resetting core...")
state = self.fsm.registers.get_probe("state")
state = self._fsm.registers.get_probe("state")
if state != States.IDLE:
self.fsm.registers.set_probe("request_start", 0)
self.fsm.registers.set_probe("request_stop", 0)
self.fsm.registers.set_probe("request_stop", 1)
self.fsm.registers.set_probe("request_stop", 0)
self._fsm.registers.set_probe("request_start", 0)
self._fsm.registers.set_probe("request_stop", 0)
self._fsm.registers.set_probe("request_stop", 1)
self._fsm.registers.set_probe("request_stop", 0)
if self.fsm.registers.get_probe("state") != States.IDLE:
if self._fsm.registers.get_probe("state") != States.IDLE:
raise ValueError("Logic analyzer did not reset to IDLE state.")
# Set triggers
print_if_verbose(" -> Setting triggers...")
self.trig_blk.clear_triggers()
self._trig_blk.clear_triggers()
if self.config.get("trigger_mode") != "immediate":
self.trig_blk.set_triggers(self.config)
if self._config.get("trigger_mode") != "immediate":
self._trig_blk.set_triggers(self._config)
# Set trigger mode, default to single-shot if user didn't specify a mode
print_if_verbose(" -> Setting trigger mode...")
if "trigger_mode" in self.config:
mode = self.config["trigger_mode"].upper()
self.fsm.registers.set_probe("trigger_mode", TriggerModes[mode])
if "trigger_mode" in self._config:
mode = self._config["trigger_mode"].upper()
self._fsm.registers.set_probe("trigger_mode", TriggerModes[mode])
else:
self.fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT)
self._fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT)
# Set trigger location
print_if_verbose(" -> Setting trigger location...")
if "trigger_location" in self.config:
self.fsm.registers.set_probe(
"trigger_location", self.config["trigger_location"]
if "trigger_location" in self._config:
self._fsm.registers.set_probe(
"trigger_location", self._config["trigger_location"]
)
else:
self.fsm.registers.set_probe(
"trigger_location", self.config["sample_depth"] // 2
self._fsm.registers.set_probe(
"trigger_location", self._config["sample_depth"] // 2
)
# Send a start request to the state machine
print_if_verbose(" -> Starting capture...")
self.fsm.registers.set_probe("request_start", 0)
self.fsm.registers.set_probe("request_start", 1)
self.fsm.registers.set_probe("request_start", 0)
self._fsm.registers.set_probe("request_start", 0)
self._fsm.registers.set_probe("request_start", 1)
self._fsm.registers.set_probe("request_start", 0)
# Poll the state machine's state, and wait for the capture to complete
print_if_verbose(" -> Waiting for capture to complete...")
while self.fsm.registers.get_probe("state") != States.CAPTURED:
while self._fsm.registers.get_probe("state") != States.CAPTURED:
pass
# Read out the entirety of the sample memory
print_if_verbose(" -> Reading sample memory contents...")
addrs = list(range(self.config["sample_depth"]))
raw_capture = self.sample_mem.read_from_user_addr(addrs)
addrs = list(range(self._config["sample_depth"]))
raw_capture = self._sample_mem.read_from_user_addr(addrs)
# Revolve the memory around the read_pointer, such that all the beginning
# of the caputure is at the first element
print_if_verbose(" -> Checking read pointer and revolving memory...")
read_pointer = self.fsm.registers.get_probe("read_pointer")
read_pointer = self._fsm.registers.get_probe("read_pointer")
data = raw_capture[read_pointer:] + raw_capture[:read_pointer]
return LogicAnalyzerCapture(data, self.config)
return LogicAnalyzerCapture(data, self._config)
class LogicAnalyzerCapture:
"""A container for the data collected during a capture from a LogicAnalyzerCore. Contains
methods for exporting the data as a VCD waveform file, or as a Verilog module for playing
back captured data in simulation/synthesis.
Parameters:
----------
data : list[int]
The raw captured data taken by the LogicAnalyzerCore. This consists of the values of
all the input probes concatenated together at every timestep.
config : dict
The configuration of the LogicAnalyzerCore that took this capture.
"""
A container class for the data collected by a LogicAnalyzerCore. Contains
methods for exporting the data as a VCD waveform file, a Python list, a
CSV file, or a Verilog module.
"""
def __init__(self, data, config):
self.data = data
self.config = config
self._data = data
self._config = config
def get_trigger_location(self):
"""Gets the location of the trigger in the capture. This will match the value of
"trigger_location" provided in the configuration file at the time of capture.
Parameters:
----------
None
Returns:
----------
The trigger location as an `int`.
"""
if "trigger_location" in self.config:
return self.config["trigger_location"]
Gets the location of the trigger in the capture. This will match the
value of "trigger_location" provided in the configuration file at the
time of capture.
"""
if "trigger_location" in self._config:
return self._config["trigger_location"]
else:
return self.config["sample_depth"] // 2
return self._config["sample_depth"] // 2
def get_trace(self, probe_name):
"""Gets the value of a single probe over the capture.
Parameters:
----------
probe_name : int
The name of the probe in the LogicAnalyzer Core. This must match the name provided
in the configuration file.
Returns:
----------
The value of the probe at every timestep in the capture, as a list of integers.
"""
Gets the value of a single probe over the capture.
"""
# sum up the widths of all the probes below this one
lower = 0
for name, width in self.config["probes"].items():
for name, width in self._config["probes"].items():
if name == probe_name:
break
lower += width
# add the width of the probe we'd like
upper = lower + self.config["probes"][probe_name]
upper = lower + self._config["probes"][probe_name]
total_probe_width = sum(self.config["probes"].values())
binary = [f"{d:0{total_probe_width}b}" for d in self.data]
total_probe_width = sum(self._config["probes"].values())
binary = [f"{d:0{total_probe_width}b}" for d in self._data]
return [int(b[lower:upper], 2) for b in binary]
def export_csv(self, path):
"""
Export the capture to a CSV file, containing the data of all probes in
the core.
"""
names = list(self._config["probes"].keys())
values = [self.get_trace(n) for n in names]
# Transpose list of lists so that data flows top-to-bottom instead of
# left-to-right
values_t = [list(x) for x in zip(*values)]
import csv
with open(path, 'w') as f:
writer = csv.writer(f)
writer.writerow(names)
writer.writerows(values_t)
def export_vcd(self, path):
"""Export the capture to a VCD file, containing the data of all probes in the core.
Parameters:
----------
path : str
The path of the output file, including the ".vcd" file extension.
Returns:
----------
None
"""
Export the capture to a VCD file, containing the data of all probes in
the core.
"""
from vcd import VCDWriter
@ -364,7 +340,7 @@ class LogicAnalyzerCapture:
with VCDWriter(vcd_file, "10 ns", timestamp, "manta") as writer:
# each probe has a name, width, and writer associated with it
signals = []
for name, width in self.config["probes"].items():
for name, width in self._config["probes"].items():
signal = {
"name": name,
"width": width,
@ -377,20 +353,20 @@ class LogicAnalyzerCapture:
# include a trigger signal such would be meaningful (ie, we didn't trigger immediately)
if (
"trigger_mode" not in self.config
or self.config["trigger_mode"] == "single_shot"
"trigger_mode" not in self._config
or self._config["trigger_mode"] == "single_shot"
):
trigger = writer.register_var("manta", "trigger", "wire", size=1)
# add the data to each probe in the vcd file
for timestamp in range(0, 2 * len(self.data)):
for timestamp in range(0, 2 * len(self._data)):
# run the clock
writer.change(clock, timestamp, timestamp % 2 == 0)
# set the trigger (if there is one)
if (
"trigger_mode" not in self.config
or self.config["trigger_mode"] == "single_shot"
"trigger_mode" not in self._config
or self._config["trigger_mode"] == "single_shot"
):
triggered = (timestamp // 2) >= self.get_trigger_location()
writer.change(trigger, timestamp, triggered)
@ -405,31 +381,19 @@ class LogicAnalyzerCapture:
vcd_file.close()
def get_playback_module(self):
"""Gets an Amaranth module that will playback the captured data. This module is
synthesizable, so it may be used in either simulation or synthesis.
Parameters:
----------
None
Returns:
----------
An instance of LogicAnalyzerPlayback, which is a synthesizable Amaranth module.
"""
return LogicAnalyzerPlayback(self.data, self.config)
Returns an Amaranth module that will playback the captured data. This
module is synthesizable, so it may be used in either simulation or
on the FPGA directly by including it your build process.
"""
return LogicAnalyzerPlayback(self._data, self._config)
def export_playback_verilog(self, path):
"""Exports a Verilog module that will playback the captured data. This module is
synthesizable, so it may be used in either simulation or synthesis.
Parameters:
----------
path : str
The path of the output file, including the ".v" file extension.
Returns:
----------
None
"""
Exports a Verilog module that will playback the captured data. This
module is synthesizable, so it may be used in either simulation or
on the FPGA directly by including it your build process.
"""
lap = self.get_playback_module()

View File

@ -3,10 +3,9 @@ from amaranth import *
class LogicAnalyzerPlayback(Elaboratable):
"""
A synthesizable module that plays back data captured by a LogicAnalyzerCore.
Takes a list of all the samples captured by a core, along with the config
of the core used to take it.
A synthesizable module that plays back data captured by a
LogicAnalyzerCore. Takes a list of all the samples captured by a core,
along with the config of the core used to take it.
"""
def __init__(self, data, config):
@ -73,7 +72,7 @@ class LogicAnalyzerPlayback(Elaboratable):
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the
Returns the Amaranth signals that should be included as ports in the
exported Verilog module.
"""
return [self.start, self.valid] + list(self.top_level_probes.values())

View File

@ -36,9 +36,9 @@ class LogicAnalyzerCounterTest(Elaboratable):
m.submodules.manta = self.manta
uart_pins = platform.request("uart")
larry = self.manta.la.probes[0]
curly = self.manta.la.probes[1]
moe = self.manta.la.probes[2]
larry = self.manta.la._probes[0]
curly = self.manta.la._probes[1]
moe = self.manta.la._probes[2]
m.d.sync += larry.eq(larry + 1)
m.d.sync += curly.eq(curly + 1)
@ -61,11 +61,14 @@ class LogicAnalyzerCounterTest(Elaboratable):
# check that VCD export works
cap.export_vcd("out.vcd")
# check that CSV export works
cap.export_csv("out.csv")
# check that Verilog export works
cap.export_playback_verilog("out.v")
# verify that each signal is just a counter modulo the width of the signal
for name, width in self.manta.la.config["probes"].items():
for name, width in self.manta.la._config["probes"].items():
trace = cap.get_trace(name)
for i in range(len(trace) - 1):