uart: tidy COBS encoder tests
This commit is contained in:
parent
fbe0d9623c
commit
33cc96f198
|
|
@ -9,47 +9,47 @@ ce = COBSEncode()
|
|||
|
||||
|
||||
@simulate(ce)
|
||||
async def test_cobs_encode(ctx):
|
||||
await ctx.tick().repeat(5)
|
||||
async def test_cobs_encode_static(ctx):
|
||||
testcases = [
|
||||
# Test cases taken from Wikipedia:
|
||||
# https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
|
||||
[0x00],
|
||||
[0x00, 0x00],
|
||||
[0x00, 0x11, 0x00],
|
||||
[0x11, 0x22, 0x00, 0x33],
|
||||
[0x11, 0x22, 0x33, 0x44],
|
||||
[0x11, 0x00, 0x00, 0x00],
|
||||
[i for i in range(1, 255)],
|
||||
[0x00] + [i for i in range(1, 255)],
|
||||
[i for i in range(256)],
|
||||
[i for i in range(2, 256)] + [0x00],
|
||||
[i for i in range(3, 256)] + [0x00, 0x01],
|
||||
# Selected edge and corner cases:
|
||||
[0x00] * 253,
|
||||
[0x00] * 254,
|
||||
[0x00] * 255,
|
||||
([0x11] * 253) + [0],
|
||||
([0x11] * 253) + [0] + ([0x11] * 5),
|
||||
([0x11] * 254) + [0],
|
||||
([0x11] * 255) + [0],
|
||||
]
|
||||
|
||||
ctx.set(ce.source.ready, 1)
|
||||
|
||||
# Test cases taken from Wikipedia:
|
||||
# https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
|
||||
await encode_and_compare(ctx, [0x00])
|
||||
await encode_and_compare(ctx, [0x00, 0x00])
|
||||
await encode_and_compare(ctx, [0x00, 0x11, 0x00])
|
||||
await encode_and_compare(ctx, [0x11, 0x22, 0x00, 0x33])
|
||||
await encode_and_compare(ctx, [0x11, 0x22, 0x33, 0x44])
|
||||
await encode_and_compare(ctx, [0x11, 0x00, 0x00, 0x00])
|
||||
await encode_and_compare(ctx, [i for i in range(1, 255)])
|
||||
await encode_and_compare(ctx, [0x00] + [i for i in range(1, 255)])
|
||||
await encode_and_compare(ctx, [i for i in range(256)])
|
||||
await encode_and_compare(ctx, [i for i in range(2, 256)] + [0x00])
|
||||
await encode_and_compare(ctx, [i for i in range(3, 256)] + [0x00, 0x01])
|
||||
|
||||
# Selected edge and corner cases
|
||||
await encode_and_compare(ctx, [0x00] * 253)
|
||||
await encode_and_compare(ctx, [0x00] * 254)
|
||||
await encode_and_compare(ctx, [0x00] * 255)
|
||||
await encode_and_compare(ctx, ([0x11] * 253) + [0])
|
||||
await encode_and_compare(ctx, ([0x11] * 253) + [0] + ([0x11] * 5))
|
||||
await encode_and_compare(ctx, ([0x11] * 254) + [0])
|
||||
await encode_and_compare(ctx, ([0x11] * 255) + [0])
|
||||
for data in testcases:
|
||||
await encode_and_compare(ctx, data, tx_irritate=True, rx_irritate=True)
|
||||
|
||||
|
||||
@simulate(ce)
|
||||
async def test_cobs_encode_random(ctx):
|
||||
ctx.set(ce.source.ready, 1)
|
||||
|
||||
num_tests = random.randint(1, 5)
|
||||
for _ in range(num_tests):
|
||||
length = random.randint(1, 2000)
|
||||
input_data = [random.randint(0, 255) for _ in range(length)]
|
||||
await encode_and_compare(ctx, input_data)
|
||||
data = [random.randint(0, 255) for _ in range(length)]
|
||||
await encode_and_compare(ctx, data, tx_irritate=True, rx_irritate=True)
|
||||
|
||||
|
||||
async def encode(ctx, data):
|
||||
async def encode(ctx, data, tx_irritate, rx_irritate):
|
||||
await ctx.tick().repeat(5)
|
||||
|
||||
tx_done = False
|
||||
tx_index = 0
|
||||
rx_done = False
|
||||
|
|
@ -57,7 +57,7 @@ async def encode(ctx, data):
|
|||
|
||||
while not (tx_done and rx_done):
|
||||
# Feed data to encoder
|
||||
tx_stall = random.randint(0, 1)
|
||||
tx_stall = random.randint(0, 1) if tx_irritate else False
|
||||
|
||||
if not tx_done and not tx_stall:
|
||||
ctx.set(ce.sink.valid, 1)
|
||||
|
|
@ -75,8 +75,9 @@ async def encode(ctx, data):
|
|||
ctx.set(ce.sink.valid, 0)
|
||||
ctx.set(ce.sink.last, 0)
|
||||
|
||||
# Randomly set source.ready
|
||||
ctx.set(ce.source.ready, random.randint(0, 1))
|
||||
# Randomly set source.ready if irritator is enabled
|
||||
ready = random.randint(0, 1) if rx_irritate else 1
|
||||
ctx.set(ce.source.ready, ready)
|
||||
|
||||
# Pull output data from buffer
|
||||
if ctx.get(ce.source.valid) and ctx.get(ce.source.ready):
|
||||
|
|
@ -91,16 +92,17 @@ async def encode(ctx, data):
|
|||
return rx_buf
|
||||
|
||||
|
||||
async def encode_and_compare(ctx, data, only_print_on_fail=True):
|
||||
async def encode_and_compare(ctx, data, tx_irritate, rx_irritate):
|
||||
expected = cobs.encode(bytes(data)) + b"\0"
|
||||
actual = await encode(ctx, data)
|
||||
actual = await encode(ctx, data, tx_irritate, rx_irritate)
|
||||
matched = bytes(actual) == expected
|
||||
|
||||
if (not only_print_on_fail) or (only_print_on_fail and not matched):
|
||||
print(f" input: {[hex(d) for d in data]}")
|
||||
print(f"expected: {[hex(d) for d in expected]}")
|
||||
print(f" actual: {[hex(d) for d in actual]}")
|
||||
print(f" result: {'PASS' if matched else 'FAIL'}")
|
||||
print("")
|
||||
hex_print = lambda data: " ".join([f"{d:02x}" for d in data])
|
||||
|
||||
return matched
|
||||
if not matched:
|
||||
raise ValueError(
|
||||
"COBS encoder output does not match expected data!\n"
|
||||
f" input: {hex_print(data)}\n"
|
||||
f"expected: {hex_print(expected)}\n"
|
||||
f" actual: {hex_print(actual)}\n"
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue