diff --git a/src/manta/logic_analyzer/__init__.py b/src/manta/logic_analyzer/__init__.py index 62dc8c6..c3f7c26 100644 --- a/src/manta/logic_analyzer/__init__.py +++ b/src/manta/logic_analyzer/__init__.py @@ -1,6 +1,6 @@ from amaranth import * from manta.utils import * -from manta.memory_core import ReadOnlyMemoryCore +from manta.memory_core import MemoryCore from manta.logic_analyzer.trigger_block import LogicAnalyzerTriggerBlock from manta.logic_analyzer.fsm import LogicAnalyzerFSM, States, TriggerModes from manta.logic_analyzer.playback import LogicAnalyzerPlayback diff --git a/src/manta/manta.py b/src/manta/manta.py index 0b89726..424a40a 100644 --- a/src/manta/manta.py +++ b/src/manta/manta.py @@ -3,7 +3,7 @@ from manta.uart import UARTInterface from manta.ethernet import EthernetInterface from manta.io_core import IOCore -from manta.memory_core import ReadOnlyMemoryCore +from manta.memory_core import MemoryCore from manta.logic_analyzer import LogicAnalyzerCore diff --git a/src/manta/memory_core.py b/src/manta/memory_core.py index fa52411..f62af64 100644 --- a/src/manta/memory_core.py +++ b/src/manta/memory_core.py @@ -1,12 +1,13 @@ from amaranth import * +from amaranth.lib.memory import Memory from manta.utils import * from math import ceil -class ReadOnlyMemoryCore(Elaboratable): +class MemoryCore(Elaboratable): """ - A module for generating a memory on the FPGA, with a read port tied to - Manta's internal bus, and a write port provided to user logic. + A module for generating a memory on the FPGA, with a port tied to Manta's + internal bus, and a port provided to user logic. Provides methods for generating synthesizable logic for the FPGA, as well as methods for reading and writing the value of a register. @@ -15,24 +16,50 @@ class ReadOnlyMemoryCore(Elaboratable): https://fischermoseley.github.io/manta/memory_core/ """ - def __init__(self, width, depth, base_addr, interface): + def __init__(self, mode, width, depth, base_addr, interface): + self._mode = mode self._width = width self._depth = depth self._base_addr = base_addr self._interface = interface - self._max_addr = self._base_addr + (self._depth * ceil(self._width / 16)) + self._n_brams = ceil(self._width / 16) + self._max_addr = self._base_addr + (self._depth * self._n_brams) # Bus Connections self.bus_i = Signal(InternalBus()) self.bus_o = Signal(InternalBus()) - # User Port - self.user_addr = Signal(range(self._depth)) - self.user_data = Signal(self._width) - self.user_we = Signal(1) + # User Ports + if self._mode == "fpga_to_host": + self.user_addr = Signal(range(self._depth)) + self.user_data_in = Signal(self._width) + self.user_write_enable = Signal() + self._top_level_ports = [ + self.user_addr, + self.user_data_in, + self.user_write_enable, + ] - self._define_mems() + elif self._mode == "host_to_fpga": + self.user_addr = Signal(range(self._depth)) + self.user_data_out = Signal(self._width) + self._top_level_ports = [ + self.user_addr, + self.user_data_out, + ] + + elif self._mode == "bidirectional": + self.user_addr = Signal(range(self._depth)) + self.user_data_in = Signal(self._width) + self.user_data_out = Signal(self._width) + self.user_write_enable = Signal() + self._top_level_ports = [ + self.user_addr, + self.user_data_in, + self.user_data_out, + self.user_write_enable, + ] @classmethod def from_config(cls, config, base_addr, interface): @@ -64,9 +91,86 @@ class ReadOnlyMemoryCore(Elaboratable): if not width > 0: raise ValueError("Width of memory core must be positive. ") - return cls(width, depth, base_addr, interface) + # Check mode is provided and is recognized value + mode = config.get("mode") + if not mode: + raise ValueError("Mode of memory core must be specified.") - def _pipeline_bus(self, m): + if mode not in ["fpga_to_host", "host_to_fpga", "bidirectional"]: + raise ValueError("Unrecognized mode provided to memory core.") + + return cls(mode, width, depth, base_addr, interface) + + def _tie_mems_to_bus(self, m): + for i, mem in enumerate(self._mems): + # Compute address range corresponding to this chunk of memory + start_addr = self._base_addr + (i * self._depth) + stop_addr = start_addr + self._depth - 1 + + # Handle write ports + if self._mode in ["host_to_fpga", "bidirectional"]: + write_port = mem.write_port() + m.d.sync += write_port.data.eq(self.bus_i.data) + m.d.sync += write_port.en.eq(self.bus_i.rw) + m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr) + + # Handle read ports + if self._mode in ["fpga_to_host", "bidirectional"]: + read_port = mem.read_port() + m.d.comb += read_port.en.eq(1) + + # Throw BRAM operations into the front of the pipeline + with m.If( + (self.bus_i.valid) + & (self.bus_i.addr >= start_addr) + & (self.bus_i.addr <= stop_addr) + ): + m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr) + + # Pull BRAM reads from the back of the pipeline + with m.If( + (self._bus_pipe[2].valid) + & (self._bus_pipe[2].addr >= start_addr) + & (self._bus_pipe[2].addr <= stop_addr) + ): + m.d.sync += self.bus_o.data.eq(read_port.data) + + def _tie_mems_to_user_logic(self, m): + # Handle write ports + if self._mode in ["fpga_to_host", "bidirectional"]: + for i, mem in enumerate(self._mems): + write_port = mem.write_port() + m.d.comb += write_port.addr.eq(self.user_addr) + m.d.comb += write_port.data.eq(self.user_data_in[16 * i : 16 * (i + 1)]) + m.d.comb += write_port.en.eq(self.user_write_enable) + + # Handle read ports + if self._mode in ["host_to_fpga", "bidirectional"]: + read_datas = [] + for i, mem in enumerate(self._mems): + read_port = mem.read_port() + m.d.comb += read_port.addr.eq(self.user_addr) + m.d.comb += read_port.en.eq(1) + read_datas.append(read_port.data) + + m.d.comb += self.user_data_out.eq(Cat(read_datas)) + + def elaborate(self, platform): + m = Module() + + # Define memories + n_full = self._width // 16 + n_partial = self._width % 16 + + self._mems = [Memory(shape=16, depth=self._depth, init=[0]*self._depth) for _ in range(n_full)] + if n_partial > 0: + self._mems += [Memory(shape=n_partial, depth=self._depth, init=[0]*self._depth)] + + # Add memories as submodules + for i, mem in enumerate(self._mems): + m.submodules[f"mem_{i}"] = mem + + # Pipeline the bus to accomodate the two clock-cycle delay in the memories self._bus_pipe = [Signal(InternalBus()) for _ in range(3)] m.d.sync += self._bus_pipe[0].eq(self.bus_i) @@ -75,75 +179,9 @@ class ReadOnlyMemoryCore(Elaboratable): m.d.sync += self.bus_o.eq(self._bus_pipe[2]) - def _define_mems(self): - # There's three cases that must be handled: - # 1. Integer number of 16 bit mems - # 2. Integer number of 16 bit mems + partial mem - # 3. Just the partial mem (width < 16) - - # Only one, partial-width memory is needed - if self._width < 16: - self._mems = [Memory(depth=self._depth, width=self._width)] - - # Only full-width memories are needed - elif self._width % 16 == 0: - self._mems = [ - Memory(depth=self._depth, width=16) for _ in range(self._width // 16) - ] - - # Both full-width and partial memories are needed - else: - self._mems = [ - Memory(depth=self._depth, width=16) for i in range(self._width // 16) - ] - self._mems += [Memory(depth=self._depth, width=self._width % 16)] - - def _handle_read_ports(self, m): - # These are tied to the bus - for i, mem in enumerate(self._mems): - read_port = mem.read_port() - m.d.comb += read_port.en.eq(1) - - start_addr = self._base_addr + (i * self._depth) - stop_addr = start_addr + self._depth - 1 - - # Throw BRAM operations into the front of the pipeline - with m.If( - (self.bus_i.valid) - & (~self.bus_i.rw) - & (self.bus_i.addr >= start_addr) - & (self.bus_i.addr <= stop_addr) - ): - m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr) - - # Pull BRAM reads from the back of the pipeline - with m.If( - (self._bus_pipe[2].valid) - & (~self._bus_pipe[2].rw) - & (self._bus_pipe[2].addr >= start_addr) - & (self._bus_pipe[2].addr <= stop_addr) - ): - m.d.sync += self.bus_o.data.eq(read_port.data) - - def _handle_write_ports(self, m): - # These are given to the user - for i, mem in enumerate(self._mems): - write_port = mem.write_port() - - m.d.comb += write_port.addr.eq(self.user_addr) - m.d.comb += write_port.data.eq(self.user_data[16 * i : 16 * (i + 1)]) - m.d.comb += write_port.en.eq(self.user_we) - - def elaborate(self, platform): - m = Module() - - # Add memories as submodules - for i, mem in enumerate(self._mems): - m.submodules[f"mem_{i}"] = mem - - self._pipeline_bus(m) - self._handle_read_ports(m) - self._handle_write_ports(m) + # Tie memory ports to the internal bus and user logic + self._tie_mems_to_bus(m) + self._tie_mems_to_user_logic(m) return m def get_top_level_ports(self): @@ -151,7 +189,7 @@ class ReadOnlyMemoryCore(Elaboratable): Return the Amaranth signals that should be included as ports in the top-level Manta module. """ - return [self.user_addr, self.user_data, self.user_we] + return self._top_level_ports def get_max_addr(self): """ @@ -161,26 +199,64 @@ class ReadOnlyMemoryCore(Elaboratable): """ return self._max_addr - def read_from_user_addr(self, addrs): + def _convert_user_to_bus_addr(self, addrs): + """ + Convert user address space to bus address space. For instance, for a + core with base address 10 and width 33, reading from address 4 is + actually a read from address 14 and address 14 + depth, and address + 14 + (2 * depth). + """ + if isinstance(addrs, int): + return self._convert_user_to_bus_addr([addrs])[0] + + bus_addrs = [] + for addr in addrs: + for i in range(len(self._mems)): + bus_addrs.append(self._base_addr + addr + (i * self._depth)) + + return bus_addrs + + def read(self, addrs): """ Read the memory stored at the provided address, as seen from the user side. """ - # Convert user address space to bus address space - # (for instance, for a core with base address 10 and width 33, - # reading from address 4 is actually a read from address 14 - # and address 14 + depth, and address 14 + 2*depth) - + # Handle a single integer address if isinstance(addrs, int): - return self.read_from_user_addr([addrs])[0] + return self.read([addrs])[0] - bus_addrs = [] - for addr in addrs: - bus_addrs += [ - addr + self._base_addr + i * self._depth for i in range(len(self._mems)) - ] + # Make sure all list elements are integers + if not all(isinstance(a, int) for a in addrs): + raise TypeError("Read address must be an integer or list of integers.") + bus_addrs = self._convert_user_to_bus_addr(addrs) datas = self._interface.read(bus_addrs) - data_chunks = split_into_chunks(datas, len(self._mems)) + data_chunks = split_into_chunks(datas, self._n_brams) return [words_to_value(chunk) for chunk in data_chunks] + + def write(self, addrs, datas): + """ + Write to the memory stored at the provided address, as seen from the + user side. + """ + + # Handle a single integer address and data + if isinstance(addrs, int) and isinstance(datas, int): + return self.write([addrs], [datas]) + + # Make sure address and datas are all integers + if not isinstance(addrs, list) or not isinstance(datas, list): + raise TypeError( + "Write addresses and data must be an integer or list of integers." + ) + + if not all(isinstance(a, int) for a in addrs): + raise TypeError("Write addresses must be all be integers.") + + if not all(isinstance(d, int) for d in datas): + raise TypeError("Write data must all be integers.") + + bus_addrs = self._convert_user_to_bus_addr([addrs])[0] + bus_datas = [word for d in datas for word in value_to_words(d, self._n_brams)] + self._interface.write(bus_addrs, bus_datas) diff --git a/test/test_mem_core_sim.py b/test/test_mem_core_sim.py index e793c4f..fce9800 100644 --- a/test/test_mem_core_sim.py +++ b/test/test_mem_core_sim.py @@ -1,4 +1,4 @@ -from manta.memory_core import ReadOnlyMemoryCore +from manta.memory_core import MemoryCore from manta.utils import * from random import randint, sample @@ -6,16 +6,16 @@ from random import randint, sample def fill_mem_from_user_port(mem_core, depth): for i in range(depth): yield mem_core.user_addr.eq(i) - yield mem_core.user_data.eq(i) - yield mem_core.user_we.eq(1) + yield mem_core.user_data_in.eq(i) + yield mem_core.user_write_enable.eq(1) yield - yield mem_core.user_we.eq(0) + yield mem_core.user_write_enable.eq(0) yield def verify_mem_core(width, depth, base_addr): - mem_core = ReadOnlyMemoryCore(width, depth, base_addr, interface=None) + mem_core = MemoryCore("fpga_to_host", width, depth, base_addr, interface=None) def testbench(): yield from fill_mem_from_user_port(mem_core, depth)