uart: add full datapath test
This commit is contained in:
parent
dd6a3ac467
commit
6a2c29ede4
|
|
@ -0,0 +1,154 @@
|
|||
import random
|
||||
|
||||
from amaranth import *
|
||||
from cobs import cobs
|
||||
|
||||
from manta import *
|
||||
from manta.ethernet.bridge import EthernetBridge
|
||||
from manta.uart.cobs_decode import COBSDecode
|
||||
from manta.uart.cobs_encode import COBSEncode
|
||||
from manta.uart.stream_packer import StreamPacker
|
||||
from manta.uart.stream_unpacker import StreamUnpacker
|
||||
from manta.utils import *
|
||||
|
||||
# Since it takes so many simulation clock cycles to send data in and out,
|
||||
# it's handy to test the UART datapath without the transmitter/receiver modules,
|
||||
# and just leave it 8 bits wide on both ends. That's what this testbench does
|
||||
|
||||
|
||||
class UARTDatapath(wiring.Component):
|
||||
sink: In(StreamSignature(8, has_last=False))
|
||||
source: Out(StreamSignature(8))
|
||||
|
||||
bus_source: Out(InternalBusSignature)
|
||||
bus_sink: In(InternalBusSignature)
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
m.submodules.cobs_decode = cobs_decode = COBSDecode()
|
||||
m.submodules.stream_packer = stream_packer = StreamPacker()
|
||||
m.submodules.bridge = bridge = EthernetBridge()
|
||||
m.submodules.stream_unpacker = stream_unpacker = StreamUnpacker()
|
||||
m.submodules.cobs_encode = cobs_encode = COBSEncode()
|
||||
|
||||
wiring.connect(m, wiring.flipped(self.sink), cobs_decode.sink)
|
||||
wiring.connect(m, cobs_decode.source, stream_packer.sink)
|
||||
wiring.connect(m, stream_packer.source, bridge.sink)
|
||||
wiring.connect(m, bridge.source, stream_unpacker.sink)
|
||||
wiring.connect(m, stream_unpacker.source, cobs_encode.sink)
|
||||
wiring.connect(m, cobs_encode.source, wiring.flipped(self.source))
|
||||
|
||||
wiring.connect(m, bridge.bus_source, wiring.flipped(self.bus_source))
|
||||
wiring.connect(m, wiring.flipped(self.bus_sink), bridge.bus_sink)
|
||||
|
||||
return m
|
||||
|
||||
|
||||
class UARTDatapathPlusMemoryCore(wiring.Component):
|
||||
sink: In(StreamSignature(8, has_last=False))
|
||||
source: Out(StreamSignature(8))
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
m.submodules.dp = dp = UARTDatapath()
|
||||
m.submodules.mem = mem = MemoryCore(mode="bidirectional", width=32, depth=512)
|
||||
mem.base_addr = 0
|
||||
|
||||
wiring.connect(m, dp.bus_source, mem.bus_sink)
|
||||
wiring.connect(m, mem.bus_source, dp.bus_sink)
|
||||
|
||||
wiring.connect(m, dp.source, wiring.flipped(self.source))
|
||||
wiring.connect(m, wiring.flipped(self.sink), dp.sink)
|
||||
|
||||
return m
|
||||
|
||||
|
||||
dpmc = UARTDatapathPlusMemoryCore()
|
||||
|
||||
|
||||
@simulate(dpmc)
|
||||
async def test_uart_read_request(ctx):
|
||||
await read_request(ctx, seq_num=0, addr=0, length=4)
|
||||
await write_request(
|
||||
ctx, seq_num=1, addr=0, data=[0x1111_1111, 0x2222_2222, 0x3333_3333, 0x4444_4444]
|
||||
)
|
||||
await read_request(ctx, seq_num=2, addr=0, length=4)
|
||||
|
||||
|
||||
async def read_request(ctx, seq_num, addr, length):
|
||||
header = EthernetMessageHeader.from_params(MessageTypes.READ_REQUEST, seq_num, length)
|
||||
request = bytestring_from_ints([header.as_bits(), addr], byteorder="little")
|
||||
await send_request(ctx, request)
|
||||
|
||||
|
||||
async def write_request(ctx, seq_num, addr, data):
|
||||
header = EthernetMessageHeader.from_params(MessageTypes.WRITE_REQUEST, seq_num)
|
||||
request = bytestring_from_ints([header.as_bits(), addr, *data], byteorder="little")
|
||||
await send_request(ctx, request)
|
||||
|
||||
|
||||
async def send_request(ctx, request):
|
||||
request_encoded = cobs.encode(request) + int(0).to_bytes(1)
|
||||
response_encoded = await send_and_receive(
|
||||
ctx, request_encoded, tx_irritate=False, rx_irritate=False
|
||||
)
|
||||
response = cobs.decode(bytes(response_encoded[:-1])) # remove zero delimiter
|
||||
|
||||
hex_print = lambda data: " ".join([f"{d:02x}" for d in data])
|
||||
|
||||
print(
|
||||
"\n"
|
||||
f" request (unencoded): {hex_print(request)}\n"
|
||||
f" request (encoded): {hex_print(request_encoded)}\n"
|
||||
f" response: (encoded): {hex_print(response_encoded)}\n"
|
||||
f" response: (decoded): {hex_print(response)}\n"
|
||||
)
|
||||
|
||||
|
||||
async def send_and_receive(ctx, data, tx_irritate, rx_irritate):
|
||||
await ctx.tick().repeat(5)
|
||||
|
||||
tx_done = False
|
||||
tx_index = 0
|
||||
rx_done = False
|
||||
rx_buf = []
|
||||
|
||||
ctx.set(dpmc.source.ready, 1)
|
||||
await ctx.tick()
|
||||
|
||||
while not (tx_done and rx_done):
|
||||
# Feed data to decoder
|
||||
tx_stall = random.randint(0, 1) if tx_irritate else False
|
||||
|
||||
if tx_done or tx_stall:
|
||||
ctx.set(dpmc.sink.data, 0)
|
||||
ctx.set(dpmc.sink.valid, 0)
|
||||
|
||||
else:
|
||||
ctx.set(dpmc.sink.valid, 1)
|
||||
ctx.set(dpmc.sink.data, data[tx_index])
|
||||
|
||||
if ctx.get(dpmc.sink.valid) and ctx.get(dpmc.sink.ready):
|
||||
if tx_index == len(data) - 1:
|
||||
tx_done = True
|
||||
|
||||
else:
|
||||
tx_index += 1
|
||||
|
||||
# Randomly set source.ready if irritator is enabled
|
||||
ready = random.randint(0, 1) if rx_irritate else 1
|
||||
ctx.set(dpmc.source.ready, ready)
|
||||
|
||||
# Pull output data from buffer
|
||||
if ctx.get(dpmc.source.valid) and ctx.get(dpmc.source.ready):
|
||||
rx_buf += [ctx.get(dpmc.source.data)]
|
||||
|
||||
if ctx.get(dpmc.source.last):
|
||||
rx_done = True
|
||||
|
||||
await ctx.tick()
|
||||
|
||||
await ctx.tick().repeat(5)
|
||||
return rx_buf
|
||||
Loading…
Reference in New Issue