From f47a4156923a01bcc5e69c9407b1df9af84754f0 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Thu, 5 Mar 2026 10:04:56 -0700 Subject: [PATCH] uart: initial commit of updated COBS decoder --- src/manta/uart/cobs_decode.py | 112 +++++++++++++++++++++++----------- test/test_cobs_decode.py | 105 +++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+), 35 deletions(-) create mode 100644 test/test_cobs_decode.py diff --git a/src/manta/uart/cobs_decode.py b/src/manta/uart/cobs_decode.py index fffdb47..f7268c0 100644 --- a/src/manta/uart/cobs_decode.py +++ b/src/manta/uart/cobs_decode.py @@ -1,54 +1,96 @@ from amaranth import * from amaranth.lib import wiring +from amaranth.lib.fifo import SyncFIFOBuffered from amaranth.lib.wiring import In, Out from manta.utils import * class COBSDecode(wiring.Component): - sink: In(StreamSignature(8, has_last=False, has_ready=False)) + sink: In(StreamSignature(8, has_last=False)) source: Out(StreamSignature(8)) def elaborate(self, platform): m = Module() + m.submodules.fifo = fifo = SyncFIFOBuffered(width=8, depth=3) - counter = Signal(8) + count = Signal(range(256)) + fsm_inject_zero = Signal() + skip_zero_injection = Signal() - m.d.sync += self.source.data.eq(0) - m.d.sync += self.source.valid.eq(0) - m.d.sync += self.source.last.eq(0) + with m.FSM() as fsm: + # m.d.comb += fsm_inject_zero.eq(0) - # State Machine: - with m.FSM(): - # TODO: determine if wait for packet logic should stay - # with m.State("WAIT_FOR_PACKET_START"): - # with m.If((self.sink.data == 0) & (self.sink.valid)): - # m.next = "START_OF_PACKET" - - with m.State("START_OF_PACKET"): - with m.If(self.sink.valid): - m.d.sync += counter.eq(self.sink.data - 1) - m.next = "DECODING" - - # with m.Else(): - # m.next = "START_OF_PACKET" - - with m.State("DECODING"): - with m.If(self.sink.valid): - with m.If(counter > 0): - m.d.sync += counter.eq(counter - 1) - m.d.sync += self.source.data.eq(self.sink.data) - m.d.sync += self.source.valid.eq(1) - m.next = "DECODING" - - with m.Else(): - with m.If(self.sink.data == 0): - m.d.sync += self.source.last.eq(1) - m.next = "START_OF_PACKET" + with m.State("IDLE"): + with m.If(self.sink.ready & self.sink.valid): + # Re-sync to start of packet + with m.If(self.sink.data != 0): + with m.If(self.sink.data == 1): + m.next = "END_OF_GROUP" with m.Else(): - m.d.sync += counter.eq(self.sink.data - 1) - m.d.sync += self.source.valid.eq(1) - m.next = "DECODING" + m.next = "STREAM" + m.d.sync += count.eq(self.sink.data - 2) + + m.d.sync += skip_zero_injection.eq(self.sink.data == 255) + + with m.State("STREAM"): + with m.If(self.sink.ready & self.sink.valid): + with m.If(count > 0): + m.d.sync += count.eq(count - 1) + + with m.Else(): + m.next = "END_OF_GROUP" + + with m.State("END_OF_GROUP"): + with m.If(self.sink.ready & self.sink.valid): + with m.If(self.sink.data == 0): + m.next = "IDLE" + + with m.Elif(self.sink.data == 1): + # m.d.comb += fsm_inject_zero.eq(~skip_zero_injection) + m.next = "END_OF_GROUP" + + with m.Else(): + # m.d.comb += fsm_inject_zero.eq(~skip_zero_injection) + m.next = "STREAM" + m.d.sync += count.eq(self.sink.data - 2) + + m.d.sync += skip_zero_injection.eq(self.sink.data == 255) + + # an attempt to fix the combo glitch on fsm_inject_zero + m.d.comb += fsm_inject_zero.eq( + fsm.ongoing("END_OF_GROUP") + & self.sink.ready + & self.sink.valid + & (self.sink.data != 0) + & (~skip_zero_injection) + ) + + m.d.comb += [ + self.source.data.eq(fifo.r_data), + self.source.valid.eq(fifo.r_rdy), + self.source.last.eq(fsm.ongoing("IDLE") & self.source.valid & (fifo.r_level == 1)), + fifo.r_en.eq(self.source.valid & self.source.ready), + ] + + with m.If(fsm.ongoing("STREAM")): + m.d.comb += [ + fifo.w_en.eq(self.sink.valid & self.sink.ready), + fifo.w_data.eq(self.sink.data), + self.sink.ready.eq(fifo.w_rdy), + ] + + with m.Else(): + m.d.comb += [ + fifo.w_en.eq(fsm_inject_zero), + fifo.w_data.eq(0), + self.sink.ready.eq( + ~( + (fsm.ongoing("IDLE") & fifo.r_rdy) + | (fsm.ongoing("END_OF_GROUP") & ~fifo.w_rdy) + ) + ), + ] return m diff --git a/test/test_cobs_decode.py b/test/test_cobs_decode.py new file mode 100644 index 0000000..815d44b --- /dev/null +++ b/test/test_cobs_decode.py @@ -0,0 +1,105 @@ +import random + +from cobs import cobs + +from manta.uart.cobs_decode import COBSDecode +from manta.utils import * + +cd = COBSDecode() + + +@simulate(cd) +async def test_cobs_decode_static(ctx): + testcases = [ + # Test cases taken from Wikipedia: + # https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing + [0x00], + [0x00, 0x00], + [0x00, 0x11, 0x00], + [0x11, 0x22, 0x00, 0x33], + [0x11, 0x22, 0x33, 0x44], + [0x11, 0x22, 0x33, 0x44, 0x00, 0x55, 0x66], + [0x11, 0x00, 0x00, 0x00], + [i for i in range(1, 255)], + [0x00] + [i for i in range(1, 255)], + [i for i in range(256)], + [i for i in range(2, 256)] + [0x00], + [i for i in range(3, 256)] + [0x00, 0x01], + # # Selected edge and corner cases: + [0x00] * 253, + [0x00] * 254, + [0x00] * 255, + ([0x11] * 253) + [0], + ([0x11] * 253) + [0] + ([0x11] * 5), + ([0x11] * 254) + [0], + ([0x11] * 255) + [0], + ] + + for data in testcases: + await decode_and_compare(ctx, data, tx_irritate=False, rx_irritate=False) + + +async def decode(ctx, data, tx_irritate, rx_irritate): + await ctx.tick().repeat(5) + + tx_done = False + tx_index = 0 + rx_done = False + rx_buf = [] + + ctx.set(cd.source.ready, 1) + await ctx.tick() + + while not (tx_done and rx_done): + # Feed data to decoder + tx_stall = random.randint(0, 1) if tx_irritate else False + + if tx_done: + ctx.set(cd.sink.data, 0) + ctx.set(cd.sink.valid, 0) + + elif not tx_stall: + ctx.set(cd.sink.valid, 1) + ctx.set(cd.sink.data, data[tx_index]) + + if ctx.get(cd.sink.valid) and ctx.get(cd.sink.ready): + if tx_index == len(data) - 1: + tx_done = True + + else: + tx_index += 1 + + # Randomly set source.ready if irritator is enabled + ready = random.randint(0, 1) if rx_irritate else 1 + ctx.set(cd.source.ready, ready) + + # Pull output data from buffer + if ctx.get(cd.source.valid) and ctx.get(cd.source.ready): + rx_buf += [ctx.get(cd.source.data)] + + if ctx.get(cd.source.last): + rx_done = True + + await ctx.tick() + + await ctx.tick().repeat(5) + return rx_buf + + +async def decode_and_compare(ctx, data, tx_irritate, rx_irritate): + encoded = cobs.encode(bytes(data)) + b"\0" + + # print([hex(a) for a in encoded]) + + actual = await decode(ctx, encoded, tx_irritate, rx_irritate) + matched = actual == data + + hex_print = lambda data: " ".join([f"{d:02x}" for d in data]) + + if not matched: + raise ValueError( + "COBS decoder output does not match expected data!\n" + f" input: {hex_print(encoded)}\n" + f"expected: {hex_print(data)}\n" + f" actual: {hex_print(actual)}\n" + )