meta: add boilerplate for Amaranth-native API

This commit is contained in:
Fischer Moseley 2024-07-28 19:22:02 -07:00
parent 3c6ec65511
commit 3c2b69579d
11 changed files with 513 additions and 131 deletions

View File

@ -87,7 +87,7 @@ Most logic analyzers use a single-shot capture by default, so Manta will do the
Once you have your Logic Analyzer core on the FPGA, you can capture data with:
```bash
manta capture [config file] [LA core] [output filename] [[additional output filenames]...]
manta capture [config_file] [la_core_name] [output path] [[additional output paths]...]
```
The capture may be exported as either a VCD or CSV file. If `manta.yaml` contained the configuration at the top of this page, then the following would export a .vcd file containing the captured waveform:

View File

@ -0,0 +1,72 @@
from amaranth import *
from amaranth.lib import io
from manta import *
from manta.io_core import IOCore
from manta.uart import UARTInterface
from time import sleep
class UARTIOCoreExample(Elaboratable):
def __init__(self, platform, port):
self.platform = platform
# Create Manta instance
self.manta = Manta()
# Configure it to communicate over Ethernet
self.manta.interface = UARTInterface(
port=port,
baudrate=2000000,
clock_freq=platform.default_clk_frequency,
)
# Autodetect the number of LEDs on the platform
resources = platform.resources.keys()
self.n_leds = max([i for name, i in resources if name == "led"])
# Add IOCore to Manta instance
self.leds = Signal(self.n_leds)
self.manta.cores.io = IOCore(outputs=[self.leds])
def elaborate(self, platform):
m = Module()
# Add Manta as a submodule
m.submodules.manta = self.manta
# Wire each LED to Manta's IO Core output
for i in range(self.n_leds):
led = io.Buffer("o", platform.request("led", i, dir="-"))
m.d.comb += led.o.eq(self.leds[i])
m.submodules += led
# Wire UART pins to the Manta instance
uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"})
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
m.d.comb += self.manta.interface.rx.eq(uart_rx.i)
m.d.comb += uart_tx.o.eq(self.manta.interface.tx)
return m
def test(self):
# Build and program the FPGA
self.platform.build(self, do_program=True)
# Iterate through all the LEDs, blinking them off and on
i = 0
while True:
self.manta.cores.io.set_probe("leds", 1 << i)
i = (i + 1) % self.n_leds
sleep(0.1)
# Amaranth has a built-in build system, and well as a set of platform
# definitions for a huge number of FPGA boards. The class defined above is
# very generic, as it specifies a design independent of any particular FGPA
# board. This means that by changing which platform you pass UARTIOCoreExample
# below, you can port this example to any FPGA board!
from amaranth_boards.icestick import ICEStickPlatform
UARTIOCoreExample(platform=ICEStickPlatform(), port="auto").test()

View File

@ -0,0 +1,73 @@
from amaranth import *
from amaranth.lib import io
from manta import *
from manta.memory_core import MemoryCore
from manta.uart import UARTInterface
class UARTMemoryCoreExample(Elaboratable):
def __init__(self, platform, port):
self.platform = platform
# Create Manta instance
self.manta = Manta()
# Configure it to communicate over Ethernet
self.manta.interface = UARTInterface(
port=port,
baudrate=2000000,
clock_freq=platform.default_clk_frequency,
)
# Add IOCore to Manta instance
self.manta.cores.mem = MemoryCore(mode="host_to_fpga", width=16, depth=512)
def elaborate(self, platform):
m = Module()
# Add Manta as a submodule
m.submodules.manta = self.manta
# Wire each LED to the data output of the memory core
for i in range(16):
led = io.Buffer("o", platform.request("led", i, dir="-"))
m.d.comb += led.o.eq(self.manta.cores.mem.user_data_out[i])
m.submodules += led
# Wire each switch to the address input of the memory core
for i in range(9):
sw = io.Buffer("i", platform.request("switch", i, dir="-"))
m.d.comb += self.manta.cores.mem.user_addr[i].eq(sw.i)
m.submodules += sw
# Wire UART pins to the Manta instance
uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"})
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
m.d.comb += self.manta.interface.rx.eq(uart_rx.i)
m.d.comb += uart_tx.o.eq(self.manta.interface.tx)
return m
def test(self):
# Build and program the FPGA
self.platform.build(self, do_program=True)
# Iterate through all the LEDs, blinking them off and on
i = 0
for i in range(512):
self.manta.cores.mem.write(i, i)
# Amaranth has a built-in build system, and well as a set of platform
# definitions for a huge number of FPGA boards. The class defined above is
# very generic, as it specifies a design independent of any particular FGPA
# board. This means that by changing which platform you pass UARTIOCoreExample
# below, you can port this example to any FPGA board!
from amaranth_boards.nexys4ddr import Nexys4DDRPlatform
UARTMemoryCoreExample(
platform=Nexys4DDRPlatform(),
port="auto",
).test()

