logic analyzer: move __init__ away from config dict
This commit is contained in:
parent
743f434652
commit
b20d7c7822
|
|
@ -0,0 +1,78 @@
|
|||
from amaranth import *
|
||||
from amaranth.lib import io
|
||||
from manta import *
|
||||
from manta.logic_analyzer import LogicAnalyzerCore, TriggerModes
|
||||
from manta.uart import UARTInterface
|
||||
from time import sleep
|
||||
|
||||
|
||||
class UARTLogicAnalyzerExample(Elaboratable):
|
||||
def __init__(self, platform, port):
|
||||
self.platform = platform
|
||||
|
||||
# Create Manta instance
|
||||
self.manta = Manta()
|
||||
|
||||
# Configure it to communicate over Ethernet
|
||||
self.manta.interface = UARTInterface(
|
||||
port=port,
|
||||
baudrate=2000000,
|
||||
clock_freq=platform.default_clk_frequency,
|
||||
)
|
||||
|
||||
self.probe0 = Signal(1)
|
||||
self.probe1 = Signal(2)
|
||||
self.probe2 = Signal(3)
|
||||
self.probe3 = Signal(4)
|
||||
|
||||
self.manta.cores.la = LogicAnalyzerCore(
|
||||
sample_depth=2048,
|
||||
probes=[self.probe0, self.probe1, self.probe2, self.probe3],
|
||||
)
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
# Add Manta as a submodule
|
||||
m.submodules.manta = self.manta
|
||||
|
||||
counter = Signal(10)
|
||||
m.d.sync += counter.eq(counter + 1)
|
||||
m.d.comb += self.probe0.eq(counter[0])
|
||||
m.d.comb += self.probe1.eq(counter[1:2])
|
||||
m.d.comb += self.probe2.eq(counter[3:5])
|
||||
m.d.comb += self.probe3.eq(counter[6:])
|
||||
|
||||
# Wire UART pins to the Manta instance
|
||||
uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"})
|
||||
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
|
||||
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
|
||||
m.d.comb += self.manta.interface.rx.eq(uart_rx.i)
|
||||
m.d.comb += uart_tx.o.eq(self.manta.interface.tx)
|
||||
|
||||
return m
|
||||
|
||||
def test(self):
|
||||
# Build and program the FPGA
|
||||
self.platform.build(self, do_program=True)
|
||||
|
||||
# Take a capture
|
||||
self.manta.cores.la.trigger_mode = TriggerModes.IMMEDIATE
|
||||
cap = self.manta.cores.la.capture()
|
||||
cap.export_vcd("capture.vcd")
|
||||
cap.export_csv("capture.csv")
|
||||
cap.export_playback_verilog("capture.v")
|
||||
|
||||
|
||||
# Amaranth has a built-in build system, and well as a set of platform
|
||||
# definitions for a huge number of FPGA boards. The class defined above is
|
||||
# very generic, as it specifies a design independent of any particular FGPA
|
||||
# board. This means that by changing which platform you pass UARTIOCoreExample
|
||||
# below, you can port this example to any FPGA board!
|
||||
|
||||
from amaranth_boards.icestick import ICEStickPlatform
|
||||
|
||||
UARTLogicAnalyzerExample(
|
||||
platform=ICEStickPlatform(),
|
||||
port="/dev/serial/by-id/usb-Lattice_Lattice_FTUSB_Interface_Cable-if01-port0",
|
||||
).test()
|
||||
|
|
@ -2,8 +2,8 @@ from amaranth import *
|
|||
from manta.utils import *
|
||||
from manta.memory_core import MemoryCore
|
||||
from manta.logic_analyzer.trigger_block import LogicAnalyzerTriggerBlock
|
||||
from manta.logic_analyzer.fsm import LogicAnalyzerFSM, States, TriggerModes
|
||||
from manta.logic_analyzer.playback import LogicAnalyzerPlayback
|
||||
from manta.logic_analyzer.fsm import LogicAnalyzerFSM, TriggerModes
|
||||
from manta.logic_analyzer.capture import LogicAnalyzerCapture
|
||||
|
||||
import math
|
||||
|
||||
|
|
@ -20,42 +20,37 @@ class LogicAnalyzerCore(MantaCore):
|
|||
https://fischermoseley.github.io/manta/logic_analyzer_core/
|
||||
"""
|
||||
|
||||
def __init__(self, config, base_addr, interface):
|
||||
self._config = config
|
||||
self._interface = interface
|
||||
self._check_config()
|
||||
def __init__(self, sample_depth, probes):
|
||||
self._sample_depth = sample_depth
|
||||
self._probes = probes
|
||||
self.trigger_location = sample_depth // 2
|
||||
self.trigger_mode = TriggerModes.SINGLE_SHOT
|
||||
self.triggers = []
|
||||
|
||||
# Bus Input/Output
|
||||
self.bus_i = Signal(InternalBus())
|
||||
self.bus_o = Signal(InternalBus())
|
||||
|
||||
self._probes = [
|
||||
Signal(width, name=name) for name, width in self._config["probes"].items()
|
||||
]
|
||||
|
||||
# Submodules
|
||||
self._fsm = LogicAnalyzerFSM(self._config, base_addr, self._interface)
|
||||
self._trig_blk = LogicAnalyzerTriggerBlock(
|
||||
self._probes, self._fsm.get_max_addr() + 1, self._interface
|
||||
)
|
||||
|
||||
self._sample_mem = MemoryCore(
|
||||
mode="fpga_to_host",
|
||||
width=sum(self._config["probes"].values()),
|
||||
depth=self._config["sample_depth"],
|
||||
base_addr=self._trig_blk.get_max_addr() + 1,
|
||||
interface=self._interface,
|
||||
)
|
||||
|
||||
@property
|
||||
def max_addr(self):
|
||||
self.define_submodules()
|
||||
return self._sample_mem.max_addr
|
||||
|
||||
@property
|
||||
def top_level_ports(self):
|
||||
return self._probes
|
||||
|
||||
def _check_config(self):
|
||||
def to_config(self):
|
||||
return {
|
||||
"type": "logic_analyzer",
|
||||
"sample_depth": self._sample_depth,
|
||||
"trigger_location": self.trigger_location,
|
||||
"probes": {p.name: len(p) for p in self._probes},
|
||||
"triggers": self.triggers,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config):
|
||||
# Check for unrecognized options
|
||||
valid_options = [
|
||||
"type",
|
||||
|
|
@ -65,12 +60,12 @@ class LogicAnalyzerCore(MantaCore):
|
|||
"trigger_location",
|
||||
"trigger_mode",
|
||||
]
|
||||
for option in self._config:
|
||||
for option in config:
|
||||
if option not in valid_options:
|
||||
warn(f"Ignoring unrecognized option '{option}' in Logic Analyzer.")
|
||||
|
||||
# Check sample depth is provided and positive
|
||||
sample_depth = self._config.get("sample_depth")
|
||||
sample_depth = config.get("sample_depth")
|
||||
if not sample_depth:
|
||||
raise ValueError("Logic Analyzer must have sample_depth specified.")
|
||||
|
||||
|
|
@ -78,35 +73,35 @@ class LogicAnalyzerCore(MantaCore):
|
|||
raise ValueError("Logic Analyzer sample_depth must be a positive integer.")
|
||||
|
||||
# Check probes
|
||||
if "probes" not in self._config or len(self._config["probes"]) == 0:
|
||||
if "probes" not in config or len(config["probes"]) == 0:
|
||||
raise ValueError("Logic Analyzer must have at least one probe specified.")
|
||||
|
||||
for name, width in self._config["probes"].items():
|
||||
for name, width in config["probes"].items():
|
||||
if width < 0:
|
||||
raise ValueError(f"Width of probe {name} must be positive.")
|
||||
|
||||
# Check trigger mode, if provided
|
||||
trigger_mode = self._config.get("trigger_mode")
|
||||
trigger_mode = config.get("trigger_mode")
|
||||
valid_modes = ["single_shot", "incremental", "immediate"]
|
||||
if trigger_mode and trigger_mode not in valid_modes:
|
||||
raise ValueError(
|
||||
f"Unrecognized trigger mode {self._config['trigger_mode']} provided."
|
||||
f"Unrecognized trigger mode {config['trigger_mode']} provided."
|
||||
)
|
||||
|
||||
# Check triggers
|
||||
if trigger_mode and trigger_mode != "immediate":
|
||||
if "triggers" not in self._config or self._config["triggers"] == 0:
|
||||
if "triggers" not in config or config["triggers"] == 0:
|
||||
raise ValueError(
|
||||
"Logic Analyzer must have at least one trigger specified if not running in immediate mode."
|
||||
)
|
||||
|
||||
# Check trigger location
|
||||
trigger_location = self._config.get("trigger_location")
|
||||
trigger_location = config.get("trigger_location")
|
||||
if trigger_location:
|
||||
if not isinstance(trigger_location, int) or trigger_location < 0:
|
||||
raise ValueError("Trigger location must be a positive integer.")
|
||||
|
||||
if trigger_location >= self._config["sample_depth"]:
|
||||
if trigger_location >= config["sample_depth"]:
|
||||
raise ValueError("Trigger location must be less than sample depth.")
|
||||
|
||||
if trigger_mode == "immediate":
|
||||
|
|
@ -116,16 +111,16 @@ class LogicAnalyzerCore(MantaCore):
|
|||
|
||||
# Check triggers themselves
|
||||
if trigger_mode == "immediate":
|
||||
if "triggers" in self._config:
|
||||
if "triggers" in config:
|
||||
warn(
|
||||
"Ignoring triggers as 'trigger_mode' is set to immediate, and there are no triggers to specify."
|
||||
)
|
||||
|
||||
else:
|
||||
if ("triggers" not in self._config) or (len(self._config["triggers"]) == 0):
|
||||
if ("triggers" not in config) or (len(config["triggers"]) == 0):
|
||||
raise ValueError("At least one trigger must be specified.")
|
||||
|
||||
for trigger in self._config.get("triggers"):
|
||||
for trigger in config.get("triggers"):
|
||||
if not isinstance(trigger, str):
|
||||
raise ValueError("Trigger must be specified with a string.")
|
||||
|
||||
|
|
@ -156,9 +151,35 @@ class LogicAnalyzerCore(MantaCore):
|
|||
)
|
||||
|
||||
# Check probe names
|
||||
if components[0] not in self._config["probes"]:
|
||||
if components[0] not in config["probes"]:
|
||||
raise ValueError(f"Unknown probe name '{components[0]}' specified.")
|
||||
|
||||
# Checks complete, create LogicAnalyzerCore
|
||||
probes = [Signal(width, name=name) for name, width in config["probes"].items()]
|
||||
|
||||
return cls(config["sample_depth"], probes)
|
||||
|
||||
def define_submodules(self):
|
||||
self._fsm = LogicAnalyzerFSM(
|
||||
sample_depth=self._sample_depth,
|
||||
base_addr=self.base_addr,
|
||||
interface=self.interface,
|
||||
)
|
||||
|
||||
self._trig_blk = LogicAnalyzerTriggerBlock(
|
||||
probes=self._probes,
|
||||
base_addr=self._fsm.max_addr + 1,
|
||||
interface=self.interface,
|
||||
)
|
||||
|
||||
self._sample_mem = MemoryCore(
|
||||
mode="fpga_to_host",
|
||||
width=sum([len(p) for p in self._probes]),
|
||||
depth=self._sample_depth,
|
||||
)
|
||||
self._sample_mem.base_addr = self._trig_blk.max_addr + 1
|
||||
self._sample_mem.interface = self.interface
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
|
|
@ -187,255 +208,41 @@ class LogicAnalyzerCore(MantaCore):
|
|||
|
||||
return m
|
||||
|
||||
def capture(self, verbose=False):
|
||||
def capture(self):
|
||||
"""
|
||||
Performs a capture, recording the state of all input probes to the
|
||||
FPGA's memory, and then returns that as a LogicAnalyzerCapture class
|
||||
on the host.
|
||||
"""
|
||||
print_if_verbose = lambda x: print(x) if verbose else None
|
||||
|
||||
# If core is not in IDLE state, request that it return to IDLE
|
||||
print_if_verbose(" -> Resetting core...")
|
||||
state = self._fsm.registers.get_probe("state")
|
||||
if state != States.IDLE:
|
||||
self._fsm.registers.set_probe("request_start", 0)
|
||||
self._fsm.registers.set_probe("request_stop", 0)
|
||||
self._fsm.registers.set_probe("request_stop", 1)
|
||||
self._fsm.registers.set_probe("request_stop", 0)
|
||||
print(" -> Resetting core...")
|
||||
self._fsm.stop_capture()
|
||||
|
||||
if self._fsm.registers.get_probe("state") != States.IDLE:
|
||||
raise ValueError("Logic analyzer did not reset to IDLE state.")
|
||||
print(" -> Setting triggers...")
|
||||
self._trig_blk.set_triggers(self.triggers)
|
||||
|
||||
# Set triggers
|
||||
print_if_verbose(" -> Setting triggers...")
|
||||
self._trig_blk.clear_triggers()
|
||||
print(" -> Setting trigger mode...")
|
||||
self._fsm.write_register("trigger_mode", self.trigger_mode)
|
||||
|
||||
if self._config.get("trigger_mode") != "immediate":
|
||||
self._trig_blk.set_triggers(self._config)
|
||||
print(" -> Setting trigger location...")
|
||||
self._fsm.write_register("trigger_location", self.trigger_location)
|
||||
|
||||
# Set trigger mode, default to single-shot if user didn't specify a mode
|
||||
print_if_verbose(" -> Setting trigger mode...")
|
||||
if "trigger_mode" in self._config:
|
||||
mode = self._config["trigger_mode"].upper()
|
||||
self._fsm.registers.set_probe("trigger_mode", TriggerModes[mode])
|
||||
print(" -> Starting capture...")
|
||||
self._fsm.start_capture()
|
||||
|
||||
else:
|
||||
self._fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT)
|
||||
print(" -> Waiting for capture to complete...")
|
||||
self._fsm.wait_for_capture()
|
||||
|
||||
# Set trigger location
|
||||
print_if_verbose(" -> Setting trigger location...")
|
||||
if "trigger_location" in self._config:
|
||||
self._fsm.registers.set_probe(
|
||||
"trigger_location", self._config["trigger_location"]
|
||||
)
|
||||
|
||||
else:
|
||||
self._fsm.registers.set_probe(
|
||||
"trigger_location", self._config["sample_depth"] // 2
|
||||
)
|
||||
|
||||
# Send a start request to the state machine
|
||||
print_if_verbose(" -> Starting capture...")
|
||||
self._fsm.registers.set_probe("request_start", 0)
|
||||
self._fsm.registers.set_probe("request_start", 1)
|
||||
self._fsm.registers.set_probe("request_start", 0)
|
||||
|
||||
# Poll the state machine's state, and wait for the capture to complete
|
||||
print_if_verbose(" -> Waiting for capture to complete...")
|
||||
while self._fsm.registers.get_probe("state") != States.CAPTURED:
|
||||
pass
|
||||
|
||||
# Read out the entirety of the sample memory
|
||||
print_if_verbose(" -> Reading sample memory contents...")
|
||||
addrs = list(range(self._config["sample_depth"]))
|
||||
print(" -> Reading sample memory contents...")
|
||||
addrs = list(range(self._sample_depth))
|
||||
raw_capture = self._sample_mem.read(addrs)
|
||||
|
||||
# Revolve the memory around the read_pointer, such that all the beginning
|
||||
# of the caputure is at the first element
|
||||
print_if_verbose(" -> Checking read pointer and revolving memory...")
|
||||
read_pointer = self._fsm.registers.get_probe("read_pointer")
|
||||
print(" -> Checking read pointer and revolving memory...")
|
||||
read_pointer = self._fsm.read_register("read_pointer")
|
||||
|
||||
data = raw_capture[read_pointer:] + raw_capture[:read_pointer]
|
||||
return LogicAnalyzerCapture(data, self._config, self._interface)
|
||||
|
||||
|
||||
class LogicAnalyzerCapture:
|
||||
"""
|
||||
A container class for the data collected by a LogicAnalyzerCore. Contains
|
||||
methods for exporting the data as a VCD waveform file, a Python list, a
|
||||
CSV file, or a Verilog module.
|
||||
"""
|
||||
|
||||
def __init__(self, data, config, interface):
|
||||
self._data = data
|
||||
self._config = config
|
||||
self._interface = interface
|
||||
|
||||
def get_trigger_location(self):
|
||||
"""
|
||||
Gets the location of the trigger in the capture. This will match the
|
||||
value of "trigger_location" provided in the configuration file at the
|
||||
time of capture.
|
||||
"""
|
||||
|
||||
if "trigger_location" in self._config:
|
||||
return self._config["trigger_location"]
|
||||
|
||||
else:
|
||||
return self._config["sample_depth"] // 2
|
||||
|
||||
def get_trace(self, probe_name):
|
||||
"""
|
||||
Gets the value of a single probe over the capture.
|
||||
"""
|
||||
|
||||
# Sum up the widths of all the probes below this one
|
||||
lower = 0
|
||||
for name, width in self._config["probes"].items():
|
||||
if name == probe_name:
|
||||
break
|
||||
|
||||
lower += width
|
||||
|
||||
# Add the width of the probe we'd like
|
||||
upper = lower + self._config["probes"][probe_name]
|
||||
|
||||
total_probe_width = sum(self._config["probes"].values())
|
||||
binary = [f"{d:0{total_probe_width}b}" for d in self._data]
|
||||
return [int(b[lower:upper], 2) for b in binary]
|
||||
|
||||
def export_csv(self, path):
|
||||
"""
|
||||
Export the capture to a CSV file, containing the data of all probes in
|
||||
the core.
|
||||
"""
|
||||
|
||||
names = list(self._config["probes"].keys())
|
||||
values = [self.get_trace(n) for n in names]
|
||||
|
||||
# Transpose list of lists so that data flows top-to-bottom instead of
|
||||
# left-to-right
|
||||
values_t = [list(x) for x in zip(*values)]
|
||||
|
||||
import csv
|
||||
|
||||
with open(path, "w") as f:
|
||||
writer = csv.writer(f)
|
||||
|
||||
writer.writerow(names)
|
||||
writer.writerows(values_t)
|
||||
|
||||
def export_vcd(self, path):
|
||||
"""
|
||||
Export the capture to a VCD file, containing the data of all probes in
|
||||
the core.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from vcd import VCDWriter
|
||||
|
||||
# Use the same datetime format that iVerilog uses
|
||||
timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y")
|
||||
vcd_file = open(path, "w")
|
||||
|
||||
# Compute the timescale from the frequency of the provided clock
|
||||
half_period = 1 / (2 * self._interface._clock_freq)
|
||||
exponent = math.floor(math.log10(half_period))
|
||||
exponent_eng = (exponent // 3) * 3
|
||||
|
||||
# The VCD file format specification supports no units larger or smaller
|
||||
# than these
|
||||
units = {
|
||||
0: "s",
|
||||
-3: "ms",
|
||||
-6: "us",
|
||||
-9: "ns",
|
||||
-12: "ps",
|
||||
-15: "fs",
|
||||
}
|
||||
|
||||
timescale_unit = units[exponent_eng]
|
||||
timescale_exponent = 10 ** (exponent - exponent_eng)
|
||||
timescale_exact = half_period / (10**exponent)
|
||||
timescale_integer = round(timescale_exact)
|
||||
|
||||
if abs(timescale_exact - timescale_integer) > 1e-3:
|
||||
warn("VCD file timescale will differ slightly from exact clock frequency.")
|
||||
|
||||
timescale = (timescale_exponent, timescale_unit)
|
||||
|
||||
with VCDWriter(vcd_file, timescale, timestamp, "manta") as writer:
|
||||
# Each probe has a name, width, and writer associated with it
|
||||
signals = []
|
||||
for name, width in self._config["probes"].items():
|
||||
signal = {
|
||||
"name": name,
|
||||
"width": width,
|
||||
"data": self.get_trace(name),
|
||||
"var": writer.register_var("manta", name, "wire", size=width),
|
||||
}
|
||||
signals.append(signal)
|
||||
|
||||
clock = writer.register_var("manta", "clk", "wire", size=1)
|
||||
|
||||
# Include a trigger signal such would be meaningful (ie, we didn't trigger immediately)
|
||||
if (
|
||||
"trigger_mode" not in self._config
|
||||
or self._config["trigger_mode"] == "single_shot"
|
||||
):
|
||||
trigger = writer.register_var("manta", "trigger", "wire", size=1)
|
||||
|
||||
# Add the data to each probe in the vcd file
|
||||
for sample_index in range(0, 2 * len(self._data)):
|
||||
sample_timestamp = timescale_integer * sample_index
|
||||
|
||||
# Run the clock
|
||||
writer.change(clock, sample_timestamp, sample_index % 2 == 0)
|
||||
|
||||
# Set the trigger (if there is one)
|
||||
if (
|
||||
"trigger_mode" not in self._config
|
||||
or self._config["trigger_mode"] == "single_shot"
|
||||
):
|
||||
triggered = (sample_index // 2) >= self.get_trigger_location()
|
||||
writer.change(trigger, sample_timestamp, triggered)
|
||||
|
||||
# Add other signals
|
||||
for signal in signals:
|
||||
var = signal["var"]
|
||||
sample = signal["data"][sample_index // 2]
|
||||
|
||||
writer.change(var, sample_timestamp, sample)
|
||||
|
||||
vcd_file.close()
|
||||
|
||||
def get_playback_module(self):
|
||||
"""
|
||||
Returns an Amaranth module that will playback the captured data. This
|
||||
module is synthesizable, so it may be used in either simulation or
|
||||
on the FPGA directly by including it your build process.
|
||||
"""
|
||||
|
||||
return LogicAnalyzerPlayback(self._data, self._config)
|
||||
|
||||
def export_playback_verilog(self, path):
|
||||
"""
|
||||
Exports a Verilog module that will playback the captured data. This
|
||||
module is synthesizable, so it may be used in either simulation or
|
||||
on the FPGA directly by including it your build process.
|
||||
"""
|
||||
|
||||
lap = self.get_playback_module()
|
||||
from amaranth.back import verilog
|
||||
|
||||
with open(path, "w") as f:
|
||||
f.write(
|
||||
verilog.convert(
|
||||
lap,
|
||||
name="logic_analyzer_playback",
|
||||
ports=lap.get_top_level_ports(),
|
||||
strip_internal_attrs=True,
|
||||
)
|
||||
)
|
||||
return LogicAnalyzerCapture(
|
||||
self._probes, self.trigger_location, self.trigger_mode, data
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,182 @@
|
|||
from manta.logic_analyzer.playback import LogicAnalyzerPlayback
|
||||
from manta.logic_analyzer import TriggerModes
|
||||
|
||||
|
||||
class LogicAnalyzerCapture:
|
||||
"""
|
||||
A container class for the data collected by a LogicAnalyzerCore. Contains
|
||||
methods for exporting the data as a VCD waveform file, a Python list, a
|
||||
CSV file, or a Verilog module.
|
||||
"""
|
||||
|
||||
def __init__(self, probes, trigger_location, trigger_mode, data):
|
||||
self._probes = probes
|
||||
self._trigger_location = trigger_location
|
||||
self._trigger_mode = trigger_mode
|
||||
self._data = data
|
||||
|
||||
print(self._trigger_mode)
|
||||
|
||||
def get_trigger_location(self):
|
||||
"""
|
||||
Returns the location of the trigger in the capture. This will match the
|
||||
value of "trigger_location" provided in the configuration file at the
|
||||
time of capture.
|
||||
"""
|
||||
|
||||
return self._trigger_location
|
||||
|
||||
def get_trace(self, name):
|
||||
"""
|
||||
Gets the value of a single probe over the capture.
|
||||
"""
|
||||
|
||||
# Get index of probe with given name
|
||||
indicies = [i for i, p in enumerate(self._probes) if p.name == name]
|
||||
if len(indicies) == 0:
|
||||
raise ValueError(f"Probe {name} not found in LogicAnalyzerCapture!")
|
||||
|
||||
if len(indicies) > 1:
|
||||
raise ValueError(
|
||||
f"Probe {name} found multiple times in LogicAnalyzerCapture!"
|
||||
)
|
||||
|
||||
idx = indicies[0]
|
||||
|
||||
# Sum up the widths of all the probes below this one
|
||||
lower = sum([len(p) for p in self._probes[:idx]])
|
||||
|
||||
# Add the width of the probe we'd like
|
||||
upper = lower + len(self._probes[idx])
|
||||
|
||||
total_probe_width = sum([len(p) for p in self._probes])
|
||||
binary = [f"{d:0{total_probe_width}b}" for d in self._data]
|
||||
return [int(b[lower:upper], 2) for b in binary]
|
||||
|
||||
def export_csv(self, path):
|
||||
"""
|
||||
Export the capture to a CSV file, containing the data of all probes in
|
||||
the core.
|
||||
"""
|
||||
|
||||
names = [p.name for p in self._probes]
|
||||
values = [self.get_trace(n) for n in names]
|
||||
|
||||
# Transpose list of lists so that data flows top-to-bottom instead of
|
||||
# left-to-right
|
||||
values_transpose = [list(x) for x in zip(*values)]
|
||||
|
||||
import csv
|
||||
|
||||
with open(path, "w") as f:
|
||||
writer = csv.writer(f)
|
||||
|
||||
writer.writerow(names)
|
||||
writer.writerows(values_transpose)
|
||||
|
||||
def export_vcd(self, path):
|
||||
"""
|
||||
Export the capture to a VCD file, containing the data of all probes in
|
||||
the core.
|
||||
"""
|
||||
|
||||
from vcd import VCDWriter
|
||||
from datetime import datetime
|
||||
|
||||
# Compute the timescale from the frequency of the provided clock
|
||||
half_period = 1 / (2 * self._interface._clock_freq)
|
||||
exponent = math.floor(math.log10(half_period))
|
||||
exponent_eng = (exponent // 3) * 3
|
||||
|
||||
# The VCD file format specification supports no units larger or smaller
|
||||
# than these
|
||||
units = {
|
||||
0: "s",
|
||||
-3: "ms",
|
||||
-6: "us",
|
||||
-9: "ns",
|
||||
-12: "ps",
|
||||
-15: "fs",
|
||||
}
|
||||
|
||||
timescale_unit = units[exponent_eng]
|
||||
timescale_exponent = 10 ** (exponent - exponent_eng)
|
||||
timescale_exact = half_period / (10**exponent)
|
||||
timescale_integer = round(timescale_exact)
|
||||
|
||||
if abs(timescale_exact - timescale_integer) > 1e-3:
|
||||
warn("VCD file timescale will differ slightly from exact clock frequency.")
|
||||
|
||||
timescale = (timescale_exponent, timescale_unit)
|
||||
|
||||
# Use the same datetime format that iVerilog uses
|
||||
timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y")
|
||||
vcd_file = open(path, "w")
|
||||
|
||||
with VCDWriter(vcd_file, timescale, timestamp, "manta") as writer:
|
||||
# Each probe has a name, width, and writer associated with it
|
||||
signals = []
|
||||
for p in self._probes:
|
||||
signal = {
|
||||
"name": p.name,
|
||||
"width": len(p),
|
||||
"data": self.get_trace(p.name),
|
||||
"var": writer.register_var("manta", p.name, "wire", size=len(p)),
|
||||
}
|
||||
signals.append(signal)
|
||||
|
||||
clock = writer.register_var("manta", "clk", "wire", size=1)
|
||||
|
||||
# Include a trigger signal such would be meaningful (ie, we didn't trigger immediately)
|
||||
if self._trigger_mode == TriggerModes.SINGLE_SHOT:
|
||||
trigger = writer.register_var("manta", "trigger", "wire", size=1)
|
||||
|
||||
# Add the data to each probe in the vcd file
|
||||
for sample_index in range(0, 2 * len(self._data)):
|
||||
sample_timestamp = timescale_integer * sample_index
|
||||
|
||||
# Run the clock
|
||||
writer.change(clock, sample_timestamp, sample_index % 2 == 0)
|
||||
|
||||
# Set the trigger (if there is one)
|
||||
if self._trigger_mode == TriggerModes.SINGLE_SHOT:
|
||||
triggered = (sample_index // 2) >= self._trigger_location
|
||||
writer.change(trigger, sample_timestep, triggered)
|
||||
|
||||
# Add other signals
|
||||
for signal in signals:
|
||||
var = signal["var"]
|
||||
sample = signal["data"][sample_index // 2]
|
||||
|
||||
writer.change(var, sample_timestamp, sample)
|
||||
|
||||
vcd_file.close()
|
||||
|
||||
def get_playback_module(self):
|
||||
"""
|
||||
Returns an Amaranth module that will playback the captured data. This
|
||||
module is synthesizable, so it may be used in either simulation or
|
||||
on the FPGA directly by including it your build process.
|
||||
"""
|
||||
|
||||
return LogicAnalyzerPlayback(self._probes, self._data)
|
||||
|
||||
def export_playback_verilog(self, path):
|
||||
"""
|
||||
Exports a Verilog module that will playback the captured data. This
|
||||
module is synthesizable, so it may be used in either simulation or
|
||||
on the FPGA directly by including it your build process.
|
||||
"""
|
||||
|
||||
lap = self.get_playback_module()
|
||||
from amaranth.back import verilog
|
||||
|
||||
with open(path, "w") as f:
|
||||
f.write(
|
||||
verilog.convert(
|
||||
lap,
|
||||
name="logic_analyzer_playback",
|
||||
ports=lap.get_top_level_ports(),
|
||||
strip_internal_attrs=True,
|
||||
)
|
||||
)
|
||||
|
|
@ -24,8 +24,8 @@ class LogicAnalyzerFSM(Elaboratable):
|
|||
memory in each trigger mode (immediate, incremental, single-shot).
|
||||
"""
|
||||
|
||||
def __init__(self, config, base_addr, interface):
|
||||
self._sample_depth = config["sample_depth"]
|
||||
def __init__(self, sample_depth, base_addr, interface):
|
||||
self._sample_depth = sample_depth
|
||||
|
||||
# Outputs to rest of Logic Analyzer
|
||||
self.trigger = Signal(1)
|
||||
|
|
@ -53,18 +53,16 @@ class LogicAnalyzerFSM(Elaboratable):
|
|||
self.request_stop,
|
||||
]
|
||||
|
||||
self.registers = IOCore(base_addr, interface, inputs, outputs)
|
||||
self.registers = IOCore(inputs, outputs)
|
||||
self.registers.base_addr = base_addr
|
||||
self.registers.interface = interface
|
||||
|
||||
# Bus Input/Output
|
||||
self.bus_i = self.registers.bus_i
|
||||
self.bus_o = self.registers.bus_o
|
||||
|
||||
def get_max_addr(self):
|
||||
"""
|
||||
Return the maximum addresses in memory used by the core. The address
|
||||
space used by the core extends from `base_addr` to the number returned
|
||||
by this function (including the endpoints).
|
||||
"""
|
||||
@property
|
||||
def max_addr(self):
|
||||
return self.registers.max_addr
|
||||
|
||||
def elaborate(self, platform):
|
||||
|
|
@ -171,3 +169,32 @@ class LogicAnalyzerFSM(Elaboratable):
|
|||
m.d.sync += state.eq(States.IDLE)
|
||||
|
||||
return m
|
||||
|
||||
def stop_capture(self):
|
||||
# If core is not in IDLE state, request that it return to IDLE
|
||||
state = self.registers.get_probe("state")
|
||||
if state != States.IDLE:
|
||||
self.registers.set_probe("request_start", 0)
|
||||
self.registers.set_probe("request_stop", 0)
|
||||
self.registers.set_probe("request_stop", 1)
|
||||
self.registers.set_probe("request_stop", 0)
|
||||
|
||||
if self.registers.get_probe("state") != States.IDLE:
|
||||
raise ValueError("Logic analyzer did not reset to IDLE state.")
|
||||
|
||||
def start_capture(self):
|
||||
# Send a start request to the state machine
|
||||
self.registers.set_probe("request_start", 0)
|
||||
self.registers.set_probe("request_start", 1)
|
||||
self.registers.set_probe("request_start", 0)
|
||||
|
||||
def wait_for_capture(self):
|
||||
# Poll the state machine, and wait for the capture to complete
|
||||
while self.registers.get_probe("state") != States.CAPTURED:
|
||||
pass
|
||||
|
||||
def read_register(self, name):
|
||||
return self.registers.get_probe(name)
|
||||
|
||||
def write_register(self, name, value):
|
||||
return self.registers.set_probe(name, value)
|
||||
|
|
|
|||
|
|
@ -8,65 +8,59 @@ class LogicAnalyzerPlayback(Elaboratable):
|
|||
along with the config of the core used to take it.
|
||||
"""
|
||||
|
||||
def __init__(self, data, config):
|
||||
self.data = data
|
||||
self.config = config
|
||||
def __init__(self, probes, data):
|
||||
self._probes = probes
|
||||
self._data = data
|
||||
|
||||
# State Machine
|
||||
self.start = Signal(1)
|
||||
self.valid = Signal(1)
|
||||
|
||||
# Top-Level Probe signals
|
||||
self.top_level_probes = {}
|
||||
for name, width in self.config["probes"].items():
|
||||
self.top_level_probes[name] = Signal(width, name=name)
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
# Instantiate memory
|
||||
self.mem = Memory(
|
||||
depth=self.config["sample_depth"],
|
||||
width=sum(self.config["probes"].values()),
|
||||
init=self.data,
|
||||
depth=len(self._data),
|
||||
width=sum([len(p) for p in self._probes]),
|
||||
init=self._data,
|
||||
)
|
||||
|
||||
self.read_port = self.mem.read_port()
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
m.submodules.mem = self.mem
|
||||
|
||||
m.d.comb += self.read_port.en.eq(1)
|
||||
read_port = self.mem.read_port()
|
||||
|
||||
m.d.comb += read_port.en.eq(1)
|
||||
|
||||
# State Machine
|
||||
busy = Signal(1)
|
||||
with m.If(~busy):
|
||||
with m.If(self.start):
|
||||
m.d.sync += busy.eq(1)
|
||||
# m.d.sync += self.read_port.addr.eq(1)
|
||||
# m.d.sync += read_port.addr.eq(1)
|
||||
|
||||
with m.Else():
|
||||
with m.If(self.read_port.addr == self.config["sample_depth"] - 1):
|
||||
with m.If(read_port.addr == len(self._data) - 1):
|
||||
m.d.sync += busy.eq(0)
|
||||
m.d.sync += self.read_port.addr.eq(0)
|
||||
m.d.sync += read_port.addr.eq(0)
|
||||
|
||||
with m.Else():
|
||||
m.d.sync += self.read_port.addr.eq(self.read_port.addr + 1)
|
||||
m.d.sync += read_port.addr.eq(read_port.addr + 1)
|
||||
|
||||
# Pipeline to accomodate for the 2-cycle latency in the RAM
|
||||
m.d.sync += self.valid.eq(busy)
|
||||
|
||||
# Assign the probe values by part-selecting from the data port
|
||||
lower = 0
|
||||
for name, width in reversed(self.config["probes"].items()):
|
||||
signal = self.top_level_probes[name]
|
||||
for p in reversed(self._probes):
|
||||
|
||||
# Set output probe to zero if we're not
|
||||
with m.If(self.valid):
|
||||
m.d.comb += signal.eq(self.read_port.data[lower : lower + width])
|
||||
m.d.comb += p.eq(read_port.data[lower : lower + len(p)])
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += signal.eq(0)
|
||||
m.d.comb += p.eq(0)
|
||||
|
||||
lower += width
|
||||
lower += len(p)
|
||||
|
||||
return m
|
||||
|
||||
|
|
@ -75,4 +69,4 @@ class LogicAnalyzerPlayback(Elaboratable):
|
|||
Returns the Amaranth signals that should be included as ports in the
|
||||
exported Verilog module.
|
||||
"""
|
||||
return [self.start, self.valid] + list(self.top_level_probes.values())
|
||||
return [self.start, self.valid] + self._probes
|
||||
|
|
|
|||
|
|
@ -19,7 +19,9 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
|
|||
# Make IO core for everything
|
||||
ops = [t.op for t in self._triggers]
|
||||
args = [t.arg for t in self._triggers]
|
||||
self.registers = IOCore(base_addr, interface, outputs=ops + args)
|
||||
self.registers = IOCore(outputs=ops + args)
|
||||
self.registers.base_addr = base_addr
|
||||
self.registers.interface = interface
|
||||
|
||||
# Bus Input/Output
|
||||
self.bus_i = self.registers.bus_i
|
||||
|
|
@ -28,23 +30,18 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
|
|||
# Global trigger. High if any probe is triggered.
|
||||
self.trig = Signal()
|
||||
|
||||
def get_max_addr(self):
|
||||
"""
|
||||
Return the maximum addresses in memory used by the core. The address
|
||||
space used by the core extends from `base_addr` to the number returned
|
||||
by this function (including the endpoints).
|
||||
"""
|
||||
@property
|
||||
def max_addr(self):
|
||||
return self.registers.max_addr
|
||||
|
||||
def clear_triggers(self):
|
||||
def set_triggers(self, triggers):
|
||||
# Reset all triggers to disabled with no argument
|
||||
for p in self._probes:
|
||||
self.registers.set_probe(p.name + "_op", Operations.DISABLE)
|
||||
self.registers.set_probe(p.name + "_arg", 0)
|
||||
|
||||
def set_triggers(self, config):
|
||||
# Set triggers
|
||||
for trigger in config["triggers"]:
|
||||
for trigger in triggers:
|
||||
components = trigger.strip().split(" ")
|
||||
|
||||
# Handle triggers that don't need an argument
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ class MemoryCore(MantaCore):
|
|||
"type": "memory",
|
||||
"mode": self._mode,
|
||||
"width": self._width,
|
||||
"depth": self._depth
|
||||
"depth": self._depth,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -1,12 +1,15 @@
|
|||
from manta import Manta
|
||||
from manta.io_core import IOCore
|
||||
from manta.memory_core import MemoryCore
|
||||
from manta.logic_analyzer import LogicAnalyzerCore
|
||||
from manta.uart import UARTInterface
|
||||
from manta.ethernet import EthernetInterface
|
||||
from amaranth import *
|
||||
import tempfile
|
||||
import os
|
||||
import yaml
|
||||
|
||||
|
||||
def test_io_core_dump():
|
||||
# Create some dummy signals to pass to the IO Core
|
||||
probe0 = Signal(1)
|
||||
|
|
@ -16,10 +19,7 @@ def test_io_core_dump():
|
|||
|
||||
# Create Manta instance
|
||||
manta = Manta()
|
||||
manta.cores.test_core = IOCore(
|
||||
inputs = [probe0, probe1],
|
||||
outputs = [probe2, probe3]
|
||||
)
|
||||
manta.cores.test_core = IOCore(inputs=[probe0, probe1], outputs=[probe2, probe3])
|
||||
|
||||
# Create Temporary File
|
||||
tf = tempfile.NamedTemporaryFile(delete=False)
|
||||
|
|
@ -36,21 +36,12 @@ def test_io_core_dump():
|
|||
expected = {
|
||||
"cores": {
|
||||
"test_core": {
|
||||
"type": "io",
|
||||
"inputs": {
|
||||
"probe0": 1,
|
||||
"probe1": 2
|
||||
},
|
||||
"type": "io",
|
||||
"inputs": {"probe0": 1, "probe1": 2},
|
||||
"outputs": {
|
||||
"probe2": {
|
||||
"width": 3,
|
||||
"initial_value": 0
|
||||
},
|
||||
"probe3": {
|
||||
"width": 4,
|
||||
"initial_value": 13
|
||||
}
|
||||
}
|
||||
"probe2": {"width": 3, "initial_value": 0},
|
||||
"probe3": {"width": 4, "initial_value": 13},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -58,13 +49,14 @@ def test_io_core_dump():
|
|||
if data != expected:
|
||||
raise ValueError("Exported YAML does not match configuration!")
|
||||
|
||||
|
||||
def test_memory_core_dump():
|
||||
# Create Manta instance
|
||||
manta = Manta()
|
||||
manta.cores.test_core = MemoryCore(
|
||||
mode = "bidirectional",
|
||||
width = 32,
|
||||
depth = 1024,
|
||||
mode="bidirectional",
|
||||
width=32,
|
||||
depth=1024,
|
||||
)
|
||||
|
||||
# Create Temporary File
|
||||
|
|
@ -82,10 +74,10 @@ def test_memory_core_dump():
|
|||
expected = {
|
||||
"cores": {
|
||||
"test_core": {
|
||||
"type": "memory",
|
||||
"mode":"bidirectional",
|
||||
"width":32,
|
||||
"depth":1024
|
||||
"type": "memory",
|
||||
"mode": "bidirectional",
|
||||
"width": 32,
|
||||
"depth": 1024,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -93,15 +85,53 @@ def test_memory_core_dump():
|
|||
if data != expected:
|
||||
raise ValueError("Exported YAML does not match configuration!")
|
||||
|
||||
|
||||
def test_logic_analyzer_core_dump():
|
||||
raise ValueError
|
||||
# Create some dummy signals to pass to the Logic Analyzer
|
||||
probe0 = Signal(1)
|
||||
probe1 = Signal(2)
|
||||
probe2 = Signal(3)
|
||||
|
||||
# Create Manta instance
|
||||
manta = Manta()
|
||||
manta.cores.test_core = LogicAnalyzerCore(
|
||||
sample_depth=2048, probes=[probe0, probe1, probe2]
|
||||
)
|
||||
|
||||
# Create Temporary File
|
||||
tf = tempfile.NamedTemporaryFile(delete=False)
|
||||
tf.close()
|
||||
|
||||
# Export Manta configuration
|
||||
manta.export_config(tf.name)
|
||||
|
||||
# Parse the exported YAML
|
||||
with open(tf.name, "r") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
print(tf.name)
|
||||
|
||||
# Verify that exported YAML matches configuration
|
||||
expected = {
|
||||
"cores": {
|
||||
"test_core": {
|
||||
"type": "logic_analyzer",
|
||||
"sample_depth": 2048,
|
||||
"trigger_location": 1024,
|
||||
"probes": {"probe0": 1, "probe1": 2, "probe2": 3},
|
||||
"triggers": [],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if data != expected:
|
||||
raise ValueError("Exported YAML does not match configuration!")
|
||||
|
||||
|
||||
def test_uart_interface_dump():
|
||||
manta = Manta()
|
||||
manta.interface = UARTInterface(
|
||||
port = "/dev/ttyUSB0",
|
||||
baudrate = 115200,
|
||||
clock_freq = 100e6
|
||||
port="/dev/ttyUSB0", baudrate=115200, clock_freq=100e6
|
||||
)
|
||||
|
||||
# Create Temporary File
|
||||
|
|
@ -120,18 +150,54 @@ def test_uart_interface_dump():
|
|||
"uart": {
|
||||
"port": "/dev/ttyUSB0",
|
||||
"baudrate": 115200,
|
||||
|
||||
# Be careful with the float comparison here, copy-pasting from the
|
||||
# exported YAML seems to have the best results. Otherwise this test
|
||||
# will fail when it shouldn't.
|
||||
"clock_freq": 100000000.0,
|
||||
"chunk_size": 256
|
||||
"chunk_size": 256,
|
||||
}
|
||||
}
|
||||
|
||||
if data != expected:
|
||||
raise ValueError("Exported YAML does not match configuration!")
|
||||
|
||||
def test_ethernet_interface_dump():
|
||||
raise ValueError
|
||||
|
||||
def test_ethernet_interface_dump():
|
||||
manta = Manta()
|
||||
manta.interface = EthernetInterface(
|
||||
fpga_ip_addr="192.168.0.101",
|
||||
host_ip_addr="192.168.0.100",
|
||||
udp_port=2000,
|
||||
phy="",
|
||||
clk_freq=0,
|
||||
)
|
||||
|
||||
# Create Temporary File
|
||||
tf = tempfile.NamedTemporaryFile(delete=False)
|
||||
tf.close()
|
||||
|
||||
# Export Manta configuration
|
||||
manta.export_config(tf.name)
|
||||
|
||||
# Parse the exported YAML
|
||||
with open(tf.name, "r") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
# Verify that exported YAML matches configuration
|
||||
expected = {
|
||||
"ethernet": {
|
||||
"phy": "LiteEthPHYRMII",
|
||||
"vendor": "xilinx",
|
||||
"toolchain": "vivado",
|
||||
# Be careful with the float comparison here, copy-pasting from the
|
||||
# exported YAML seems to have the best results. Otherwise this test
|
||||
# will fail when it shouldn't.
|
||||
"clk_freq": 50000000.0,
|
||||
"refclk_freq": 50000000.0,
|
||||
"fpga_ip_addr": "192.168.0.101",
|
||||
"host_ip_addr": "192.168.0.100",
|
||||
}
|
||||
}
|
||||
|
||||
if data != expected:
|
||||
raise ValueError("Exported YAML does not match configuration!")
|
||||
|
|
|
|||
Loading…
Reference in New Issue