diff --git a/Makefile b/Makefile index 8e59c94..a6dabd9 100644 --- a/Makefile +++ b/Makefile @@ -48,3 +48,4 @@ uart_tx_tb: clean: rm -f *.out *.vcd rm -rf dist/ + rm -rf src/mantaray.egg-info diff --git a/README.md b/README.md index 7df9220..75a85dd 100644 --- a/README.md +++ b/README.md @@ -5,25 +5,36 @@ [![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) -Manta is a tool for debugging FPGA designs over an interface like UART or Ethernet. It has two modes for doing this, downlink and uplink. The downlink mode feels similar to a logic analyzer, in that Manta provides a waveform view of a configurable set of signals, which get captured when some trigger condition is met. The uplink mode allows a host machine to remotely set values of registers on the FPGA via a python interface. This permits rapid prototyping of logic in Python, and a means of incrementally migrating it to HDL. A more detailed description of each mode is below. +Manta is a tool for debugging FPGA designs over an interface like UART or Ethernet. It works by allowing the user to instantiate a number of debug cores in a design, and exposes a Python interface to them. This permits rapid prototyping of logic in Python, and a means of incrementally migrating it to HDL. The cores are described below. -Manta is written in Python, and generates SystemVerilog HDL. It's cross-platform, and its only dependencies are pySerial and pyYAML. The SystemVerilog templates are included in the Python source, so only a single python file must be included in your project. +Manta is written in Python, and generates Verilog-2001 HDL. It's cross-platform, and its only dependencies are pySerial and pyYAML. -## Design Philosophy -- Things that are easy to break should be easy to fix. For instance, it's pretty easy to put the wrong number of clock cycles of holdoff in your configuration, but it's a lot harder to accidentally put the wrong number of stop bits in your serial port. Manta supports changing the former post-upload, but not the latter. -- Features are added when they're needed. We won't add features until there's been a use case shown that would benefit from them. This keeps manta lightweight. -- Trust the interface. In the name of simplicity, we don't implement a ton of error checking. If this proves to be a problem, we'll fix it, inline with the philosophy above. +## Cores -## Downlink +### Logic Analayzer Core Manta's downlink mode works by taking a YAML/JSON file describing the ILA configuration, and autogenerating a debug core with SystemVerilog. This gets included in the rest of the project's HDL, and is synthesized and flashed on the FPGA. It can then be controlled by a host machine connected over a serial port. The host can arm the core, and then when a trigger condition is met, the debug output is wired back to the host, where it's saved as a waveform file. This can then be opened and inspected in a waveform viewer like GTKWave. This is similar to Xilinx's Integrated Logic Analyzer (ILA) and Intel/Altera's SignalTap utility. +### I/O Core + +### LUT RAM Core + +### BRAM Core + + ## Getting Started Manta is installed with `pip3 install mantaray`. Or at least it will be, once it's out of alpha. For now, it's installable with `pip install -i https://test.pypi.org/simple/ mantaray`, which just pulls from the PyPI testing registry. ## Examples -Examples can be found under `examples/`. These target the [Nexys4 DDR](https://digilent.com/reference/programmable-logic/nexys-4-ddr/start) and [Nexys A7-100T](https://digilent.com/reference/programmable-logic/nexys-a7/start) from Digilent, which are functionally equivalent. +Examples can be found under `examples/`. These target the Xilinx Series 7 FPGAs on the [Nexys A7](https://digilent.com/reference/programmable-logic/nexys-a7/start)/[Nexys4 DDR](https://digilent.com/reference/programmable-logic/nexys-4-ddr/start) and the Lattice iCE40 on the [Icestick](https://www.latticesemi.com/icestick). + +## Design Philosophy +- Things that are easy to break should be easy to fix. For instance, it's pretty easy to put the wrong number of clock cycles of holdoff in your configuration, but it's a lot harder to accidentally put the wrong number of stop bits in your serial port. Manta supports changing the former post-upload, but not the latter. +- Features are added when they're needed. We won't add features until there's been a use case shown that would benefit from them. This keeps manta lightweight. +- Trust the interface. In the name of simplicity, we don't implement a ton of error checking. If this proves to be a problem, we'll fix it, inline with the philosophy above. +- Don't use macros - there's a possibility that they'll conflict with something in user code. +- Use Verilog 2001 for source for compatibility. Manta uses SystemVerilog 2012 for simulation and test, however. ## About Manta was originally developed as part of my [Master's Thesis at MIT](dspace.mit.edu) in 2023, done under the supervision of Joe Steinmeyer. But I think it's a neat tool, so I'm still working on it :) \ No newline at end of file diff --git a/examples/nexys_a7/single_lut_ram/manta.yaml b/examples/nexys_a7/single_lut_ram/manta.yaml index 63d351e..6788e82 100644 --- a/examples/nexys_a7/single_lut_ram/manta.yaml +++ b/examples/nexys_a7/single_lut_ram/manta.yaml @@ -1,21 +1,18 @@ --- -downlink: - sample_depth: 4096 - clock_freq: 100000000 - - probes: - larry: 1 - curly: 1 - moe: 1 - shemp: 4 +cores: + logic_analyzer: + sample_depth: 4096 - triggers: - - larry && curly && ~moe + probes: + larry: 1 + curly: 1 + moe: 1 + shemp: 4 + + triggers: + - larry && curly && ~moe uart: - baudrate: 115200 port: "/dev/tty.usbserial-2102926963071" - data: 8 - parity: none - stop: 1 - timeout: 1 \ No newline at end of file + baudrate: 115200 + clock_freq: 100000000 \ No newline at end of file diff --git a/src/manta/__init__.py b/src/manta/__init__.py index 0bb0c7c..c991d90 100644 --- a/src/manta/__init__.py +++ b/src/manta/__init__.py @@ -1,169 +1,266 @@ +import pkgutil from sys import argv import os -import json -import yaml from datetime import datetime -import serial -import pkgutil - -debug = True version = "0.0.0" -def load_source_files(): - """loads source files and returns a string of their contents concatenated together""" +class UARTInterface: + def __init__(self, config): + # Obtain port. No way to check if it's valid yet. + assert config["port"], "No serial port provided to UART core." + self.port = config["port"] - downlink_template = pkgutil.get_data(__name__, "manta_template.sv").decode() - downlink_template += pkgutil.get_data(__name__, "fifo.sv").decode() - downlink_template += pkgutil.get_data(__name__, "uart_tx.sv").decode() - downlink_template += pkgutil.get_data(__name__, "uart_rx.sv").decode() - downlink_template += pkgutil.get_data( - __name__, "xilinx_true_dual_port_read_first_2_clock_ram.v" - ).decode() + # Check that clock frequency is provided and positive + assert config["clock_freq"], "Clock frequency not provided to UART core." + assert config["clock_freq"] > 0, "Clock frequency must be positive." + self.clock_freq = config["clock_freq"] - return downlink_template - - -def load_config(path): - """Take path to configuration file, and retun the configuration as a python list/dict object.""" - extension = path.split(".")[-1] - - if "json" in extension: - with open(path, "r") as f: - config = json.load(f) - - return config - - elif "yaml" in extension or "yml" in extension: - with open(path, "r") as f: - config = yaml.safe_load(f) - - return config - - else: - raise ValueError("Unable to recognize configuration file extension.") - - -def check_config(config): - """Takes a list/dict python object representing a core configuration and throws an error if it is misconfigured.""" - - assert ( - "downlink" in config or "uplink" in config - ), "No downlink or uplink specified." - - if "downlink" in config: - dl = config["downlink"] - assert dl[ - "sample_depth" - ], "Downlink core specified, but sample_depth not specified." - assert dl[ - "clock_freq" - ], "Downlink core specified, but clock_freq not specified." - assert ( - dl["probes"] and len(dl["probes"]) > 0 - ), "Downlink core specified, but no probes specified." - assert ( - dl["triggers"] and len(dl["triggers"]) > 0 - ), "Downlink core specified, but no triggers specified." + # Check that baudrate is provided and positive + assert config["baudrate"], "Baudrate not provided to UART core." + assert config["baudrate"] > 0, "Baudrate must be positive." + self.baudrate = config["baudrate"] # confirm core clock is sufficiently fast - prescaler = dl["clock_freq"] // config["uart"]["baudrate"] - assert prescaler >= 2 + clocks_per_baud = self.clock_freq // self.baudrate + assert clocks_per_baud >= 2 + self.clocks_per_baud = clocks_per_baud - # confirm actual baudrate and target baudrate are within 5% - actual_baudrate = config["downlink"]["clock_freq"] / prescaler - baudrate_error = ( - abs(actual_baudrate - config["uart"]["baudrate"]) - / config["uart"]["baudrate"] - ) + # confirm we can match baudrate suffeciently well + actual_baudrate = self.clock_freq / clocks_per_baud + baudrate_error = 100 * abs(actual_baudrate - self.baudrate) / self.baudrate assert ( - baudrate_error <= 0.05 - ), f"Unable to match target baudrate! Actual baudrate differs from target by {round(100*baudrate_error, 2)}%" + baudrate_error <= 5 + ), "Unable to match target baudrate - they differ by {baudrate_error}%" - if debug: - print(f"UART interface on debug core will the following configuration:") - print(f' - target_baudrate: {config["uart"]["baudrate"]}') - print(f" - prescaler: {prescaler}") - print(f" - actual_baudrate: {round(actual_baudrate, 2)}") + import serial - if "uplink" in config: - raise NotImplementedError( - "Cannot check configuration validity for uplinks just yet!" - ) + self.ser = serial.Serial(self.port, self.baudrate) - if "uart" in config: - uart = config["uart"] + def read(self, bytes): + self.ser.read(bytes) - # confirm number of data bits is valid - assert "data" in uart, "Number of data bits in UART interface not specified." - assert uart["data"] in [8, 7, 6, 5], "Invalid number of data bits." + def write(self, bytes): + self.ser.write(bytes) - # confirm number of stop bits is valid - assert uart["stop"] in [1, 1.5, 2], "Invalid number of stop bits." + def tx_hdl(self): + pkgutil.get_data(__name__, "uart_tx.sv").decode() - # confirm parity is valid - assert uart["parity"] in [ - "none", - "even", - "odd", - "mark", - "space", - ], "Invalid parity setting." + def rx_hdl(self): + pkgutil.get_data(__name__, "rx_uart.sv").decode() -def gen_downlink_core(config): - buf = load_source_files() - dl = config["downlink"] +class LogicAnalyzerCore: + def __init__(self, config, interface): + self.interface = interface - # add timestamp - timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S") - buf = buf.replace("@TIMESTAMP", timestamp) + # load config + assert config["sample_depth"], "Sample depth not found for logic analyzer core." + self.sample_depth = config["sample_depth"] - # add user - user = os.environ.get("USER", os.environ.get("USERNAME")) - buf = buf.replace("@USER", user) + assert config["probes"], "No probe definitions found." + assert len(config["probes"]) > 0, "Must specify at least one probe." - # add trigger - trigger = [f"({trigger})" for trigger in dl["triggers"]] - trigger = " || ".join(trigger) - buf = buf.replace("@TRIGGER", trigger) + for probe in config["probes"]: + assert ( + probe["width"] > 0 + ), "Probe {probe} is of invalid width - it must be of at least width one." - # add concat - concat = [name for name in dl["probes"]] - concat = ", ".join(concat) - concat = "{" + concat + "}" - buf = buf.replace("@CONCAT", concat) + self.probes = config["probes"] - # add probes - probe_verilog = [] - for name, width in dl["probes"].items(): - if width == 1: - probe_verilog.append(f"input wire {name},") + assert config["triggers"], "No triggers found." + assert len(config["triggers"]) > 0, "Must specify at least one trigger." + self.triggers = config["triggers"] + + def run(self): + self.interface.open() + self.interface.flushInput() + self.interface.write(b"\x30") + data = self.interface.read(4096) + + def part_select(self, data, width): + top, bottom = width + + assert top >= bottom + + mask = 2 ** (top - bottom + 1) - 1 + return (data >> bottom) & mask + + def make_widths(self, config): + # {probe0, probe1, probe2} + # [12, 1, 3] should produce + # [ (15, 4) (3, 3) (2,0) ] + + widths = list(config["downlink"]["probes"].values()) + + # easiest to make by summing them and incrementally subtracting + s = sum(widths) + slices = [] + for width in widths: + slices.append((s - 1, s - width)) + s = s - width + + assert s == 0, "Probe sizes are weird, cannot slice bits properly" + return slices + + def export_waveform(self, config, data, path): + extension = path.split(".")[-1] + + assert extension == "vcd", "Unrecognized waveform export format." + from vcd import VCDWriter + + vcd_file = open(path, "w") + + # Use the datetime format that iVerilog uses + timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") + + with VCDWriter( + vcd_file, timescale="10 ns", date=timestamp, version="manta" + ) as writer: + # add probes to vcd file + vcd_probes = [] + for name, width in config["downlink"]["probes"].items(): + probe = writer.register_var("manta", name, "wire", size=width) + vcd_probes.append(probe) + + # add clock to vcd file + clock = writer.register_var("manta", "clk", "wire", size=1) + + # calculate bit widths for part selecting + widths = self.make_widths(config) + + # slice data, and dump to vcd file + for timestamp in range(2 * len(data)): + value = data[timestamp // 2] + + # dump clock values to vcd file + # note: this assumes logic is triggered + # on the rising edge of the clock, @TODO fix this + writer.change(clock, timestamp, timestamp % 2 == 0) + + for probe_num, probe in enumerate(vcd_probes): + val = self.part_select(value, widths[probe_num]) + writer.change(probe, timestamp, val) + vcd_file.close() + + def hdl(self): + # Return an autogenerated verilog module definition for the core. + # load source files + tmpl = pkgutil.get_data(__name__, "la_template.v").decode() + + # add triggers + trigger = [f"({trigger})" for trigger in self.triggers] + trigger = " || ".join(trigger) + templ = templ.replace("@TRIGGER", trigger) + + # add concat + concat = [name for name in self.probes] + concat = ", ".join(concat) + concat = "{" + concat + "}" + templ = templ.replace("@CONCAT", concat) + + # add probes + probe_verilog = [] + for name, width in self.probes.items(): + if width == 1: + probe_verilog.append(f"input wire {name},") + + else: + probe_verilog.append(f"input wire [{width-1}:0] {name},") + + probe_verilog = "\n\t\t".join(probe_verilog) + tmpl = tmpl.replace("@PROBES", probe_verilog) + + # add sample width + sample_width = sum([width for name, width in self.probes.items()]) + tmpl = tmpl.replace("@SAMPLE_WIDTH", str(sample_width)) + + # add sample depth + tmpl = tmpl.replace("@SAMPLE_DEPTH", str(self.sample_depth)) + return tmpl + + +class Manta: + def __init__(self, config_filepath): + config = self.read_config_file(config_filepath) + + # TODO: figure out some better place to put the interface data. + # it should probably go in it's own class. But for now it can go here. + self.interface = self.get_interface(config) + + # add cores to manta + assert "cores" in config, "No cores found." + assert len(config["cores"]) > 0, "Must specify at least one core." + + for core in config: + if core == "LA": + self.cores.append(LogicAnalyzerCore(core, self.interface)) + + def read_config_file(self, path): + """Take path to configuration file, and retun the configuration as a python list/dict object.""" + extension = path.split(".")[-1] + + if "json" in extension: + with open(path, "r") as f: + import json + + config = json.load(f) + + elif "yaml" in extension or "yml" in extension: + with open(path, "r") as f: + import yaml + + config = yaml.safe_load(f) else: - probe_verilog.append(f"input wire [{width-1}:0] {name},") + raise ValueError("Unable to recognize configuration file extension.") - probe_verilog = "\n\t\t".join(probe_verilog) - buf = buf.replace("@PROBES", probe_verilog) + return config - # add sample width - sample_width = sum([width for name, width in dl["probes"].items()]) - buf = buf.replace("@SAMPLE_WIDTH", str(sample_width)) + def get_interface(self, config): + if "uart" in config: + return UARTInterface(config["uart"]) - # add sample depth - buf = buf.replace("@SAMPLE_DEPTH", str(dl["sample_depth"])) + elif "ethernet" in config: + raise NotImplementedError("Ethernet not supported yet!") - # uart config - buf = buf.replace("@DATA_WIDTH", str(config["uart"]["data"])) - buf = buf.replace("@BAUDRATE", str(config["uart"]["baudrate"])) - buf = buf.replace("@CLK_FREQ_HZ", str(dl["clock_freq"])) + elif "jtag" in config: + raise NotImplementedError("JTAG not supported yet!") - return buf + else: + raise ValueError("Interface not recognized") + + def generate(self): + # this occurs in two steps: generating manta and the top-level, + # and pasting in all the HDL from earlier. + + cores_hdl = {core.name: core.generate for core in self.cores} + + uart_rx_hdl = pkgutil.get_data(__name__, "rx_uart.sv").decode() + bridge_rx_hdl = pkgutil.get_data(__name__, "bridge_rx.sv").decode() + bridge_tx_hdl = pkgutil.get_data(__name__, "bridge_tx.sv").decode() + uart_tx_hdl = pkgutil.get_data(__name__, "uart_tx.sv").decode() + + # for core in cores: + # registers = '' + # registers += f'{core}{}' + # # wire cores together + + # add preamble to top of file + user = os.environ.get("USER", os.environ.get("USERNAME")) + timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S") + + hdl = "This manata definitinon was autogenerated on {timestamp} by {user}\n\n" + hdl += "If this breaks or if you've got dank formal verification memes,\n" + hdl += "please contact fischerm [at] mit.edu\n" -def print_help(): - help = f""" +def main(): + # print help menu if no args passed or help menu requested + if len(argv) == 1 or argv[1] == "help" or argv[1] == "ray" or argv[1] == "bae": + print( + f""" \033[96m (\.-./) \033[96m / \\ \033[96m .' : '. @@ -185,159 +282,14 @@ Supported commands: ports list all available serial ports help, ray display this splash screen (hehe...splash screen) """ - print(help) - - -def setup_serial(ser, config): - ser.baudrate = config["uart"]["baudrate"] - ser.port = config["uart"]["port"] - ser.timeout = config["uart"]["timeout"] - - # setup number of data bits - if config["uart"]["data"] == 8: - ser.bytesize = serial.EIGHTBITS - - elif config["uart"]["data"] == 7: - ser.bytesize = serial.SEVENBITS - - elif config["uart"]["data"] == 6: - ser.bytesize = serial.SIXBITS - - elif config["uart"]["data"] == 5: - ser.bytesize = serial.FIVEBITS - - else: - raise ValueError("Invalid number of data bits in UART configuration.") - - # setup number of stop bits - if config["uart"]["stop"] == 1: - ser.stopbits = serial.STOPBITS_ONE - - elif config["uart"]["stop"] == 1.5: - ser.stopbits = serial.STOPBITS_ONE_POINT_FIVE - - elif config["uart"]["stop"] == 2: - ser.stopbits = serial.STOPBITS_TWO - - else: - raise ValueError("Invalid number of stop bits in UART configuration.") - - # setup parity - if config["uart"]["parity"] == "none": - ser.parity = serial.PARITY_NONE - - elif config["uart"]["parity"] == "even": - ser.parity = serial.PARITY_EVEN - - elif config["uart"]["parity"] == "odd": - ser.parity = serial.PARITY_ODD - - elif config["uart"]["parity"] == "mark": - ser.parity = serial.PARITY_MARK - - elif config["uart"]["parity"] == "space": - ser.parity = serial.PARITY_SPACE - - else: - raise ValueError("Invalid parity setting in UART configuration.") - - -def read_serial(config): - # obtain bytestream from FPGA - with serial.Serial() as ser: - setup_serial(ser, config) - ser.open() - ser.flushInput() - ser.write(b"\x30") - data = ser.read(4096) - - return data - - -def part_select(data, width): - top, bottom = width - - assert top >= bottom - - mask = 2 ** (top - bottom + 1) - 1 - return (data >> bottom) & mask - - -def make_widths(config): - # {probe0, probe1, probe2} - # [12, 1, 3] should produce - # [ (15, 4) (3, 3) (2,0) ] - - widths = list(config["downlink"]["probes"].values()) - - # easiest to make by summing them and incrementally subtracting - s = sum(widths) - slices = [] - for width in widths: - slices.append((s - 1, s - width)) - s = s - width - - assert s == 0, "Probe sizes are weird, cannot slice bits properly" - return slices - - -def export_waveform(config, data, path): - extension = path.split(".")[-1] - - if extension == "vcd": - from vcd import VCDWriter - - vcd_file = open(path, "w") - - # Use the datetime format that iVerilog uses - timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") - - with VCDWriter( - vcd_file, timescale="10 ns", date=timestamp, version="manta" - ) as writer: - # add probes to vcd file - vcd_probes = [] - for name, width in config["downlink"]["probes"].items(): - probe = writer.register_var("manta", name, "wire", size=width) - vcd_probes.append(probe) - - # add clock to vcd file - clock = writer.register_var("manta", "clk", "wire", size=1) - - # calculate bit widths for part selecting - widths = make_widths(config) - - # slice data, and dump to vcd file - for timestamp in range(2 * len(data)): - value = data[timestamp // 2] - - # dump clock values to vcd file - # note: this assumes logic is triggered - # on the rising edge of the clock, @TODO fix this - writer.change(clock, timestamp, timestamp % 2 == 0) - - for probe_num, probe in enumerate(vcd_probes): - val = part_select(value, widths[probe_num]) - writer.change(probe, timestamp, val) - vcd_file.close() - - else: - raise NotImplementedError("More file formats to come!") - - -def main(): - # print help menu if no args passed or help menu requested - if len(argv) == 1 or argv[1] == "help" or argv[1] == "ray" or argv[1] == "bae": - print_help() - exit() + ) # open minicom-like serial terminal with given config elif argv[1] == "terminal": assert len(argv) == 3, "Not enough (or too many) config files specified." - # TODO: make this work with a looser config file - it should work even if we just have a uart definition - config = load_config(argv[2]) - check_config(config) + # TODO: make this work with a looser config file - it should work as long as it has a uart definition + manta = Manta(argv[2]) raise NotImplementedError("Miniterm console is still under development!") @@ -348,31 +300,28 @@ def main(): for info in serial.tools.list_ports.comports(): print(info) - # generate the specified core + # generate the specified configuration elif argv[1] == "gen": assert ( len(argv) == 4 ), "Wrong number of arguments, only a config file and output file must both be specified." - config = load_config(argv[2]) - check_config(config) + manta = Manta(argv[2]) with open(argv[3], "w") as f: - f.write(gen_downlink_core(config)) + f.write(manta.generate()) # run the specified core elif argv[1] == "run": assert ( len(argv) == 4 ), "Wrong number of arguments, only a config file and output file must both be specified." - config = load_config(argv[2]) - check_config(config) - data = read_serial(config) - export_waveform(config, data, argv[3]) + manta = Manta(argv[2]) + manta.la_0.arm() + manta.la_0.export_waveform(argv[3]) else: - print("Option not recognized.") - print_help() + print("Option not recognized! Run 'manta help' for supported commands.") if __name__ == "__main__":