From c47b0df07bed0697fbf274c3fe1234d4419a6097 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Wed, 11 Feb 2026 13:54:05 -0700 Subject: [PATCH] uart: update top-level wiring in UARTInterface --- src/manta/ethernet/bridge.py | 2 + src/manta/uart/__init__.py | 45 +++++++++++---------- test/test_uart_bridge_sim.py | 78 +++++++++--------------------------- 3 files changed, 44 insertions(+), 81 deletions(-) diff --git a/src/manta/ethernet/bridge.py b/src/manta/ethernet/bridge.py index 6922e5a..2e3d8b8 100644 --- a/src/manta/ethernet/bridge.py +++ b/src/manta/ethernet/bridge.py @@ -11,6 +11,8 @@ class EthernetBridge(wiring.Component): def __init__(self): super().__init__() + + # TODO: use In() and Out() for InternalBus connections self.bus_o = Signal(InternalBus()) self.bus_i = Signal(InternalBus()) diff --git a/src/manta/uart/__init__.py b/src/manta/uart/__init__.py index 7537fde..1b7e705 100644 --- a/src/manta/uart/__init__.py +++ b/src/manta/uart/__init__.py @@ -1,9 +1,12 @@ from amaranth import * from serial import Serial -from manta.uart.receive_bridge import ReceiveBridge +from manta.ethernet.bridge import EthernetBridge +from manta.uart.cobs_decode import COBSDecode +from manta.uart.cobs_encode import COBSEncode from manta.uart.receiver import UARTReceiver -from manta.uart.transmit_bridge import TransmitBridge +from manta.uart.stream_packer import StreamPacker +from manta.uart.stream_unpacker import StreamUnpacker from manta.uart.transmitter import UARTTransmitter from manta.utils import * @@ -324,26 +327,24 @@ class UARTInterface(Elaboratable): m = Module() m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud) - m.submodules.bridge_rx = bridge_rx = ReceiveBridge() - m.submodules.bridge_tx = bridge_tx = TransmitBridge() + m.submodules.cobs_decode = cobs_decode = COBSDecode() + m.submodules.stream_packer = stream_packer = StreamPacker() + m.submodules.bridge = bridge = EthernetBridge() + m.submodules.stream_unpacker = stream_unpacker = StreamUnpacker() + m.submodules.cobs_encode = cobs_encode = COBSEncode() m.submodules.uart_tx = uart_tx = UARTTransmitter(self._clocks_per_baud) - m.d.comb += [ - # UART RX -> Internal Bus - uart_rx.rx.eq(self.rx), - bridge_rx.data_i.eq(uart_rx.data_o), - bridge_rx.valid_i.eq(uart_rx.valid_o), - self.bus_o.data.eq(bridge_rx.data_o), - self.bus_o.addr.eq(bridge_rx.addr_o), - self.bus_o.rw.eq(bridge_rx.rw_o), - self.bus_o.valid.eq(bridge_rx.valid_o), - # Internal Bus -> UART TX - bridge_tx.data_i.eq(self.bus_i.data), - bridge_tx.rw_i.eq(self.bus_i.rw), - bridge_tx.valid_i.eq(self.bus_i.valid), - uart_tx.data_i.eq(bridge_tx.data_o), - uart_tx.start_i.eq(bridge_tx.start_o), - bridge_tx.done_i.eq(uart_tx.done_o), - self.tx.eq(uart_tx.tx), - ] + m.d.comb += uart_rx.rx.eq(self.rx) + wiring.connect(m, uart_rx.source, cobs_decode.sink) + wiring.connect(m, cobs_decode.source, stream_packer.sink) + wiring.connect(m, stream_packer.source, bridge.sink) + wiring.connect(m, bridge.source, stream_unpacker.sink) + wiring.connect(m, stream_unpacker.source, cobs_encode.sink) + wiring.connect(m, cobs_encode.source, uart_tx.sink) + m.d.comb += self.tx.eq(uart_tx.tx) + + # TODO: replace these with wiring.Connect + m.d.comb += self.bus_o.eq(bridge.bus_o) + m.d.comb += bridge.bus_i.eq(self.bus_i) + return m diff --git a/test/test_uart_bridge_sim.py b/test/test_uart_bridge_sim.py index 5a4105b..2908724 100644 --- a/test/test_uart_bridge_sim.py +++ b/test/test_uart_bridge_sim.py @@ -1,76 +1,36 @@ from amaranth import * +from amaranth.lib import wiring +from amaranth.lib.wiring import In, Out from cobs import cobs from manta import * -from manta.ethernet.bridge import EthernetBridge -from manta.uart.cobs_decode import COBSDecode -from manta.uart.cobs_encode import COBSEncode -from manta.uart.receiver import UARTReceiver -from manta.uart.stream_packer import StreamPacker -from manta.uart.stream_unpacker import StreamUnpacker -from manta.uart.transmitter import UARTTransmitter +from manta.uart import UARTInterface from manta.utils import * -# uart_rx -> COBS decode -> pack_stream -> bridge -> unpack_stream -> COBS encode -> uart_tx - -class UARTHardware(Elaboratable): - def __init__(self): - self.rx = Signal() - self.tx = Signal() - - self.bus_o = Signal(InternalBus()) - self.bus_i = Signal(InternalBus()) - self._clocks_per_baud = 10 +class UARTHardwarePlusMemoryCore(wiring.Component): + rx: In(1) + tx: Out(1) def elaborate(self, platform): m = Module() - - m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud) - m.submodules.cobs_decode = cobs_decode = COBSDecode() - m.submodules.stream_packer = stream_packer = StreamPacker() - m.submodules.bridge = bridge = EthernetBridge() - m.submodules.stream_unpacker = stream_unpacker = StreamUnpacker() - m.submodules.cobs_encode = cobs_encode = COBSEncode() - m.submodules.uart_tx = uart_tx = UARTTransmitter(self._clocks_per_baud) - - wiring.connect(m, uart_rx.source, cobs_decode.sink) - wiring.connect(m, cobs_decode.source, stream_packer.sink) - wiring.connect(m, stream_packer.source, bridge.sink) - wiring.connect(m, bridge.source, stream_unpacker.sink) - wiring.connect(m, stream_unpacker.source, cobs_encode.sink) - wiring.connect(m, cobs_encode.source, uart_tx.sink) - - m.d.comb += [ - uart_rx.rx.eq(self.rx), - self.tx.eq(uart_tx.tx), - self.bus_o.eq(bridge.bus_o), - bridge.bus_i.eq(self.bus_i), - ] - - return m - - -class UARTHardwarePlusMemoryCore(Elaboratable): - def __init__(self): - self.rx = Signal() - self.tx = Signal() - - self._clocks_per_baud = 10 - - def elaborate(self, platform): - m = Module() - m.submodules.uart = uart = UARTHardware() + m.submodules.uart = uart = UARTInterface( + port="None", + baudrate=1e6, + clock_freq=10e6, + ) m.submodules.mem_core = mem_core = MemoryCore("bidirectional", 32, 1024) mem_core.base_addr = 0 + # Expose the UARTInterface externally, so tests can grab + # the _clocks_per_baud attribute + self.uart = uart + m.d.comb += uart.bus_i.eq(mem_core.bus_o) m.d.comb += mem_core.bus_i.eq(uart.bus_o) - m.d.comb += [ - self.tx.eq(uart.tx), - uart.rx.eq(self.rx), - ] + m.d.comb += self.tx.eq(uart.tx) + m.d.comb += uart.rx.eq(self.rx) return m @@ -83,8 +43,8 @@ async def send_byte(ctx, module, data): data_bits = "0" + f"{data:08b}"[::-1] + "1" data_bits = [int(bit) for bit in data_bits] - for i in range(10 * uart_hw._clocks_per_baud): - bit_index = i // uart_hw._clocks_per_baud + for i in range(10 * uart_hw.uart._clocks_per_baud): + bit_index = i // uart_hw.uart._clocks_per_baud ctx.set(module.rx, data_bits[bit_index]) await ctx.tick()