manta/test/test_cobs_encode.py

107 lines
3.3 KiB
Python
Raw Normal View History

import random
from cobs import cobs
from manta.uart.cobs_encode import COBSEncode
from manta.utils import *
ce = COBSEncode()
@simulate(ce)
async def test_cobs_encode(ctx):
await ctx.tick().repeat(5)
ctx.set(ce.source.ready, 1)
# Test cases taken from Wikipedia:
# https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
await encode_and_compare(ctx, [0x00])
await encode_and_compare(ctx, [0x00, 0x00])
await encode_and_compare(ctx, [0x00, 0x11, 0x00])
await encode_and_compare(ctx, [0x11, 0x22, 0x00, 0x33])
await encode_and_compare(ctx, [0x11, 0x22, 0x33, 0x44])
await encode_and_compare(ctx, [0x11, 0x00, 0x00, 0x00])
await encode_and_compare(ctx, [i for i in range(1, 255)])
await encode_and_compare(ctx, [0x00] + [i for i in range(1, 255)])
await encode_and_compare(ctx, [i for i in range(256)])
await encode_and_compare(ctx, [i for i in range(2, 256)] + [0x00])
await encode_and_compare(ctx, [i for i in range(3, 256)] + [0x00, 0x01])
# Selected edge and corner cases
await encode_and_compare(ctx, [0x00] * 253)
await encode_and_compare(ctx, [0x00] * 254)
await encode_and_compare(ctx, [0x00] * 255)
await encode_and_compare(ctx, ([0x11] * 253) + [0])
await encode_and_compare(ctx, ([0x11] * 253) + [0] + ([0x11] * 5))
await encode_and_compare(ctx, ([0x11] * 254) + [0])
await encode_and_compare(ctx, ([0x11] * 255) + [0])
@simulate(ce)
async def test_cobs_encode_random(ctx):
ctx.set(ce.source.ready, 1)
num_tests = random.randint(1, 5)
for _ in range(num_tests):
length = random.randint(1, 2000)
input_data = [random.randint(0, 255) for _ in range(length)]
await encode_and_compare(ctx, input_data)
async def encode(ctx, data):
tx_done = False
tx_index = 0
rx_done = False
rx_buf = []
while not (tx_done and rx_done):
# Feed data to encoder
tx_stall = random.randint(0, 1)
if not tx_done and not tx_stall:
ctx.set(ce.sink.valid, 1)
ctx.set(ce.sink.data, data[tx_index])
ctx.set(ce.sink.last, tx_index == len(data) - 1)
if ctx.get(ce.sink.valid) and ctx.get(ce.sink.ready):
if tx_index == len(data) - 1:
tx_done = True
else:
tx_index += 1
else:
ctx.set(ce.sink.data, 0)
ctx.set(ce.sink.valid, 0)
ctx.set(ce.sink.last, 0)
# Randomly set source.ready
ctx.set(ce.source.ready, random.randint(0, 1))
# Pull output data from buffer
if ctx.get(ce.source.valid) and ctx.get(ce.source.ready):
rx_buf += [ctx.get(ce.source.data)]
if ctx.get(ce.source.last):
rx_done = True
await ctx.tick()
await ctx.tick().repeat(5)
return rx_buf
async def encode_and_compare(ctx, data, only_print_on_fail=True):
expected = cobs.encode(bytes(data)) + b"\0"
actual = await encode(ctx, data)
matched = bytes(actual) == expected
if (not only_print_on_fail) or (only_print_on_fail and not matched):
print(f" input: {[hex(d) for d in data]}")
print(f"expected: {[hex(d) for d in expected]}")
print(f" actual: {[hex(d) for d in actual]}")
print(f" result: {'PASS' if matched else 'FAIL'}")
print("")
return matched