meta: finish moving simulations to new async API
This commit is contained in:
parent
8fd943257c
commit
753a3f9427
|
|
@ -35,7 +35,7 @@ class EthernetInterface(Elaboratable):
|
|||
self._phy_io = self._define_phy_io()
|
||||
|
||||
self._dhcp_start = Signal()
|
||||
self._dhcp_timer = Signal(range(self._clk_freq + 1), reset=self._clk_freq)
|
||||
self._dhcp_timer = Signal(range(self._clk_freq + 1), init=self._clk_freq)
|
||||
|
||||
self._source_data = Signal(32)
|
||||
self._source_last = Signal()
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class IOCore(MantaCore):
|
|||
self._strobe = Signal()
|
||||
self._input_bufs = [Signal(len(p), name=p.name + "_buf") for p in self._inputs]
|
||||
self._output_bufs = [
|
||||
Signal(len(p), name=p.name + "_buf", reset=p.reset) for p in self._outputs
|
||||
Signal(len(p), name=p.name + "_buf", init=p.init) for p in self._outputs
|
||||
]
|
||||
|
||||
self._make_memory_map()
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
from amaranth import *
|
||||
from manta.utils import *
|
||||
from amaranth.lib.memory import Memory
|
||||
from math import ceil
|
||||
|
||||
|
||||
|
|
@ -67,12 +68,12 @@ class MemoryCore(MantaCore):
|
|||
n_partial = self._width % 16
|
||||
|
||||
self._mems = [
|
||||
Memory(width=16, depth=self._depth, init=[0] * self._depth)
|
||||
Memory(shape=16, depth=self._depth, init=[0] * self._depth)
|
||||
for _ in range(n_full)
|
||||
]
|
||||
if n_partial > 0:
|
||||
self._mems += [
|
||||
Memory(width=n_partial, depth=self._depth, init=[0] * self._depth)
|
||||
Memory(shape=n_partial, depth=self._depth, init=[0] * self._depth)
|
||||
]
|
||||
|
||||
@property
|
||||
|
|
@ -204,10 +205,7 @@ class MemoryCore(MantaCore):
|
|||
if self._mode in ["host_to_fpga", "bidirectional"]:
|
||||
read_datas = []
|
||||
for i, mem in enumerate(self._mems):
|
||||
m.domains.user = user_cd = ClockDomain(local=True)
|
||||
m.d.comb += user_cd.clk.eq(self.user_clk)
|
||||
|
||||
read_port = mem.read_port(domain="user")
|
||||
read_port = mem.read_port()
|
||||
m.d.comb += read_port.addr.eq(self.user_addr)
|
||||
m.d.comb += read_port.en.eq(1)
|
||||
read_datas.append(read_port.data)
|
||||
|
|
|
|||
|
|
@ -13,9 +13,9 @@ class UARTTransmitter(Elaboratable):
|
|||
# Top-Level Ports
|
||||
self.data_i = Signal(8)
|
||||
self.start_i = Signal()
|
||||
self.done_o = Signal(reset=1)
|
||||
self.done_o = Signal(init=1)
|
||||
|
||||
self.tx = Signal(reset=1)
|
||||
self.tx = Signal(init=1)
|
||||
|
||||
# Internal Signals
|
||||
self._baud_counter = Signal(range(self._clocks_per_baud))
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
from amaranth import *
|
||||
from amaranth.lib import io
|
||||
from amaranth_boards.nexys4ddr import Nexys4DDRPlatform
|
||||
from amaranth_boards.icestick import ICEStickPlatform
|
||||
from manta import Manta
|
||||
|
|
@ -57,7 +58,9 @@ class IOCoreLoopbackTest(Elaboratable):
|
|||
m = Module()
|
||||
m.submodules.manta = self.manta
|
||||
|
||||
uart_pins = platform.request("uart")
|
||||
uart_pins = platform.request("uart", dir={"tx": "-", "rx": "-"})
|
||||
m.submodules.uart_rx = uart_rx = io.Buffer("i", uart_pins.rx)
|
||||
m.submodules.uart_tx = uart_tx = io.Buffer("o", uart_pins.tx)
|
||||
|
||||
probe0 = self.get_probe("probe0")
|
||||
probe1 = self.get_probe("probe1")
|
||||
|
|
@ -73,8 +76,8 @@ class IOCoreLoopbackTest(Elaboratable):
|
|||
probe1.eq(probe5),
|
||||
probe2.eq(probe6),
|
||||
probe3.eq(probe7),
|
||||
self.manta.interface.rx.eq(uart_pins.rx.i),
|
||||
uart_pins.tx.o.eq(self.manta.interface.tx),
|
||||
self.manta.interface.rx.eq(uart_rx.i),
|
||||
uart_tx.o.eq(self.manta.interface.tx),
|
||||
]
|
||||
|
||||
return m
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ async def test_output_probe_buffer_initial_value(ctx):
|
|||
# Verify all output probe buffers initialize to the values in the config
|
||||
for o in outputs:
|
||||
addrs = io_core._memory_map[o.name]["addrs"]
|
||||
datas = value_to_words(o.reset, len(addrs))
|
||||
datas = value_to_words(o.init, len(addrs))
|
||||
|
||||
for addr, data in zip(addrs, datas):
|
||||
await verify_register(io_core, ctx, addr, data)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,9 @@ class MemoryCoreTests:
|
|||
# A model of what each bus address contains
|
||||
self.model = {i: 0 for i in self.bus_addrs}
|
||||
|
||||
def set_simulation_context(self, ctx):
|
||||
self.ctx = ctx
|
||||
|
||||
async def bus_addrs_all_zero(self):
|
||||
for addr in self.bus_addrs:
|
||||
await self.verify_bus_side(addr)
|
||||
|
|
@ -32,23 +35,23 @@ class MemoryCoreTests:
|
|||
await self.verify_user_side(addr)
|
||||
|
||||
async def bus_to_bus_functionality(self):
|
||||
# yield from self.one_bus_write_then_one_bus_read()
|
||||
# yield from self.multi_bus_writes_then_multi_bus_reads()
|
||||
await self.one_bus_write_then_one_bus_read()
|
||||
await self.multi_bus_writes_then_multi_bus_reads()
|
||||
await self.rand_bus_writes_rand_bus_reads()
|
||||
|
||||
async def user_to_bus_functionality(self):
|
||||
# yield from self.one_user_write_then_one_bus_read()
|
||||
# yield from self.multi_user_write_then_multi_bus_reads()
|
||||
await self.one_user_write_then_one_bus_read()
|
||||
await self.multi_user_write_then_multi_bus_reads()
|
||||
await self.rand_user_writes_rand_bus_reads()
|
||||
|
||||
async def bus_to_user_functionality(self):
|
||||
# yield from self.one_bus_write_then_one_user_read()
|
||||
# yield from self.multi_bus_write_then_multi_user_reads()
|
||||
await self.one_bus_write_then_one_user_read()
|
||||
await self.multi_bus_write_then_multi_user_reads()
|
||||
await self.rand_bus_writes_rand_user_reads()
|
||||
|
||||
async def user_to_user_functionality(self):
|
||||
# yield from self.one_user_write_then_one_user_read()
|
||||
# yield from self.multi_user_write_then_multi_user_read()
|
||||
await self.one_user_write_then_one_user_read()
|
||||
await self.multi_user_write_then_multi_user_read()
|
||||
await self.rand_user_write_rand_user_read()
|
||||
|
||||
async def one_bus_write_then_one_bus_read(self):
|
||||
|
|
@ -210,16 +213,16 @@ class MemoryCoreTests:
|
|||
else:
|
||||
return self.width % 16
|
||||
|
||||
async def verify_bus_side(self, ctx, addr):
|
||||
await verify_register(self.mem_core, ctx, addr, self.model[addr])
|
||||
await ctx.tick().repeat(4)
|
||||
async def verify_bus_side(self, addr):
|
||||
await verify_register(self.mem_core, self.ctx, addr, self.model[addr])
|
||||
await self.ctx.tick().repeat(4)
|
||||
|
||||
async def write_bus_side(self, ctx, addr, data):
|
||||
async def write_bus_side(self, addr, data):
|
||||
self.model[addr] = data
|
||||
await write_register(self.mem_core, addr, data)
|
||||
await ctx.tick().repeat(4)
|
||||
await write_register(self.mem_core, self.ctx, addr, data)
|
||||
await self.ctx.tick().repeat(4)
|
||||
|
||||
async def verify_user_side(self, ctx, addr):
|
||||
async def verify_user_side(self, addr):
|
||||
# Determine the expected value on the user side by looking
|
||||
# up the appropriate bus addresses in the model
|
||||
|
||||
|
|
@ -231,29 +234,29 @@ class MemoryCoreTests:
|
|||
|
||||
expected_data = words_to_value(bus_words)
|
||||
|
||||
await self.mem_core.user_addr.eq(addr)
|
||||
await ctx.tick().repeat(2)
|
||||
self.ctx.set(self.mem_core.user_addr, addr)
|
||||
await self.ctx.tick().repeat(2)
|
||||
|
||||
data = ctx.get(self.mem_core.user_data_out)
|
||||
data = self.ctx.get(self.mem_core.user_data_out)
|
||||
if data != expected_data:
|
||||
raise ValueError(
|
||||
f"Read from {addr} yielded {data} instead of {expected_data}"
|
||||
)
|
||||
|
||||
async def write_user_side(self, ctx, addr, data):
|
||||
async def write_user_side(self, addr, data):
|
||||
# convert value to words, and save to self.model
|
||||
words = value_to_words(data, self.n_mems)
|
||||
for i, word in enumerate(words):
|
||||
bus_addr = self.base_addr + addr + (i * self.depth)
|
||||
self.model[bus_addr] = word
|
||||
|
||||
ctx.set(self.mem_core.user_addr, addr)
|
||||
ctx.set(self.mem_core.user_data_in, data)
|
||||
ctx.set(self.mem_core.user_write_enable, 1)
|
||||
await ctx.tick()
|
||||
ctx.set(self.mem_core.user_addr, 0)
|
||||
ctx.set(self.mem_core.user_data_in, 0)
|
||||
ctx.set(self.mem_core.user_write_enable, 0)
|
||||
self.ctx.set(self.mem_core.user_addr, addr)
|
||||
self.ctx.set(self.mem_core.user_data_in, data)
|
||||
self.ctx.set(self.mem_core.user_write_enable, 1)
|
||||
await self.ctx.tick()
|
||||
self.ctx.set(self.mem_core.user_addr, 0)
|
||||
self.ctx.set(self.mem_core.user_data_in, 0)
|
||||
self.ctx.set(self.mem_core.user_write_enable, 0)
|
||||
|
||||
|
||||
modes = ["bidirectional", "fpga_to_host", "host_to_fpga"]
|
||||
|
|
@ -273,7 +276,9 @@ def test_mem_core(mode, width, depth, base_addr):
|
|||
tests = MemoryCoreTests(mem_core)
|
||||
|
||||
@simulate(mem_core)
|
||||
async def testbench():
|
||||
async def testbench(ctx):
|
||||
tests.set_simulation_context(ctx)
|
||||
|
||||
if mode == "bidirectional":
|
||||
await tests.bus_addrs_all_zero()
|
||||
await tests.user_addrs_all_zero()
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ async def verify_bit_sequence(ctx, byte):
|
|||
|
||||
ctx.set(uart_tx.data_i, 0)
|
||||
ctx.set(uart_tx.start_i, 0)
|
||||
await ctx.tick()
|
||||
|
||||
# Check that data bit is correct on every clock baud period
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue