add preliminary bidirectional memory core

This commit is contained in:
Fischer Moseley 2024-02-28 10:36:27 -08:00
parent 8a2b9ced76
commit ab7d9105b1
4 changed files with 178 additions and 102 deletions

View File

@ -1,6 +1,6 @@
from amaranth import *
from manta.utils import *
from manta.memory_core import ReadOnlyMemoryCore
from manta.memory_core import MemoryCore
from manta.logic_analyzer.trigger_block import LogicAnalyzerTriggerBlock
from manta.logic_analyzer.fsm import LogicAnalyzerFSM, States, TriggerModes
from manta.logic_analyzer.playback import LogicAnalyzerPlayback

View File

@ -3,7 +3,7 @@ from manta.uart import UARTInterface
from manta.ethernet import EthernetInterface
from manta.io_core import IOCore
from manta.memory_core import ReadOnlyMemoryCore
from manta.memory_core import MemoryCore
from manta.logic_analyzer import LogicAnalyzerCore

View File

@ -1,12 +1,13 @@
from amaranth import *
from amaranth.lib.memory import Memory
from manta.utils import *
from math import ceil
class ReadOnlyMemoryCore(Elaboratable):
class MemoryCore(Elaboratable):
"""
A module for generating a memory on the FPGA, with a read port tied to
Manta's internal bus, and a write port provided to user logic.
A module for generating a memory on the FPGA, with a port tied to Manta's
internal bus, and a port provided to user logic.
Provides methods for generating synthesizable logic for the FPGA, as well
as methods for reading and writing the value of a register.
@ -15,24 +16,50 @@ class ReadOnlyMemoryCore(Elaboratable):
https://fischermoseley.github.io/manta/memory_core/
"""
def __init__(self, width, depth, base_addr, interface):
def __init__(self, mode, width, depth, base_addr, interface):
self._mode = mode
self._width = width
self._depth = depth
self._base_addr = base_addr
self._interface = interface
self._max_addr = self._base_addr + (self._depth * ceil(self._width / 16))
self._n_brams = ceil(self._width / 16)
self._max_addr = self._base_addr + (self._depth * self._n_brams)
# Bus Connections
self.bus_i = Signal(InternalBus())
self.bus_o = Signal(InternalBus())
# User Port
self.user_addr = Signal(range(self._depth))
self.user_data = Signal(self._width)
self.user_we = Signal(1)
# User Ports
if self._mode == "fpga_to_host":
self.user_addr = Signal(range(self._depth))
self.user_data_in = Signal(self._width)
self.user_write_enable = Signal()
self._top_level_ports = [
self.user_addr,
self.user_data_in,
self.user_write_enable,
]
self._define_mems()
elif self._mode == "host_to_fpga":
self.user_addr = Signal(range(self._depth))
self.user_data_out = Signal(self._width)
self._top_level_ports = [
self.user_addr,
self.user_data_out,
]
elif self._mode == "bidirectional":
self.user_addr = Signal(range(self._depth))
self.user_data_in = Signal(self._width)
self.user_data_out = Signal(self._width)
self.user_write_enable = Signal()
self._top_level_ports = [
self.user_addr,
self.user_data_in,
self.user_data_out,
self.user_write_enable,
]
@classmethod
def from_config(cls, config, base_addr, interface):
@ -64,9 +91,86 @@ class ReadOnlyMemoryCore(Elaboratable):
if not width > 0:
raise ValueError("Width of memory core must be positive. ")
return cls(width, depth, base_addr, interface)
# Check mode is provided and is recognized value
mode = config.get("mode")
if not mode:
raise ValueError("Mode of memory core must be specified.")
def _pipeline_bus(self, m):
if mode not in ["fpga_to_host", "host_to_fpga", "bidirectional"]:
raise ValueError("Unrecognized mode provided to memory core.")
return cls(mode, width, depth, base_addr, interface)
def _tie_mems_to_bus(self, m):
for i, mem in enumerate(self._mems):
# Compute address range corresponding to this chunk of memory
start_addr = self._base_addr + (i * self._depth)
stop_addr = start_addr + self._depth - 1
# Handle write ports
if self._mode in ["host_to_fpga", "bidirectional"]:
write_port = mem.write_port()
m.d.sync += write_port.data.eq(self.bus_i.data)
m.d.sync += write_port.en.eq(self.bus_i.rw)
m.d.sync += write_port.addr.eq(self.bus_i.addr - start_addr)
# Handle read ports
if self._mode in ["fpga_to_host", "bidirectional"]:
read_port = mem.read_port()
m.d.comb += read_port.en.eq(1)
# Throw BRAM operations into the front of the pipeline
with m.If(
(self.bus_i.valid)
& (self.bus_i.addr >= start_addr)
& (self.bus_i.addr <= stop_addr)
):
m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr)
# Pull BRAM reads from the back of the pipeline
with m.If(
(self._bus_pipe[2].valid)
& (self._bus_pipe[2].addr >= start_addr)
& (self._bus_pipe[2].addr <= stop_addr)
):
m.d.sync += self.bus_o.data.eq(read_port.data)
def _tie_mems_to_user_logic(self, m):
# Handle write ports
if self._mode in ["fpga_to_host", "bidirectional"]:
for i, mem in enumerate(self._mems):
write_port = mem.write_port()
m.d.comb += write_port.addr.eq(self.user_addr)
m.d.comb += write_port.data.eq(self.user_data_in[16 * i : 16 * (i + 1)])
m.d.comb += write_port.en.eq(self.user_write_enable)
# Handle read ports
if self._mode in ["host_to_fpga", "bidirectional"]:
read_datas = []
for i, mem in enumerate(self._mems):
read_port = mem.read_port()
m.d.comb += read_port.addr.eq(self.user_addr)
m.d.comb += read_port.en.eq(1)
read_datas.append(read_port.data)
m.d.comb += self.user_data_out.eq(Cat(read_datas))
def elaborate(self, platform):
m = Module()
# Define memories
n_full = self._width // 16
n_partial = self._width % 16
self._mems = [Memory(shape=16, depth=self._depth, init=[0]*self._depth) for _ in range(n_full)]
if n_partial > 0:
self._mems += [Memory(shape=n_partial, depth=self._depth, init=[0]*self._depth)]
# Add memories as submodules
for i, mem in enumerate(self._mems):
m.submodules[f"mem_{i}"] = mem
# Pipeline the bus to accomodate the two clock-cycle delay in the memories
self._bus_pipe = [Signal(InternalBus()) for _ in range(3)]
m.d.sync += self._bus_pipe[0].eq(self.bus_i)
@ -75,75 +179,9 @@ class ReadOnlyMemoryCore(Elaboratable):
m.d.sync += self.bus_o.eq(self._bus_pipe[2])
def _define_mems(self):
# There's three cases that must be handled:
# 1. Integer number of 16 bit mems
# 2. Integer number of 16 bit mems + partial mem
# 3. Just the partial mem (width < 16)
# Only one, partial-width memory is needed
if self._width < 16:
self._mems = [Memory(depth=self._depth, width=self._width)]
# Only full-width memories are needed
elif self._width % 16 == 0:
self._mems = [
Memory(depth=self._depth, width=16) for _ in range(self._width // 16)
]
# Both full-width and partial memories are needed
else:
self._mems = [
Memory(depth=self._depth, width=16) for i in range(self._width // 16)
]
self._mems += [Memory(depth=self._depth, width=self._width % 16)]
def _handle_read_ports(self, m):
# These are tied to the bus
for i, mem in enumerate(self._mems):
read_port = mem.read_port()
m.d.comb += read_port.en.eq(1)
start_addr = self._base_addr + (i * self._depth)
stop_addr = start_addr + self._depth - 1
# Throw BRAM operations into the front of the pipeline
with m.If(
(self.bus_i.valid)
& (~self.bus_i.rw)
& (self.bus_i.addr >= start_addr)
& (self.bus_i.addr <= stop_addr)
):
m.d.sync += read_port.addr.eq(self.bus_i.addr - start_addr)
# Pull BRAM reads from the back of the pipeline
with m.If(
(self._bus_pipe[2].valid)
& (~self._bus_pipe[2].rw)
& (self._bus_pipe[2].addr >= start_addr)
& (self._bus_pipe[2].addr <= stop_addr)
):
m.d.sync += self.bus_o.data.eq(read_port.data)
def _handle_write_ports(self, m):
# These are given to the user
for i, mem in enumerate(self._mems):
write_port = mem.write_port()
m.d.comb += write_port.addr.eq(self.user_addr)
m.d.comb += write_port.data.eq(self.user_data[16 * i : 16 * (i + 1)])
m.d.comb += write_port.en.eq(self.user_we)
def elaborate(self, platform):
m = Module()
# Add memories as submodules
for i, mem in enumerate(self._mems):
m.submodules[f"mem_{i}"] = mem
self._pipeline_bus(m)
self._handle_read_ports(m)
self._handle_write_ports(m)
# Tie memory ports to the internal bus and user logic
self._tie_mems_to_bus(m)
self._tie_mems_to_user_logic(m)
return m
def get_top_level_ports(self):
@ -151,7 +189,7 @@ class ReadOnlyMemoryCore(Elaboratable):
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
return [self.user_addr, self.user_data, self.user_we]
return self._top_level_ports
def get_max_addr(self):
"""
@ -161,26 +199,64 @@ class ReadOnlyMemoryCore(Elaboratable):
"""
return self._max_addr
def read_from_user_addr(self, addrs):
def _convert_user_to_bus_addr(self, addrs):
"""
Convert user address space to bus address space. For instance, for a
core with base address 10 and width 33, reading from address 4 is
actually a read from address 14 and address 14 + depth, and address
14 + (2 * depth).
"""
if isinstance(addrs, int):
return self._convert_user_to_bus_addr([addrs])[0]
bus_addrs = []
for addr in addrs:
for i in range(len(self._mems)):
bus_addrs.append(self._base_addr + addr + (i * self._depth))
return bus_addrs
def read(self, addrs):
"""
Read the memory stored at the provided address, as seen from the user
side.
"""
# Convert user address space to bus address space
# (for instance, for a core with base address 10 and width 33,
# reading from address 4 is actually a read from address 14
# and address 14 + depth, and address 14 + 2*depth)
# Handle a single integer address
if isinstance(addrs, int):
return self.read_from_user_addr([addrs])[0]
return self.read([addrs])[0]
bus_addrs = []
for addr in addrs:
bus_addrs += [
addr + self._base_addr + i * self._depth for i in range(len(self._mems))
]
# Make sure all list elements are integers
if not all(isinstance(a, int) for a in addrs):
raise TypeError("Read address must be an integer or list of integers.")
bus_addrs = self._convert_user_to_bus_addr(addrs)
datas = self._interface.read(bus_addrs)
data_chunks = split_into_chunks(datas, len(self._mems))
data_chunks = split_into_chunks(datas, self._n_brams)
return [words_to_value(chunk) for chunk in data_chunks]
def write(self, addrs, datas):
"""
Write to the memory stored at the provided address, as seen from the
user side.
"""
# Handle a single integer address and data
if isinstance(addrs, int) and isinstance(datas, int):
return self.write([addrs], [datas])
# Make sure address and datas are all integers
if not isinstance(addrs, list) or not isinstance(datas, list):
raise TypeError(
"Write addresses and data must be an integer or list of integers."
)
if not all(isinstance(a, int) for a in addrs):
raise TypeError("Write addresses must be all be integers.")
if not all(isinstance(d, int) for d in datas):
raise TypeError("Write data must all be integers.")
bus_addrs = self._convert_user_to_bus_addr([addrs])[0]
bus_datas = [word for d in datas for word in value_to_words(d, self._n_brams)]
self._interface.write(bus_addrs, bus_datas)

View File

@ -1,4 +1,4 @@
from manta.memory_core import ReadOnlyMemoryCore
from manta.memory_core import MemoryCore
from manta.utils import *
from random import randint, sample
@ -6,16 +6,16 @@ from random import randint, sample
def fill_mem_from_user_port(mem_core, depth):
for i in range(depth):
yield mem_core.user_addr.eq(i)
yield mem_core.user_data.eq(i)
yield mem_core.user_we.eq(1)
yield mem_core.user_data_in.eq(i)
yield mem_core.user_write_enable.eq(1)
yield
yield mem_core.user_we.eq(0)
yield mem_core.user_write_enable.eq(0)
yield
def verify_mem_core(width, depth, base_addr):
mem_core = ReadOnlyMemoryCore(width, depth, base_addr, interface=None)
mem_core = MemoryCore("fpga_to_host", width, depth, base_addr, interface=None)
def testbench():
yield from fill_mem_from_user_port(mem_core, depth)