diff --git a/Makefile b/Makefile index d23c409..ee5afab 100644 --- a/Makefile +++ b/Makefile @@ -81,32 +81,28 @@ lut_ram_tb: examples: icestick nexys_a7 -nexys_a7: nexys_a7_video_sprite nexys_a7_io_core nexys_a7_logic_analyzer nexys_a7_lut_ram +nexys_a7: nexys_a7_video_sprite nexys_a7_io_core nexys_a7_ps2_logic_analyzer nexys_a7_lut_ram nexys_a7_video_sprite: cd examples/nexys_a7/video_sprite; \ manta gen manta.yaml src/manta.v; \ - mkdir -p obj/; \ - python3 lab-bc.py + build nexys_a7_io_core: cd examples/nexys_a7/io_core/; \ - manta gen manta.yaml src/manta.v; \ - mkdir -p obj/; \ - python3 lab-bc.py + manta gen manta.yaml manta.v; \ + build -nexys_a7_logic_analyzer: - cd examples/nexys_a7/logic_analyzer/; \ +nexys_a7_ps2_logic_analyzer: + cd examples/nexys_a7/ps2_logic_analyzer/; \ manta gen manta.yaml src/manta.v; \ - manta playback manta.yaml my_logic_analyzer sim/playback.v; \ - mkdir -p obj/; \ - python3 lab-bc.py + manta playback manta.yaml my_logic_analyzer sim/playback.v; \ + build nexys_a7_lut_ram: cd examples/nexys_a7/lut_ram/; \ - manta gen manta.yaml src/manta.v; \ - mkdir -p obj/; \ - python3 lab-bc.py + manta gen manta.yaml manta.v; \ + build icestick: icestick_io_core icestick_lut_ram diff --git a/examples/icestick/build.sh b/examples/icestick/common/build.sh similarity index 100% rename from examples/icestick/build.sh rename to examples/icestick/common/build.sh diff --git a/examples/icestick/io_core/build.sh b/examples/icestick/io_core/build.sh new file mode 120000 index 0000000..f84531b --- /dev/null +++ b/examples/icestick/io_core/build.sh @@ -0,0 +1 @@ +../common/build.sh \ No newline at end of file diff --git a/examples/icestick/lut_ram/build.sh b/examples/icestick/lut_ram/build.sh deleted file mode 100755 index 1b790ea..0000000 --- a/examples/icestick/lut_ram/build.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -yosys -p 'synth_ice40 -top top_level -json top_level.json' top_level.sv -nextpnr-ice40 --hx1k --json top_level.json --pcf top_level.pcf --asc top_level.asc -icepack top_level.asc top_level.bin -rm -f *.json -rm -f *.asc \ No newline at end of file diff --git a/examples/icestick/lut_ram/build.sh b/examples/icestick/lut_ram/build.sh new file mode 120000 index 0000000..f84531b --- /dev/null +++ b/examples/icestick/lut_ram/build.sh @@ -0,0 +1 @@ +../common/build.sh \ No newline at end of file diff --git a/examples/nexys_a7/ps2_logic_analyzer/manta.yaml b/examples/nexys_a7/ps2_logic_analyzer/manta.yaml index 5c7e1b3..c2d1e5f 100644 --- a/examples/nexys_a7/ps2_logic_analyzer/manta.yaml +++ b/examples/nexys_a7/ps2_logic_analyzer/manta.yaml @@ -3,7 +3,7 @@ cores: my_logic_analyzer: type: logic_analyzer sample_depth: 64000 - trigger_loc: 15000 + trigger_location: 15000 probes: ps2_clk: 1 diff --git a/examples/nexys_a7/ps2_logic_analyzer/sim/playback.v b/examples/nexys_a7/ps2_logic_analyzer/sim/playback.v index 6fd7f68..2e2130c 100644 --- a/examples/nexys_a7/ps2_logic_analyzer/sim/playback.v +++ b/examples/nexys_a7/ps2_logic_analyzer/sim/playback.v @@ -1,5 +1,5 @@ /* -This playback module was generated with Manta v0.0.0 on 18 Apr 2023 at 00:54:57 by fischerm +This playback module was generated with Manta v0.0.5 on 26 Apr 2023 at 12:42:05 by fischerm If this breaks or if you've got dank formal verification memes, contact fischerm [at] mit.edu diff --git a/src/manta/__init__.py b/src/manta/__init__.py index 6f883a7..ecbefaf 100644 --- a/src/manta/__init__.py +++ b/src/manta/__init__.py @@ -1,976 +1,17 @@ -import pkgutil -import math +# Internal Dependencies +from .verilog_manipulator import * +from .uart import * +from .ethernet import * +from .logic_analyzer import * +from .io import * +from .block_memory import * +from .lut_ram import * + +# External Dependencies from sys import argv import os from datetime import datetime - -version = "v0.0.0" - -class VerilogManipulator: - def __init__(self, filepath=None): - if filepath is not None: - self.hdl = pkgutil.get_data(__name__, filepath).decode() - - # scrub any default_nettype or timescale directives from the source - self.hdl = self.hdl.replace("`default_nettype none", "") - self.hdl = self.hdl.replace("`default_nettype wire", "") - self.hdl = self.hdl.replace("`timescale 1ns/1ps", "") - self.hdl = self.hdl.strip() - - # python tries to be cute and automatically convert - # line endings on Windows, but Manta's source comes - # with (and injects) UNIX line endings, so Python - # ends up adding way too many line breaks, so we just - # undo anything it's done when we load the file - self.hdl = self.hdl.replace("\r\n", "\n") - - else: - self.hdl = None - - def sub(self, replace, find): - # sometimes we have integer inputs, want to accomodate - if isinstance(replace, str): - replace_str = replace - - elif isinstance(replace, int): - replace_str = str(replace) - - else: - raise ValueError("Only string and integer arguments supported.") - - - # if the string being subbed in isn't multiline, just - # find-and-replace like normal: - if "\n" not in replace_str: - self.hdl = self.hdl.replace(find, replace_str) - - # if the string being substituted in is multiline, - # make sure the replace text gets put at the same - # indentation level by adding whitespace to left - # of the line. - else: - for line in self.hdl.split("\n"): - if find in line: - # get whitespace that's on the left side of the line - whitespace = line.rstrip().replace(line.lstrip(), "") - - # add it to every line, except the first - replace_as_lines = replace_str.split("\n") - replace_with_whitespace = f"\n{whitespace}".join(replace_as_lines) - - # replace the first occurance in the HDL with it - self.hdl = self.hdl.replace(find, replace_with_whitespace, 1) - - def get_hdl(self): - return self.hdl - - def net_dec(self, nets, net_type, trailing_comma = False): - """Takes a dictonary of nets in the format {probe: width}, and generates - the net declarations that would go in a Verilog module definition. - - For example, calling net_dec({foo : 1, bar : 4}, "input wire") would produce: - - input wire foo, - input [3:0] wire bar - - Which you'd then slap into your module declaration, along with all the other - inputs and outputs the module needs.""" - - dec = [] - for name, width in nets.items(): - if width == 1: - dec.append(f"{net_type} {name}") - - else: - dec.append(f"{net_type} [{width-1}:0] {name}") - - dec = ",\n".join(dec) - dec = dec + "," if trailing_comma else dec - return dec - - def net_conn(self, nets, trailing_comma = False): - """Takes a dictionary of nets in the format {probe: width}, and generates - the net connections that would go in the Verilog module instantiation. - - For example, calling net_conn({foo: 1, bar: 4}) would produce: - - .foo(foo), - .bar(bar) - - Which you'd then slap into your module instantiation, along with all the other - module inputs and outputs that get connected elsewhere.""" - - - conn = [f".{name}({name})" for name in nets] - conn = ",\n".join(conn) - conn = conn + "," if trailing_comma else conn - - return conn - -class UARTInterface: - def __init__(self, config): - # Warn if unrecognized options have been given - for option in config: - if option not in ["port", "clock_freq", "baudrate", "chunk_size", "verbose"]: - print(f"Warning: Ignoring unrecognized option '{option}' in UART interface.") - - # Obtain port. Try to automatically detect port if "auto" is specified - assert "port" in config, "No serial port provided to UART core." - self.port = config["port"] - - # Check that clock frequency is provided and positive - assert "clock_freq" in config, "Clock frequency not provided to UART core." - assert config["clock_freq"] > 0, "Clock frequency must be positive." - self.clock_freq = config["clock_freq"] - - # Check that baudrate is provided and positive - assert "baudrate" in config, "Baudrate not provided to UART core." - assert config["baudrate"] > 0, "Baudrate must be positive." - self.baudrate = config["baudrate"] - - # Confirm core clock is sufficiently fast - clocks_per_baud = self.clock_freq // self.baudrate - assert clocks_per_baud >= 2 - self.clocks_per_baud = clocks_per_baud - - # Confirm we can match baudrate suffeciently well - actual_baudrate = self.clock_freq / clocks_per_baud - baudrate_error = 100 * abs(actual_baudrate - self.baudrate) / self.baudrate - assert baudrate_error <= 5, \ - "Unable to match target baudrate - they differ by {baudrate_error}%" - - # Set chunk_size, which is the max amount of bytes that get dumped - # to the OS driver at a time - self.chunk_size = 256 - if "chunk_size" in config: - self.chunk_size = config["chunk_size"] - - # Set verbosity - self.verbose = False - if "verbose" in config: - self.verbose = config["verbose"] - - def open_port_if_not_alredy_open(self): - if self.port == "auto": - self.port = self.autodetect_port() - - if not hasattr(self, "ser"): - import serial - self.ser = serial.Serial(self.port, self.baudrate) - - def autodetect_port(self): - # as far as I know the FT2232 is the only chip used on the icestick/digilent boards, so just look for that - import serial.tools.list_ports - - recognized_devices = [] - for port in serial.tools.list_ports.comports(): - if (port.vid == 0x403) and (port.pid == 0x6010): - recognized_devices.append(port) - - # board manufacturers seem to always make the 0th serial - # interface on the FT2232 be for programming over JTAG, - # and then the 1st to be for UART. as a result, we always - # grab the device with the larger location - - rd = recognized_devices - assert len(recognized_devices) == 2, f"Expected to see two serial ports for FT2232 device, but instead see {len(recognized_devices)}." - assert rd[0].serial_number == rd[1].serial_number, "Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices." - return rd[0].device if rd[0].location > rd[1].location else rd[1].device - - def decode_response(self, response): - """Make sure reponse from FPGA has the correct format, and return data contained within if so.""" - assert response is not None, "No reponse received." - - response_str = response.decode('ascii') - assert response_str[0] == 'M', "Bad message recieved, incorrect preamble." - assert response_str[-1] == '\n', "Bad message received, incorrect EOL." - assert response_str[-2] == '\r', "Bad message received, incorrect EOL." - assert len(response_str) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}." - - return int(response_str[1:5], 16) - - def read_register(self, addr): - self.open_port_if_not_alredy_open() - - # request from the bus - request = f"M{addr:04X}\r\n".encode('ascii') - self.ser.write(request) - - # read and parse the response - data = self.decode_response(self.ser.read(7)) - - if self.verbose: - print(f"read {data:04X} from {addr:04X}") - - return data - - def write_register(self, addr, data): - self.open_port_if_not_alredy_open() - - # request from the bus - request = f"M{addr:04X}{data:04X}\r\n".encode('ascii') - self.ser.write(request) - - if self.verbose: - print(f"wrote {data:04X} to {addr:04X}") - - def read_registers(self, addrs): - assert isinstance(addrs, list), "Read addresses must be list of integers." - assert all(isinstance(addr, int) for addr in addrs), "Read addresses must be list of integers." - - # send data in chunks because the reponses will fill up the OS's - # input buffer in no time flat - self.open_port_if_not_alredy_open() - - inbound_bytes = b"" - for i in range(0, len(addrs), self.chunk_size): - addr_chunk = addrs[i:i+self.chunk_size] - - outbound_bytes = [f"M{addr:04X}\r\n".encode('ascii') for addr in addr_chunk] - outbound_bytes = b"".join(outbound_bytes) - - self.ser.write(outbound_bytes) - - inbound_bytes += self.ser.read(len(outbound_bytes)) - - data = [] - for i in range(0, len(inbound_bytes), 7): - response = inbound_bytes[i:i+7] - data.append(self.decode_response(response)) - - return data - - def write_registers(self, addrs, datas): - assert isinstance(addrs, list), "Write addresses must be list of integers." - assert isinstance(datas, list), "Write data must be list of integers." - assert all(isinstance(addr, int) for addr in addrs), "Write addresses must be list of integers." - assert all(isinstance(data, int) for data in datas), "Write data must be list of integers." - assert len(addrs) == len(datas), "Write addresses and write data must be of same length." - - # send data in chunks because the responses will fill up the OS's - # input buffer in no time flat - self.open_port_if_not_alredy_open() - - for i in range(0, len(addrs), self.chunk_size): - addr_chunk = addrs[i:i+self.chunk_size] - data_chunk = datas[i:i+self.chunk_size] - - - outbound_bytes = [f"M{a:04X}{d:04X}\r\n" for a, d in zip(addr_chunk, data_chunk)] - outbound_bytes = [ob.encode('ascii') for ob in outbound_bytes] - outbound_bytes = b"".join(outbound_bytes) - - self.ser.write(outbound_bytes) - - def hdl_top_level_ports(self): - # this should return the probes that we want to connect to top-level, but like as a string of verilog - - return ["input wire rx", "output reg tx"] - - def rx_hdl_def(self): - uart_rx_def = VerilogManipulator("rx_uart.v").get_hdl() - bridge_rx_def = VerilogManipulator("bridge_rx.v").get_hdl() - return uart_rx_def + '\n' + bridge_rx_def - - def tx_hdl_def(self): - uart_tx_def = VerilogManipulator("uart_tx.v").get_hdl() - bridge_tx_def = VerilogManipulator("bridge_tx.v").get_hdl() - return bridge_tx_def + '\n' + uart_tx_def - - def rx_hdl_inst(self): - rx = VerilogManipulator("uart_rx_bridge_rx_inst_templ.v") - rx.sub(self.clocks_per_baud, "/* CLOCKS_PER_BAUD */") - return rx.get_hdl() - - def tx_hdl_inst(self): - tx = VerilogManipulator("uart_tx_bridge_tx_inst_templ.v") - tx.sub(self.clocks_per_baud, "/* CLOCKS_PER_BAUD */") - return tx.get_hdl() - -class IOCoreProbe: - def __init__(self, name, width, direction, base_addr, interface): - self.name = name - self.width = width - self.direction = direction - self.base_addr = base_addr - self.interface = interface - - def set(self, data): - # make sure that we're an output probe - assert self.direction == "output", "Cannot set value of input port." - - # check that value is within range for the width of the probe - assert isinstance(data, int), "Data must be an integer." - if data > 0: - assert data <= (2**self.width) - 1, f"Unsigned value too large for probe of width {self.width}" - - elif data < 0: - assert data >= -(2**(self.width-1))-1, f"Signed value too large for probe of width {self.width}" - assert data <= (2**(self.width-1))-1, f"Signed value too large for probe of width {self.width}" - - self.interface.write_register(self.base_addr, data) - - def get(self): - return self.interface.read_register(self.base_addr) - -class IOCore: - def __init__(self, config, name, base_addr, interface): - self.name = name - self.base_addr = base_addr - self.interface = interface - - # Warn if unrecognized options have been given - for option in config: - if option not in ["type", "inputs", "outputs"]: - print(f"Warning: Ignoring unrecognized option '{option}' in IO core '{self.name}'") - - # make sure we have ports defined - assert ('inputs' in config) or ('outputs' in config), "No input or output ports specified." - - # add input probes to core - self.probes = [] - probe_base_addr = self.base_addr - if 'inputs' in config: - for name, width in config["inputs"].items(): - # make sure inputs are of reasonable width - assert isinstance(width, int), f"Probe {name} must have integer width." - assert width > 0, f"Probe {name} must have positive width." - - probe = IOCoreProbe(name, width, "input", probe_base_addr, self.interface) - - # add friendly name, so users can do Manta.my_io_core.my_probe.set() for example - setattr(self, name, probe) - self.probes.append(probe) - - self.max_addr = probe_base_addr - probe_base_addr += 1 - - # add output probes to core - if 'outputs' in config: - for name, width in config["outputs"].items(): - # make sure inputs are of reasonable width - assert isinstance(width, int), f"Probe {name} must have integer width." - assert width > 0, f"Probe {name} must have positive width." - - probe = IOCoreProbe(name, width, "output", probe_base_addr, self.interface) - - # add friendly name, so users can do Manta.my_io_core.my_probe.set() for example - setattr(self, name, probe) - self.probes.append(probe) - - self.max_addr = probe_base_addr - probe_base_addr += 1 - - - def hdl_inst(self): - inst = VerilogManipulator("io_core_inst_tmpl.v") - inst.sub(self.name, "/* MODULE_NAME */") - inst.sub(self.name + "_inst", "/* INST_NAME */") - - probes = {probe.name:probe.width for probe in self.probes} - - inst_ports = inst.net_conn(probes, trailing_comma=True) - inst.sub(inst_ports, "/* INST_PORTS */") - - return inst.get_hdl() - - - def hdl_def(self): - io_core = VerilogManipulator("io_core_def_tmpl.v") - io_core.sub(self.name, "/* MODULE_NAME */") - io_core.sub(self.max_addr, "/* MAX_ADDR */") - - # generate declaration - top_level_ports = ',\n'.join(self.hdl_top_level_ports()) - top_level_ports += ',' - io_core.sub(top_level_ports, "/* TOP_LEVEL_PORTS */") - - # generate memory handling - rcsb = "" # read case statement body - wcsb = "" # write case statement body - for probe in self.probes: - - # add to read block - if probe.width == 16: - rcsb += f"{probe.base_addr}: rdata_o <= {probe.name};\n" - - else: - rcsb += f"{probe.base_addr}: rdata_o <= {{{16-probe.width}'b0, {probe.name}}};\n" - - - # if output, add to write block - if probe.direction == "output": - if probe.width == 1: - wcsb += f"{probe.base_addr}: {probe.name} <= wdata_i[0];\n" - - elif probe.width == 16: - wcsb += f"{probe.base_addr}: {probe.name} <= wdata_i;\n" - - else: - wcsb += f"{probe.base_addr}: {probe.name} <= wdata_i[{probe.width-1}:0];\n" - - # remove trailing newline - rcsb = rcsb.rstrip() - wcsb = wcsb.rstrip() - - io_core.sub(rcsb, "/* READ_CASE_STATEMENT_BODY */") - io_core.sub(wcsb, "/* WRITE_CASE_STATEMENT_BODY */") - - return io_core.get_hdl() - - - - def hdl_top_level_ports(self): - ports = [] - for probe in self.probes: - net_type = "input wire " if probe.direction == "input" else "output reg " - name_def = probe.name if probe.width == 1 else f"[{probe.width-1}:0] {probe.name}" - ports.append(net_type + name_def) - - return ports - -class LUTRAMCore: - def __init__(self, config, name, base_addr, interface): - self.name = name - self.base_addr = base_addr - self.interface = interface - - # Warn if unrecognized options have been given - for option in config: - if option not in ["type", "size"]: - print(f"Warning: Ignoring unrecognized option '{option}' in LUT Memory '{self.name}'") - - assert "size" in config, "Size not specified for LUT RAM core." - assert config["size"] > 0, "LUT RAM must have positive size." - assert isinstance(config["size"], int), "LUT RAM must have integer size." - self.size = config["size"] - - self.max_addr = self.base_addr + self.size - 1 - - def hdl_inst(self): - inst = VerilogManipulator("lut_ram_inst_tmpl.v") - inst.sub(self.size, "/* DEPTH */") - inst.sub(self.name, "/* INST_NAME */") - return inst.get_hdl() - - def hdl_def(self): - return VerilogManipulator("lut_ram.v").get_hdl() - - def hdl_top_level_ports(self): - # no top_level connections since this core just lives on the bus - return "" - - def read(self, addr): - return self.interface.read_register(addr + self.base_addr) - - def write(self, addr, data): - return self.interface.write_register(addr + self.base_addr, data) - -class LogicAnalyzerCore: - def __init__(self, config, name, base_addr, interface): - self.name = name - self.base_addr = base_addr - self.interface = interface - - # Warn if unrecognized options have been given - valid_options = ["type", "sample_depth", "probes", "triggers", "trigger_location", "trigger_mode"] - for option in config: - if option not in valid_options: - print(f"Warning: Ignoring unrecognized option '{option}' in Logic Analyzer core '{self.name}'") - - # Load sample depth - assert "sample_depth" in config, \ - "Sample depth not found for Logic Analyzer core {self.name}." - - assert isinstance(config["sample_depth"], int), \ - "Sample depth must be an integer." - - self.sample_depth = config["sample_depth"] - - # Add probes - assert "probes" in config, "No probe definitions found." - assert len(config["probes"]) > 0, "Must specify at least one probe." - - for probe_name, probe_width in config["probes"].items(): - assert probe_width > 0, f"Probe {probe_name} is of invalid width - it must be of at least width one." - - self.probes = config["probes"] - - # Add triggers - assert "triggers" in config, "No triggers found." - assert len(config["triggers"]) > 0, "Must specify at least one trigger." - self.triggers = config["triggers"] - - # Add trigger location - self.trigger_loc = self.sample_depth // 2 - if "trigger_location" in config: - assert isinstance(config["trigger_location"], int), \ - "Trigger location must be an integer." - - assert config["trigger_location"] >= 0, \ - "Trigger location cannot be negative." - - assert config["trigger_location"] <= self.sample_depth, \ - "Trigger location cannot exceed sample depth." - - self.trigger_loc = config["trigger_location"] - - # Add trigger mode - self.SINGLE_SHOT = 0 - self.INCREMENTAL = 1 - self.IMMEDIATE = 2 - - self.trigger_mode = self.SINGLE_SHOT - if "trigger_mode" in config: - assert config["trigger_mode"] in ["single_shot", "incremental", "immediate"], \ - "Unrecognized trigger mode provided." - - if config["trigger_mode"] == "single_shot": - self.trigger_mode = self.SINGLE_SHOT - - elif config["trigger_mode"] == "incremental": - self.trigger_mode = self.INCREMENTAL - - elif config["trigger_mode"] == "immediate": - self.trigger_mode = self.IMMEDIATE - - # compute base addresses - self.fsm_base_addr = self.base_addr - self.trigger_block_base_addr = self.fsm_base_addr + 6 - - self.total_probe_width = sum(self.probes.values()) - n_brams = math.ceil(self.total_probe_width / 16) - self.block_memory_base_addr = self.trigger_block_base_addr + (2*len(self.probes)) - self.max_addr = self.block_memory_base_addr + (n_brams * self.sample_depth) - - # build out self register map: - # these are also defined in logic_analyzer_fsm_registers.v, and should match - self.state_reg_addr = self.base_addr - self.trigger_mode_reg_addr = self.base_addr + 1 - self.trigger_loc_reg_addr = self.base_addr + 2 - self.request_start_reg_addr = self.base_addr + 3 - self.request_stop_reg_addr = self.base_addr + 4 - self.read_pointer_reg_addr = self.base_addr + 5 - self.write_pointer_reg_addr = self.base_addr + 6 - - self.IDLE = 0 - self.MOVE_TO_POSITION = 1 - self.IN_POSITION = 2 - self.CAPTURING = 3 - self.CAPTURED = 4 - - def hdl_inst(self): - la_inst = VerilogManipulator("logic_analyzer_inst_tmpl.v") - - # add module name to instantiation - la_inst.sub(self.name, "/* INST_NAME */") - - # add net connections to instantiation - conns = la_inst.net_conn(self.probes, trailing_comma=True) - la_inst.sub(conns, "/* NET_CONNS */") - return la_inst.get_hdl() - - def gen_trigger_block_def(self): - trigger_block = VerilogManipulator("trigger_block_def_tmpl.v") - - # add probe ports to module declaration - # these ports belong to the logic analyzer, but - # need to be included in the trigger_block module declaration - probe_ports = trigger_block.net_dec(self.probes, "input wire", trailing_comma=True) - trigger_block.sub(probe_ports, "/* PROBE_PORTS */") - - - # add trigger cores to module definition - # these are instances of the trigger module, of which one gets wired - # into each probe - trigger_module_insts = [] - for name, width in self.probes.items(): - trig_inst = VerilogManipulator("trigger_block_inst_tmpl.v") - trig_inst.sub(width, "/* INPUT_WIDTH */") - trig_inst.sub(f"{name}_trigger", "/* NAME */") - - trig_inst.sub(f"reg [3:0] {name}_op = 0;", "/* OP_DEC */") - trig_inst.sub(f"reg {name}_trig;", "/* TRIG_DEC */") - - if width == 1: - trig_inst.sub(f"reg {name}_arg = 0;", "/* ARG_DEC */") - - else: - trig_inst.sub(f"reg [{width-1}:0] {name}_arg = 0;", "/* ARG_DEC */") - - trig_inst.sub(name, "/* PROBE */") - trig_inst.sub(f"{name}_op", "/* OP */") - trig_inst.sub(f"{name}_arg", "/* ARG */") - trig_inst.sub(f"{name}_trig", "/* TRIG */") - - trigger_module_insts.append(trig_inst.get_hdl()) - - trigger_module_insts = "\n".join(trigger_module_insts) - trigger_block.sub(trigger_module_insts, "/* TRIGGER_MODULE_INSTS */") - - # add combined individual triggers - cit = [f"{name}_trig" for name in self.probes] - cit = " || ".join(cit) - cit = f"assign trig = {cit};" - trigger_block.sub(cit, " /* COMBINE_INDIV_TRIGGERS */") - - # add read and write block case statement bodies - rcsb = "" # read case statement body - wcsb = "" # write case statement body - addr = 0 - for i, name in enumerate(self.probes): - addr = 2 * i - rcsb += f"BASE_ADDR + {addr}: rdata_o <= {name}_op;\n" - wcsb += f"BASE_ADDR + {addr}: {name}_op <= wdata_i;\n" - - addr = (2 * i) + 1 - rcsb += f"BASE_ADDR + {addr}: rdata_o <= {name}_arg;\n" - wcsb += f"BASE_ADDR + {addr}: {name}_arg <= wdata_i;\n" - - rcsb = rcsb.strip() - wcsb = wcsb.strip() - - trigger_block.sub(rcsb, "/* READ_CASE_STATEMENT_BODY */") - trigger_block.sub(wcsb, "/* WRITE_CASE_STATEMENT_BODY */") - trigger_block.sub(self.trigger_block_base_addr + addr + 1, "/* MAX_ADDR */") - - return trigger_block.get_hdl() - - def gen_logic_analyzer_def(self): - la = VerilogManipulator("logic_analyzer_def_tmpl.v") - - # add top level probe ports to module declaration - ports = la.net_dec(self.probes, "input wire", trailing_comma=True) - la.sub(ports, "/* TOP_LEVEL_PROBE_PORTS */") - - # assign base addresses to the FSM, trigger block, and sample mem - la.sub(self.fsm_base_addr, "/* FSM_BASE_ADDR */") - la.sub(self.trigger_block_base_addr, "/* TRIGGER_BLOCK_BASE_ADDR */") - la.sub(self.block_memory_base_addr, "/* BLOCK_MEMORY_BASE_ADDR */") - - # set sample depth - la.sub(self.sample_depth, "/* SAMPLE_DEPTH */") - - # set probe ports for the trigger block and sample mem - probe_ports = la.net_conn(self.probes, trailing_comma=True) - la.sub(probe_ports, "/* TRIGGER_BLOCK_PROBE_PORTS */") - - la.sub(self.total_probe_width, "/* TOTAL_PROBE_WIDTH */") - - # concatenate the probes together to make one big register, - # but do so such that the first probe in the config file - # is at the least-significant position in that big register. - # - # this makes part-selecting out from the memory easier to - # implement in python, and because verilog and python conventions - # are different, we would have had to reverse it somwehere anyway - probes_concat = list(self.probes.keys())[::-1] - probes_concat = '{' + ', '.join(probes_concat) + '}' - la.sub(probes_concat, "/* PROBES_CONCAT */") - - return la.get_hdl() - - def hdl_def(self): - # Return an autogenerated verilog module definition for the core. - # load source files - hdl = self.gen_logic_analyzer_def() + "\n" - hdl += VerilogManipulator("logic_analyzer_controller.v").get_hdl() + "\n" - hdl += VerilogManipulator("logic_analyzer_fsm_registers.v").get_hdl() + "\n" - hdl += VerilogManipulator("block_memory.v").get_hdl() + "\n" - hdl += VerilogManipulator("dual_port_bram.v").get_hdl() + "\n" - hdl += self.gen_trigger_block_def() + "\n" - hdl += VerilogManipulator("trigger.v").get_hdl() + "\n" - - return hdl - - def hdl_top_level_ports(self): - # the probes that we want as ports on the top-level manta module - ports = [] - for name, width in self.probes.items(): - if width == 1: - ports.append(f"input wire {name}") - - else: - ports.append(f"input wire [{width-1}:0] {name}") - return ports - #return VerilogManipulator().net_dec(self.probes, "input wire") - - def set_trigger_conditions(self): - - operations = { - "DISABLE" : 0, - "RISING" : 1, - "FALLING" : 2, - "CHANGING" : 3, - "GT" : 4, - "LT" : 5, - "GEQ" : 6, - "LEQ" : 7, - "EQ" : 8, - "NEQ" : 9 - } - - ops_with_no_args = ["DISABLE", "RISING" , "FALLING", "CHANGING"] - - # reset all the other triggers - for addr in range(self.trigger_block_base_addr, self.block_memory_base_addr): - self.interface.write_register(addr, 0) - - for trigger in self.triggers: - # determine if the trigger is good - - # most triggers will have 3 parts - the trigger, the operation, and the argument - # this is true unless the argument is RISING, FALLING, or CHANGING - - statement = trigger.split(' ') - if len(statement) == 2: - assert statement[1] in ops_with_no_args, "Invalid operation in trigger statement." - probe_name, op = statement - - op_register = 2*(list(self.probes.keys()).index(probe_name)) + self.trigger_block_base_addr - - self.interface.write_register(op_register, operations[op]) - - else: - assert len(statement) == 3, "Missing information in trigger statement." - probe_name, op, arg = statement - - op_register = 2*(list(self.probes.keys()).index(probe_name)) + self.trigger_block_base_addr - arg_register = op_register + 1 - - self.interface.write_register(op_register, operations[op]) - self.interface.write_register(arg_register, int(arg)) - - - - # functions for actually using the core: - def capture(self): - # Check state - if it's in anything other than IDLE, - # request to stop the existing capture - - print(" -> Resetting core...") - state = self.interface.read_register(self.state_reg_addr) - if state != self.IDLE: - self.interface.write_register(self.request_stop_reg_addr, 0) - self.interface.write_register(self.request_stop_reg_addr, 1) - - state = self.interface.read_register(self.state_reg_addr) - assert state == self.IDLE, "Logic analyzer did not reset to correct state when requested to." - - # Configure trigger conditions - print(" -> Set trigger conditions...") - self.set_trigger_conditions() - - # Configure the trigger_mode - print(" -> Setting trigger mode") - self.interface.write_register(self.trigger_mode_reg_addr, self.trigger_mode) - - # Configure the trigger_loc - print(" -> Setting trigger location...") - self.interface.write_register(self.trigger_loc_reg_addr, self.trigger_loc) - - # Start the capture by pulsing request_start - print(" -> Starting capture...") - self.interface.write_register(self.request_start_reg_addr, 1) - self.interface.write_register(self.request_start_reg_addr, 0) - - # Wait for core to finish capturing data - print(" -> Waiting for capture to complete...") - state = self.interface.read_register(self.state_reg_addr) - while(state != self.CAPTURED): - state = self.interface.read_register(self.state_reg_addr) - - # Read out contents from memory - print(" -> Reading sample memory contents...") - addrs = list(range(self.block_memory_base_addr, self.max_addr)) - block_mem_contents = self.interface.read_registers(addrs) - - # Revolve BRAM contents around so the data pointed to by the read_pointer - # is at the beginning - print(" -> Reading read_pointer and revolving memory...") - read_pointer = self.interface.read_register(self.read_pointer_reg_addr) - return block_mem_contents[read_pointer:] + block_mem_contents[:read_pointer] - - - def export_vcd(self, capture_data, path): - from vcd import VCDWriter - vcd_file = open(path, "w") - - # Use the same datetime format that iVerilog uses - timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") - - with VCDWriter(vcd_file, '10 ns', timestamp, "manta") as writer: - - # each probe has a name, width, and writer associated with it - signals = [] - for name, width in self.probes.items(): - signal = { - "name" : name, - "width" : width, - "data" : self.part_select_capture_data(capture_data, name), - "var": writer.register_var("manta", name, "wire", size=width) - } - signals.append(signal) - - clock = writer.register_var("manta", "clk", "wire", size=1) - trigger = writer.register_var("manta", "trigger", "wire", size=1) - - # add the data to each probe in the vcd file - for timestamp in range(0, 2*len(capture_data)): - - # run the clock - writer.change(clock, timestamp, timestamp % 2 == 0) - - # set the trigger - triggered = (timestamp // 2) >= self.trigger_loc - writer.change(trigger, timestamp, triggered) - - # add other signals - for signal in signals: - var = signal["var"] - sample = signal["data"][timestamp // 2] - - writer.change(var, timestamp, sample) - - vcd_file.close() - - def export_mem(self, capture_data, path): - with open(path, "w") as f: - # a wee bit of cursed string formatting, but just - # outputs each sample as binary, padded to a fixed length - w = self.total_probe_width - f.writelines([f'{s:0{w}b}\n' for s in capture_data]) - - def export_playback_module(self, path): - playback = VerilogManipulator("logic_analyzer_playback_tmpl.v") - - module_name = f"{self.name}_playback" - playback.sub(module_name, "/* MODULE_NAME */") - - playback.sub(version, "/* VERSION */") - - timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S") - playback.sub(timestamp, "/* TIMESTAMP */") - - user = os.environ.get("USER", os.environ.get("USERNAME")) - playback.sub(user, "/* USER */") - - ports = [f".{name}({name})" for name in self.probes.keys()] - ports = ",\n".join(ports) - playback.sub(ports, "/* PORTS */") - - playback.sub(self.sample_depth, "/* SAMPLE_DEPTH */") - playback.sub(self.total_probe_width, "/* TOTAL_PROBE_WIDTH */") - - # see the note in generate_logic_analyzer_def about why we do this - probes_concat = list(self.probes.keys())[::-1] - probes_concat = '{' + ', '.join(probes_concat) + '}' - playback.sub(probes_concat, "/* PROBES_CONCAT */") - - - probe_dec = playback.net_dec(self.probes, "output reg") - playback.sub(probe_dec, "/* PROBE_DEC */") - - with open(path, "w") as f: - f.write(playback.get_hdl()) - - - def part_select_capture_data(self, capture_data, probe_name): - """Given the name of the probe, part-select the appropriate bits of capture data, - and return as an integer. Accepts capture_data as an integer or a list of integers.""" - - # sum up the widths of the probes below this one - lower = 0 - for name, width in self.probes.items(): - if name == probe_name: - break - - lower += width - - upper = lower + (self.probes[probe_name] - 1) - - # define the part select - mask = 2 ** (upper - lower + 1) - 1 - part_select = lambda x: (x >> lower) & mask - - # apply the part_select function depending on type - if isinstance(capture_data, int): - return part_select(capture_data) - - elif isinstance(capture_data, list): - for i in capture_data: - assert isinstance(i, int), "Can only part select on integers and list of integers." - - return [part_select(sample) for sample in capture_data] - - else: - raise ValueError("Can only part select on integers and lists of integers.") - -class BlockMemoryCore: - def __init__(self, config, name, base_addr, interface): - self.name = name - self.base_addr = base_addr - self.interface = interface - - # Warn if unrecognized options have been given - for option in config: - if option not in ["type", "depth", "width", "expose_port"]: - print(f"Warning: Ignoring unrecognized option '{option}' in Block Memory core '{self.name}'") - - # Determine if we expose the BRAM's second port to the top of the module - if "expose_port" in config: - assert isinstance(config["expose_port"], bool), "Configuring BRAM exposure must be done with a boolean." - self.expose_port = config["expose_port"] - - else: - self.expose_port = True - - # Get depth - assert "depth" in config, "Depth not specified for Block Memory core." - assert config["depth"] > 0, "Block Memory core must have positive depth." - assert isinstance(config["depth"], int), "Block Memory core must have integer depth." - self.depth = config["depth"] - - # Get width - assert "width" in config, "Width not specified for Block Memory core." - assert config["width"] > 0, "Block Memory core must have positive width." - assert isinstance(config["width"], int), "Block Memory core must have integer width." - self.width = config["width"] - - self.addr_width = math.ceil(math.log2(self.depth)) - self.n_brams = math.ceil(self.width / 16) - self.max_addr = self.base_addr + (self.depth * self.n_brams) - - def hdl_inst(self): - inst = VerilogManipulator("block_memory_inst_tmpl.v") - inst.sub(self.name, "/* INST_NAME */") - inst.sub(self.depth, "/* DEPTH */") - inst.sub(self.width, "/* WIDTH */") - return inst.get_hdl() - - def hdl_def(self): - block_memory = VerilogManipulator("block_memory.v").get_hdl() - dual_port_bram = VerilogManipulator("dual_port_bram.v").get_hdl() - return block_memory + "\n" + dual_port_bram - - def hdl_top_level_ports(self): - if not self.expose_port: - return "" - - tlp = [] - tlp.append(f"input wire {self.name}_clk") - tlp.append(f"input wire [{self.addr_width-1}:0] {self.name}_addr") - tlp.append(f"input wire [{self.width-1}:0] {self.name}_din") - tlp.append(f"output reg [{self.width-1}:0] {self.name}_dout") - tlp.append(f"input wire {self.name}_we") - return tlp - - def read(self, addr): - return self.interface.read_register(addr + self.base_addr) - - def write(self, addr, data): - return self.interface.write_register(addr + self.base_addr, data) - +from pkg_resources import get_distribution class Manta: def __init__(self, config_filepath): @@ -1220,6 +261,7 @@ reg {self.cores[-1].name}_btx_valid;\n""" def generate_hdl(self, output_filepath): manta = VerilogManipulator("manta_def_tmpl.v") + version = "v" + get_distribution('mantaray').version manta.sub(version, "/* VERSION */") timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S") diff --git a/src/manta/block_memory.py b/src/manta/block_memory.py new file mode 100644 index 0000000..4ce0974 --- /dev/null +++ b/src/manta/block_memory.py @@ -0,0 +1,68 @@ +from .verilog_manipulator import * + +import math + +class BlockMemoryCore: + def __init__(self, config, name, base_addr, interface): + self.name = name + self.base_addr = base_addr + self.interface = interface + + # Warn if unrecognized options have been given + for option in config: + if option not in ["type", "depth", "width", "expose_port"]: + print(f"Warning: Ignoring unrecognized option '{option}' in Block Memory core '{self.name}'") + + # Determine if we expose the BRAM's second port to the top of the module + if "expose_port" in config: + assert isinstance(config["expose_port"], bool), "Configuring BRAM exposure must be done with a boolean." + self.expose_port = config["expose_port"] + + else: + self.expose_port = True + + # Get depth + assert "depth" in config, "Depth not specified for Block Memory core." + assert config["depth"] > 0, "Block Memory core must have positive depth." + assert isinstance(config["depth"], int), "Block Memory core must have integer depth." + self.depth = config["depth"] + + # Get width + assert "width" in config, "Width not specified for Block Memory core." + assert config["width"] > 0, "Block Memory core must have positive width." + assert isinstance(config["width"], int), "Block Memory core must have integer width." + self.width = config["width"] + + self.addr_width = math.ceil(math.log2(self.depth)) + self.n_brams = math.ceil(self.width / 16) + self.max_addr = self.base_addr + (self.depth * self.n_brams) + + def hdl_inst(self): + inst = VerilogManipulator("block_memory/block_memory_inst_tmpl.v") + inst.sub(self.name, "/* INST_NAME */") + inst.sub(self.depth, "/* DEPTH */") + inst.sub(self.width, "/* WIDTH */") + return inst.get_hdl() + + def hdl_def(self): + block_memory = VerilogManipulator("block_memory/block_memory.v").get_hdl() + dual_port_bram = VerilogManipulator("block_memory/dual_port_bram.v").get_hdl() + return block_memory + "\n" + dual_port_bram + + def hdl_top_level_ports(self): + if not self.expose_port: + return "" + + tlp = [] + tlp.append(f"input wire {self.name}_clk") + tlp.append(f"input wire [{self.addr_width-1}:0] {self.name}_addr") + tlp.append(f"input wire [{self.width-1}:0] {self.name}_din") + tlp.append(f"output reg [{self.width-1}:0] {self.name}_dout") + tlp.append(f"input wire {self.name}_we") + return tlp + + def read(self, addr): + return self.interface.read_register(addr + self.base_addr) + + def write(self, addr, data): + return self.interface.write_register(addr + self.base_addr, data) \ No newline at end of file diff --git a/src/manta/block_memory.v b/src/manta/block_memory/block_memory.v similarity index 100% rename from src/manta/block_memory.v rename to src/manta/block_memory/block_memory.v diff --git a/src/manta/block_memory_inst_tmpl.v b/src/manta/block_memory/block_memory_inst_tmpl.v similarity index 100% rename from src/manta/block_memory_inst_tmpl.v rename to src/manta/block_memory/block_memory_inst_tmpl.v diff --git a/src/manta/dual_port_bram.v b/src/manta/block_memory/dual_port_bram.v similarity index 100% rename from src/manta/dual_port_bram.v rename to src/manta/block_memory/dual_port_bram.v diff --git a/src/manta/ethernet.py b/src/manta/ethernet.py new file mode 100644 index 0000000..e69de29 diff --git a/src/manta/aggregate.v b/src/manta/ethernet/aggregate.v similarity index 100% rename from src/manta/aggregate.v rename to src/manta/ethernet/aggregate.v diff --git a/src/manta/bitorder.v b/src/manta/ethernet/bitorder.v similarity index 100% rename from src/manta/bitorder.v rename to src/manta/ethernet/bitorder.v diff --git a/src/manta/cksum.v b/src/manta/ethernet/cksum.v similarity index 100% rename from src/manta/cksum.v rename to src/manta/ethernet/cksum.v diff --git a/src/manta/crc32.v b/src/manta/ethernet/crc32.v similarity index 100% rename from src/manta/crc32.v rename to src/manta/ethernet/crc32.v diff --git a/src/manta/ether.v b/src/manta/ethernet/ether.v similarity index 100% rename from src/manta/ether.v rename to src/manta/ethernet/ether.v diff --git a/src/manta/ethernet_rx.v b/src/manta/ethernet/ethernet_rx.v similarity index 100% rename from src/manta/ethernet_rx.v rename to src/manta/ethernet/ethernet_rx.v diff --git a/src/manta/ethernet_tx.v b/src/manta/ethernet/ethernet_tx.v similarity index 100% rename from src/manta/ethernet_tx.v rename to src/manta/ethernet/ethernet_tx.v diff --git a/src/manta/firewall.v b/src/manta/ethernet/firewall.v similarity index 100% rename from src/manta/firewall.v rename to src/manta/ethernet/firewall.v diff --git a/src/manta/mac_rx.v b/src/manta/ethernet/mac_rx.v similarity index 100% rename from src/manta/mac_rx.v rename to src/manta/ethernet/mac_rx.v diff --git a/src/manta/mac_tx.v b/src/manta/ethernet/mac_tx.v similarity index 100% rename from src/manta/mac_tx.v rename to src/manta/ethernet/mac_tx.v diff --git a/src/manta/io.py b/src/manta/io.py new file mode 100644 index 0000000..1873c48 --- /dev/null +++ b/src/manta/io.py @@ -0,0 +1,143 @@ +from .verilog_manipulator import * + +class IOCoreProbe: + def __init__(self, name, width, direction, base_addr, interface): + self.name = name + self.width = width + self.direction = direction + self.base_addr = base_addr + self.interface = interface + + def set(self, data): + # make sure that we're an output probe + assert self.direction == "output", "Cannot set value of input port." + + # check that value is within range for the width of the probe + assert isinstance(data, int), "Data must be an integer." + if data > 0: + assert data <= (2**self.width) - 1, f"Unsigned value too large for probe of width {self.width}" + + elif data < 0: + assert data >= -(2**(self.width-1))-1, f"Signed value too large for probe of width {self.width}" + assert data <= (2**(self.width-1))-1, f"Signed value too large for probe of width {self.width}" + + self.interface.write_register(self.base_addr, data) + + def get(self): + return self.interface.read_register(self.base_addr) + +class IOCore: + def __init__(self, config, name, base_addr, interface): + self.name = name + self.base_addr = base_addr + self.interface = interface + + # Warn if unrecognized options have been given + for option in config: + if option not in ["type", "inputs", "outputs"]: + print(f"Warning: Ignoring unrecognized option '{option}' in IO core '{self.name}'") + + # make sure we have ports defined + assert ('inputs' in config) or ('outputs' in config), "No input or output ports specified." + + # add input probes to core + self.probes = [] + probe_base_addr = self.base_addr + if 'inputs' in config: + for name, width in config["inputs"].items(): + # make sure inputs are of reasonable width + assert isinstance(width, int), f"Probe {name} must have integer width." + assert width > 0, f"Probe {name} must have positive width." + + probe = IOCoreProbe(name, width, "input", probe_base_addr, self.interface) + + # add friendly name, so users can do Manta.my_io_core.my_probe.set() for example + setattr(self, name, probe) + self.probes.append(probe) + + self.max_addr = probe_base_addr + probe_base_addr += 1 + + # add output probes to core + if 'outputs' in config: + for name, width in config["outputs"].items(): + # make sure inputs are of reasonable width + assert isinstance(width, int), f"Probe {name} must have integer width." + assert width > 0, f"Probe {name} must have positive width." + + probe = IOCoreProbe(name, width, "output", probe_base_addr, self.interface) + + # add friendly name, so users can do Manta.my_io_core.my_probe.set() for example + setattr(self, name, probe) + self.probes.append(probe) + + self.max_addr = probe_base_addr + probe_base_addr += 1 + + + def hdl_inst(self): + inst = VerilogManipulator("io/io_core_inst_tmpl.v") + inst.sub(self.name, "/* MODULE_NAME */") + inst.sub(self.name + "_inst", "/* INST_NAME */") + + probes = {probe.name:probe.width for probe in self.probes} + + inst_ports = inst.net_conn(probes, trailing_comma=True) + inst.sub(inst_ports, "/* INST_PORTS */") + + return inst.get_hdl() + + + def hdl_def(self): + io_core = VerilogManipulator("io/io_core_def_tmpl.v") + io_core.sub(self.name, "/* MODULE_NAME */") + io_core.sub(self.max_addr, "/* MAX_ADDR */") + + # generate declaration + top_level_ports = ',\n'.join(self.hdl_top_level_ports()) + top_level_ports += ',' + io_core.sub(top_level_ports, "/* TOP_LEVEL_PORTS */") + + # generate memory handling + rcsb = "" # read case statement body + wcsb = "" # write case statement body + for probe in self.probes: + + # add to read block + if probe.width == 16: + rcsb += f"{probe.base_addr}: rdata_o <= {probe.name};\n" + + else: + rcsb += f"{probe.base_addr}: rdata_o <= {{{16-probe.width}'b0, {probe.name}}};\n" + + + # if output, add to write block + if probe.direction == "output": + if probe.width == 1: + wcsb += f"{probe.base_addr}: {probe.name} <= wdata_i[0];\n" + + elif probe.width == 16: + wcsb += f"{probe.base_addr}: {probe.name} <= wdata_i;\n" + + else: + wcsb += f"{probe.base_addr}: {probe.name} <= wdata_i[{probe.width-1}:0];\n" + + # remove trailing newline + rcsb = rcsb.rstrip() + wcsb = wcsb.rstrip() + + io_core.sub(rcsb, "/* READ_CASE_STATEMENT_BODY */") + io_core.sub(wcsb, "/* WRITE_CASE_STATEMENT_BODY */") + + return io_core.get_hdl() + + + + def hdl_top_level_ports(self): + ports = [] + for probe in self.probes: + net_type = "input wire " if probe.direction == "input" else "output reg " + name_def = probe.name if probe.width == 1 else f"[{probe.width-1}:0] {probe.name}" + ports.append(net_type + name_def) + + return ports \ No newline at end of file diff --git a/src/manta/io_core_def_tmpl.v b/src/manta/io/io_core_def_tmpl.v similarity index 100% rename from src/manta/io_core_def_tmpl.v rename to src/manta/io/io_core_def_tmpl.v diff --git a/src/manta/io_core_inst_tmpl.v b/src/manta/io/io_core_inst_tmpl.v similarity index 100% rename from src/manta/io_core_inst_tmpl.v rename to src/manta/io/io_core_inst_tmpl.v diff --git a/src/manta/logic_analyzer.py b/src/manta/logic_analyzer.py new file mode 100644 index 0000000..7278da5 --- /dev/null +++ b/src/manta/logic_analyzer.py @@ -0,0 +1,447 @@ +from .verilog_manipulator import * + +from datetime import datetime +from pkg_resources import get_distribution +import math +import os + +class LogicAnalyzerCore: + def __init__(self, config, name, base_addr, interface): + self.name = name + self.base_addr = base_addr + self.interface = interface + + # Warn if unrecognized options have been given + valid_options = ["type", "sample_depth", "probes", "triggers", "trigger_location", "trigger_mode"] + for option in config: + if option not in valid_options: + print(f"Warning: Ignoring unrecognized option '{option}' in Logic Analyzer core '{self.name}'") + + # Load sample depth + assert "sample_depth" in config, \ + "Sample depth not found for Logic Analyzer core {self.name}." + + assert isinstance(config["sample_depth"], int), \ + "Sample depth must be an integer." + + self.sample_depth = config["sample_depth"] + + # Add probes + assert "probes" in config, "No probe definitions found." + assert len(config["probes"]) > 0, "Must specify at least one probe." + + for probe_name, probe_width in config["probes"].items(): + assert probe_width > 0, f"Probe {probe_name} is of invalid width - it must be of at least width one." + + self.probes = config["probes"] + + # Add triggers + assert "triggers" in config, "No triggers found." + assert len(config["triggers"]) > 0, "Must specify at least one trigger." + self.triggers = config["triggers"] + + # Add trigger location + self.trigger_loc = self.sample_depth // 2 + if "trigger_location" in config: + assert isinstance(config["trigger_location"], int), \ + "Trigger location must be an integer." + + assert config["trigger_location"] >= 0, \ + "Trigger location cannot be negative." + + assert config["trigger_location"] <= self.sample_depth, \ + "Trigger location cannot exceed sample depth." + + self.trigger_loc = config["trigger_location"] + + # Add trigger mode + self.SINGLE_SHOT = 0 + self.INCREMENTAL = 1 + self.IMMEDIATE = 2 + + self.trigger_mode = self.SINGLE_SHOT + if "trigger_mode" in config: + assert config["trigger_mode"] in ["single_shot", "incremental", "immediate"], \ + "Unrecognized trigger mode provided." + + if config["trigger_mode"] == "single_shot": + self.trigger_mode = self.SINGLE_SHOT + + elif config["trigger_mode"] == "incremental": + self.trigger_mode = self.INCREMENTAL + + elif config["trigger_mode"] == "immediate": + self.trigger_mode = self.IMMEDIATE + + # compute base addresses + self.fsm_base_addr = self.base_addr + self.trigger_block_base_addr = self.fsm_base_addr + 6 + + self.total_probe_width = sum(self.probes.values()) + n_brams = math.ceil(self.total_probe_width / 16) + self.block_memory_base_addr = self.trigger_block_base_addr + (2*len(self.probes)) + self.max_addr = self.block_memory_base_addr + (n_brams * self.sample_depth) + + # build out self register map: + # these are also defined in logic_analyzer_fsm_registers.v, and should match + self.state_reg_addr = self.base_addr + self.trigger_mode_reg_addr = self.base_addr + 1 + self.trigger_loc_reg_addr = self.base_addr + 2 + self.request_start_reg_addr = self.base_addr + 3 + self.request_stop_reg_addr = self.base_addr + 4 + self.read_pointer_reg_addr = self.base_addr + 5 + self.write_pointer_reg_addr = self.base_addr + 6 + + self.IDLE = 0 + self.MOVE_TO_POSITION = 1 + self.IN_POSITION = 2 + self.CAPTURING = 3 + self.CAPTURED = 4 + + def hdl_inst(self): + la_inst = VerilogManipulator("logic_analyzer/logic_analyzer_inst_tmpl.v") + + # add module name to instantiation + la_inst.sub(self.name, "/* INST_NAME */") + + # add net connections to instantiation + conns = la_inst.net_conn(self.probes, trailing_comma=True) + la_inst.sub(conns, "/* NET_CONNS */") + return la_inst.get_hdl() + + def gen_trigger_block_def(self): + trigger_block = VerilogManipulator("logic_analyzer/trigger_block_def_tmpl.v") + + # add probe ports to module declaration + # these ports belong to the logic analyzer, but + # need to be included in the trigger_block module declaration + probe_ports = trigger_block.net_dec(self.probes, "input wire", trailing_comma=True) + trigger_block.sub(probe_ports, "/* PROBE_PORTS */") + + + # add trigger cores to module definition + # these are instances of the trigger module, of which one gets wired + # into each probe + trigger_module_insts = [] + for name, width in self.probes.items(): + trig_inst = VerilogManipulator("logic_analyzer/trigger_block_inst_tmpl.v") + trig_inst.sub(width, "/* INPUT_WIDTH */") + trig_inst.sub(f"{name}_trigger", "/* NAME */") + + trig_inst.sub(f"reg [3:0] {name}_op = 0;", "/* OP_DEC */") + trig_inst.sub(f"reg {name}_trig;", "/* TRIG_DEC */") + + if width == 1: + trig_inst.sub(f"reg {name}_arg = 0;", "/* ARG_DEC */") + + else: + trig_inst.sub(f"reg [{width-1}:0] {name}_arg = 0;", "/* ARG_DEC */") + + trig_inst.sub(name, "/* PROBE */") + trig_inst.sub(f"{name}_op", "/* OP */") + trig_inst.sub(f"{name}_arg", "/* ARG */") + trig_inst.sub(f"{name}_trig", "/* TRIG */") + + trigger_module_insts.append(trig_inst.get_hdl()) + + trigger_module_insts = "\n".join(trigger_module_insts) + trigger_block.sub(trigger_module_insts, "/* TRIGGER_MODULE_INSTS */") + + # add combined individual triggers + cit = [f"{name}_trig" for name in self.probes] + cit = " || ".join(cit) + cit = f"assign trig = {cit};" + trigger_block.sub(cit, " /* COMBINE_INDIV_TRIGGERS */") + + # add read and write block case statement bodies + rcsb = "" # read case statement body + wcsb = "" # write case statement body + addr = 0 + for i, name in enumerate(self.probes): + addr = 2 * i + rcsb += f"BASE_ADDR + {addr}: rdata_o <= {name}_op;\n" + wcsb += f"BASE_ADDR + {addr}: {name}_op <= wdata_i;\n" + + addr = (2 * i) + 1 + rcsb += f"BASE_ADDR + {addr}: rdata_o <= {name}_arg;\n" + wcsb += f"BASE_ADDR + {addr}: {name}_arg <= wdata_i;\n" + + rcsb = rcsb.strip() + wcsb = wcsb.strip() + + trigger_block.sub(rcsb, "/* READ_CASE_STATEMENT_BODY */") + trigger_block.sub(wcsb, "/* WRITE_CASE_STATEMENT_BODY */") + trigger_block.sub(self.trigger_block_base_addr + addr + 1, "/* MAX_ADDR */") + + return trigger_block.get_hdl() + + def gen_logic_analyzer_def(self): + la = VerilogManipulator("logic_analyzer/logic_analyzer_def_tmpl.v") + + # add top level probe ports to module declaration + ports = la.net_dec(self.probes, "input wire", trailing_comma=True) + la.sub(ports, "/* TOP_LEVEL_PROBE_PORTS */") + + # assign base addresses to the FSM, trigger block, and sample mem + la.sub(self.fsm_base_addr, "/* FSM_BASE_ADDR */") + la.sub(self.trigger_block_base_addr, "/* TRIGGER_BLOCK_BASE_ADDR */") + la.sub(self.block_memory_base_addr, "/* BLOCK_MEMORY_BASE_ADDR */") + + # set sample depth + la.sub(self.sample_depth, "/* SAMPLE_DEPTH */") + + # set probe ports for the trigger block and sample mem + probe_ports = la.net_conn(self.probes, trailing_comma=True) + la.sub(probe_ports, "/* TRIGGER_BLOCK_PROBE_PORTS */") + + la.sub(self.total_probe_width, "/* TOTAL_PROBE_WIDTH */") + + # concatenate the probes together to make one big register, + # but do so such that the first probe in the config file + # is at the least-significant position in that big register. + # + # this makes part-selecting out from the memory easier to + # implement in python, and because verilog and python conventions + # are different, we would have had to reverse it somwehere anyway + probes_concat = list(self.probes.keys())[::-1] + probes_concat = '{' + ', '.join(probes_concat) + '}' + la.sub(probes_concat, "/* PROBES_CONCAT */") + + return la.get_hdl() + + def hdl_def(self): + # Return an autogenerated verilog module definition for the core. + # load source files + hdl = self.gen_logic_analyzer_def() + "\n" + hdl += VerilogManipulator("logic_analyzer/logic_analyzer_controller.v").get_hdl() + "\n" + hdl += VerilogManipulator("logic_analyzer/logic_analyzer_fsm_registers.v").get_hdl() + "\n" + hdl += VerilogManipulator("block_memory/block_memory.v").get_hdl() + "\n" + hdl += VerilogManipulator("block_memory/dual_port_bram.v").get_hdl() + "\n" + hdl += self.gen_trigger_block_def() + "\n" + hdl += VerilogManipulator("logic_analyzer/trigger.v").get_hdl() + "\n" + + return hdl + + def hdl_top_level_ports(self): + # the probes that we want as ports on the top-level manta module + ports = [] + for name, width in self.probes.items(): + if width == 1: + ports.append(f"input wire {name}") + + else: + ports.append(f"input wire [{width-1}:0] {name}") + return ports + #return VerilogManipulator().net_dec(self.probes, "input wire") + + def set_trigger_conditions(self): + + operations = { + "DISABLE" : 0, + "RISING" : 1, + "FALLING" : 2, + "CHANGING" : 3, + "GT" : 4, + "LT" : 5, + "GEQ" : 6, + "LEQ" : 7, + "EQ" : 8, + "NEQ" : 9 + } + + ops_with_no_args = ["DISABLE", "RISING" , "FALLING", "CHANGING"] + + # reset all the other triggers + for addr in range(self.trigger_block_base_addr, self.block_memory_base_addr): + self.interface.write_register(addr, 0) + + for trigger in self.triggers: + # determine if the trigger is good + + # most triggers will have 3 parts - the trigger, the operation, and the argument + # this is true unless the argument is RISING, FALLING, or CHANGING + + statement = trigger.split(' ') + if len(statement) == 2: + assert statement[1] in ops_with_no_args, "Invalid operation in trigger statement." + probe_name, op = statement + + op_register = 2*(list(self.probes.keys()).index(probe_name)) + self.trigger_block_base_addr + + self.interface.write_register(op_register, operations[op]) + + else: + assert len(statement) == 3, "Missing information in trigger statement." + probe_name, op, arg = statement + + op_register = 2*(list(self.probes.keys()).index(probe_name)) + self.trigger_block_base_addr + arg_register = op_register + 1 + + self.interface.write_register(op_register, operations[op]) + self.interface.write_register(arg_register, int(arg)) + + + + # functions for actually using the core: + def capture(self): + # Check state - if it's in anything other than IDLE, + # request to stop the existing capture + + print(" -> Resetting core...") + state = self.interface.read_register(self.state_reg_addr) + if state != self.IDLE: + self.interface.write_register(self.request_stop_reg_addr, 0) + self.interface.write_register(self.request_stop_reg_addr, 1) + + state = self.interface.read_register(self.state_reg_addr) + assert state == self.IDLE, "Logic analyzer did not reset to correct state when requested to." + + # Configure trigger conditions + print(" -> Set trigger conditions...") + self.set_trigger_conditions() + + # Configure the trigger_mode + print(" -> Setting trigger mode") + self.interface.write_register(self.trigger_mode_reg_addr, self.trigger_mode) + + # Configure the trigger_loc + print(" -> Setting trigger location...") + self.interface.write_register(self.trigger_loc_reg_addr, self.trigger_loc) + + # Start the capture by pulsing request_start + print(" -> Starting capture...") + self.interface.write_register(self.request_start_reg_addr, 1) + self.interface.write_register(self.request_start_reg_addr, 0) + + # Wait for core to finish capturing data + print(" -> Waiting for capture to complete...") + state = self.interface.read_register(self.state_reg_addr) + while(state != self.CAPTURED): + state = self.interface.read_register(self.state_reg_addr) + + # Read out contents from memory + print(" -> Reading sample memory contents...") + addrs = list(range(self.block_memory_base_addr, self.max_addr)) + block_mem_contents = self.interface.read_registers(addrs) + + # Revolve BRAM contents around so the data pointed to by the read_pointer + # is at the beginning + print(" -> Reading read_pointer and revolving memory...") + read_pointer = self.interface.read_register(self.read_pointer_reg_addr) + return block_mem_contents[read_pointer:] + block_mem_contents[:read_pointer] + + + def export_vcd(self, capture_data, path): + from vcd import VCDWriter + vcd_file = open(path, "w") + + # Use the same datetime format that iVerilog uses + timestamp = datetime.now().strftime("%a %b %w %H:%M:%S %Y") + + with VCDWriter(vcd_file, '10 ns', timestamp, "manta") as writer: + + # each probe has a name, width, and writer associated with it + signals = [] + for name, width in self.probes.items(): + signal = { + "name" : name, + "width" : width, + "data" : self.part_select_capture_data(capture_data, name), + "var": writer.register_var("manta", name, "wire", size=width) + } + signals.append(signal) + + clock = writer.register_var("manta", "clk", "wire", size=1) + trigger = writer.register_var("manta", "trigger", "wire", size=1) + + # add the data to each probe in the vcd file + for timestamp in range(0, 2*len(capture_data)): + + # run the clock + writer.change(clock, timestamp, timestamp % 2 == 0) + + # set the trigger + triggered = (timestamp // 2) >= self.trigger_loc + writer.change(trigger, timestamp, triggered) + + # add other signals + for signal in signals: + var = signal["var"] + sample = signal["data"][timestamp // 2] + + writer.change(var, timestamp, sample) + + vcd_file.close() + + def export_mem(self, capture_data, path): + with open(path, "w") as f: + # a wee bit of cursed string formatting, but just + # outputs each sample as binary, padded to a fixed length + w = self.total_probe_width + f.writelines([f'{s:0{w}b}\n' for s in capture_data]) + + def export_playback_module(self, path): + playback = VerilogManipulator("logic_analyzer/logic_analyzer_playback_tmpl.v") + + module_name = f"{self.name}_playback" + playback.sub(module_name, "/* MODULE_NAME */") + + version = "v" + get_distribution('mantaray').version + playback.sub(version, "/* VERSION */") + + timestamp = datetime.now().strftime("%d %b %Y at %H:%M:%S") + playback.sub(timestamp, "/* TIMESTAMP */") + + user = os.environ.get("USER", os.environ.get("USERNAME")) + playback.sub(user, "/* USER */") + + ports = [f".{name}({name})" for name in self.probes.keys()] + ports = ",\n".join(ports) + playback.sub(ports, "/* PORTS */") + + playback.sub(self.sample_depth, "/* SAMPLE_DEPTH */") + playback.sub(self.total_probe_width, "/* TOTAL_PROBE_WIDTH */") + + # see the note in generate_logic_analyzer_def about why we do this + probes_concat = list(self.probes.keys())[::-1] + probes_concat = '{' + ', '.join(probes_concat) + '}' + playback.sub(probes_concat, "/* PROBES_CONCAT */") + + + probe_dec = playback.net_dec(self.probes, "output reg") + playback.sub(probe_dec, "/* PROBE_DEC */") + + with open(path, "w") as f: + f.write(playback.get_hdl()) + + + def part_select_capture_data(self, capture_data, probe_name): + """Given the name of the probe, part-select the appropriate bits of capture data, + and return as an integer. Accepts capture_data as an integer or a list of integers.""" + + # sum up the widths of the probes below this one + lower = 0 + for name, width in self.probes.items(): + if name == probe_name: + break + + lower += width + + upper = lower + (self.probes[probe_name] - 1) + + # define the part select + mask = 2 ** (upper - lower + 1) - 1 + part_select = lambda x: (x >> lower) & mask + + # apply the part_select function depending on type + if isinstance(capture_data, int): + return part_select(capture_data) + + elif isinstance(capture_data, list): + for i in capture_data: + assert isinstance(i, int), "Can only part select on integers and list of integers." + + return [part_select(sample) for sample in capture_data] + + else: + raise ValueError("Can only part select on integers and lists of integers.") diff --git a/src/manta/logic_analyzer_controller.v b/src/manta/logic_analyzer/logic_analyzer_controller.v similarity index 100% rename from src/manta/logic_analyzer_controller.v rename to src/manta/logic_analyzer/logic_analyzer_controller.v diff --git a/src/manta/logic_analyzer_def_tmpl.v b/src/manta/logic_analyzer/logic_analyzer_def_tmpl.v similarity index 100% rename from src/manta/logic_analyzer_def_tmpl.v rename to src/manta/logic_analyzer/logic_analyzer_def_tmpl.v diff --git a/src/manta/logic_analyzer_fsm_registers.v b/src/manta/logic_analyzer/logic_analyzer_fsm_registers.v similarity index 100% rename from src/manta/logic_analyzer_fsm_registers.v rename to src/manta/logic_analyzer/logic_analyzer_fsm_registers.v diff --git a/src/manta/logic_analyzer_inst_tmpl.v b/src/manta/logic_analyzer/logic_analyzer_inst_tmpl.v similarity index 100% rename from src/manta/logic_analyzer_inst_tmpl.v rename to src/manta/logic_analyzer/logic_analyzer_inst_tmpl.v diff --git a/src/manta/logic_analyzer_playback_tmpl.v b/src/manta/logic_analyzer/logic_analyzer_playback_tmpl.v similarity index 100% rename from src/manta/logic_analyzer_playback_tmpl.v rename to src/manta/logic_analyzer/logic_analyzer_playback_tmpl.v diff --git a/src/manta/trigger.v b/src/manta/logic_analyzer/trigger.v similarity index 100% rename from src/manta/trigger.v rename to src/manta/logic_analyzer/trigger.v diff --git a/src/manta/trigger_block_def_tmpl.v b/src/manta/logic_analyzer/trigger_block_def_tmpl.v similarity index 100% rename from src/manta/trigger_block_def_tmpl.v rename to src/manta/logic_analyzer/trigger_block_def_tmpl.v diff --git a/src/manta/trigger_block_inst_tmpl.v b/src/manta/logic_analyzer/trigger_block_inst_tmpl.v similarity index 100% rename from src/manta/trigger_block_inst_tmpl.v rename to src/manta/logic_analyzer/trigger_block_inst_tmpl.v diff --git a/src/manta/lut_ram.py b/src/manta/lut_ram.py new file mode 100644 index 0000000..389837f --- /dev/null +++ b/src/manta/lut_ram.py @@ -0,0 +1,38 @@ +from .verilog_manipulator import * + +class LUTRAMCore: + def __init__(self, config, name, base_addr, interface): + self.name = name + self.base_addr = base_addr + self.interface = interface + + # Warn if unrecognized options have been given + for option in config: + if option not in ["type", "size"]: + print(f"Warning: Ignoring unrecognized option '{option}' in LUT Memory '{self.name}'") + + assert "size" in config, "Size not specified for LUT RAM core." + assert config["size"] > 0, "LUT RAM must have positive size." + assert isinstance(config["size"], int), "LUT RAM must have integer size." + self.size = config["size"] + + self.max_addr = self.base_addr + self.size - 1 + + def hdl_inst(self): + inst = VerilogManipulator("lut_ram/lut_ram_inst_tmpl.v") + inst.sub(self.size, "/* DEPTH */") + inst.sub(self.name, "/* INST_NAME */") + return inst.get_hdl() + + def hdl_def(self): + return VerilogManipulator("lut_ram/lut_ram.v").get_hdl() + + def hdl_top_level_ports(self): + # no top_level connections since this core just lives on the bus + return "" + + def read(self, addr): + return self.interface.read_register(addr + self.base_addr) + + def write(self, addr, data): + return self.interface.write_register(addr + self.base_addr, data) \ No newline at end of file diff --git a/src/manta/lut_ram.v b/src/manta/lut_ram/lut_ram.v similarity index 100% rename from src/manta/lut_ram.v rename to src/manta/lut_ram/lut_ram.v diff --git a/src/manta/lut_ram_inst_tmpl.v b/src/manta/lut_ram/lut_ram_inst_tmpl.v similarity index 100% rename from src/manta/lut_ram_inst_tmpl.v rename to src/manta/lut_ram/lut_ram_inst_tmpl.v diff --git a/src/manta/uart.py b/src/manta/uart.py new file mode 100644 index 0000000..9d29f7c --- /dev/null +++ b/src/manta/uart.py @@ -0,0 +1,181 @@ +from .verilog_manipulator import * + +class UARTInterface: + def __init__(self, config): + # Warn if unrecognized options have been given + for option in config: + if option not in ["port", "clock_freq", "baudrate", "chunk_size", "verbose"]: + print(f"Warning: Ignoring unrecognized option '{option}' in UART interface.") + + # Obtain port. Try to automatically detect port if "auto" is specified + assert "port" in config, "No serial port provided to UART core." + self.port = config["port"] + + # Check that clock frequency is provided and positive + assert "clock_freq" in config, "Clock frequency not provided to UART core." + assert config["clock_freq"] > 0, "Clock frequency must be positive." + self.clock_freq = config["clock_freq"] + + # Check that baudrate is provided and positive + assert "baudrate" in config, "Baudrate not provided to UART core." + assert config["baudrate"] > 0, "Baudrate must be positive." + self.baudrate = config["baudrate"] + + # Confirm core clock is sufficiently fast + clocks_per_baud = self.clock_freq // self.baudrate + assert clocks_per_baud >= 2 + self.clocks_per_baud = clocks_per_baud + + # Confirm we can match baudrate suffeciently well + actual_baudrate = self.clock_freq / clocks_per_baud + baudrate_error = 100 * abs(actual_baudrate - self.baudrate) / self.baudrate + assert baudrate_error <= 5, \ + "Unable to match target baudrate - they differ by {baudrate_error}%" + + # Set chunk_size, which is the max amount of bytes that get dumped + # to the OS driver at a time + self.chunk_size = 256 + if "chunk_size" in config: + self.chunk_size = config["chunk_size"] + + # Set verbosity + self.verbose = False + if "verbose" in config: + self.verbose = config["verbose"] + + def open_port_if_not_alredy_open(self): + if self.port == "auto": + self.port = self.autodetect_port() + + if not hasattr(self, "ser"): + import serial + self.ser = serial.Serial(self.port, self.baudrate) + + def autodetect_port(self): + # as far as I know the FT2232 is the only chip used on the icestick/digilent boards, so just look for that + import serial.tools.list_ports + + recognized_devices = [] + for port in serial.tools.list_ports.comports(): + if (port.vid == 0x403) and (port.pid == 0x6010): + recognized_devices.append(port) + + # board manufacturers seem to always make the 0th serial + # interface on the FT2232 be for programming over JTAG, + # and then the 1st to be for UART. as a result, we always + # grab the device with the larger location + + rd = recognized_devices + assert len(recognized_devices) == 2, f"Expected to see two serial ports for FT2232 device, but instead see {len(recognized_devices)}." + assert rd[0].serial_number == rd[1].serial_number, "Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices." + return rd[0].device if rd[0].location > rd[1].location else rd[1].device + + def decode_response(self, response): + """Make sure reponse from FPGA has the correct format, and return data contained within if so.""" + assert response is not None, "No reponse received." + + response_str = response.decode('ascii') + assert response_str[0] == 'M', "Bad message recieved, incorrect preamble." + assert response_str[-1] == '\n', "Bad message received, incorrect EOL." + assert response_str[-2] == '\r', "Bad message received, incorrect EOL." + assert len(response_str) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}." + + return int(response_str[1:5], 16) + + def read_register(self, addr): + self.open_port_if_not_alredy_open() + + # request from the bus + request = f"M{addr:04X}\r\n".encode('ascii') + self.ser.write(request) + + # read and parse the response + data = self.decode_response(self.ser.read(7)) + + if self.verbose: + print(f"read {data:04X} from {addr:04X}") + + return data + + def write_register(self, addr, data): + self.open_port_if_not_alredy_open() + + # request from the bus + request = f"M{addr:04X}{data:04X}\r\n".encode('ascii') + self.ser.write(request) + + if self.verbose: + print(f"wrote {data:04X} to {addr:04X}") + + def read_registers(self, addrs): + assert isinstance(addrs, list), "Read addresses must be list of integers." + assert all(isinstance(addr, int) for addr in addrs), "Read addresses must be list of integers." + + # send data in chunks because the reponses will fill up the OS's + # input buffer in no time flat + self.open_port_if_not_alredy_open() + + inbound_bytes = b"" + for i in range(0, len(addrs), self.chunk_size): + addr_chunk = addrs[i:i+self.chunk_size] + + outbound_bytes = [f"M{addr:04X}\r\n".encode('ascii') for addr in addr_chunk] + outbound_bytes = b"".join(outbound_bytes) + + self.ser.write(outbound_bytes) + + inbound_bytes += self.ser.read(len(outbound_bytes)) + + data = [] + for i in range(0, len(inbound_bytes), 7): + response = inbound_bytes[i:i+7] + data.append(self.decode_response(response)) + + return data + + def write_registers(self, addrs, datas): + assert isinstance(addrs, list), "Write addresses must be list of integers." + assert isinstance(datas, list), "Write data must be list of integers." + assert all(isinstance(addr, int) for addr in addrs), "Write addresses must be list of integers." + assert all(isinstance(data, int) for data in datas), "Write data must be list of integers." + assert len(addrs) == len(datas), "Write addresses and write data must be of same length." + + # send data in chunks because the responses will fill up the OS's + # input buffer in no time flat + self.open_port_if_not_alredy_open() + + for i in range(0, len(addrs), self.chunk_size): + addr_chunk = addrs[i:i+self.chunk_size] + data_chunk = datas[i:i+self.chunk_size] + + + outbound_bytes = [f"M{a:04X}{d:04X}\r\n" for a, d in zip(addr_chunk, data_chunk)] + outbound_bytes = [ob.encode('ascii') for ob in outbound_bytes] + outbound_bytes = b"".join(outbound_bytes) + + self.ser.write(outbound_bytes) + + def hdl_top_level_ports(self): + # this should return the probes that we want to connect to top-level, but like as a string of verilog + + return ["input wire rx", "output reg tx"] + + def rx_hdl_def(self): + uart_rx_def = VerilogManipulator("uart/rx_uart.v").get_hdl() + bridge_rx_def = VerilogManipulator("uart/bridge_rx.v").get_hdl() + return uart_rx_def + '\n' + bridge_rx_def + + def tx_hdl_def(self): + uart_tx_def = VerilogManipulator("uart/uart_tx.v").get_hdl() + bridge_tx_def = VerilogManipulator("uart/bridge_tx.v").get_hdl() + return bridge_tx_def + '\n' + uart_tx_def + + def rx_hdl_inst(self): + rx = VerilogManipulator("uart/uart_rx_bridge_rx_inst_templ.v") + rx.sub(self.clocks_per_baud, "/* CLOCKS_PER_BAUD */") + return rx.get_hdl() + + def tx_hdl_inst(self): + tx = VerilogManipulator("uart/uart_tx_bridge_tx_inst_templ.v") + tx.sub(self.clocks_per_baud, "/* CLOCKS_PER_BAUD */") + return tx.get_hdl() \ No newline at end of file diff --git a/src/manta/bridge_rx.v b/src/manta/uart/bridge_rx.v similarity index 100% rename from src/manta/bridge_rx.v rename to src/manta/uart/bridge_rx.v diff --git a/src/manta/bridge_tx.v b/src/manta/uart/bridge_tx.v similarity index 100% rename from src/manta/bridge_tx.v rename to src/manta/uart/bridge_tx.v diff --git a/src/manta/rx_uart.v b/src/manta/uart/rx_uart.v similarity index 100% rename from src/manta/rx_uart.v rename to src/manta/uart/rx_uart.v diff --git a/src/manta/tx_uart.v b/src/manta/uart/tx_uart.v similarity index 100% rename from src/manta/tx_uart.v rename to src/manta/uart/tx_uart.v diff --git a/src/manta/uart_rx_bridge_rx_inst_templ.v b/src/manta/uart/uart_rx_bridge_rx_inst_templ.v similarity index 100% rename from src/manta/uart_rx_bridge_rx_inst_templ.v rename to src/manta/uart/uart_rx_bridge_rx_inst_templ.v diff --git a/src/manta/uart_tx.v b/src/manta/uart/uart_tx.v similarity index 100% rename from src/manta/uart_tx.v rename to src/manta/uart/uart_tx.v diff --git a/src/manta/uart_tx_bridge_tx_inst_templ.v b/src/manta/uart/uart_tx_bridge_tx_inst_templ.v similarity index 100% rename from src/manta/uart_tx_bridge_tx_inst_templ.v rename to src/manta/uart/uart_tx_bridge_tx_inst_templ.v diff --git a/src/manta/verilog_manipulator.py b/src/manta/verilog_manipulator.py new file mode 100644 index 0000000..5939598 --- /dev/null +++ b/src/manta/verilog_manipulator.py @@ -0,0 +1,102 @@ +import pkgutil + +class VerilogManipulator: + def __init__(self, filepath=None): + if filepath is not None: + self.hdl = pkgutil.get_data(__name__, filepath).decode() + + # scrub any default_nettype or timescale directives from the source + self.hdl = self.hdl.replace("`default_nettype none", "") + self.hdl = self.hdl.replace("`default_nettype wire", "") + self.hdl = self.hdl.replace("`timescale 1ns/1ps", "") + self.hdl = self.hdl.strip() + + # python tries to be cute and automatically convert + # line endings on Windows, but Manta's source comes + # with (and injects) UNIX line endings, so Python + # ends up adding way too many line breaks, so we just + # undo anything it's done when we load the file + self.hdl = self.hdl.replace("\r\n", "\n") + + else: + self.hdl = None + + def sub(self, replace, find): + # sometimes we have integer inputs, want to accomodate + if isinstance(replace, str): + replace_str = replace + + elif isinstance(replace, int): + replace_str = str(replace) + + else: + raise ValueError("Only string and integer arguments supported.") + + + # if the string being subbed in isn't multiline, just + # find-and-replace like normal: + if "\n" not in replace_str: + self.hdl = self.hdl.replace(find, replace_str) + + # if the string being substituted in is multiline, + # make sure the replace text gets put at the same + # indentation level by adding whitespace to left + # of the line. + else: + for line in self.hdl.split("\n"): + if find in line: + # get whitespace that's on the left side of the line + whitespace = line.rstrip().replace(line.lstrip(), "") + + # add it to every line, except the first + replace_as_lines = replace_str.split("\n") + replace_with_whitespace = f"\n{whitespace}".join(replace_as_lines) + + # replace the first occurance in the HDL with it + self.hdl = self.hdl.replace(find, replace_with_whitespace, 1) + + def get_hdl(self): + return self.hdl + + def net_dec(self, nets, net_type, trailing_comma = False): + """Takes a dictonary of nets in the format {probe: width}, and generates + the net declarations that would go in a Verilog module definition. + + For example, calling net_dec({foo : 1, bar : 4}, "input wire") would produce: + + input wire foo, + input [3:0] wire bar + + Which you'd then slap into your module declaration, along with all the other + inputs and outputs the module needs.""" + + dec = [] + for name, width in nets.items(): + if width == 1: + dec.append(f"{net_type} {name}") + + else: + dec.append(f"{net_type} [{width-1}:0] {name}") + + dec = ",\n".join(dec) + dec = dec + "," if trailing_comma else dec + return dec + + def net_conn(self, nets, trailing_comma = False): + """Takes a dictionary of nets in the format {probe: width}, and generates + the net connections that would go in the Verilog module instantiation. + + For example, calling net_conn({foo: 1, bar: 4}) would produce: + + .foo(foo), + .bar(bar) + + Which you'd then slap into your module instantiation, along with all the other + module inputs and outputs that get connected elsewhere.""" + + + conn = [f".{name}({name})" for name in nets] + conn = ",\n".join(conn) + conn = conn + "," if trailing_comma else conn + + return conn \ No newline at end of file diff --git a/test/functional_sim/bit_fifo_tb/bit_fifo.v b/test/functional_sim/bit_fifo_tb/bit_fifo.v deleted file mode 100644 index b9817b1..0000000 --- a/test/functional_sim/bit_fifo_tb/bit_fifo.v +++ /dev/null @@ -1,90 +0,0 @@ -`default_nettype none -`timescale 1ns/1ps - -module bit_fifo( - input wire clk, - - input wire en, - input wire [IWIDTH-1:0] in, - input wire in_valid, - output reg [OWIDTH-1:0] out, - output reg out_valid); - - parameter IWIDTH = 0; - parameter OWIDTH = 0; - - localparam BWIDTH = OWIDTH-1 + IWIDTH; - - reg [OWIDTH-1:0] buffer; - reg [$clog2(OWIDTH)-1:0] buffer_size; - - initial begin - buffer = 0; - buffer_size = 0; - out = 0; - out_valid = 0; - end - - reg [OWIDTH-1:0] mask; - reg [OWIDTH-1:0] top_half; - reg [OWIDTH-1:0] bottom_half; - reg [OWIDTH-1:0] joined_halves; - - always @(*) begin - mask = (1 << buffer_size) - 1; - top_half = (buffer & mask) << (OWIDTH - buffer_size); - bottom_half = in >> (IWIDTH- (OWIDTH - buffer_size)); - joined_halves = top_half | bottom_half; - end - - always @(posedge clk) begin - out_valid <= 0; - - // RUN state - if(en && in_valid) begin - // this module should spit out values as soon as it's able, - // so if we'll have enough once the present value's clocked in, - // then we'll need to immediately assign the output as a combination - // of what's in the buffer and what's on the line. - - if(buffer_size + IWIDTH >= OWIDTH) begin - // compute buffer size - // -> everything that was in the buffer is now in the output, - // so what we put back in the buffer is purely what's left over - // from our input data once we've sliced out what we need - buffer_size <= buffer_size + IWIDTH - OWIDTH; - - // compute buffer contents - buffer <= ( (1 << (buffer_size + IWIDTH - OWIDTH)) - 1 ) & in; - - /* - $display("buffer_size: %h in: %b ", buffer_size, in); - $display(" buffer: %b", buffer); - $display(" mask: %b", mask); - $display(" top_half: %b", top_half); - $display(" bottom_half: %b", bottom_half); - $display(" out: %b \n", joined_halves); - */ - - // compute output - out <= joined_halves; - out_valid <= 1; - end - - else begin - // shift into the right side of the buffer - buffer <= {buffer[BWIDTH-1-IWIDTH:0], in}; - buffer_size <= buffer_size + IWIDTH; - end - end - - // FLUSH state - else if(buffer_size > 0) begin - out <= (buffer) & ((1 << buffer_size) - 1); - out_valid <= 1; - buffer_size <= 0; - end - end -endmodule - -`default_nettype wire \ No newline at end of file diff --git a/test/functional_sim/bit_fifo_tb/bit_fifo_tb.sv b/test/functional_sim/bit_fifo_tb/bit_fifo_tb.sv deleted file mode 100644 index b10c764..0000000 --- a/test/functional_sim/bit_fifo_tb/bit_fifo_tb.sv +++ /dev/null @@ -1,75 +0,0 @@ -`default_nettype none -`timescale 1ns/1ps - -`define CP 10 -`define HCP 5 - -module bit_fifo_tb; - - //boilerplate - logic clk; - integer test_num; - - always begin - #`HCP - clk = !clk; - end - - parameter IWIDTH = 3; - parameter OWIDTH = 7; - - logic en; - logic [IWIDTH-1:0] in; - logic in_valid; - logic [OWIDTH-1:0] out; - logic out_valid; - - bit_fifo #(.IWIDTH(IWIDTH), .OWIDTH(OWIDTH)) bfifo ( - .clk(clk), - - .en(en), - .in(in), - .in_valid(in_valid), - .out(out), - .out_valid(out_valid)); - - initial begin - $dumpfile("bit_fifo_tb.vcd"); - $dumpvars(0, bit_fifo_tb); - - // setup and reset - clk = 0; - test_num = 0; - en = 0; - in_valid = 0; - in = 0; - #`HCP - - #(10*`CP); - - /* ==== Test 1 Begin ==== */ - $display("\n=== test 1: make sure invalid data isn't added to buffer ==="); - test_num = 1; - en = 1; - #(10*`CP); - en = 0; - /* ==== Test 1 End ==== */ - - /* ==== Test 2 Begin ==== */ - $display("\n=== test 2: just throw bits at it! ==="); - test_num = 1; - en = 1; - in_valid = 1; - in = 3'b101; - - #(10*`CP); - in_valid = 0; - en = 0; - - #(10*`CP); - /* ==== Test 2 End ==== */ - $finish(); - end -endmodule - -`default_nettype wire \ No newline at end of file