refactor __init__.py to be object-oriented

This commit is contained in:
Fischer Moseley 2023-03-07 22:05:06 -05:00
parent 5e2f02ebd6
commit 334aa8c005
4 changed files with 271 additions and 313 deletions

View File

@ -48,3 +48,4 @@ uart_tx_tb:
clean:
rm -f *.out *.vcd
rm -rf dist/
rm -rf src/mantaray.egg-info

View File

@ -5,25 +5,36 @@
[![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
Manta is a tool for debugging FPGA designs over an interface like UART or Ethernet. It has two modes for doing this, downlink and uplink. The downlink mode feels similar to a logic analyzer, in that Manta provides a waveform view of a configurable set of signals, which get captured when some trigger condition is met. The uplink mode allows a host machine to remotely set values of registers on the FPGA via a python interface. This permits rapid prototyping of logic in Python, and a means of incrementally migrating it to HDL. A more detailed description of each mode is below.
Manta is a tool for debugging FPGA designs over an interface like UART or Ethernet. It works by allowing the user to instantiate a number of debug cores in a design, and exposes a Python interface to them. This permits rapid prototyping of logic in Python, and a means of incrementally migrating it to HDL. The cores are described below.
Manta is written in Python, and generates SystemVerilog HDL. It's cross-platform, and its only dependencies are pySerial and pyYAML. The SystemVerilog templates are included in the Python source, so only a single python file must be included in your project.
Manta is written in Python, and generates Verilog-2001 HDL. It's cross-platform, and its only dependencies are pySerial and pyYAML.
## Design Philosophy
- Things that are easy to break should be easy to fix. For instance, it's pretty easy to put the wrong number of clock cycles of holdoff in your configuration, but it's a lot harder to accidentally put the wrong number of stop bits in your serial port. Manta supports changing the former post-upload, but not the latter.
- Features are added when they're needed. We won't add features until there's been a use case shown that would benefit from them. This keeps manta lightweight.
- Trust the interface. In the name of simplicity, we don't implement a ton of error checking. If this proves to be a problem, we'll fix it, inline with the philosophy above.
## Cores
## Downlink
### Logic Analayzer Core
Manta's downlink mode works by taking a YAML/JSON file describing the ILA configuration, and autogenerating a debug core with SystemVerilog. This gets included in the rest of the project's HDL, and is synthesized and flashed on the FPGA. It can then be controlled by a host machine connected over a serial port. The host can arm the core, and then when a trigger condition is met, the debug output is wired back to the host, where it's saved as a waveform file. This can then be opened and inspected in a waveform viewer like GTKWave.
This is similar to Xilinx's Integrated Logic Analyzer (ILA) and Intel/Altera's SignalTap utility.
### I/O Core
### LUT RAM Core
### BRAM Core
## Getting Started
Manta is installed with `pip3 install mantaray`. Or at least it will be, once it's out of alpha. For now, it's installable with `pip install -i https://test.pypi.org/simple/ mantaray`, which just pulls from the PyPI testing registry.
## Examples
Examples can be found under `examples/`. These target the [Nexys4 DDR](https://digilent.com/reference/programmable-logic/nexys-4-ddr/start) and [Nexys A7-100T](https://digilent.com/reference/programmable-logic/nexys-a7/start) from Digilent, which are functionally equivalent.
Examples can be found under `examples/`. These target the Xilinx Series 7 FPGAs on the [Nexys A7](https://digilent.com/reference/programmable-logic/nexys-a7/start)/[Nexys4 DDR](https://digilent.com/reference/programmable-logic/nexys-4-ddr/start) and the Lattice iCE40 on the [Icestick](https://www.latticesemi.com/icestick).
## Design Philosophy
- Things that are easy to break should be easy to fix. For instance, it's pretty easy to put the wrong number of clock cycles of holdoff in your configuration, but it's a lot harder to accidentally put the wrong number of stop bits in your serial port. Manta supports changing the former post-upload, but not the latter.
- Features are added when they're needed. We won't add features until there's been a use case shown that would benefit from them. This keeps manta lightweight.
- Trust the interface. In the name of simplicity, we don't implement a ton of error checking. If this proves to be a problem, we'll fix it, inline with the philosophy above.
- Don't use macros - there's a possibility that they'll conflict with something in user code.
- Use Verilog 2001 for source for compatibility. Manta uses SystemVerilog 2012 for simulation and test, however.
## About
Manta was originally developed as part of my [Master's Thesis at MIT](dspace.mit.edu) in 2023, done under the supervision of Joe Steinmeyer. But I think it's a neat tool, so I'm still working on it :)

View File

@ -1,21 +1,18 @@
---
downlink:
sample_depth: 4096
clock_freq: 100000000
probes:
larry: 1
curly: 1
moe: 1
shemp: 4
cores:
logic_analyzer:
sample_depth: 4096
triggers:
- larry && curly && ~moe
probes:
larry: 1
curly: 1
moe: 1
shemp: 4
triggers:
- larry && curly && ~moe
uart:
baudrate: 115200
port: "/dev/tty.usbserial-2102926963071"
data: 8
parity: none
stop: 1
timeout: 1
baudrate: 115200
clock_freq: 100000000

View File

@ -1,169 +1,266 @@
import pkgutil
from sys import argv
import os
import json
import yaml
from datetime import datetime
import serial
import pkgutil
debug = True
version = "0.0.0"
def load_source_files():
"""loads source files and returns a string of their contents concatenated together"""
class UARTInterface:
def __init__(self, config):
# Obtain port. No way to check if it's valid yet.
assert config["port"], "No serial port provided to UART core."
self.port = config["port"]
downlink_template = pkgutil.get_data(__name__, "manta_template.sv").decode()
downlink_template += pkgutil.get_data(__name__, "fifo.sv").decode()
downlink_template += pkgutil.get_data(__name__, "uart_tx.sv").decode()
downlink_template += pkgutil.get_data(__name__, "uart_rx.sv").decode()
downlink_template += pkgutil.get_data(
__name__, "xilinx_true_dual_port_read_first_2_clock_ram.v"
).decode()
# Check that clock frequency is provided and positive
assert config["clock_freq"], "Clock frequency not provided to UART core."
assert config["clock_freq"] > 0, "Clock frequency must be positive."
self.clock_freq = config["clock_freq"]
return downlink_template
def load_config(path):
"""Take path to configuration file, and retun the configuration as a python list/dict object."""
extension = path.split(".")[-1]
if "json" in extension:
with open(path, "r") as f:
config = json.load(f)
return config
elif "yaml" in extension or "yml" in extension:
with open(path, "r") as f:
config = yaml.safe_load(f)
return config
else:
raise ValueError("Unable to recognize configuration file extension.")
def check_config(config):
"""Takes a list/dict python object representing a core configuration and throws an error if it is misconfigured."""
assert (
"downlink" in config or "uplink" in config
), "No downlink or uplink specified."
if "downlink" in config:
dl = config["downlink"]
assert dl[
"sample_depth"
], "Downlink core specified, but sample_depth not specified."
assert dl[
"clock_freq"
], "Downlink core specified, but clock_freq not specified."
assert (
dl["probes"] and len(dl["probes"]) > 0
), "Downlink core specified, but no probes specified."
assert (
dl["triggers"] and len(dl["triggers"]) > 0
), "Downlink core specified, but no triggers specified."
# Check that baudrate is provided and positive
assert config["baudrate"], "Baudrate not provided to UART core."
assert config["baudrate"] > 0, "Baudrate must be positive."
self.baudrate = config["baudrate"]
# confirm core clock is sufficiently fast
prescaler = dl["clock_freq"] // config["uart"]["baudrate"]
assert prescaler >= 2
clocks_per_baud = self.clock_freq // self.baudrate
assert clocks_per_baud >= 2
self.clocks_per_baud = clocks_per_baud
# confirm actual baudrate and target baudrate are within 5%
actual_baudrate = config["downlink"]["clock_freq"] / prescaler
baudrate_error = (
abs(actual_baudrate - config["uart"]["baudrate"])
/ config["uart"]["baudrate"]
)
# confirm we can match baudrate suffeciently well
actual_baudrate = self.clock_freq / clocks_per_baud
baudrate_error = 100 * abs(actual_baudrate - self.baudrate) / self.baudrate
assert (
baudrate_error <= 0.05
), f"Unable to match target baudrate! Actual baudrate differs from target by {round(100*baudrate_error, 2)}%"
baudrate_error <= 5
), "Unable to match target baudrate - they differ by {baudrate_error}%"
if debug:
print(f"UART interface on debug core will the following configuration:")
print(f' - target_baudrate: {config["uart"]["baudrate"]}')
print(f" - prescaler: {prescaler}")
print(f" - actual_baudrate: {round(actual_baudrate, 2)}")
import serial
if "uplink" in config:
raise NotImplementedError(
"Cannot check configuration validity for uplinks just yet!"
)
self.ser = serial.Serial(self.port, self.baudrate)
if "uart" in config:
uart = config["uart"]
def read(self, bytes):
self.ser.read(bytes)
# confirm number of data bits is valid
assert "data" in uart, "Number of data bits in UART interface not specified."
assert uart["data"] in [8, 7, 6, 5], "Invalid number of data bits."
def write(self, bytes):
self.ser.write(bytes)
# confirm number of stop bits is valid
assert uart["stop"] in [1, 1.5, 2], "Invalid number of stop bits."
def tx_hdl(self):
pkgutil.get_data(__name__, "uart_tx.sv").decode()
# confirm parity is valid
assert uart["parity"] in [
"none",
"even",
"odd",
"mark",
"space",
], "Invalid parity setting."
def rx_hdl(self):
pkgutil.get_data(__name__, "rx_uart.sv").decode()
def gen_downlink_core(config):
buf = load_source_files()
dl = config["downlink"]
class LogicAnalyzerCore:
def __init__(self, config, interface):
self.interface = interface
# add timestamp
timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S")
buf = buf.replace("@TIMESTAMP", timestamp)
# load config
assert config["sample_depth"], "Sample depth not found for logic analyzer core."
self.sample_depth = config["sample_depth"]
# add user
user = os.environ.get("USER", os.environ.get("USERNAME"))
buf = buf.replace("@USER", user)
assert config["probes"], "No probe definitions found."
assert len(config["probes"]) > 0, "Must specify at least one probe."
# add trigger
trigger = [f"({trigger})" for trigger in dl["triggers"]]
trigger = " || ".join(trigger)
buf = buf.replace("@TRIGGER", trigger)
for probe in config["probes"]:
assert (
probe["width"] > 0
), "Probe {probe} is of invalid width - it must be of at least width one."
# add concat
concat = [name for name in dl["probes"]]
concat = ", ".join(concat)
concat = "{" + concat + "}"
buf = buf.replace("@CONCAT", concat)
self.probes = config["probes"]
# add probes
probe_verilog = []
for name, width in dl["probes"].items():
if width == 1:
probe_verilog.append(f"input wire {name},")
assert config["triggers"], "No triggers found."
assert len(config["triggers"]) > 0, "Must specify at least one trigger."
self.triggers = config["triggers"]
def run(self):
self.interface.open()
self.interface.flushInput()
self.interface.write(b"\x30")
data = self.interface.read(4096)
def part_select(self, data, width):
top, bottom = width
assert top >= bottom
mask = 2 ** (top - bottom + 1) - 1
return (data >> bottom) & mask
def make_widths(self, config):
# {probe0, probe1, probe2}
# [12, 1, 3] should produce
# [ (15, 4) (3, 3) (2,0) ]
widths = list(config["downlink"]["probes"].values())
# easiest to make by summing them and incrementally subtracting
s = sum(widths)
slices = []
for width in widths:
slices.append((s - 1, s - width))
s = s - width
assert s == 0, "Probe sizes are weird, cannot slice bits properly"
return slices
def export_waveform(self, config, data, path):
extension = path.split(".")[-1]
assert extension == "vcd", "Unrecognized waveform export format."
from vcd import VCDWriter
vcd_file = open(path, "w")
# Use the datetime format that iVerilog uses
timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y")
with VCDWriter(
vcd_file, timescale="10 ns", date=timestamp, version="manta"
) as writer:
# add probes to vcd file
vcd_probes = []
for name, width in config["downlink"]["probes"].items():
probe = writer.register_var("manta", name, "wire", size=width)
vcd_probes.append(probe)
# add clock to vcd file
clock = writer.register_var("manta", "clk", "wire", size=1)
# calculate bit widths for part selecting
widths = self.make_widths(config)
# slice data, and dump to vcd file
for timestamp in range(2 * len(data)):
value = data[timestamp // 2]
# dump clock values to vcd file
# note: this assumes logic is triggered
# on the rising edge of the clock, @TODO fix this
writer.change(clock, timestamp, timestamp % 2 == 0)
for probe_num, probe in enumerate(vcd_probes):
val = self.part_select(value, widths[probe_num])
writer.change(probe, timestamp, val)
vcd_file.close()
def hdl(self):
# Return an autogenerated verilog module definition for the core.
# load source files
tmpl = pkgutil.get_data(__name__, "la_template.v").decode()
# add triggers
trigger = [f"({trigger})" for trigger in self.triggers]
trigger = " || ".join(trigger)
templ = templ.replace("@TRIGGER", trigger)
# add concat
concat = [name for name in self.probes]
concat = ", ".join(concat)
concat = "{" + concat + "}"
templ = templ.replace("@CONCAT", concat)
# add probes
probe_verilog = []
for name, width in self.probes.items():
if width == 1:
probe_verilog.append(f"input wire {name},")
else:
probe_verilog.append(f"input wire [{width-1}:0] {name},")
probe_verilog = "\n\t\t".join(probe_verilog)
tmpl = tmpl.replace("@PROBES", probe_verilog)
# add sample width
sample_width = sum([width for name, width in self.probes.items()])
tmpl = tmpl.replace("@SAMPLE_WIDTH", str(sample_width))
# add sample depth
tmpl = tmpl.replace("@SAMPLE_DEPTH", str(self.sample_depth))
return tmpl
class Manta:
def __init__(self, config_filepath):
config = self.read_config_file(config_filepath)
# TODO: figure out some better place to put the interface data.
# it should probably go in it's own class. But for now it can go here.
self.interface = self.get_interface(config)
# add cores to manta
assert "cores" in config, "No cores found."
assert len(config["cores"]) > 0, "Must specify at least one core."
for core in config:
if core == "LA":
self.cores.append(LogicAnalyzerCore(core, self.interface))
def read_config_file(self, path):
"""Take path to configuration file, and retun the configuration as a python list/dict object."""
extension = path.split(".")[-1]
if "json" in extension:
with open(path, "r") as f:
import json
config = json.load(f)
elif "yaml" in extension or "yml" in extension:
with open(path, "r") as f:
import yaml
config = yaml.safe_load(f)
else:
probe_verilog.append(f"input wire [{width-1}:0] {name},")
raise ValueError("Unable to recognize configuration file extension.")
probe_verilog = "\n\t\t".join(probe_verilog)
buf = buf.replace("@PROBES", probe_verilog)
return config
# add sample width
sample_width = sum([width for name, width in dl["probes"].items()])
buf = buf.replace("@SAMPLE_WIDTH", str(sample_width))
def get_interface(self, config):
if "uart" in config:
return UARTInterface(config["uart"])
# add sample depth
buf = buf.replace("@SAMPLE_DEPTH", str(dl["sample_depth"]))
elif "ethernet" in config:
raise NotImplementedError("Ethernet not supported yet!")
# uart config
buf = buf.replace("@DATA_WIDTH", str(config["uart"]["data"]))
buf = buf.replace("@BAUDRATE", str(config["uart"]["baudrate"]))
buf = buf.replace("@CLK_FREQ_HZ", str(dl["clock_freq"]))
elif "jtag" in config:
raise NotImplementedError("JTAG not supported yet!")
return buf
else:
raise ValueError("Interface not recognized")
def generate(self):
# this occurs in two steps: generating manta and the top-level,
# and pasting in all the HDL from earlier.
cores_hdl = {core.name: core.generate for core in self.cores}
uart_rx_hdl = pkgutil.get_data(__name__, "rx_uart.sv").decode()
bridge_rx_hdl = pkgutil.get_data(__name__, "bridge_rx.sv").decode()
bridge_tx_hdl = pkgutil.get_data(__name__, "bridge_tx.sv").decode()
uart_tx_hdl = pkgutil.get_data(__name__, "uart_tx.sv").decode()
# for core in cores:
# registers = ''
# registers += f'{core}{}'
# # wire cores together
# add preamble to top of file
user = os.environ.get("USER", os.environ.get("USERNAME"))
timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S")
hdl = "This manata definitinon was autogenerated on {timestamp} by {user}\n\n"
hdl += "If this breaks or if you've got dank formal verification memes,\n"
hdl += "please contact fischerm [at] mit.edu\n"
def print_help():
help = f"""
def main():
# print help menu if no args passed or help menu requested
if len(argv) == 1 or argv[1] == "help" or argv[1] == "ray" or argv[1] == "bae":
print(
f"""
\033[96m (\.-./)
\033[96m / \\
\033[96m .' : '.
@ -185,159 +282,14 @@ Supported commands:
ports list all available serial ports
help, ray display this splash screen (hehe...splash screen)
"""
print(help)
def setup_serial(ser, config):
ser.baudrate = config["uart"]["baudrate"]
ser.port = config["uart"]["port"]
ser.timeout = config["uart"]["timeout"]
# setup number of data bits
if config["uart"]["data"] == 8:
ser.bytesize = serial.EIGHTBITS
elif config["uart"]["data"] == 7:
ser.bytesize = serial.SEVENBITS
elif config["uart"]["data"] == 6:
ser.bytesize = serial.SIXBITS
elif config["uart"]["data"] == 5:
ser.bytesize = serial.FIVEBITS
else:
raise ValueError("Invalid number of data bits in UART configuration.")
# setup number of stop bits
if config["uart"]["stop"] == 1:
ser.stopbits = serial.STOPBITS_ONE
elif config["uart"]["stop"] == 1.5:
ser.stopbits = serial.STOPBITS_ONE_POINT_FIVE
elif config["uart"]["stop"] == 2:
ser.stopbits = serial.STOPBITS_TWO
else:
raise ValueError("Invalid number of stop bits in UART configuration.")
# setup parity
if config["uart"]["parity"] == "none":
ser.parity = serial.PARITY_NONE
elif config["uart"]["parity"] == "even":
ser.parity = serial.PARITY_EVEN
elif config["uart"]["parity"] == "odd":
ser.parity = serial.PARITY_ODD
elif config["uart"]["parity"] == "mark":
ser.parity = serial.PARITY_MARK
elif config["uart"]["parity"] == "space":
ser.parity = serial.PARITY_SPACE
else:
raise ValueError("Invalid parity setting in UART configuration.")
def read_serial(config):
# obtain bytestream from FPGA
with serial.Serial() as ser:
setup_serial(ser, config)
ser.open()
ser.flushInput()
ser.write(b"\x30")
data = ser.read(4096)
return data
def part_select(data, width):
top, bottom = width
assert top >= bottom
mask = 2 ** (top - bottom + 1) - 1
return (data >> bottom) & mask
def make_widths(config):
# {probe0, probe1, probe2}
# [12, 1, 3] should produce
# [ (15, 4) (3, 3) (2,0) ]
widths = list(config["downlink"]["probes"].values())
# easiest to make by summing them and incrementally subtracting
s = sum(widths)
slices = []
for width in widths:
slices.append((s - 1, s - width))
s = s - width
assert s == 0, "Probe sizes are weird, cannot slice bits properly"
return slices
def export_waveform(config, data, path):
extension = path.split(".")[-1]
if extension == "vcd":
from vcd import VCDWriter
vcd_file = open(path, "w")
# Use the datetime format that iVerilog uses
timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y")
with VCDWriter(
vcd_file, timescale="10 ns", date=timestamp, version="manta"
) as writer:
# add probes to vcd file
vcd_probes = []
for name, width in config["downlink"]["probes"].items():
probe = writer.register_var("manta", name, "wire", size=width)
vcd_probes.append(probe)
# add clock to vcd file
clock = writer.register_var("manta", "clk", "wire", size=1)
# calculate bit widths for part selecting
widths = make_widths(config)
# slice data, and dump to vcd file
for timestamp in range(2 * len(data)):
value = data[timestamp // 2]
# dump clock values to vcd file
# note: this assumes logic is triggered
# on the rising edge of the clock, @TODO fix this
writer.change(clock, timestamp, timestamp % 2 == 0)
for probe_num, probe in enumerate(vcd_probes):
val = part_select(value, widths[probe_num])
writer.change(probe, timestamp, val)
vcd_file.close()
else:
raise NotImplementedError("More file formats to come!")
def main():
# print help menu if no args passed or help menu requested
if len(argv) == 1 or argv[1] == "help" or argv[1] == "ray" or argv[1] == "bae":
print_help()
exit()
)
# open minicom-like serial terminal with given config
elif argv[1] == "terminal":
assert len(argv) == 3, "Not enough (or too many) config files specified."
# TODO: make this work with a looser config file - it should work even if we just have a uart definition
config = load_config(argv[2])
check_config(config)
# TODO: make this work with a looser config file - it should work as long as it has a uart definition
manta = Manta(argv[2])
raise NotImplementedError("Miniterm console is still under development!")
@ -348,31 +300,28 @@ def main():
for info in serial.tools.list_ports.comports():
print(info)
# generate the specified core
# generate the specified configuration
elif argv[1] == "gen":
assert (
len(argv) == 4
), "Wrong number of arguments, only a config file and output file must both be specified."
config = load_config(argv[2])
check_config(config)
manta = Manta(argv[2])
with open(argv[3], "w") as f:
f.write(gen_downlink_core(config))
f.write(manta.generate())
# run the specified core
elif argv[1] == "run":
assert (
len(argv) == 4
), "Wrong number of arguments, only a config file and output file must both be specified."
config = load_config(argv[2])
check_config(config)
data = read_serial(config)
export_waveform(config, data, argv[3])
manta = Manta(argv[2])
manta.la_0.arm()
manta.la_0.export_waveform(argv[3])
else:
print("Option not recognized.")
print_help()
print("Option not recognized! Run 'manta help' for supported commands.")
if __name__ == "__main__":