manta/test/test_uart_datapath.py

155 lines
5.0 KiB
Python

import random
from amaranth import *
from cobs import cobs
from manta import *
from manta.ethernet.bridge import EthernetBridge
from manta.uart.cobs_decode import COBSDecode
from manta.uart.cobs_encode import COBSEncode
from manta.uart.stream_packer import StreamPacker
from manta.uart.stream_unpacker import StreamUnpacker
from manta.utils import *
# Since it takes so many simulation clock cycles to send data in and out,
# it's handy to test the UART datapath without the transmitter/receiver modules,
# and just leave it 8 bits wide on both ends. That's what this testbench does
class UARTDatapath(wiring.Component):
sink: In(StreamSignature(8, has_last=False))
source: Out(StreamSignature(8))
bus_source: Out(InternalBusSignature)
bus_sink: In(InternalBusSignature)
def elaborate(self, platform):
m = Module()
m.submodules.cobs_decode = cobs_decode = COBSDecode()
m.submodules.stream_packer = stream_packer = StreamPacker()
m.submodules.bridge = bridge = EthernetBridge()
m.submodules.stream_unpacker = stream_unpacker = StreamUnpacker()
m.submodules.cobs_encode = cobs_encode = COBSEncode()
wiring.connect(m, wiring.flipped(self.sink), cobs_decode.sink)
wiring.connect(m, cobs_decode.source, stream_packer.sink)
wiring.connect(m, stream_packer.source, bridge.sink)
wiring.connect(m, bridge.source, stream_unpacker.sink)
wiring.connect(m, stream_unpacker.source, cobs_encode.sink)
wiring.connect(m, cobs_encode.source, wiring.flipped(self.source))
wiring.connect(m, bridge.bus_source, wiring.flipped(self.bus_source))
wiring.connect(m, wiring.flipped(self.bus_sink), bridge.bus_sink)
return m
class UARTDatapathPlusMemoryCore(wiring.Component):
sink: In(StreamSignature(8, has_last=False))
source: Out(StreamSignature(8))
def elaborate(self, platform):
m = Module()
m.submodules.dp = dp = UARTDatapath()
m.submodules.mem = mem = MemoryCore(mode="bidirectional", width=32, depth=512)
mem.base_addr = 0
wiring.connect(m, dp.bus_source, mem.bus_sink)
wiring.connect(m, mem.bus_source, dp.bus_sink)
wiring.connect(m, dp.source, wiring.flipped(self.source))
wiring.connect(m, wiring.flipped(self.sink), dp.sink)
return m
dpmc = UARTDatapathPlusMemoryCore()
@simulate(dpmc)
async def test_uart_read_request(ctx):
await read_request(ctx, seq_num=0, addr=0, length=4)
await write_request(
ctx, seq_num=1, addr=0, data=[0x1111_1111, 0x2222_2222, 0x3333_3333, 0x4444_4444]
)
await read_request(ctx, seq_num=2, addr=0, length=4)
async def read_request(ctx, seq_num, addr, length):
header = EthernetMessageHeader.from_params(MessageTypes.READ_REQUEST, seq_num, length)
request = bytestring_from_ints([header.as_bits(), addr], byteorder="little")
await send_request(ctx, request)
async def write_request(ctx, seq_num, addr, data):
header = EthernetMessageHeader.from_params(MessageTypes.WRITE_REQUEST, seq_num)
request = bytestring_from_ints([header.as_bits(), addr, *data], byteorder="little")
await send_request(ctx, request)
async def send_request(ctx, request):
request_encoded = cobs.encode(request) + int(0).to_bytes(1)
response_encoded = await send_and_receive(
ctx, request_encoded, tx_irritate=False, rx_irritate=False
)
response = cobs.decode(bytes(response_encoded[:-1])) # remove zero delimiter
hex_print = lambda data: " ".join([f"{d:02x}" for d in data])
print(
"\n"
f" request (unencoded): {hex_print(request)}\n"
f" request (encoded): {hex_print(request_encoded)}\n"
f" response: (encoded): {hex_print(response_encoded)}\n"
f" response: (decoded): {hex_print(response)}\n"
)
async def send_and_receive(ctx, data, tx_irritate, rx_irritate):
await ctx.tick().repeat(5)
tx_done = False
tx_index = 0
rx_done = False
rx_buf = []
ctx.set(dpmc.source.ready, 1)
await ctx.tick()
while not (tx_done and rx_done):
# Feed data to decoder
tx_stall = random.randint(0, 1) if tx_irritate else False
if tx_done or tx_stall:
ctx.set(dpmc.sink.data, 0)
ctx.set(dpmc.sink.valid, 0)
else:
ctx.set(dpmc.sink.valid, 1)
ctx.set(dpmc.sink.data, data[tx_index])
if ctx.get(dpmc.sink.valid) and ctx.get(dpmc.sink.ready):
if tx_index == len(data) - 1:
tx_done = True
else:
tx_index += 1
# Randomly set source.ready if irritator is enabled
ready = random.randint(0, 1) if rx_irritate else 1
ctx.set(dpmc.source.ready, ready)
# Pull output data from buffer
if ctx.get(dpmc.source.valid) and ctx.get(dpmc.source.ready):
rx_buf += [ctx.get(dpmc.source.data)]
if ctx.get(dpmc.source.last):
rx_done = True
await ctx.tick()
await ctx.tick().repeat(5)
return rx_buf