rewrite IO Core

This commit is contained in:
Fischer Moseley 2024-02-18 13:50:26 -08:00
parent 3f724b3336
commit 0c0f31be64
4 changed files with 232 additions and 294 deletions

View File

@ -15,171 +15,133 @@ class IOCore(Elaboratable):
https://fischermoseley.github.io/manta/io_core/
"""
def __init__(self, config, base_addr, interface):
self._config = config
self.base_addr = base_addr
self.interface = interface
self._check_config(self._config)
def __init__(self, base_addr, interface, inputs=[], outputs=[]):
self._base_addr = base_addr
self._interface = interface
self._inputs = inputs
self._outputs = outputs
self.define_signals()
self.mmap, self.max_addr = self.assign_memory()
# Bus Connections
self.bus_i = Signal(InternalBus())
self.bus_o = Signal(InternalBus())
def _check_config(self, config):
# make sure ports are defined
if "inputs" not in config and "outputs" not in config:
raise ValueError("No input or output ports specified.")
# Internal Signals
self._strobe = Signal()
self._input_bufs = [Signal(p.width, name=p.name + "_buf") for p in self._inputs]
self._output_bufs = [
Signal(p.width, name=p.name + "_buf", reset=p.reset) for p in self._outputs
]
# check for unrecognized options
valid_options = ["type", "inputs", "outputs", "user_clock"]
self._make_memory_map()
@classmethod
def from_config(cls, config, base_addr, interface):
inputs = config.get("inputs", [])
outputs = config.get("outputs", [])
# Make sure IO core has at least one input or output
if not inputs and not outputs:
raise ValueError("Must specify at least one input or output port.")
# Warn about unrecognized options
valid_options = ["type", "inputs", "outputs"]
for option in config:
if option not in valid_options:
warn(f"Ignoring unrecognized option '{option}' in IO core.'")
# check that user_clock is a bool
if "user_clock" in config:
if not isinstance(config["user_clock"], bool):
raise ValueError("Option user_clock must be a boolean.")
# check that inputs is only dicts of format name:width
if "inputs" in config:
for name, attrs in config["inputs"].items():
if not isinstance(name, str):
raise ValueError(
f'Input probe "{name}" has invalid name, names must be strings.'
)
if not isinstance(attrs, int):
raise ValueError(f'Input probe "{name}" must have integer width.')
if not attrs > 0:
raise ValueError(f'Input probe "{name}" must have positive width.')
if "outputs" in config:
for name, attrs in config["outputs"].items():
if not isinstance(name, str):
raise ValueError(
f'Output probe "{name}" has invalid name, names must be strings.'
)
if not isinstance(attrs, int) and not isinstance(attrs, dict):
raise ValueError(f'Unrecognized format for output probe "{name}".')
if isinstance(attrs, int):
if not attrs > 0:
raise ValueError(
f'Output probe "{name}" must have positive width.'
)
if isinstance(attrs, dict):
# check that each output probe has only recognized options
valid_options = ["width", "initial_value"]
for option in attrs:
if option not in valid_options:
warn(f'Ignoring unrecognized option "{option}" in IO core.')
# check that widths are appropriate
if "width" not in attrs:
raise ValueError(f"No width specified for output probe {name}.")
if not isinstance(attrs["width"], int):
raise ValueError(
f'Output probe "{name}" must have integer width.'
)
if not attrs["width"] > 0:
raise ValueError(
f'Input probe "{name}" must have positive width.'
)
def define_signals(self):
# Bus Input/Output
self.bus_i = Signal(InternalBus())
self.bus_o = Signal(InternalBus())
# Input Probes (and buffers)
if "inputs" in self._config:
for name, width in self._config["inputs"].items():
setattr(self, name, Signal(width, name=name))
setattr(self, name + "_buf", Signal(width, name=name + "_buf"))
# Output Probes (and buffers)
if "outputs" in self._config:
for name, attrs in self._config["outputs"].items():
if isinstance(attrs, dict):
width = attrs["width"]
initial_value = attrs["initial_value"]
else:
width = attrs
initial_value = 0
setattr(self, name, Signal(width, name=name, reset=initial_value))
setattr(
self,
name + "_buf",
Signal(width, name=name + "_buf", reset=initial_value),
# Define input signals
input_signals = []
for name, width in inputs.items():
if not isinstance(name, str):
raise ValueError(
f'Input probe "{name}" has invalid name, names must be strings.'
)
# Strobe Register
self.strobe = Signal(reset=0)
if not isinstance(width, int):
raise ValueError(f"Input probe '{name}' must have integer width.")
def assign_memory(self):
"""
the memory map is a dict that maps registers (in memory) to their locations (in memory)
as well as their Signals (from Amaranth). This looks like the following:
if not width > 0:
raise ValueError(f"Input probe '{name}' must have positive width.")
{
strobe:
addrs: [0x0000]
signals: [self.strobe]
probe0_buf:
addrs: [0x0001]
signals: [self.probe0_buf]
probe1_buf:
addrs: [0x0002]
signals: [self.probe1_buf]
probe2_buf:
addrs: [0x0003]
signals: [self.probe2_buf]
probe3_buf:
addrs: [0x0004, 0x0005]
signals: [self.probe3_buf[0:15], self.probe3_buf[16:19]]
... and so on
}
input_signals += [Signal(width, name=name)]
"""
mmap = {}
# Define output signals
output_signals = []
for name, attrs in outputs.items():
if not isinstance(name, str):
raise ValueError(
f'Output probe "{name}" has invalid name, names must be strings.'
)
# Add strobe register first
mmap["strobe"] = dict(addrs=[self.base_addr], signals=[self.strobe])
if not isinstance(attrs, int) and not isinstance(attrs, dict):
raise ValueError(f'Unrecognized format for output probe "{name}".')
# Add all input and output probes
all_probes = {}
if "inputs" in self._config:
all_probes = {**all_probes, **self._config["inputs"]}
if isinstance(attrs, int):
if not attrs > 0:
raise ValueError(f'Output probe "{name}" must have positive width.')
if "outputs" in self._config:
all_probes = {**all_probes, **self._config["outputs"]}
for name, attrs in all_probes.items():
# Handle output probes that might have initial value specified in addition to width
if isinstance(attrs, dict):
width = attrs["width"]
else:
width = attrs
initial_value = 0
# Assign addresses
last_used_addr = list(mmap.values())[-1]["addrs"][-1]
addrs = [last_used_addr + 1 + i for i in range(ceil(width / 16))]
if isinstance(attrs, dict):
# check that each output probe has only recognized options
valid_options = ["width", "initial_value"]
for option in attrs:
if option not in valid_options:
warn(f'Ignoring unrecognized option "{option}" in IO core.')
# Slice signal into 16-bit chunks
signal = getattr(self, name + "_buf")
signals = [signal[16 * i : 16 * (i + 1)] for i in range(ceil(width / 16))]
# check that widths are appropriate
if "width" not in attrs:
raise ValueError(f"No width specified for output probe {name}.")
mmap[name + "_buf"] = {"addrs": addrs, "signals": signals}
if not isinstance(attrs["width"], int):
raise ValueError(f'Output probe "{name}" must have integer width.')
# Compute maximum address used by the core
max_addr = list(mmap.values())[-1]["addrs"][-1]
return mmap, max_addr
if not attrs["width"] > 0:
raise ValueError(f'Input probe "{name}" must have positive width.')
width = attrs["width"]
initial_value = 0
if "initial_value" in attrs:
if not isinstance(attrs["initial_value"], int):
raise ValueError("Initial value must be an integer.")
check_value_fits_in_bits(attrs["initial_value"], width)
initial_value = attrs["initial_value"]
output_signals += [Signal(width, name=name, reset=initial_value)]
return cls(base_addr, interface, inputs=input_signals, outputs=output_signals)
def _make_memory_map(self):
self._memory_map = {}
# Add strobe register
self._memory_map["strobe"] = dict(
signals=[self._strobe], addrs=[self._base_addr]
)
# Assign memory to all inputs and outputs
ios = self._inputs + self._outputs
io_bufs = self._input_bufs + self._output_bufs
last_used_addr = self._base_addr
for io, io_buf in zip(ios, io_bufs):
n_slices = ceil(io.width / 16)
signals = split_into_chunks(io_buf, 16)
addrs = [i + last_used_addr + 1 for i in range(n_slices)]
self._memory_map[io.name] = dict(signals=signals, addrs=addrs)
last_used_addr = addrs[-1]
# Save the last used address, for use later.
# Normally we'd just grab this from self._memory_map, but Python
# dictionaries don't guaruntee that insertion order is preserved,
# so it's more convenient to just save it now.
self._max_addr = last_used_addr
def elaborate(self, platform):
m = Module()
@ -187,34 +149,27 @@ class IOCore(Elaboratable):
# Shuffle bus transactions along
m.d.sync += self.bus_o.eq(self.bus_i)
# Update buffers from probes
with m.If(self.strobe):
# Input buffers
if "inputs" in self._config:
for name in self._config["inputs"]:
input_probe = getattr(self, name)
input_probe_buf = getattr(self, name + "_buf")
m.d.sync += input_probe_buf.eq(input_probe)
# Update input_buffers from inputs
for i, i_buf in zip(self._inputs, self._input_bufs):
with m.If(self._strobe):
m.d.sync += i_buf.eq(i)
# Output buffers
if "outputs" in self._config:
for name in self._config["outputs"]:
output_probe = getattr(self, name)
output_probe_buf = getattr(self, name + "_buf")
m.d.sync += output_probe.eq(output_probe_buf)
# Update outputs from output_buffers
for o, o_buf in zip(self._outputs, self._output_bufs):
with m.If(self._strobe):
m.d.sync += o.eq(o_buf)
# Handle register reads and writes
with m.If((self.bus_i.addr >= self.base_addr)):
with m.If((self.bus_o.addr <= self.max_addr)):
for entry in self.mmap.values():
for addr, signal in zip(entry["addrs"], entry["signals"]):
with m.If(self.bus_i.rw):
with m.If(self.bus_i.addr == addr):
m.d.sync += signal.eq(self.bus_i.data)
for io in self._memory_map.values():
for addr, signal in zip(io["addrs"], io["signals"]):
with m.If(self.bus_i.addr == addr):
# Writes
with m.If(self.bus_i.rw):
m.d.sync += signal.eq(self.bus_i.data)
with m.Else():
with m.If(self.bus_i.addr == addr):
m.d.sync += self.bus_o.data.eq(signal)
# Reads
with m.Else():
m.d.sync += self.bus_o.data.eq(signal)
return m
@ -223,65 +178,64 @@ class IOCore(Elaboratable):
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
ports = []
if "inputs" in self._config:
for name in self._config["inputs"].keys():
ports.append(getattr(self, name))
if "outputs" in self._config:
for name in self._config["outputs"].keys():
ports.append(getattr(self, name))
return ports
return [self._inputs + self._outputs]
def get_max_addr(self):
"""
Return the maximum addresses in memory used by the core. The address space used
by the core extends from `base_addr` to the number returned by this function.
Return the maximum addresses in memory used by the core. The address
space used by the core extends from `base_addr` to the number returned
by this function.
"""
return self.max_addr
return self._max_addr
def set_probe(self, probe_name, value):
# check that probe is an output probe
if probe_name not in self._config["outputs"]:
raise ValueError(f"Output probe '{probe_name}' not found.")
def set_probe(self, name, value):
"""
Set the value of an output probe on the FPGA. The value may be either
an unsigned or signed integer, but must fit within the width of the
probe.
"""
# check that value is an integer
if not isinstance(value, int):
raise ValueError("Value must be an integer.")
# Check that probe exists in memory map
probe = self._memory_map.get(name)
if not probe:
raise KeyError(f"Probe '{name}' not found in IO core.")
# get the width of the probe, make sure value isn't too large for the probe
attrs = self._config["outputs"][probe_name]
if isinstance(attrs, int):
width = attrs
# Check that the probe is an output
if not any([o.name == name for o in self._outputs]):
raise KeyError(f"Probe '{name}' is not an output of the IO core.")
if isinstance(attrs, dict):
width = attrs["width"]
# Check that value isn't too big for the register
n_bits = sum([len(s) for s in probe["signals"]])
check_value_fits_in_bits(value, n_bits)
if value > 0 and value > 2**width - 1:
raise ValueError("Unsigned integer too large.")
if value < 0 and value < -(2 ** (width - 1)):
raise ValueError("Signed integer too large.")
# set value in buffer
addrs = self.mmap[probe_name + "_buf"]["addrs"]
# Write value to core
addrs = probe["addrs"]
datas = value_to_words(value, len(addrs))
self.interface.write(addrs, datas)
self._interface.write(addrs, datas)
# pulse strobe register
strobe_addr = self.mmap["strobe"]["addrs"][0]
self.interface.write(strobe_addr, 0)
self.interface.write(strobe_addr, 1)
self.interface.write(strobe_addr, 0)
# Pulse strobe register
self._interface.write(self._base_addr, 0)
self._interface.write(self._base_addr, 1)
self._interface.write(self._base_addr, 0)
def get_probe(self, probe_name):
# pulse strobe register
strobe_addr = self.mmap["strobe"]["addrs"][0]
self.interface.write(strobe_addr, 0)
self.interface.write(strobe_addr, 1)
self.interface.write(strobe_addr, 0)
def get_probe(self, name):
"""
Get the present value of a probe on the FPGA, which is returned as an
unsigned integer. This function may be called on both input and output
probes, but output probes will return the last value written to them
(or their initial value, if no value has been written to them yet).
"""
# get value from buffer
addrs = self.mmap[probe_name + "_buf"]["addrs"]
return words_to_value(self.interface.read(addrs))
# Check that probe exists in memory map
probe = self._memory_map.get(name)
if not probe:
raise KeyError(f"Probe with name {name} not found in IO core.")
# Pulse strobe register
self._interface.write(self._base_addr, 0)
self._interface.write(self._base_addr, 1)
self._interface.write(self._base_addr, 0)
# Get value from buffer
datas = self._interface.read(probe["addrs"])
return words_to_value(datas)

View File

@ -78,7 +78,7 @@ class Manta(Elaboratable):
base_addr = 0
for name, attrs in self.config["cores"].items():
if attrs["type"] == "io":
core = IOCore(attrs, base_addr, self.interface)
core = IOCore.from_config(attrs, base_addr, self.interface)
elif attrs["type"] == "logic_analyzer":
core = LogicAnalyzerCore(attrs, base_addr, self.interface)

View File

@ -39,17 +39,39 @@ class IOCoreLoopbackTest(Elaboratable):
},
}
def get_probe(self, name):
# This is a hack! And should be removed once the full Amaranth-native
# API is built out
for i in self.manta.io_core._inputs:
if i.name == name:
return i
for o in self.manta.io_core._outputs:
if o.name == name:
return o
return None
def elaborate(self, platform):
m = Module()
m.submodules.manta = self.manta
uart_pins = platform.request("uart")
probe0 = self.get_probe("probe0")
probe1 = self.get_probe("probe1")
probe2 = self.get_probe("probe2")
probe3 = self.get_probe("probe3")
probe4 = self.get_probe("probe4")
probe5 = self.get_probe("probe5")
probe6 = self.get_probe("probe6")
probe7 = self.get_probe("probe7")
m.d.comb += [
self.manta.io_core.probe0.eq(self.manta.io_core.probe4),
self.manta.io_core.probe1.eq(self.manta.io_core.probe5),
self.manta.io_core.probe2.eq(self.manta.io_core.probe6),
self.manta.io_core.probe3.eq(self.manta.io_core.probe7),
probe0.eq(probe4),
probe1.eq(probe5),
probe2.eq(probe6),
probe3.eq(probe7),
self.manta.interface.rx.eq(uart_pins.rx.i),
uart_pins.tx.o.eq(self.manta.interface.tx),
]
@ -126,4 +148,4 @@ def test_output_probe_initial_values_xilinx():
@pytest.mark.skipif(not ice40_tools_installed(), reason="no toolchain installed")
def test_output_probe_initial_values_ice40():
IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB2").verify()
IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB3").verify()

View File

@ -1,57 +1,36 @@
from amaranth import *
from amaranth.sim import Simulator
from manta.io_core import IOCore
from manta.utils import *
from random import randint
config = {
"type": "io",
"inputs": {"probe0": 1, "probe1": 2, "probe2": 8, "probe3": 20},
"outputs": {
"probe4": {"width": 1, "initial_value": 1},
"probe5": {"width": 2, "initial_value": 2},
"probe6": 8,
"probe7": {"width": 20, "initial_value": 65538},
},
}
probe0 = Signal(1)
probe1 = Signal(2)
probe2 = Signal(8)
probe3 = Signal(20)
inputs = [probe0, probe1, probe2, probe3]
io_core = IOCore(config, base_addr=0, interface=None)
probe4 = Signal(1, reset=1)
probe5 = Signal(2, reset=2)
probe6 = Signal(8)
probe7 = Signal(20, reset=65538)
outputs = [probe4, probe5, probe6, probe7]
io_core = IOCore(base_addr=0, interface=None, inputs=inputs, outputs=outputs)
def pulse_strobe_register():
strobe_addr = io_core.mmap["strobe"]["addrs"][0]
strobe_addr = io_core._memory_map["strobe"]["addrs"][0]
yield from write_register(io_core, strobe_addr, 0)
yield from write_register(io_core, strobe_addr, 1)
yield from write_register(io_core, strobe_addr, 0)
def test_output_probe_initial_values():
def testbench():
# Verify all output probes initialize to the values in the config
for name, attrs in config["outputs"].items():
initial_value = 0
if isinstance(attrs, dict):
if "initial_value" in attrs:
initial_value = attrs["initial_value"]
output_probe = getattr(io_core, name)
value = yield output_probe
if value != initial_value:
raise ValueError(
f"Output probe {name} initialized to {value} instead of {initial_value}"
)
else:
print(f"Output probe {name} initialized to {value} as expected.")
simulate(io_core, testbench)
def test_input_probe_buffer_initial_value():
def testbench():
# Verify all input probe buffers initialize to zero
for name, width in config["inputs"].items():
addrs = io_core.mmap[name + "_buf"]["addrs"]
for i in inputs:
addrs = io_core._memory_map[i.name]["addrs"]
for addr in addrs:
yield from verify_register(io_core, addr, 0)
@ -62,13 +41,9 @@ def test_input_probe_buffer_initial_value():
def test_output_probe_buffer_initial_value():
def testbench():
# Verify all output probe buffers initialize to the values in the config
for name, attrs in config["outputs"].items():
addrs = io_core.mmap[name + "_buf"]["addrs"]
datas = [0] * len(addrs)
if isinstance(attrs, dict):
if "initial_value" in attrs:
datas = value_to_words(attrs["initial_value"], len(addrs))
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
datas = value_to_words(o.reset, len(addrs))
for addr, data in zip(addrs, datas):
yield from verify_register(io_core, addr, data)
@ -78,14 +53,9 @@ def test_output_probe_buffer_initial_value():
def test_output_probes_are_writeable():
def testbench():
for name, attrs in config["outputs"].items():
if isinstance(attrs, dict):
width = attrs["width"]
else:
width = attrs
addrs = io_core.mmap[name + "_buf"]["addrs"]
test_value = randint(0, (2**width) - 1)
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = randint(0, (2**o.width) - 1)
datas = value_to_words(test_value, len(addrs))
# write value to registers
@ -101,14 +71,9 @@ def test_output_probes_are_writeable():
def test_output_probes_update():
def testbench():
for name, attrs in config["outputs"].items():
if isinstance(attrs, dict):
width = attrs["width"]
else:
width = attrs
addrs = io_core.mmap[name + "_buf"]["addrs"]
test_value = randint(0, (2**width) - 1)
for o in outputs:
addrs = io_core._memory_map[o.name]["addrs"]
test_value = randint(0, (2**o.width) - 1)
datas = value_to_words(test_value, len(addrs))
# write value to registers
@ -119,34 +84,31 @@ def test_output_probes_update():
yield from pulse_strobe_register()
# check that outputs took updated value
output_probe = getattr(io_core, name)
value = yield (output_probe)
value = yield (o)
if value != test_value:
raise ValueError(
f"Output probe {name} took value {value} instead of {test_value} after pulsing strobe."
f"Output probe {o.name} took value {value} instead of {test_value} after pulsing strobe."
)
else:
print(f"Output probe {name} took value {value} after pulsing strobe.")
print(f"Output probe {o.name} took value {value} after pulsing strobe.")
simulate(io_core, testbench)
def test_input_probes_update():
def testbench():
for name, width in config["inputs"].items():
test_value = randint(0, (2**width) - 1)
for i in inputs:
# set input probe value
input_probe = getattr(io_core, name)
yield input_probe.eq(test_value)
test_value = randint(0, (2**i.width) - 1)
yield i.eq(test_value)
# pulse strobe register
yield from pulse_strobe_register()
# check that values are as expected once read back
addrs = io_core.mmap[name + "_buf"]["addrs"]
addrs = io_core._memory_map[i.name]["addrs"]
datas = value_to_words(test_value, len(addrs))
for addr, data in zip(addrs, datas):