From 33cc96f198d3ccba76b7ce9f3bb42dba9e86343c Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Wed, 25 Feb 2026 10:03:02 -0700 Subject: [PATCH] uart: tidy COBS encoder tests --- test/test_cobs_encode.py | 88 ++++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/test/test_cobs_encode.py b/test/test_cobs_encode.py index c5c0bff..1dd5881 100644 --- a/test/test_cobs_encode.py +++ b/test/test_cobs_encode.py @@ -9,47 +9,47 @@ ce = COBSEncode() @simulate(ce) -async def test_cobs_encode(ctx): - await ctx.tick().repeat(5) +async def test_cobs_encode_static(ctx): + testcases = [ + # Test cases taken from Wikipedia: + # https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing + [0x00], + [0x00, 0x00], + [0x00, 0x11, 0x00], + [0x11, 0x22, 0x00, 0x33], + [0x11, 0x22, 0x33, 0x44], + [0x11, 0x00, 0x00, 0x00], + [i for i in range(1, 255)], + [0x00] + [i for i in range(1, 255)], + [i for i in range(256)], + [i for i in range(2, 256)] + [0x00], + [i for i in range(3, 256)] + [0x00, 0x01], + # Selected edge and corner cases: + [0x00] * 253, + [0x00] * 254, + [0x00] * 255, + ([0x11] * 253) + [0], + ([0x11] * 253) + [0] + ([0x11] * 5), + ([0x11] * 254) + [0], + ([0x11] * 255) + [0], + ] - ctx.set(ce.source.ready, 1) - - # Test cases taken from Wikipedia: - # https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing - await encode_and_compare(ctx, [0x00]) - await encode_and_compare(ctx, [0x00, 0x00]) - await encode_and_compare(ctx, [0x00, 0x11, 0x00]) - await encode_and_compare(ctx, [0x11, 0x22, 0x00, 0x33]) - await encode_and_compare(ctx, [0x11, 0x22, 0x33, 0x44]) - await encode_and_compare(ctx, [0x11, 0x00, 0x00, 0x00]) - await encode_and_compare(ctx, [i for i in range(1, 255)]) - await encode_and_compare(ctx, [0x00] + [i for i in range(1, 255)]) - await encode_and_compare(ctx, [i for i in range(256)]) - await encode_and_compare(ctx, [i for i in range(2, 256)] + [0x00]) - await encode_and_compare(ctx, [i for i in range(3, 256)] + [0x00, 0x01]) - - # Selected edge and corner cases - await encode_and_compare(ctx, [0x00] * 253) - await encode_and_compare(ctx, [0x00] * 254) - await encode_and_compare(ctx, [0x00] * 255) - await encode_and_compare(ctx, ([0x11] * 253) + [0]) - await encode_and_compare(ctx, ([0x11] * 253) + [0] + ([0x11] * 5)) - await encode_and_compare(ctx, ([0x11] * 254) + [0]) - await encode_and_compare(ctx, ([0x11] * 255) + [0]) + for data in testcases: + await encode_and_compare(ctx, data, tx_irritate=True, rx_irritate=True) @simulate(ce) async def test_cobs_encode_random(ctx): - ctx.set(ce.source.ready, 1) - num_tests = random.randint(1, 5) for _ in range(num_tests): length = random.randint(1, 2000) - input_data = [random.randint(0, 255) for _ in range(length)] - await encode_and_compare(ctx, input_data) + data = [random.randint(0, 255) for _ in range(length)] + await encode_and_compare(ctx, data, tx_irritate=True, rx_irritate=True) -async def encode(ctx, data): +async def encode(ctx, data, tx_irritate, rx_irritate): + await ctx.tick().repeat(5) + tx_done = False tx_index = 0 rx_done = False @@ -57,7 +57,7 @@ async def encode(ctx, data): while not (tx_done and rx_done): # Feed data to encoder - tx_stall = random.randint(0, 1) + tx_stall = random.randint(0, 1) if tx_irritate else False if not tx_done and not tx_stall: ctx.set(ce.sink.valid, 1) @@ -75,8 +75,9 @@ async def encode(ctx, data): ctx.set(ce.sink.valid, 0) ctx.set(ce.sink.last, 0) - # Randomly set source.ready - ctx.set(ce.source.ready, random.randint(0, 1)) + # Randomly set source.ready if irritator is enabled + ready = random.randint(0, 1) if rx_irritate else 1 + ctx.set(ce.source.ready, ready) # Pull output data from buffer if ctx.get(ce.source.valid) and ctx.get(ce.source.ready): @@ -91,16 +92,17 @@ async def encode(ctx, data): return rx_buf -async def encode_and_compare(ctx, data, only_print_on_fail=True): +async def encode_and_compare(ctx, data, tx_irritate, rx_irritate): expected = cobs.encode(bytes(data)) + b"\0" - actual = await encode(ctx, data) + actual = await encode(ctx, data, tx_irritate, rx_irritate) matched = bytes(actual) == expected - if (not only_print_on_fail) or (only_print_on_fail and not matched): - print(f" input: {[hex(d) for d in data]}") - print(f"expected: {[hex(d) for d in expected]}") - print(f" actual: {[hex(d) for d in actual]}") - print(f" result: {'PASS' if matched else 'FAIL'}") - print("") + hex_print = lambda data: " ".join([f"{d:02x}" for d in data]) - return matched + if not matched: + raise ValueError( + "COBS encoder output does not match expected data!\n" + f" input: {hex_print(data)}\n" + f"expected: {hex_print(expected)}\n" + f" actual: {hex_print(actual)}\n" + )