uart: initial commit of updated COBS decoder
This commit is contained in:
parent
7e7b97a57f
commit
f47a415692
|
|
@ -1,54 +1,96 @@
|
|||
from amaranth import *
|
||||
from amaranth.lib import wiring
|
||||
from amaranth.lib.fifo import SyncFIFOBuffered
|
||||
from amaranth.lib.wiring import In, Out
|
||||
|
||||
from manta.utils import *
|
||||
|
||||
|
||||
class COBSDecode(wiring.Component):
|
||||
sink: In(StreamSignature(8, has_last=False, has_ready=False))
|
||||
sink: In(StreamSignature(8, has_last=False))
|
||||
source: Out(StreamSignature(8))
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
m.submodules.fifo = fifo = SyncFIFOBuffered(width=8, depth=3)
|
||||
|
||||
counter = Signal(8)
|
||||
count = Signal(range(256))
|
||||
fsm_inject_zero = Signal()
|
||||
skip_zero_injection = Signal()
|
||||
|
||||
m.d.sync += self.source.data.eq(0)
|
||||
m.d.sync += self.source.valid.eq(0)
|
||||
m.d.sync += self.source.last.eq(0)
|
||||
with m.FSM() as fsm:
|
||||
# m.d.comb += fsm_inject_zero.eq(0)
|
||||
|
||||
# State Machine:
|
||||
with m.FSM():
|
||||
# TODO: determine if wait for packet logic should stay
|
||||
# with m.State("WAIT_FOR_PACKET_START"):
|
||||
# with m.If((self.sink.data == 0) & (self.sink.valid)):
|
||||
# m.next = "START_OF_PACKET"
|
||||
|
||||
with m.State("START_OF_PACKET"):
|
||||
with m.If(self.sink.valid):
|
||||
m.d.sync += counter.eq(self.sink.data - 1)
|
||||
m.next = "DECODING"
|
||||
|
||||
# with m.Else():
|
||||
# m.next = "START_OF_PACKET"
|
||||
|
||||
with m.State("DECODING"):
|
||||
with m.If(self.sink.valid):
|
||||
with m.If(counter > 0):
|
||||
m.d.sync += counter.eq(counter - 1)
|
||||
m.d.sync += self.source.data.eq(self.sink.data)
|
||||
m.d.sync += self.source.valid.eq(1)
|
||||
m.next = "DECODING"
|
||||
|
||||
with m.Else():
|
||||
with m.If(self.sink.data == 0):
|
||||
m.d.sync += self.source.last.eq(1)
|
||||
m.next = "START_OF_PACKET"
|
||||
with m.State("IDLE"):
|
||||
with m.If(self.sink.ready & self.sink.valid):
|
||||
# Re-sync to start of packet
|
||||
with m.If(self.sink.data != 0):
|
||||
with m.If(self.sink.data == 1):
|
||||
m.next = "END_OF_GROUP"
|
||||
|
||||
with m.Else():
|
||||
m.d.sync += counter.eq(self.sink.data - 1)
|
||||
m.d.sync += self.source.valid.eq(1)
|
||||
m.next = "DECODING"
|
||||
m.next = "STREAM"
|
||||
m.d.sync += count.eq(self.sink.data - 2)
|
||||
|
||||
m.d.sync += skip_zero_injection.eq(self.sink.data == 255)
|
||||
|
||||
with m.State("STREAM"):
|
||||
with m.If(self.sink.ready & self.sink.valid):
|
||||
with m.If(count > 0):
|
||||
m.d.sync += count.eq(count - 1)
|
||||
|
||||
with m.Else():
|
||||
m.next = "END_OF_GROUP"
|
||||
|
||||
with m.State("END_OF_GROUP"):
|
||||
with m.If(self.sink.ready & self.sink.valid):
|
||||
with m.If(self.sink.data == 0):
|
||||
m.next = "IDLE"
|
||||
|
||||
with m.Elif(self.sink.data == 1):
|
||||
# m.d.comb += fsm_inject_zero.eq(~skip_zero_injection)
|
||||
m.next = "END_OF_GROUP"
|
||||
|
||||
with m.Else():
|
||||
# m.d.comb += fsm_inject_zero.eq(~skip_zero_injection)
|
||||
m.next = "STREAM"
|
||||
m.d.sync += count.eq(self.sink.data - 2)
|
||||
|
||||
m.d.sync += skip_zero_injection.eq(self.sink.data == 255)
|
||||
|
||||
# an attempt to fix the combo glitch on fsm_inject_zero
|
||||
m.d.comb += fsm_inject_zero.eq(
|
||||
fsm.ongoing("END_OF_GROUP")
|
||||
& self.sink.ready
|
||||
& self.sink.valid
|
||||
& (self.sink.data != 0)
|
||||
& (~skip_zero_injection)
|
||||
)
|
||||
|
||||
m.d.comb += [
|
||||
self.source.data.eq(fifo.r_data),
|
||||
self.source.valid.eq(fifo.r_rdy),
|
||||
self.source.last.eq(fsm.ongoing("IDLE") & self.source.valid & (fifo.r_level == 1)),
|
||||
fifo.r_en.eq(self.source.valid & self.source.ready),
|
||||
]
|
||||
|
||||
with m.If(fsm.ongoing("STREAM")):
|
||||
m.d.comb += [
|
||||
fifo.w_en.eq(self.sink.valid & self.sink.ready),
|
||||
fifo.w_data.eq(self.sink.data),
|
||||
self.sink.ready.eq(fifo.w_rdy),
|
||||
]
|
||||
|
||||
with m.Else():
|
||||
m.d.comb += [
|
||||
fifo.w_en.eq(fsm_inject_zero),
|
||||
fifo.w_data.eq(0),
|
||||
self.sink.ready.eq(
|
||||
~(
|
||||
(fsm.ongoing("IDLE") & fifo.r_rdy)
|
||||
| (fsm.ongoing("END_OF_GROUP") & ~fifo.w_rdy)
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
return m
|
||||
|
|
|
|||
|
|
@ -0,0 +1,105 @@
|
|||
import random
|
||||
|
||||
from cobs import cobs
|
||||
|
||||
from manta.uart.cobs_decode import COBSDecode
|
||||
from manta.utils import *
|
||||
|
||||
cd = COBSDecode()
|
||||
|
||||
|
||||
@simulate(cd)
|
||||
async def test_cobs_decode_static(ctx):
|
||||
testcases = [
|
||||
# Test cases taken from Wikipedia:
|
||||
# https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
|
||||
[0x00],
|
||||
[0x00, 0x00],
|
||||
[0x00, 0x11, 0x00],
|
||||
[0x11, 0x22, 0x00, 0x33],
|
||||
[0x11, 0x22, 0x33, 0x44],
|
||||
[0x11, 0x22, 0x33, 0x44, 0x00, 0x55, 0x66],
|
||||
[0x11, 0x00, 0x00, 0x00],
|
||||
[i for i in range(1, 255)],
|
||||
[0x00] + [i for i in range(1, 255)],
|
||||
[i for i in range(256)],
|
||||
[i for i in range(2, 256)] + [0x00],
|
||||
[i for i in range(3, 256)] + [0x00, 0x01],
|
||||
# # Selected edge and corner cases:
|
||||
[0x00] * 253,
|
||||
[0x00] * 254,
|
||||
[0x00] * 255,
|
||||
([0x11] * 253) + [0],
|
||||
([0x11] * 253) + [0] + ([0x11] * 5),
|
||||
([0x11] * 254) + [0],
|
||||
([0x11] * 255) + [0],
|
||||
]
|
||||
|
||||
for data in testcases:
|
||||
await decode_and_compare(ctx, data, tx_irritate=False, rx_irritate=False)
|
||||
|
||||
|
||||
async def decode(ctx, data, tx_irritate, rx_irritate):
|
||||
await ctx.tick().repeat(5)
|
||||
|
||||
tx_done = False
|
||||
tx_index = 0
|
||||
rx_done = False
|
||||
rx_buf = []
|
||||
|
||||
ctx.set(cd.source.ready, 1)
|
||||
await ctx.tick()
|
||||
|
||||
while not (tx_done and rx_done):
|
||||
# Feed data to decoder
|
||||
tx_stall = random.randint(0, 1) if tx_irritate else False
|
||||
|
||||
if tx_done:
|
||||
ctx.set(cd.sink.data, 0)
|
||||
ctx.set(cd.sink.valid, 0)
|
||||
|
||||
elif not tx_stall:
|
||||
ctx.set(cd.sink.valid, 1)
|
||||
ctx.set(cd.sink.data, data[tx_index])
|
||||
|
||||
if ctx.get(cd.sink.valid) and ctx.get(cd.sink.ready):
|
||||
if tx_index == len(data) - 1:
|
||||
tx_done = True
|
||||
|
||||
else:
|
||||
tx_index += 1
|
||||
|
||||
# Randomly set source.ready if irritator is enabled
|
||||
ready = random.randint(0, 1) if rx_irritate else 1
|
||||
ctx.set(cd.source.ready, ready)
|
||||
|
||||
# Pull output data from buffer
|
||||
if ctx.get(cd.source.valid) and ctx.get(cd.source.ready):
|
||||
rx_buf += [ctx.get(cd.source.data)]
|
||||
|
||||
if ctx.get(cd.source.last):
|
||||
rx_done = True
|
||||
|
||||
await ctx.tick()
|
||||
|
||||
await ctx.tick().repeat(5)
|
||||
return rx_buf
|
||||
|
||||
|
||||
async def decode_and_compare(ctx, data, tx_irritate, rx_irritate):
|
||||
encoded = cobs.encode(bytes(data)) + b"\0"
|
||||
|
||||
# print([hex(a) for a in encoded])
|
||||
|
||||
actual = await decode(ctx, encoded, tx_irritate, rx_irritate)
|
||||
matched = actual == data
|
||||
|
||||
hex_print = lambda data: " ".join([f"{d:02x}" for d in data])
|
||||
|
||||
if not matched:
|
||||
raise ValueError(
|
||||
"COBS decoder output does not match expected data!\n"
|
||||
f" input: {hex_print(encoded)}\n"
|
||||
f"expected: {hex_print(data)}\n"
|
||||
f" actual: {hex_print(actual)}\n"
|
||||
)
|
||||
Loading…
Reference in New Issue