manta/test/test_uart_bridge_sim.py

112 lines
3.4 KiB
Python

from amaranth import *
from cobs import cobs
from manta import *
from manta.ethernet.bridge import EthernetBridge
from manta.uart.cobs_decode import COBSDecode
from manta.uart.cobs_encode import COBSEncode
from manta.uart.receiver import UARTReceiver
from manta.uart.stream_packer import StreamPacker
from manta.uart.stream_unpacker import StreamUnpacker
from manta.uart.transmitter import UARTTransmitter
from manta.utils import *
# uart_rx -> COBS decode -> pack_stream -> bridge -> unpack_stream -> COBS encode -> uart_tx
class UARTHardware(Elaboratable):
def __init__(self):
self.rx = Signal()
self.tx = Signal()
self.bus_o = Signal(InternalBus())
self.bus_i = Signal(InternalBus())
self._clocks_per_baud = 10
def elaborate(self, platform):
m = Module()
m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud)
m.submodules.cobs_decode = cobs_decode = COBSDecode()
m.submodules.stream_packer = stream_packer = StreamPacker()
m.submodules.bridge = bridge = EthernetBridge()
m.submodules.stream_unpacker = stream_unpacker = StreamUnpacker()
m.submodules.cobs_encode = cobs_encode = COBSEncode()
m.submodules.uart_tx = uart_tx = UARTTransmitter(self._clocks_per_baud)
wiring.connect(m, uart_rx.source, cobs_decode.sink)
wiring.connect(m, cobs_decode.source, stream_packer.sink)
wiring.connect(m, stream_packer.source, bridge.sink)
wiring.connect(m, bridge.source, stream_unpacker.sink)
wiring.connect(m, stream_unpacker.source, cobs_encode.sink)
wiring.connect(m, cobs_encode.source, uart_tx.sink)
m.d.comb += [
uart_rx.rx.eq(self.rx),
self.tx.eq(uart_tx.tx),
self.bus_o.eq(bridge.bus_o),
bridge.bus_i.eq(self.bus_i),
]
return m
class UARTHardwarePlusMemoryCore(Elaboratable):
def __init__(self):
self.rx = Signal()
self.tx = Signal()
self._clocks_per_baud = 10
def elaborate(self, platform):
m = Module()
m.submodules.uart = uart = UARTHardware()
m.submodules.mem_core = mem_core = MemoryCore("bidirectional", 32, 1024)
mem_core.base_addr = 0
m.d.comb += uart.bus_i.eq(mem_core.bus_o)
m.d.comb += mem_core.bus_i.eq(uart.bus_o)
m.d.comb += [
self.tx.eq(uart.tx),
uart.rx.eq(self.rx),
]
return m
uart_hw = UARTHardwarePlusMemoryCore()
async def send_byte(ctx, module, data):
# 8N1 serial, LSB sent first
data_bits = "0" + f"{data:08b}"[::-1] + "1"
data_bits = [int(bit) for bit in data_bits]
for i in range(10 * uart_hw._clocks_per_baud):
bit_index = i // uart_hw._clocks_per_baud
ctx.set(module.rx, data_bits[bit_index])
await ctx.tick()
@simulate(uart_hw)
async def test_read_request(ctx):
addr = 0x5678_9ABC
header = EthernetMessageHeader.from_params(
MessageTypes.READ_REQUEST, seq_num=0x0, length=1
)
request = bytestring_from_ints([header.as_bits(), addr], byteorder="little")
encoded = cobs.encode(request)
encoded = encoded + int(0).to_bytes(1)
ctx.set(uart_hw.rx, 1)
await ctx.tick()
await ctx.tick()
await ctx.tick()
for byte in encoded:
await send_byte(ctx, uart_hw, int(byte))
print(hex(int(byte)))
await ctx.tick()