uart: add random COBS decoder tests

This commit is contained in:
Fischer Moseley 2026-03-05 11:53:46 -07:00
parent 0e22a8d563
commit b1643b3075
2 changed files with 22 additions and 12 deletions

View File

@ -19,7 +19,7 @@ class COBSDecode(wiring.Component):
skip_zero_injection = Signal()
with m.FSM() as fsm:
# m.d.comb += fsm_inject_zero.eq(0)
m.d.comb += fsm_inject_zero.eq(0)
with m.State("IDLE"):
with m.If(self.sink.ready & self.sink.valid):
@ -48,25 +48,16 @@ class COBSDecode(wiring.Component):
m.next = "IDLE"
with m.Elif(self.sink.data == 1):
# m.d.comb += fsm_inject_zero.eq(~skip_zero_injection)
m.d.comb += fsm_inject_zero.eq(~skip_zero_injection)
m.next = "END_OF_GROUP"
with m.Else():
# m.d.comb += fsm_inject_zero.eq(~skip_zero_injection)
m.d.comb += fsm_inject_zero.eq(~skip_zero_injection)
m.next = "STREAM"
m.d.sync += count.eq(self.sink.data - 2)
m.d.sync += skip_zero_injection.eq(self.sink.data == 255)
# an attempt to fix the combo glitch on fsm_inject_zero
m.d.comb += fsm_inject_zero.eq(
fsm.ongoing("END_OF_GROUP")
& self.sink.ready
& self.sink.valid
& (self.sink.data != 0)
& (~skip_zero_injection)
)
m.d.comb += [
self.source.data.eq(fifo.r_data),
self.source.valid.eq(fifo.r_rdy & (fsm.ongoing("IDLE") | (fifo.r_level > 1))),

View File

@ -39,6 +39,25 @@ async def test_cobs_decode_static(ctx):
await decode_and_compare(ctx, data, tx_irritate=True, rx_irritate=True)
@simulate(cd)
async def test_cobs_decode_random(ctx):
for _ in range(10):
length = random.randint(1, 2000)
population = [i for i in range(256)]
no_preference = [1] * 256
prefer_zeros = [10] + [1] * 255
prefer_nonzeros = [1] + [10] * 255
data_no_pref = random.choices(population, weights=no_preference, k=length)
data_pref_zeros = random.choices(population, weights=prefer_zeros, k=length)
data_pref_nonzeros = random.choices(population, weights=prefer_nonzeros, k=length)
await decode_and_compare(ctx, data_no_pref, tx_irritate=True, rx_irritate=True)
await decode_and_compare(ctx, data_pref_zeros, tx_irritate=True, rx_irritate=True)
await decode_and_compare(ctx, data_pref_nonzeros, tx_irritate=True, rx_irritate=True)
async def decode(ctx, data, tx_irritate, rx_irritate):
await ctx.tick().repeat(5)