add docstrings

This commit is contained in:
Fischer Moseley 2024-02-17 16:07:29 -08:00
parent 0e1bc30802
commit ea6b3f73b9
9 changed files with 167 additions and 105 deletions

View File

@ -12,6 +12,9 @@ class EthernetInterface(Elaboratable):
Provides methods for generating synthesizable logic for the FPGA, as well
as methods for reading and writing to memory by the host.
More information available in the online documentation at:
https://fischermoseley.github.io/manta/ethernet_interface/
"""
def __init__(self, config):

View File

@ -5,20 +5,26 @@ from math import ceil
class IOCore(Elaboratable):
"""
Contains the HDL to instantiate an IO core on a FPGA, and the functions to interact with it. For
more information on the core itself, check out the IO core documentation.
A module for setting and getting the values of registers of arbitrary size
on a FPGA.
Provides methods for generating synthesizable logic for the FPGA, as well
as methods for reading and writing the value of a register.
More information available in the online documentation at:
https://fischermoseley.github.io/manta/io_core/
"""
def __init__(self, config, base_addr, interface):
self.config = config
self._config = config
self.base_addr = base_addr
self.interface = interface
self._check_config(self._config)
self.check_config(self.config)
self.define_signals()
self.mmap, self.max_addr = self.assign_memory()
def check_config(self, config):
def _check_config(self, config):
# make sure ports are defined
if "inputs" not in config and "outputs" not in config:
raise ValueError("No input or output ports specified.")
@ -91,14 +97,14 @@ class IOCore(Elaboratable):
self.bus_o = Signal(InternalBus())
# Input Probes (and buffers)
if "inputs" in self.config:
for name, width in self.config["inputs"].items():
if "inputs" in self._config:
for name, width in self._config["inputs"].items():
setattr(self, name, Signal(width, name=name))
setattr(self, name + "_buf", Signal(width, name=name + "_buf"))
# Output Probes (and buffers)
if "outputs" in self.config:
for name, attrs in self.config["outputs"].items():
if "outputs" in self._config:
for name, attrs in self._config["outputs"].items():
if isinstance(attrs, dict):
width = attrs["width"]
initial_value = attrs["initial_value"]
@ -148,11 +154,11 @@ class IOCore(Elaboratable):
# Add all input and output probes
all_probes = {}
if "inputs" in self.config:
all_probes = {**all_probes, **self.config["inputs"]}
if "inputs" in self._config:
all_probes = {**all_probes, **self._config["inputs"]}
if "outputs" in self.config:
all_probes = {**all_probes, **self.config["outputs"]}
if "outputs" in self._config:
all_probes = {**all_probes, **self._config["outputs"]}
for name, attrs in all_probes.items():
# Handle output probes that might have initial value specified in addition to width
@ -184,15 +190,15 @@ class IOCore(Elaboratable):
# Update buffers from probes
with m.If(self.strobe):
# Input buffers
if "inputs" in self.config:
for name in self.config["inputs"]:
if "inputs" in self._config:
for name in self._config["inputs"]:
input_probe = getattr(self, name)
input_probe_buf = getattr(self, name + "_buf")
m.d.sync += input_probe_buf.eq(input_probe)
# Output buffers
if "outputs" in self.config:
for name in self.config["outputs"]:
if "outputs" in self._config:
for name in self._config["outputs"]:
output_probe = getattr(self, name)
output_probe_buf = getattr(self, name + "_buf")
m.d.sync += output_probe.eq(output_probe_buf)
@ -213,23 +219,31 @@ class IOCore(Elaboratable):
return m
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
ports = []
if "inputs" in self.config:
for name in self.config["inputs"].keys():
if "inputs" in self._config:
for name in self._config["inputs"].keys():
ports.append(getattr(self, name))
if "outputs" in self.config:
for name in self.config["outputs"].keys():
if "outputs" in self._config:
for name in self._config["outputs"].keys():
ports.append(getattr(self, name))
return ports
def get_max_addr(self):
"""
Return the maximum addresses in memory used by the core. The address space used
by the core extends from `base_addr` to the number returned by this function.
"""
return self.max_addr
def set_probe(self, probe_name, value):
# check that probe is an output probe
if probe_name not in self.config["outputs"]:
if probe_name not in self._config["outputs"]:
raise ValueError(f"Output probe '{probe_name}' not found.")
# check that value is an integer
@ -237,7 +251,7 @@ class IOCore(Elaboratable):
raise ValueError("Value must be an integer.")
# get the width of the probe, make sure value isn't too large for the probe
attrs = self.config["outputs"][probe_name]
attrs = self._config["outputs"][probe_name]
if isinstance(attrs, int):
width = attrs

View File

@ -8,28 +8,14 @@ from manta.logic_analyzer.playback import LogicAnalyzerPlayback
class LogicAnalyzerCore(Elaboratable):
"""
A logic analzyer, implemented in the FPGA fabric. Connects to the rest of the cores
over Manta's internal bus, and may be operated from a user's machine through the Python API.
Parameters:
----------
config : dict
Configuration options. This is taken from the section of Manta's configuration YAML that
describes the core.
base_addr : int
Where to place the core in Manta's internal memory map. This determines the beginning of
the core's address space. The end of the core's address space may be obtained by calling
the get_max_addr() method.
interface : UARTInterface or EthernetInterface
The interface used to communicate with the core.
Attributes:
----------
None
A module for generating a logic analyzer on the FPGA, with configurable
triggers, trigger position, and trigger modes.
Provides methods for generating synthesizable logic for the FPGA, as well
as methods for reading and writing the value of a register.
More information available in the online documentation at:
https://fischermoseley.github.io/manta/logic_analyzer_core/
"""
def __init__(self, config, base_addr, interface):
@ -186,6 +172,10 @@ class LogicAnalyzerCore(Elaboratable):
return m
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
return self.probes
def get_probe(self, name):
@ -196,6 +186,10 @@ class LogicAnalyzerCore(Elaboratable):
raise ValueError(f"Probe '{name}' not found in Logic Analyzer core.")
def get_max_addr(self):
"""
Return the maximum addresses in memory used by the core. The address space used
by the core extends from `base_addr` to the number returned by this function.
"""
return self.sample_mem.get_max_addr()
def capture(self, verbose=False):

View File

@ -19,7 +19,11 @@ class TriggerModes(IntEnum):
class LogicAnalyzerFSM(Elaboratable):
""" """
"""
A module containing the state machine for a LogicAnalyzerCore. Primarily
responsible for controlling the write port of the Logic Analyzer's sample
memory in each trigger mode (immediate, incremental, single-shot).
"""
def __init__(self, config, base_addr, interface):
self.config = config
@ -47,6 +51,10 @@ class LogicAnalyzerFSM(Elaboratable):
self.bus_o = self.r.bus_o
def get_max_addr(self):
"""
Return the maximum addresses in memory used by the core. The address space used
by the core extends from `base_addr` to the number returned by this function.
"""
return self.r.get_max_addr()
def increment_mod_sample_depth(self, m, signal):

View File

@ -72,4 +72,8 @@ class LogicAnalyzerPlayback(Elaboratable):
return m
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the
exported Verilog module.
"""
return [self.start, self.valid] + list(self.top_level_probes.values())

