move to private methods in Manta class
This commit is contained in:
parent
b0dcd269bc
commit
ca7d743bf4
|
|
@ -113,7 +113,7 @@ def ports():
|
|||
for port in serial.tools.list_ports.comports():
|
||||
print(port)
|
||||
|
||||
# sometimes macOS will enumerate non-serial devices as serial ports,
|
||||
# Sometimes macOS will enumerate non-serial devices as serial ports,
|
||||
# in which case the PID/VID/serial/location/etc are all None
|
||||
pid = f"0x{port.pid:04X}" if port.pid is not None else "None"
|
||||
vid = f"0x{port.vid:04X}" if port.vid is not None else "None"
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class UDPSourceBridge(Elaboratable):
|
|||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
state = Signal() # can either be 0, for read/write, or 1, for data
|
||||
state = Signal() # Can either be 0, for read/write, or 1, for data
|
||||
rw_buf = Signal().like(self.bus_o.rw)
|
||||
|
||||
# Can always take more data
|
||||
|
|
|
|||
|
|
@ -84,13 +84,13 @@ class IOCore(Elaboratable):
|
|||
initial_value = 0
|
||||
|
||||
if isinstance(attrs, dict):
|
||||
# check that each output probe has only recognized options
|
||||
# Check that each output probe has only recognized options
|
||||
valid_options = ["width", "initial_value"]
|
||||
for option in attrs:
|
||||
if option not in valid_options:
|
||||
warn(f'Ignoring unrecognized option "{option}" in IO core.')
|
||||
|
||||
# check that widths are appropriate
|
||||
# Check that widths are appropriate
|
||||
if "width" not in attrs:
|
||||
raise ValueError(f"No width specified for output probe {name}.")
|
||||
|
||||
|
|
|
|||
|
|
@ -292,7 +292,7 @@ class LogicAnalyzerCapture:
|
|||
Gets the value of a single probe over the capture.
|
||||
"""
|
||||
|
||||
# sum up the widths of all the probes below this one
|
||||
# Sum up the widths of all the probes below this one
|
||||
lower = 0
|
||||
for name, width in self._config["probes"].items():
|
||||
if name == probe_name:
|
||||
|
|
@ -300,7 +300,7 @@ class LogicAnalyzerCapture:
|
|||
|
||||
lower += width
|
||||
|
||||
# add the width of the probe we'd like
|
||||
# Add the width of the probe we'd like
|
||||
upper = lower + self._config["probes"][probe_name]
|
||||
|
||||
total_probe_width = sum(self._config["probes"].values())
|
||||
|
|
@ -342,7 +342,7 @@ class LogicAnalyzerCapture:
|
|||
vcd_file = open(path, "w")
|
||||
|
||||
with VCDWriter(vcd_file, "10 ns", timestamp, "manta") as writer:
|
||||
# each probe has a name, width, and writer associated with it
|
||||
# Each probe has a name, width, and writer associated with it
|
||||
signals = []
|
||||
for name, width in self._config["probes"].items():
|
||||
signal = {
|
||||
|
|
@ -355,19 +355,19 @@ class LogicAnalyzerCapture:
|
|||
|
||||
clock = writer.register_var("manta", "clk", "wire", size=1)
|
||||
|
||||
# include a trigger signal such would be meaningful (ie, we didn't trigger immediately)
|
||||
# Include a trigger signal such would be meaningful (ie, we didn't trigger immediately)
|
||||
if (
|
||||
"trigger_mode" not in self._config
|
||||
or self._config["trigger_mode"] == "single_shot"
|
||||
):
|
||||
trigger = writer.register_var("manta", "trigger", "wire", size=1)
|
||||
|
||||
# add the data to each probe in the vcd file
|
||||
# Add the data to each probe in the vcd file
|
||||
for timestamp in range(0, 2 * len(self._data)):
|
||||
# run the clock
|
||||
# Run the clock
|
||||
writer.change(clock, timestamp, timestamp % 2 == 0)
|
||||
|
||||
# set the trigger (if there is one)
|
||||
# Set the trigger (if there is one)
|
||||
if (
|
||||
"trigger_mode" not in self._config
|
||||
or self._config["trigger_mode"] == "single_shot"
|
||||
|
|
@ -375,7 +375,7 @@ class LogicAnalyzerCapture:
|
|||
triggered = (timestamp // 2) >= self.get_trigger_location()
|
||||
writer.change(trigger, timestamp, triggered)
|
||||
|
||||
# add other signals
|
||||
# Add other signals
|
||||
for signal in signals:
|
||||
var = signal["var"]
|
||||
sample = signal["data"][timestamp // 2]
|
||||
|
|
|
|||
|
|
@ -37,13 +37,13 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
|
|||
return self.registers.get_max_addr()
|
||||
|
||||
def clear_triggers(self):
|
||||
# reset all triggers to disabled with no argument
|
||||
# Reset all triggers to disabled with no argument
|
||||
for p in self._probes:
|
||||
self.registers.set_probe(p.name + "_op", Operations.DISABLE)
|
||||
self.registers.set_probe(p.name + "_arg", 0)
|
||||
|
||||
def set_triggers(self, config):
|
||||
# set triggers
|
||||
# Set triggers
|
||||
for trigger in config["triggers"]:
|
||||
components = trigger.strip().split(" ")
|
||||
|
||||
|
|
|
|||
|
|
@ -9,24 +9,26 @@ from manta.logic_analyzer import LogicAnalyzerCore
|
|||
|
||||
class Manta(Elaboratable):
|
||||
def __init__(self, config):
|
||||
# load config from either a configuration file or a dictionary. Users primarily use the
|
||||
# config file, but the dictionary is included for internal tests.
|
||||
# Load config from either a configuration file or a dictionary.
|
||||
# Users primarily use the config file, but the dictionary is
|
||||
# included for internal tests.
|
||||
|
||||
if isinstance(config, str):
|
||||
self.config = self.read_config_file(config)
|
||||
self._config = self._read_config_file(config)
|
||||
|
||||
if isinstance(config, dict):
|
||||
self.config = config
|
||||
self._config = config
|
||||
|
||||
self.check_config()
|
||||
self._check_config()
|
||||
|
||||
self.interface = self.get_interface()
|
||||
self.cores = self.get_cores()
|
||||
self.add_friendly_core_names()
|
||||
self._get_interface()
|
||||
self._get_cores()
|
||||
self._add_friendly_core_names()
|
||||
|
||||
def read_config_file(self, path):
|
||||
def _read_config_file(self, path):
|
||||
"""
|
||||
Take path to configuration file, and retun the configuration as a python list/dict object.
|
||||
Takes a path to configuration file, and return the configuration as a
|
||||
python dictionary.
|
||||
"""
|
||||
|
||||
extension = path.split(".")[-1]
|
||||
|
|
@ -46,37 +48,46 @@ class Manta(Elaboratable):
|
|||
else:
|
||||
raise ValueError("Unable to recognize configuration file extension.")
|
||||
|
||||
def check_config(self):
|
||||
if "cores" not in self.config:
|
||||
def _check_config(self):
|
||||
if "cores" not in self._config:
|
||||
raise ValueError("No cores specified in configuration file.")
|
||||
|
||||
if not len(self.config["cores"]) > 0:
|
||||
if not len(self._config["cores"]) > 0:
|
||||
raise ValueError("Must specify at least one core.")
|
||||
|
||||
for name, attrs in self.config["cores"].items():
|
||||
# make sure core type is specified
|
||||
for name, attrs in self._config["cores"].items():
|
||||
# Make sure core type is specified
|
||||
if "type" not in attrs:
|
||||
raise ValueError(f"No type specified for core {name}.")
|
||||
|
||||
if attrs["type"] not in ["logic_analyzer", "io", "memory_read_only"]:
|
||||
raise ValueError(f"Unrecognized core type specified for {name}.")
|
||||
|
||||
def get_interface(self):
|
||||
if "uart" in self.config:
|
||||
return UARTInterface.from_config(self.config["uart"])
|
||||
def _get_interface(self):
|
||||
"""
|
||||
Returns an instance of an interface object (UARTInterface or
|
||||
EthernetInterface) configured with the parameters in the
|
||||
config file.
|
||||
"""
|
||||
if "uart" in self._config:
|
||||
self.interface = UARTInterface.from_config(self._config["uart"])
|
||||
|
||||
elif "ethernet" in self.config:
|
||||
return EthernetInterface(self.config["ethernet"])
|
||||
elif "ethernet" in self._config:
|
||||
self.interface = EthernetInterface(self._config["ethernet"])
|
||||
|
||||
else:
|
||||
raise ValueError("No recognized interface specified.")
|
||||
|
||||
def get_cores(self):
|
||||
""" """
|
||||
def _get_cores(self):
|
||||
"""
|
||||
Creates instances of the cores (IOCore, LogicAnalyzerCore,
|
||||
ReadOnlyMemoryCore) specified in the user's configuration, and returns
|
||||
them as a list.
|
||||
"""
|
||||
|
||||
cores = {}
|
||||
self._cores = {}
|
||||
base_addr = 0
|
||||
for name, attrs in self.config["cores"].items():
|
||||
for name, attrs in self._config["cores"].items():
|
||||
if attrs["type"] == "io":
|
||||
core = IOCore.from_config(attrs, base_addr, self.interface)
|
||||
|
||||
|
|
@ -86,7 +97,7 @@ class Manta(Elaboratable):
|
|||
elif attrs["type"] == "memory_read_only":
|
||||
core = ReadOnlyMemoryCore.from_config(attrs, base_addr, self.interface)
|
||||
|
||||
# make sure we're not out of address space
|
||||
# Make sure we're not out of address space
|
||||
if core.get_max_addr() > (2**16) - 1:
|
||||
raise ValueError(
|
||||
f"Ran out of address space to allocate to core {name}."
|
||||
|
|
@ -94,18 +105,16 @@ class Manta(Elaboratable):
|
|||
|
||||
# Make the next core's base address start one address after the previous one's
|
||||
base_addr = core.get_max_addr() + 1
|
||||
cores[name] = core
|
||||
self._cores[name] = core
|
||||
|
||||
return cores
|
||||
|
||||
def add_friendly_core_names(self):
|
||||
def _add_friendly_core_names(self):
|
||||
"""
|
||||
Add cores to the instance under a friendly name - ie, a core named `my_core` belonging
|
||||
to a Manta instance `m` could be obtained with `m.cores["my_core"]`, but this allows
|
||||
it to be obtained with `m.my_core`. Which is way nicer.
|
||||
"""
|
||||
|
||||
for name, instance in self.cores.items():
|
||||
for name, instance in self._cores.items():
|
||||
if not hasattr(self, name):
|
||||
setattr(self, name, instance)
|
||||
|
||||
|
|
@ -115,24 +124,17 @@ class Manta(Elaboratable):
|
|||
)
|
||||
|
||||
def elaborate(self, platform):
|
||||
# make a module object
|
||||
# add all the submodules
|
||||
# connect them together, which consists of:
|
||||
# connect interface to first core
|
||||
# connect cores to each other
|
||||
# connect interface to last core
|
||||
|
||||
m = Module()
|
||||
|
||||
# Add interface as submodule
|
||||
m.submodules.interface = self.interface
|
||||
|
||||
# Add all cores as submodules
|
||||
for name, instance in self.cores.items():
|
||||
for name, instance in self._cores.items():
|
||||
m.submodules[name] = instance
|
||||
|
||||
# Connect first/last cores to interface output/input respectively
|
||||
core_instances = list(self.cores.values())
|
||||
core_instances = list(self._cores.values())
|
||||
first_core = core_instances[0]
|
||||
last_core = core_instances[-1]
|
||||
|
||||
|
|
@ -155,7 +157,7 @@ class Manta(Elaboratable):
|
|||
"""
|
||||
ports = self.interface.get_top_level_ports()
|
||||
|
||||
for name, instance in self.cores.items():
|
||||
for name, instance in self._cores.items():
|
||||
ports += instance.get_top_level_ports()
|
||||
|
||||
return ports
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ class ReadOnlyMemoryCore(Elaboratable):
|
|||
if not width > 0:
|
||||
raise ValueError("Width of memory core must be positive. ")
|
||||
|
||||
cls(width, depth, base_addr, interface)
|
||||
return cls(width, depth, base_addr, interface)
|
||||
|
||||
def _pipeline_bus(self, m):
|
||||
self._bus_pipe = [Signal(InternalBus()) for _ in range(3)]
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ def warn(message):
|
|||
Prints a warning to the user's terminal. Originally the warn() method
|
||||
from the builtin warnings module was used for this, but I don't think the
|
||||
way it outputs on the command line is the most helpful for the users.
|
||||
(They don't care about the stacktrace or the filename/line number, for example.)
|
||||
(ie, Users don't care about the stacktrace or the filename/line number.)
|
||||
"""
|
||||
print("Warning: " + message)
|
||||
|
||||
|
|
@ -68,7 +68,7 @@ def value_to_words(data, n_words):
|
|||
if not isinstance(data, int) or data < 0:
|
||||
raise ValueError("Behavior is only defined for nonnegative integers.")
|
||||
|
||||
# convert to binary, split into 16-bit chunks, and then convert back to list of int
|
||||
# Convert to binary, split into 16-bit chunks, and then convert back to list of int
|
||||
binary = f"{data:0b}".zfill(n_words * 16)
|
||||
return [int(binary[i : i + 16], 2) for i in range(0, 16 * n_words, 16)][::-1]
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ def verify_register(module, addr, expected_data):
|
|||
possible to return a value from here, and compare it in the calling function.
|
||||
"""
|
||||
|
||||
# place read transaction on the bus
|
||||
# Place read transaction on the bus
|
||||
yield module.bus_i.addr.eq(addr)
|
||||
yield module.bus_i.data.eq(0)
|
||||
yield module.bus_i.rw.eq(0)
|
||||
|
|
@ -122,11 +122,11 @@ def verify_register(module, addr, expected_data):
|
|||
yield module.bus_i.addr.eq(0)
|
||||
yield module.bus_i.valid.eq(0)
|
||||
|
||||
# wait for output to be valid
|
||||
# Wait for output to be valid
|
||||
while not (yield module.bus_o.valid):
|
||||
yield
|
||||
|
||||
# compare returned value with expected
|
||||
# Compare returned value with expected
|
||||
data = yield (module.bus_o.data)
|
||||
if data != expected_data:
|
||||
raise ValueError(f"Read from {addr} yielded {data} instead of {expected_data}")
|
||||
|
|
|
|||
Loading…
Reference in New Issue