View File

@ -41,9 +41,9 @@ Usage:
Generate a copy-pasteable Verilog snippet to instantiate Manta
in your design.
capture [config_file] [la_core_name] [vcd_file] [verilog_file]
Start a capture on the specified core, and save the results to a .vcd
or .v file at the provided path(s).
capture [config_file] [la_core_name] [output path] [[additional output paths]...]
Start a capture on the specified core, and save the results to a .vcd,
.csv, or .v file at the provided path(s).
ports
List all available serial ports.
@ -70,9 +70,7 @@ def wrong_args():
def gen(config_path, output_path):
m = Manta(config_path)
with open(output_path, "w") as f:
f.write(m.generate_verilog())
m.generate_verilog(output_path)
def inst(config_path):
@ -105,10 +103,6 @@ def capture(config_path, logic_analyzer_name, export_paths):
warn(f"Unrecognized file type, skipping {path}.")
def mmap(config_path):
print(Manta(config_path).mmap())
def ports():
import serial.tools.list_ports

View File

@ -83,6 +83,9 @@ class EthernetInterface(Elaboratable):
if not 0 <= int(byte) <= 255:
raise ValueError(f"Invalid byte in FPGA IP: {byte}")
def to_config(self):
return self._config
def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the

View File

@ -15,9 +15,7 @@ class IOCore(MantaCore):
https://fischermoseley.github.io/manta/io_core/
"""
def __init__(self, base_addr, interface, inputs=[], outputs=[]):
self._base_addr = base_addr
self._interface = interface
def __init__(self, inputs=[], outputs=[]):
self._inputs = inputs
self._outputs = outputs
@ -32,14 +30,13 @@ class IOCore(MantaCore):
Signal(len(p), name=p.name + "_buf", init=p.init) for p in self._outputs
]
self._make_memory_map()
@property
def top_level_ports(self):
return self._inputs + self._outputs
@property
def max_addr(self):
self._make_memory_map()
return self._max_addr
@classmethod
@ -122,18 +119,32 @@ class IOCore(MantaCore):
return cls(base_addr, interface, inputs=input_signals, outputs=output_signals)
def to_config(self):
config = {}
config["type"] = "io"
if self._inputs:
config["inputs"] = {s.name: len(s) for s in self._inputs}
if self._outputs:
config["outputs"] = {}
for s in self._outputs:
config["outputs"][s.name] = {"width": len(s), "initial_value": s.init}
return config
def _make_memory_map(self):
self._memory_map = {}
# Add strobe register
self._memory_map["strobe"] = dict(
signals=[self._strobe], addrs=[self._base_addr]
signals=[self._strobe], addrs=[self.base_addr]
)
# Assign memory to all inputs and outputs
ios = self._inputs + self._outputs
io_bufs = self._input_bufs + self._output_bufs
last_used_addr = self._base_addr
last_used_addr = self.base_addr
for io, io_buf in zip(ios, io_bufs):
n_slices = ceil(len(io) / 16)
@ -204,12 +215,12 @@ class IOCore(MantaCore):
# Write value to core
addrs = probe["addrs"]
datas = value_to_words(value, len(addrs))
self._interface.write(addrs, datas)
self.interface.write(addrs, datas)
# Pulse strobe register
self._interface.write(self._base_addr, 0)
self._interface.write(self._base_addr, 1)
self._interface.write(self._base_addr, 0)
self.interface.write(self.base_addr, 0)
self.interface.write(self.base_addr, 1)
self.interface.write(self.base_addr, 0)
def get_probe(self, name):
"""
@ -225,10 +236,10 @@ class IOCore(MantaCore):
raise KeyError(f"Probe with name {name} not found in IO core.")
# Pulse strobe register
self._interface.write(self._base_addr, 0)
self._interface.write(self._base_addr, 1)
self._interface.write(self._base_addr, 0)
self.interface.write(self.base_addr, 0)
self.interface.write(self.base_addr, 1)
self.interface.write(self.base_addr, 0)
# Get value from buffer
datas = self._interface.read(probe["addrs"])
datas = self.interface.read(probe["addrs"])
return words_to_value(datas)

View File

@ -4,122 +4,138 @@ from manta.ethernet import EthernetInterface
from manta.io_core import IOCore
from manta.memory_core import MemoryCore
from manta.logic_analyzer import LogicAnalyzerCore
from manta.utils import *
class Manta(Elaboratable):
def __init__(self, config):
# Load config from either a configuration file or a dictionary.
# Users primarily use the config file, but the dictionary is
# included for internal tests.
def __init__(self):
self._interface = None
self.cores = CoreContainer(self)
if isinstance(config, str):
self._config = self._read_config_file(config)
# This treats the `interface` attribute as a property, which allows the
# setter to update the interfaces of all the cores in self.cores whenever
# the user assigns to Manta's `interface` object.
@property
def interface(self):
return self._interface
if isinstance(config, dict):
self._config = config
@interface.setter
def interface(self, value):
self._interface = value
for core in self.cores._cores.values():
core.interface = value
self._check_config()
# def __init__(self, config):
# # Load config from either a configuration file or a dictionary.
# # Users primarily use the config file, but the dictionary is
# # included for internal tests.
self._get_interface()
self._get_cores()
self._add_friendly_core_names()
# if isinstance(config, str):
# self._config = self._read_config_file(config)
def _read_config_file(self, path):
"""
Takes a path to configuration file, and return the configuration as a
python dictionary.
"""
# if isinstance(config, dict):
# self._config = config
extension = path.split(".")[-1]
# self._check_config()
if "json" in extension:
with open(path, "r") as f:
import json
# self._get_interface()
# self._get_cores()
# self._add_friendly_core_names()
return json.load(f)
# def _read_config_file(self, path):
# """
# Takes a path to configuration file, and return the configuration as a
# python dictionary.
# """
elif "yaml" in extension or "yml" in extension:
with open(path, "r") as f:
import yaml
# extension = path.split(".")[-1]
return yaml.safe_load(f)
# if "json" in extension:
# import json
# with open(path, "r") as f:
# return json.load(f)
else:
raise ValueError("Unable to recognize configuration file extension.")
# elif "yaml" in extension or "yml" in extension:
# import yaml
# with open(path, "r") as f:
# return yaml.safe_load(f)
def _check_config(self):
if "cores" not in self._config:
raise ValueError("No cores specified in configuration file.")
# else:
# raise ValueError("Unable to recognize configuration file extension.")
if not len(self._config["cores"]) > 0:
raise ValueError("Must specify at least one core.")
# def _check_config(self):
# if "cores" not in self._config:
# raise ValueError("No cores specified in configuration file.")
for name, attrs in self._config["cores"].items():
# Make sure core type is specified
if "type" not in attrs:
raise ValueError(f"No type specified for core {name}.")
# if not len(self._config["cores"]) > 0:
# raise ValueError("Must specify at least one core.")
if attrs["type"] not in ["logic_analyzer", "io", "memory"]:
raise ValueError(f"Unrecognized core type specified for {name}.")
# for name, attrs in self._config["cores"].items():
# # Make sure core type is specified
# if "type" not in attrs:
# raise ValueError(f"No type specified for core {name}.")
def _get_interface(self):
"""
Returns an instance of an interface object (UARTInterface or
EthernetInterface) configured with the parameters in the
config file.
"""
if "uart" in self._config:
self.interface = UARTInterface.from_config(self._config["uart"])
# if attrs["type"] not in ["logic_analyzer", "io", "memory"]:
# raise ValueError(f"Unrecognized core type specified for {name}.")
elif "ethernet" in self._config:
self.interface = EthernetInterface(self._config["ethernet"])
# def _get_interface(self):
# """
# Returns an instance of an interface object (UARTInterface or
# EthernetInterface) configured with the parameters in the
# config file.
# """
# if "uart" in self._config:
# self.interface = UARTInterface.from_config(self._config["uart"])
else:
raise ValueError("No recognized interface specified.")
# elif "ethernet" in self._config:
# self.interface = EthernetInterface(self._config["ethernet"])
def _get_cores(self):
"""
Creates instances of the cores (IOCore, LogicAnalyzerCore, MemoryCore)
specified in the user's configuration, and returns them as a list.
"""
# else:
# raise ValueError("No recognized interface specified.")
self._cores = {}
base_addr = 0
for name, attrs in self._config["cores"].items():
if attrs["type"] == "io":
core = IOCore.from_config(attrs, base_addr, self.interface)
# def _get_cores(self):
# """
# Creates instances of the cores (IOCore, LogicAnalyzerCore, MemoryCore)
# specified in the user's configuration, and returns them as a list.
# """
elif attrs["type"] == "logic_analyzer":
core = LogicAnalyzerCore(attrs, base_addr, self.interface)
# self._cores = {}
# base_addr = 0
# for name, attrs in self._config["cores"].items():
# if attrs["type"] == "io":
# core = IOCore.from_config(attrs, base_addr, self.interface)
elif attrs["type"] == "memory":
core = MemoryCore.from_config(attrs, base_addr, self.interface)
# elif attrs["type"] == "logic_analyzer":
# core = LogicAnalyzerCore(attrs, base_addr, self.interface)
# Make sure we're not out of address space
if core.max_addr > (2**16) - 1:
raise ValueError(
f"Ran out of address space to allocate to core {name}."
)
# elif attrs["type"] == "memory":
# core = MemoryCore.from_config(attrs, base_addr, self.interface)
# Make the next core's base address start one address after the previous one's
base_addr = core.max_addr + 1
self._cores[name] = core
# # Make sure we're not out of address space
# if core.max_addr > (2**16) - 1:
# raise ValueError(
# f"Ran out of address space to allocate to core {name}."
# )
def _add_friendly_core_names(self):
"""
Add cores to the instance under a friendly name - ie, a core named `my_core` belonging
to a Manta instance `m` could be obtained with `m.cores["my_core"]`, but this allows
it to be obtained with `m.my_core`. Which is way nicer.
"""
# # Make the next core's base address start one address after the previous one's
# base_addr = core.max_addr + 1
# self._cores[name] = core
for name, instance in self._cores.items():
if not hasattr(self, name):
setattr(self, name, instance)
# def _add_friendly_core_names(self):
# """
# Add cores to the instance under a friendly name - ie, a core named `my_core` belonging
# to a Manta instance `m` could be obtained with `m.cores["my_core"]`, but this allows
# it to be obtained with `m.my_core`. Which is way nicer.
# """
else:
raise ValueError(
"Cannot add object to Manta instance - name is already taken!"
)
# for name, instance in self._cores.items():
# if not hasattr(self, name):
# setattr(self, name, instance)
# else:
# raise ValueError(
# "Cannot add object to Manta instance - name is already taken!"
# )
def elaborate(self, platform):
m = Module()
@ -128,11 +144,11 @@ class Manta(Elaboratable):
m.submodules.interface = self.interface
# Add all cores as submodules
for name, instance in self._cores.items():
for name, instance in self.cores._cores.items():
m.submodules[name] = instance
# Connect first/last cores to interface output/input respectively
core_instances = list(self._cores.values())
core_instances = list(self.cores._cores.values())
first_core = core_instances[0]
last_core = core_instances[-1]
@ -155,12 +171,12 @@ class Manta(Elaboratable):
"""
ports = self.interface.get_top_level_ports()
for name, instance in self._cores.items():
for name, instance in self.cores._cores.items():
ports += instance.top_level_ports
return ports
def generate_verilog(self, strip_internal_attrs=False):
def generate_verilog(self, path, strip_internal_attrs=False):
from amaranth.back import verilog
output = verilog.convert(
@ -180,4 +196,27 @@ class Manta(Elaboratable):
if isinstance(self.interface, EthernetInterface):
output += self.interface.generate_liteeth_core()
return output
with open(path, "w") as f:
f.write(output)
def export_config(self, path):
"Export a YAML file containing all the configuration of the core"
config = {}
if self.cores._cores:
config["cores"] = {}
for name, instance in self.cores._cores.items():
config["cores"][name] = instance.to_config()
if self.interface:
if isinstance(self.interface, UARTInterface):
config["uart"] = self.interface.to_config()
if isinstance(self.interface, EthernetInterface):
config["ethernet"] = self.interface.to_config()
import yaml
with open(path, "w") as f:
yaml.dump(config, f)

View File

@ -16,15 +16,12 @@ class MemoryCore(MantaCore):
https://fischermoseley.github.io/manta/memory_core/
"""
def __init__(self, mode, width, depth, base_addr, interface):
def __init__(self, mode, width, depth):
self._mode = mode
self._width = width
self._depth = depth
self._base_addr = base_addr
self._interface = interface
self._n_mems = ceil(self._width / 16)
self._max_addr = self._base_addr + (self._depth * self._n_mems)
# Bus Connections
self.bus_i = Signal(InternalBus())
@ -44,11 +41,9 @@ class MemoryCore(MantaCore):
elif self._mode == "host_to_fpga":
self.user_addr = Signal(range(self._depth))
self.user_data_out = Signal(self._width)
self.user_clk = Signal()
self._top_level_ports = [
self.user_addr,
self.user_data_out,
self.user_clk,
]
elif self._mode == "bidirectional":
@ -82,7 +77,15 @@ class MemoryCore(MantaCore):
@property
def max_addr(self):
return self._max_addr
return self.base_addr + (self._depth * self._n_mems)
def to_config(self):
return {
"type": "memory",
"mode": self._mode,
"width": self._width,
"depth": self._depth
}
@classmethod
def from_config(cls, config, base_addr, interface):
@ -127,7 +130,7 @@ class MemoryCore(MantaCore):
def _tie_mems_to_bus(self, m):
for i, mem in enumerate(self._mems):
# Compute address range corresponding to this chunk of memory
start_addr = self._base_addr + (i * self._depth)
start_addr = self.base_addr + (i * self._depth)
stop_addr = start_addr + self._depth - 1
if self._mode == "fpga_to_host":
@ -245,7 +248,7 @@ class MemoryCore(MantaCore):
bus_addrs = []
for addr in addrs:
for i in range(len(self._mems)):
bus_addrs.append(self._base_addr + addr + (i * self._depth))
bus_addrs.append(self.base_addr + addr + (i * self._depth))
return bus_addrs
@ -264,7 +267,7 @@ class MemoryCore(MantaCore):
raise TypeError("Read address must be an integer or list of integers.")
bus_addrs = self._convert_user_to_bus_addr(addrs)
datas = self._interface.read(bus_addrs)
datas = self.interface.read(bus_addrs)
data_chunks = split_into_chunks(datas, self._n_mems)
return [words_to_value(chunk) for chunk in data_chunks]
@ -292,4 +295,4 @@ class MemoryCore(MantaCore):
bus_addrs = self._convert_user_to_bus_addr(addrs)
bus_datas = [word for d in datas for word in value_to_words(d, self._n_mems)]
self._interface.write(bus_addrs, bus_datas)
self.interface.write(bus_addrs, bus_datas)

View File

@ -53,6 +53,14 @@ class UARTInterface(Elaboratable):
else:
return cls(port, baudrate, clock_freq)
def to_config(self):
return {
"port": self._port,
"baudrate": self._baudrate,
"clock_freq": self._clock_freq,
"chunk_size": self._chunk_size,
}
def _check_config(self):
# Ensure a serial port has been given
if self._port is None:

View File

@ -8,6 +8,12 @@ import os
class MantaCore(ABC, Elaboratable):
# These attributes are meant to be settable and gettable, but max_addr and
# top_level_ports are indended to be only gettable. Do not implement
# setters for them in subclasses.
base_addr = None
interface = None
@property
@abstractmethod
@ -28,13 +34,49 @@ class MantaCore(ABC, Elaboratable):
"""
pass
@abstractmethod
def to_config(self):
"""
Return a dictionary containing the core's configuration (i.e., the
content of the core's section of the `manta.yaml` file).
"""
pass
@abstractmethod
def elaborate(self, platform):
pass
# @abstractclassmethod
# def from_config(cls):
# pass
@classmethod
@abstractmethod
def from_config(cls, config):
"""
Return an instance of the core, given the section of the Manta
configuration file (as a Python dictionary) that contains the core's
specification.
"""
pass
class CoreContainer:
def __init__(self, manta):
self._manta = manta
self._cores = {}
self._base_addr = 0
self._last_used_addr = 0
def __getattr__(self, name):
if name in self._cores:
return self._cores[name]
raise AttributeError(f"No such core: {name}")
def __setattr__(self, name, value):
if name in {"_manta", "_cores", "_base_addr", "_last_used_addr"}:
super().__setattr__(name, value)
else:
self._cores[name] = value
value.interface = self._manta.interface
value.base_addr = self._last_used_addr
self._last_used_addr = value.max_addr + 1
class InternalBus(data.StructLayout):

137
test/test_config_export.py Normal file
View File

@ -0,0 +1,137 @@
from manta import Manta
from manta.io_core import IOCore
from manta.memory_core import MemoryCore
from manta.uart import UARTInterface
from amaranth import *
import tempfile
import os
import yaml
def test_io_core_dump():
# Create some dummy signals to pass to the IO Core
probe0 = Signal(1)
probe1 = Signal(2)
probe2 = Signal(3)
probe3 = Signal(4, init=13)
# Create Manta instance
manta = Manta()
manta.cores.test_core = IOCore(
inputs = [probe0, probe1],
outputs = [probe2, probe3]
)
# Create Temporary File
tf = tempfile.NamedTemporaryFile(delete=False)
tf.close()
# Export Manta configuration
manta.export_config(tf.name)
# Parse the exported YAML
with open(tf.name, "r") as f:
data = yaml.safe_load(f)
# Verify that exported YAML matches configuration
expected = {
"cores": {
"test_core": {
"type": "io",
"inputs": {
"probe0": 1,
"probe1": 2
},
"outputs": {
"probe2": {
"width": 3,
"initial_value": 0
},
"probe3": {
"width": 4,
"initial_value": 13
}
}
}
}
}
if data != expected:
raise ValueError("Exported YAML does not match configuration!")
def test_memory_core_dump():
# Create Manta instance
manta = Manta()
manta.cores.test_core = MemoryCore(
mode = "bidirectional",
width = 32,
depth = 1024,
)
# Create Temporary File
tf = tempfile.NamedTemporaryFile(delete=False)
tf.close()
# Export Manta configuration
manta.export_config(tf.name)
# Parse the exported YAML
with open(tf.name, "r") as f:
data = yaml.safe_load(f)
# Verify that exported YAML matches configuration
expected = {
"cores": {
"test_core": {
"type": "memory",
"mode":"bidirectional",
"width":32,
"depth":1024
}
}
}
if data != expected:
raise ValueError("Exported YAML does not match configuration!")
def test_logic_analyzer_core_dump():
raise ValueError
def test_uart_interface_dump():
manta = Manta()
manta.interface = UARTInterface(
port = "/dev/ttyUSB0",
baudrate = 115200,
clock_freq = 100e6
)
# Create Temporary File
tf = tempfile.NamedTemporaryFile(delete=False)
tf.close()
# Export Manta configuration
manta.export_config(tf.name)
# Parse the exported YAML
with open(tf.name, "r") as f:
data = yaml.safe_load(f)
# Verify that exported YAML matches configuration
expected = {
"uart": {
"port": "/dev/ttyUSB0",
"baudrate": 115200,
# Be careful with the float comparison here, copy-pasting from the
# exported YAML seems to have the best results. Otherwise this test
# will fail when it shouldn't.
"clock_freq": 100000000.0,
"chunk_size": 256
}
}
if data != expected:
raise ValueError("Exported YAML does not match configuration!")
def test_ethernet_interface_dump():
raise ValueError