From 0c0f31be64d9208dbed5a9b7e1fb91a901fe05a5 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Sun, 18 Feb 2024 13:50:26 -0800 Subject: [PATCH] rewrite IO Core --- src/manta/io_core.py | 390 +++++++++++++++++---------------------- src/manta/manta.py | 2 +- test/test_io_core_hw.py | 32 +++- test/test_io_core_sim.py | 102 ++++------ 4 files changed, 232 insertions(+), 294 deletions(-) diff --git a/src/manta/io_core.py b/src/manta/io_core.py index 91322d80..648be7c 100644 --- a/src/manta/io_core.py +++ b/src/manta/io_core.py @@ -15,171 +15,133 @@ class IOCore(Elaboratable): https://fischermoseley.github.io/manta/io_core/ """ - def __init__(self, config, base_addr, interface): - self._config = config - self.base_addr = base_addr - self.interface = interface - self._check_config(self._config) + def __init__(self, base_addr, interface, inputs=[], outputs=[]): + self._base_addr = base_addr + self._interface = interface + self._inputs = inputs + self._outputs = outputs - self.define_signals() - self.mmap, self.max_addr = self.assign_memory() + # Bus Connections + self.bus_i = Signal(InternalBus()) + self.bus_o = Signal(InternalBus()) - def _check_config(self, config): - # make sure ports are defined - if "inputs" not in config and "outputs" not in config: - raise ValueError("No input or output ports specified.") + # Internal Signals + self._strobe = Signal() + self._input_bufs = [Signal(p.width, name=p.name + "_buf") for p in self._inputs] + self._output_bufs = [ + Signal(p.width, name=p.name + "_buf", reset=p.reset) for p in self._outputs + ] - # check for unrecognized options - valid_options = ["type", "inputs", "outputs", "user_clock"] + self._make_memory_map() + + @classmethod + def from_config(cls, config, base_addr, interface): + inputs = config.get("inputs", []) + outputs = config.get("outputs", []) + + # Make sure IO core has at least one input or output + if not inputs and not outputs: + raise ValueError("Must specify at least one input or output port.") + + # Warn about unrecognized options + valid_options = ["type", "inputs", "outputs"] for option in config: if option not in valid_options: warn(f"Ignoring unrecognized option '{option}' in IO core.'") - # check that user_clock is a bool - if "user_clock" in config: - if not isinstance(config["user_clock"], bool): - raise ValueError("Option user_clock must be a boolean.") - - # check that inputs is only dicts of format name:width - if "inputs" in config: - for name, attrs in config["inputs"].items(): - if not isinstance(name, str): - raise ValueError( - f'Input probe "{name}" has invalid name, names must be strings.' - ) - - if not isinstance(attrs, int): - raise ValueError(f'Input probe "{name}" must have integer width.') - - if not attrs > 0: - raise ValueError(f'Input probe "{name}" must have positive width.') - - if "outputs" in config: - for name, attrs in config["outputs"].items(): - if not isinstance(name, str): - raise ValueError( - f'Output probe "{name}" has invalid name, names must be strings.' - ) - - if not isinstance(attrs, int) and not isinstance(attrs, dict): - raise ValueError(f'Unrecognized format for output probe "{name}".') - - if isinstance(attrs, int): - if not attrs > 0: - raise ValueError( - f'Output probe "{name}" must have positive width.' - ) - - if isinstance(attrs, dict): - # check that each output probe has only recognized options - valid_options = ["width", "initial_value"] - for option in attrs: - if option not in valid_options: - warn(f'Ignoring unrecognized option "{option}" in IO core.') - - # check that widths are appropriate - if "width" not in attrs: - raise ValueError(f"No width specified for output probe {name}.") - - if not isinstance(attrs["width"], int): - raise ValueError( - f'Output probe "{name}" must have integer width.' - ) - - if not attrs["width"] > 0: - raise ValueError( - f'Input probe "{name}" must have positive width.' - ) - - def define_signals(self): - # Bus Input/Output - self.bus_i = Signal(InternalBus()) - self.bus_o = Signal(InternalBus()) - - # Input Probes (and buffers) - if "inputs" in self._config: - for name, width in self._config["inputs"].items(): - setattr(self, name, Signal(width, name=name)) - setattr(self, name + "_buf", Signal(width, name=name + "_buf")) - - # Output Probes (and buffers) - if "outputs" in self._config: - for name, attrs in self._config["outputs"].items(): - if isinstance(attrs, dict): - width = attrs["width"] - initial_value = attrs["initial_value"] - else: - width = attrs - initial_value = 0 - - setattr(self, name, Signal(width, name=name, reset=initial_value)) - setattr( - self, - name + "_buf", - Signal(width, name=name + "_buf", reset=initial_value), + # Define input signals + input_signals = [] + for name, width in inputs.items(): + if not isinstance(name, str): + raise ValueError( + f'Input probe "{name}" has invalid name, names must be strings.' ) - # Strobe Register - self.strobe = Signal(reset=0) + if not isinstance(width, int): + raise ValueError(f"Input probe '{name}' must have integer width.") - def assign_memory(self): - """ - the memory map is a dict that maps registers (in memory) to their locations (in memory) - as well as their Signals (from Amaranth). This looks like the following: + if not width > 0: + raise ValueError(f"Input probe '{name}' must have positive width.") - { - strobe: - addrs: [0x0000] - signals: [self.strobe] - probe0_buf: - addrs: [0x0001] - signals: [self.probe0_buf] - probe1_buf: - addrs: [0x0002] - signals: [self.probe1_buf] - probe2_buf: - addrs: [0x0003] - signals: [self.probe2_buf] - probe3_buf: - addrs: [0x0004, 0x0005] - signals: [self.probe3_buf[0:15], self.probe3_buf[16:19]] - ... and so on - } + input_signals += [Signal(width, name=name)] - """ - mmap = {} + # Define output signals + output_signals = [] + for name, attrs in outputs.items(): + if not isinstance(name, str): + raise ValueError( + f'Output probe "{name}" has invalid name, names must be strings.' + ) - # Add strobe register first - mmap["strobe"] = dict(addrs=[self.base_addr], signals=[self.strobe]) + if not isinstance(attrs, int) and not isinstance(attrs, dict): + raise ValueError(f'Unrecognized format for output probe "{name}".') - # Add all input and output probes - all_probes = {} - if "inputs" in self._config: - all_probes = {**all_probes, **self._config["inputs"]} + if isinstance(attrs, int): + if not attrs > 0: + raise ValueError(f'Output probe "{name}" must have positive width.') - if "outputs" in self._config: - all_probes = {**all_probes, **self._config["outputs"]} - - for name, attrs in all_probes.items(): - # Handle output probes that might have initial value specified in addition to width - if isinstance(attrs, dict): - width = attrs["width"] - else: width = attrs + initial_value = 0 - # Assign addresses - last_used_addr = list(mmap.values())[-1]["addrs"][-1] - addrs = [last_used_addr + 1 + i for i in range(ceil(width / 16))] + if isinstance(attrs, dict): + # check that each output probe has only recognized options + valid_options = ["width", "initial_value"] + for option in attrs: + if option not in valid_options: + warn(f'Ignoring unrecognized option "{option}" in IO core.') - # Slice signal into 16-bit chunks - signal = getattr(self, name + "_buf") - signals = [signal[16 * i : 16 * (i + 1)] for i in range(ceil(width / 16))] + # check that widths are appropriate + if "width" not in attrs: + raise ValueError(f"No width specified for output probe {name}.") - mmap[name + "_buf"] = {"addrs": addrs, "signals": signals} + if not isinstance(attrs["width"], int): + raise ValueError(f'Output probe "{name}" must have integer width.') - # Compute maximum address used by the core - max_addr = list(mmap.values())[-1]["addrs"][-1] - return mmap, max_addr + if not attrs["width"] > 0: + raise ValueError(f'Input probe "{name}" must have positive width.') + + width = attrs["width"] + + initial_value = 0 + if "initial_value" in attrs: + if not isinstance(attrs["initial_value"], int): + raise ValueError("Initial value must be an integer.") + + check_value_fits_in_bits(attrs["initial_value"], width) + initial_value = attrs["initial_value"] + + output_signals += [Signal(width, name=name, reset=initial_value)] + + return cls(base_addr, interface, inputs=input_signals, outputs=output_signals) + + def _make_memory_map(self): + self._memory_map = {} + + # Add strobe register + self._memory_map["strobe"] = dict( + signals=[self._strobe], addrs=[self._base_addr] + ) + + # Assign memory to all inputs and outputs + ios = self._inputs + self._outputs + io_bufs = self._input_bufs + self._output_bufs + last_used_addr = self._base_addr + + for io, io_buf in zip(ios, io_bufs): + n_slices = ceil(io.width / 16) + signals = split_into_chunks(io_buf, 16) + addrs = [i + last_used_addr + 1 for i in range(n_slices)] + + self._memory_map[io.name] = dict(signals=signals, addrs=addrs) + + last_used_addr = addrs[-1] + + # Save the last used address, for use later. + # Normally we'd just grab this from self._memory_map, but Python + # dictionaries don't guaruntee that insertion order is preserved, + # so it's more convenient to just save it now. + + self._max_addr = last_used_addr def elaborate(self, platform): m = Module() @@ -187,34 +149,27 @@ class IOCore(Elaboratable): # Shuffle bus transactions along m.d.sync += self.bus_o.eq(self.bus_i) - # Update buffers from probes - with m.If(self.strobe): - # Input buffers - if "inputs" in self._config: - for name in self._config["inputs"]: - input_probe = getattr(self, name) - input_probe_buf = getattr(self, name + "_buf") - m.d.sync += input_probe_buf.eq(input_probe) + # Update input_buffers from inputs + for i, i_buf in zip(self._inputs, self._input_bufs): + with m.If(self._strobe): + m.d.sync += i_buf.eq(i) - # Output buffers - if "outputs" in self._config: - for name in self._config["outputs"]: - output_probe = getattr(self, name) - output_probe_buf = getattr(self, name + "_buf") - m.d.sync += output_probe.eq(output_probe_buf) + # Update outputs from output_buffers + for o, o_buf in zip(self._outputs, self._output_bufs): + with m.If(self._strobe): + m.d.sync += o.eq(o_buf) # Handle register reads and writes - with m.If((self.bus_i.addr >= self.base_addr)): - with m.If((self.bus_o.addr <= self.max_addr)): - for entry in self.mmap.values(): - for addr, signal in zip(entry["addrs"], entry["signals"]): - with m.If(self.bus_i.rw): - with m.If(self.bus_i.addr == addr): - m.d.sync += signal.eq(self.bus_i.data) + for io in self._memory_map.values(): + for addr, signal in zip(io["addrs"], io["signals"]): + with m.If(self.bus_i.addr == addr): + # Writes + with m.If(self.bus_i.rw): + m.d.sync += signal.eq(self.bus_i.data) - with m.Else(): - with m.If(self.bus_i.addr == addr): - m.d.sync += self.bus_o.data.eq(signal) + # Reads + with m.Else(): + m.d.sync += self.bus_o.data.eq(signal) return m @@ -223,65 +178,64 @@ class IOCore(Elaboratable): Return the Amaranth signals that should be included as ports in the top-level Manta module. """ - ports = [] - if "inputs" in self._config: - for name in self._config["inputs"].keys(): - ports.append(getattr(self, name)) - - if "outputs" in self._config: - for name in self._config["outputs"].keys(): - ports.append(getattr(self, name)) - - return ports + return [self._inputs + self._outputs] def get_max_addr(self): """ - Return the maximum addresses in memory used by the core. The address space used - by the core extends from `base_addr` to the number returned by this function. + Return the maximum addresses in memory used by the core. The address + space used by the core extends from `base_addr` to the number returned + by this function. """ - return self.max_addr + return self._max_addr - def set_probe(self, probe_name, value): - # check that probe is an output probe - if probe_name not in self._config["outputs"]: - raise ValueError(f"Output probe '{probe_name}' not found.") + def set_probe(self, name, value): + """ + Set the value of an output probe on the FPGA. The value may be either + an unsigned or signed integer, but must fit within the width of the + probe. + """ - # check that value is an integer - if not isinstance(value, int): - raise ValueError("Value must be an integer.") + # Check that probe exists in memory map + probe = self._memory_map.get(name) + if not probe: + raise KeyError(f"Probe '{name}' not found in IO core.") - # get the width of the probe, make sure value isn't too large for the probe - attrs = self._config["outputs"][probe_name] - if isinstance(attrs, int): - width = attrs + # Check that the probe is an output + if not any([o.name == name for o in self._outputs]): + raise KeyError(f"Probe '{name}' is not an output of the IO core.") - if isinstance(attrs, dict): - width = attrs["width"] + # Check that value isn't too big for the register + n_bits = sum([len(s) for s in probe["signals"]]) + check_value_fits_in_bits(value, n_bits) - if value > 0 and value > 2**width - 1: - raise ValueError("Unsigned integer too large.") - - if value < 0 and value < -(2 ** (width - 1)): - raise ValueError("Signed integer too large.") - - # set value in buffer - addrs = self.mmap[probe_name + "_buf"]["addrs"] + # Write value to core + addrs = probe["addrs"] datas = value_to_words(value, len(addrs)) - self.interface.write(addrs, datas) + self._interface.write(addrs, datas) - # pulse strobe register - strobe_addr = self.mmap["strobe"]["addrs"][0] - self.interface.write(strobe_addr, 0) - self.interface.write(strobe_addr, 1) - self.interface.write(strobe_addr, 0) + # Pulse strobe register + self._interface.write(self._base_addr, 0) + self._interface.write(self._base_addr, 1) + self._interface.write(self._base_addr, 0) - def get_probe(self, probe_name): - # pulse strobe register - strobe_addr = self.mmap["strobe"]["addrs"][0] - self.interface.write(strobe_addr, 0) - self.interface.write(strobe_addr, 1) - self.interface.write(strobe_addr, 0) + def get_probe(self, name): + """ + Get the present value of a probe on the FPGA, which is returned as an + unsigned integer. This function may be called on both input and output + probes, but output probes will return the last value written to them + (or their initial value, if no value has been written to them yet). + """ - # get value from buffer - addrs = self.mmap[probe_name + "_buf"]["addrs"] - return words_to_value(self.interface.read(addrs)) + # Check that probe exists in memory map + probe = self._memory_map.get(name) + if not probe: + raise KeyError(f"Probe with name {name} not found in IO core.") + + # Pulse strobe register + self._interface.write(self._base_addr, 0) + self._interface.write(self._base_addr, 1) + self._interface.write(self._base_addr, 0) + + # Get value from buffer + datas = self._interface.read(probe["addrs"]) + return words_to_value(datas) diff --git a/src/manta/manta.py b/src/manta/manta.py index 3625a19..9384957 100644 --- a/src/manta/manta.py +++ b/src/manta/manta.py @@ -78,7 +78,7 @@ class Manta(Elaboratable): base_addr = 0 for name, attrs in self.config["cores"].items(): if attrs["type"] == "io": - core = IOCore(attrs, base_addr, self.interface) + core = IOCore.from_config(attrs, base_addr, self.interface) elif attrs["type"] == "logic_analyzer": core = LogicAnalyzerCore(attrs, base_addr, self.interface) diff --git a/test/test_io_core_hw.py b/test/test_io_core_hw.py index 1e6f31c..2c984f8 100644 --- a/test/test_io_core_hw.py +++ b/test/test_io_core_hw.py @@ -39,17 +39,39 @@ class IOCoreLoopbackTest(Elaboratable): }, } + def get_probe(self, name): + # This is a hack! And should be removed once the full Amaranth-native + # API is built out + for i in self.manta.io_core._inputs: + if i.name == name: + return i + + for o in self.manta.io_core._outputs: + if o.name == name: + return o + + return None + def elaborate(self, platform): m = Module() m.submodules.manta = self.manta uart_pins = platform.request("uart") + probe0 = self.get_probe("probe0") + probe1 = self.get_probe("probe1") + probe2 = self.get_probe("probe2") + probe3 = self.get_probe("probe3") + probe4 = self.get_probe("probe4") + probe5 = self.get_probe("probe5") + probe6 = self.get_probe("probe6") + probe7 = self.get_probe("probe7") + m.d.comb += [ - self.manta.io_core.probe0.eq(self.manta.io_core.probe4), - self.manta.io_core.probe1.eq(self.manta.io_core.probe5), - self.manta.io_core.probe2.eq(self.manta.io_core.probe6), - self.manta.io_core.probe3.eq(self.manta.io_core.probe7), + probe0.eq(probe4), + probe1.eq(probe5), + probe2.eq(probe6), + probe3.eq(probe7), self.manta.interface.rx.eq(uart_pins.rx.i), uart_pins.tx.o.eq(self.manta.interface.tx), ] @@ -126,4 +148,4 @@ def test_output_probe_initial_values_xilinx(): @pytest.mark.skipif(not ice40_tools_installed(), reason="no toolchain installed") def test_output_probe_initial_values_ice40(): - IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB2").verify() + IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB3").verify() diff --git a/test/test_io_core_sim.py b/test/test_io_core_sim.py index 5ef1405..3bda812 100644 --- a/test/test_io_core_sim.py +++ b/test/test_io_core_sim.py @@ -1,57 +1,36 @@ +from amaranth import * from amaranth.sim import Simulator from manta.io_core import IOCore from manta.utils import * from random import randint -config = { - "type": "io", - "inputs": {"probe0": 1, "probe1": 2, "probe2": 8, "probe3": 20}, - "outputs": { - "probe4": {"width": 1, "initial_value": 1}, - "probe5": {"width": 2, "initial_value": 2}, - "probe6": 8, - "probe7": {"width": 20, "initial_value": 65538}, - }, -} +probe0 = Signal(1) +probe1 = Signal(2) +probe2 = Signal(8) +probe3 = Signal(20) +inputs = [probe0, probe1, probe2, probe3] -io_core = IOCore(config, base_addr=0, interface=None) +probe4 = Signal(1, reset=1) +probe5 = Signal(2, reset=2) +probe6 = Signal(8) +probe7 = Signal(20, reset=65538) +outputs = [probe4, probe5, probe6, probe7] + +io_core = IOCore(base_addr=0, interface=None, inputs=inputs, outputs=outputs) def pulse_strobe_register(): - strobe_addr = io_core.mmap["strobe"]["addrs"][0] + strobe_addr = io_core._memory_map["strobe"]["addrs"][0] yield from write_register(io_core, strobe_addr, 0) yield from write_register(io_core, strobe_addr, 1) yield from write_register(io_core, strobe_addr, 0) -def test_output_probe_initial_values(): - def testbench(): - # Verify all output probes initialize to the values in the config - for name, attrs in config["outputs"].items(): - initial_value = 0 - if isinstance(attrs, dict): - if "initial_value" in attrs: - initial_value = attrs["initial_value"] - - output_probe = getattr(io_core, name) - value = yield output_probe - - if value != initial_value: - raise ValueError( - f"Output probe {name} initialized to {value} instead of {initial_value}" - ) - - else: - print(f"Output probe {name} initialized to {value} as expected.") - - simulate(io_core, testbench) - - def test_input_probe_buffer_initial_value(): def testbench(): # Verify all input probe buffers initialize to zero - for name, width in config["inputs"].items(): - addrs = io_core.mmap[name + "_buf"]["addrs"] + for i in inputs: + addrs = io_core._memory_map[i.name]["addrs"] for addr in addrs: yield from verify_register(io_core, addr, 0) @@ -62,13 +41,9 @@ def test_input_probe_buffer_initial_value(): def test_output_probe_buffer_initial_value(): def testbench(): # Verify all output probe buffers initialize to the values in the config - for name, attrs in config["outputs"].items(): - addrs = io_core.mmap[name + "_buf"]["addrs"] - - datas = [0] * len(addrs) - if isinstance(attrs, dict): - if "initial_value" in attrs: - datas = value_to_words(attrs["initial_value"], len(addrs)) + for o in outputs: + addrs = io_core._memory_map[o.name]["addrs"] + datas = value_to_words(o.reset, len(addrs)) for addr, data in zip(addrs, datas): yield from verify_register(io_core, addr, data) @@ -78,14 +53,9 @@ def test_output_probe_buffer_initial_value(): def test_output_probes_are_writeable(): def testbench(): - for name, attrs in config["outputs"].items(): - if isinstance(attrs, dict): - width = attrs["width"] - else: - width = attrs - - addrs = io_core.mmap[name + "_buf"]["addrs"] - test_value = randint(0, (2**width) - 1) + for o in outputs: + addrs = io_core._memory_map[o.name]["addrs"] + test_value = randint(0, (2**o.width) - 1) datas = value_to_words(test_value, len(addrs)) # write value to registers @@ -101,14 +71,9 @@ def test_output_probes_are_writeable(): def test_output_probes_update(): def testbench(): - for name, attrs in config["outputs"].items(): - if isinstance(attrs, dict): - width = attrs["width"] - else: - width = attrs - - addrs = io_core.mmap[name + "_buf"]["addrs"] - test_value = randint(0, (2**width) - 1) + for o in outputs: + addrs = io_core._memory_map[o.name]["addrs"] + test_value = randint(0, (2**o.width) - 1) datas = value_to_words(test_value, len(addrs)) # write value to registers @@ -119,34 +84,31 @@ def test_output_probes_update(): yield from pulse_strobe_register() # check that outputs took updated value - output_probe = getattr(io_core, name) - value = yield (output_probe) + value = yield (o) if value != test_value: raise ValueError( - f"Output probe {name} took value {value} instead of {test_value} after pulsing strobe." + f"Output probe {o.name} took value {value} instead of {test_value} after pulsing strobe." ) else: - print(f"Output probe {name} took value {value} after pulsing strobe.") + print(f"Output probe {o.name} took value {value} after pulsing strobe.") simulate(io_core, testbench) def test_input_probes_update(): def testbench(): - for name, width in config["inputs"].items(): - test_value = randint(0, (2**width) - 1) - + for i in inputs: # set input probe value - input_probe = getattr(io_core, name) - yield input_probe.eq(test_value) + test_value = randint(0, (2**i.width) - 1) + yield i.eq(test_value) # pulse strobe register yield from pulse_strobe_register() # check that values are as expected once read back - addrs = io_core.mmap[name + "_buf"]["addrs"] + addrs = io_core._memory_map[i.name]["addrs"] datas = value_to_words(test_value, len(addrs)) for addr, data in zip(addrs, datas):