View File

@ -3,7 +3,12 @@ from manta.io_core import IOCore
class LogicAnalyzerTriggerBlock(Elaboratable):
""" """
"""
A module containing an instance of a LogicAnalyzerTrigger for each input
probe. The operations and arguments of these LogicAnalyzerTriggers are set
with an internal IOCore, which is connected to the internal bus, and allows
the triggers to be reprogrammed without reflashing the FPGA.
"""
def __init__(self, probes, base_addr, interface):
# Instantiate a bunch of trigger blocks
@ -26,6 +31,10 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
self.trig = Signal(1)
def get_max_addr(self):
"""
Return the maximum addresses in memory used by the core. The address space used
by the core extends from `base_addr` to the number returned by this function.
"""
return self.r.get_max_addr()
def clear_triggers(self):
@ -71,6 +80,12 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
class LogicAnalyzerTrigger(Elaboratable):
"""
A module containing a programmable "trigger" for a given input signal,
which asserts its output when the programmed "trigger condition" is met.
This condition is programmed through the `op` and `arg` inputs.
"""
def __init__(self, signal):
self.operations = {
"DISABLE": 0,

View File

@ -149,6 +149,10 @@ class Manta(Elaboratable):
return m
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
ports = self.interface.get_top_level_ports()
for name, instance in self.cores.items():

View File

@ -4,21 +4,39 @@ from math import ceil
class ReadOnlyMemoryCore(Elaboratable):
"""
A module for generating a memory on the FPGA, with a read port tied to
Manta's internal bus, and a write port provided to user logic.
Provides methods for generating synthesizable logic for the FPGA, as well
as methods for reading and writing the value of a register.
More information available in the online documentation at:
https://fischermoseley.github.io/manta/memory_core/
"""
def __init__(self, config, base_addr, interface):
self.config = config
self.base_addr = base_addr
self.interface = interface
self._config = config
self._base_addr = base_addr
self._interface = interface
self._check_config(config)
self.check_config(config)
self._depth = self._config["depth"]
self._width = self._config["width"]
self._max_addr = self._base_addr + (self._depth * ceil(self._width / 16))
self.depth = self.config["depth"]
self.width = self.config["width"]
self.max_addr = self.base_addr + (self.depth * ceil(self.width / 16))
# Bus Connections
self.bus_i = Signal(InternalBus())
self.bus_o = Signal(InternalBus())
self.define_signals()
self.define_mems()
# User Port
self.user_addr = Signal(range(self._depth))
self.user_data = Signal(self._width)
self.user_we = Signal(1)
def check_config(self, config):
self._define_mems()
def _check_config(self, config):
# Check for unrecognized options
valid_options = ["type", "depth", "width"]
for option in config:
@ -45,56 +63,46 @@ class ReadOnlyMemoryCore(Elaboratable):
if config["width"] <= 0:
raise ValueError("Width of memory core must be positive. ")
def define_signals(self):
# Bus Input/Pipelining/Output
self.bus_i = Signal(InternalBus())
self.bus_pipe = [Signal(InternalBus()) for _ in range(3)]
self.bus_o = Signal(InternalBus())
# User Port
self.user_addr = Signal(range(self.depth))
self.user_data = Signal(self.width)
self.user_we = Signal(1)
def pipeline_bus(self, m):
m.d.sync += self.bus_pipe[0].eq(self.bus_i)
def _pipeline_bus(self, m):
self._bus_pipe = [Signal(InternalBus()) for _ in range(3)]
m.d.sync += self._bus_pipe[0].eq(self.bus_i)
for i in range(1, 3):
m.d.sync += self.bus_pipe[i].eq(self.bus_pipe[i - 1])
m.d.sync += self._bus_pipe[i].eq(self._bus_pipe[i - 1])
m.d.sync += self.bus_o.eq(self.bus_pipe[2])
m.d.sync += self.bus_o.eq(self._bus_pipe[2])
def define_mems(self):
# ok there's three cases:
# 1. integer number of 16 bit mems
# 2. integer number of 16 bit mems + partial mem
# 3. just the partial mem (width < 16)
def _define_mems(self):
# There's three cases that must be handled:
# 1. Integer number of 16 bit mems
# 2. Integer number of 16 bit mems + partial mem
# 3. Just the partial mem (width < 16)
# Only one, partial-width memory is needed
if self.width < 16:
self.mems = [Memory(depth=self.depth, width=self.width)]
if self._width < 16:
self._mems = [Memory(depth=self._depth, width=self._width)]
# Only full-width memories are needed
elif self.width % 16 == 0:
self.mems = [
Memory(depth=self.depth, width=16) for _ in range(self.width // 16)
elif self._width % 16 == 0:
self._mems = [
Memory(depth=self._depth, width=16) for _ in range(self._width // 16)
]
# Both full-width and partial memories are needed
else:
self.mems = [
Memory(depth=self.depth, width=16) for i in range(self.width // 16)
self._mems = [
Memory(depth=self._depth, width=16) for i in range(self._width // 16)
]
self.mems += [Memory(depth=self.depth, width=self.width % 16)]
self._mems += [Memory(depth=self._depth, width=self._width % 16)]
def handle_read_ports(self, m):
def _handle_read_ports(self, m):
# These are tied to the bus
for i, mem in enumerate(self.mems):
for i, mem in enumerate(self._mems):
read_port = mem.read_port()
m.d.comb += read_port.en.eq(1)
start_addr = self.base_addr + (i * self.depth)
stop_addr = start_addr + self.depth - 1
start_addr = self._base_addr + (i * self._depth)
stop_addr = start_addr + self._depth - 1
# Throw BRAM operations into the front of the pipeline
with m.If(
@ -107,16 +115,16 @@ class ReadOnlyMemoryCore(Elaboratable):
# Pull BRAM reads from the back of the pipeline
with m.If(
(self.bus_pipe[2].valid)
& (~self.bus_pipe[2].rw)
& (self.bus_pipe[2].addr >= start_addr)
& (self.bus_pipe[2].addr <= stop_addr)
(self._bus_pipe[2].valid)
& (~self._bus_pipe[2].rw)
& (self._bus_pipe[2].addr >= start_addr)
& (self._bus_pipe[2].addr <= stop_addr)
):
m.d.sync += self.bus_o.data.eq(read_port.data)
def handle_write_ports(self, m):
def _handle_write_ports(self, m):
# These are given to the user
for i, mem in enumerate(self.mems):
for i, mem in enumerate(self._mems):
write_port = mem.write_port()
m.d.comb += write_port.addr.eq(self.user_addr)
@ -127,19 +135,27 @@ class ReadOnlyMemoryCore(Elaboratable):
m = Module()
# Add memories as submodules
for i, mem in enumerate(self.mems):
for i, mem in enumerate(self._mems):
m.submodules[f"mem_{i}"] = mem
self.pipeline_bus(m)
self.handle_read_ports(m)
self.handle_write_ports(m)
self._pipeline_bus(m)
self._handle_read_ports(m)
self._handle_write_ports(m)
return m
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
return [self.user_addr, self.user_data, self.user_we]
def get_max_addr(self):
return self.max_addr
"""
Return the maximum addresses in memory used by the core. The address space used
by the core extends from `base_addr` to the number returned by this function.
"""
return self._max_addr
def read_from_user_addr(self, addrs):
"""
@ -147,8 +163,9 @@ class ReadOnlyMemoryCore(Elaboratable):
"""
# Convert user address space to bus address space
# (for instance, for a core with base address 10 and width 33,
# reading from address 4 is actually a read from address 14 and address 14 + depth, and address 14 + 2*depth)
# (for instance, for a core with base address 10 and width 33,
# reading from address 4 is actually a read from address 14
# and address 14 + depth, and address 14 + 2*depth)
if isinstance(addrs, int):
return self.read_from_user_addr([addrs])[0]
@ -156,11 +173,11 @@ class ReadOnlyMemoryCore(Elaboratable):
bus_addrs = []
for addr in addrs:
bus_addrs += [
addr + self.base_addr + i * self.depth for i in range(len(self.mems))
addr + self._base_addr + i * self._depth for i in range(len(self._mems))
]
datas = self.interface.read(bus_addrs)
data_chunks = split_into_chunks(datas, len(self.mems))
datas = self._interface.read(bus_addrs)
data_chunks = split_into_chunks(datas, len(self._mems))
return [words_to_value(chunk) for chunk in data_chunks]
# def write_to_user_addr(self, addrs, datas):
@ -171,11 +188,11 @@ class ReadOnlyMemoryCore(Elaboratable):
# bus_addrs = []
# for addr in addrs:
# bus_addrs += [
# addr + self.base_addr + i * self.depth for i in range(len(self.mems))
# addr + self._base_addr + i * self._depth for i in range(len(self._mems))
# ]
# bus_datas = []
# for data in datas:
# bus_datas += value_to_words(data)
# self.interface.write(bus_addrs, bus_datas)
# self._interface.write(bus_addrs, bus_datas)

View File

@ -13,6 +13,9 @@ class UARTInterface(Elaboratable):
Provides methods for generating synthesizable logic for the FPGA, as well
as methods for reading and writing to memory by the host.
More information available in the online documentation at:
https://fischermoseley.github.io/manta/uart_interface/
"""
def __init__(self, port, baudrate, clock_freq, chunk_size=256):