complete IO core refactor

This commit is contained in:
Fischer Moseley 2024-02-18 15:50:51 -08:00
parent 0c0f31be64
commit e2450ddbff
10 changed files with 179 additions and 161 deletions

View File

@ -36,8 +36,8 @@ class IOCore(Elaboratable):
@classmethod
def from_config(cls, config, base_addr, interface):
inputs = config.get("inputs", [])
outputs = config.get("outputs", [])
inputs = config.get("inputs", {})
outputs = config.get("outputs", {})
# Make sure IO core has at least one input or output
if not inputs and not outputs:
@ -178,7 +178,7 @@ class IOCore(Elaboratable):
Return the Amaranth signals that should be included as ports in the
top-level Manta module.
"""
return [self._inputs + self._outputs]
return self._inputs + self._outputs
def get_max_addr(self):
"""

View File

@ -165,7 +165,7 @@ class LogicAnalyzerCore(Elaboratable):
self.bus_o.eq(sample_mem.bus_o),
# Non-bus Connections
fsm.trigger.eq(trig_blk.trig),
sample_mem.user_addr.eq(fsm.r.write_pointer),
sample_mem.user_addr.eq(fsm.write_pointer),
sample_mem.user_we.eq(fsm.write_enable),
]
@ -211,14 +211,14 @@ class LogicAnalyzerCore(Elaboratable):
# If core is not in IDLE state, request that it return to IDLE
print_if_verbose(" -> Resetting core...")
state = self.fsm.r.get_probe("state")
state = self.fsm.registers.get_probe("state")
if state != States.IDLE:
self.fsm.r.set_probe("request_start", 0)
self.fsm.r.set_probe("request_stop", 0)
self.fsm.r.set_probe("request_stop", 1)
self.fsm.r.set_probe("request_stop", 0)
self.fsm.registers.set_probe("request_start", 0)
self.fsm.registers.set_probe("request_stop", 0)
self.fsm.registers.set_probe("request_stop", 1)
self.fsm.registers.set_probe("request_stop", 0)
if self.fsm.r.get_probe("state") != States.IDLE:
if self.fsm.registers.get_probe("state") != States.IDLE:
raise ValueError("Logic analyzer did not reset to IDLE state.")
# Set triggers
@ -232,28 +232,32 @@ class LogicAnalyzerCore(Elaboratable):
print_if_verbose(" -> Setting trigger mode...")
if "trigger_mode" in self.config:
mode = self.config["trigger_mode"].upper()
self.fsm.r.set_probe("trigger_mode", TriggerModes[mode])
self.fsm.registers.set_probe("trigger_mode", TriggerModes[mode])
else:
self.fsm.r.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT)
self.fsm.registers.set_probe("trigger_mode", TriggerModes.SINGLE_SHOT)
# Set trigger location
print_if_verbose(" -> Setting trigger location...")
if "trigger_location" in self.config:
self.fsm.r.set_probe("trigger_location", self.config["trigger_location"])
self.fsm.registers.set_probe(
"trigger_location", self.config["trigger_location"]
)
else:
self.fsm.r.set_probe("trigger_location", self.config["sample_depth"] // 2)
self.fsm.registers.set_probe(
"trigger_location", self.config["sample_depth"] // 2
)
# Send a start request to the state machine
print_if_verbose(" -> Starting capture...")
self.fsm.r.set_probe("request_start", 0)
self.fsm.r.set_probe("request_start", 1)
self.fsm.r.set_probe("request_start", 0)
self.fsm.registers.set_probe("request_start", 0)
self.fsm.registers.set_probe("request_start", 1)
self.fsm.registers.set_probe("request_start", 0)
# Poll the state machine's state, and wait for the capture to complete
print_if_verbose(" -> Waiting for capture to complete...")
while self.fsm.r.get_probe("state") != States.CAPTURED:
while self.fsm.registers.get_probe("state") != States.CAPTURED:
pass
# Read out the entirety of the sample memory
@ -264,7 +268,7 @@ class LogicAnalyzerCore(Elaboratable):
# Revolve the memory around the read_pointer, such that all the beginning
# of the caputure is at the first element
print_if_verbose(" -> Checking read pointer and revolving memory...")
read_pointer = self.fsm.r.get_probe("read_pointer")
read_pointer = self.fsm.registers.get_probe("read_pointer")
data = raw_capture[read_pointer:] + raw_capture[:read_pointer]
return LogicAnalyzerCapture(data, self.config)

View File

@ -26,29 +26,39 @@ class LogicAnalyzerFSM(Elaboratable):
"""
def __init__(self, config, base_addr, interface):
self.config = config
self._sample_depth = config["sample_depth"]
# Outputs to rest of Logic Analyzer
self.trigger = Signal(1)
self.write_enable = Signal(1)
register_config = {
"inputs": {
"state": 4,
"read_pointer": ceil(log2(self.config["sample_depth"])),
"write_pointer": ceil(log2(self.config["sample_depth"])),
},
"outputs": {
"trigger_location": ceil(log2(self.config["sample_depth"])),
"trigger_mode": 2,
"request_start": 1,
"request_stop": 1,
},
}
# Outputs from FSM, inputs from IOCore
self.state = Signal(States)
self.read_pointer = Signal(range(self._sample_depth))
self.write_pointer = Signal(range(self._sample_depth))
inputs = [
self.state,
self.read_pointer,
self.write_pointer,
]
self.r = IOCore(register_config, base_addr, interface)
# Inputs to FSM, outputs from IOCore
self.trigger_location = Signal(range(self._sample_depth))
self.trigger_mode = Signal(TriggerModes)
self.request_start = Signal()
self.request_stop = Signal()
outputs = [
self.trigger_location,
self.trigger_mode,
self.request_start,
self.request_stop,
]
self.registers = IOCore(base_addr, interface, inputs, outputs)
# Bus Input/Output
self.bus_i = self.r.bus_i
self.bus_o = self.r.bus_o
self.bus_i = self.registers.bus_i
self.bus_o = self.registers.bus_o
def get_max_addr(self):
"""
@ -56,38 +66,33 @@ class LogicAnalyzerFSM(Elaboratable):
space used by the core extends from `base_addr` to the number returned
by this function.
"""
return self.r.get_max_addr()
def increment_mod_sample_depth(self, m, signal):
# m.d.sync += signal.eq((signal + 1) % self.config["sample_depth"])
with m.If(signal == self.config["sample_depth"] - 1):
m.d.sync += signal.eq(0)
with m.Else():
m.d.sync += signal.eq(signal + 1)
return self.registers.get_max_addr()
def elaborate(self, platform):
m = Module()
m.submodules.registers = self.r
m.submodules.registers = self.registers
sample_depth = self.config["sample_depth"]
request_start = self.r.request_start
request_stop = self.r.request_stop
trigger_mode = self.r.trigger_mode
trigger_location = self.r.trigger_location
state = self.r.state
sample_depth = self._sample_depth
request_start = self.request_start
request_stop = self.request_stop
trigger_mode = self.trigger_mode
trigger_location = self.trigger_location
state = self.state
write_enable = self.write_enable
write_pointer = self.r.write_pointer
read_pointer = self.r.read_pointer
write_pointer = self.write_pointer
read_pointer = self.read_pointer
prev_request_start = Signal(1)
prev_request_stop = Signal(1)
prev_request_start = Signal().like(request_start)
prev_request_stop = Signal().like(request_stop)
# Compute next_write_pointer as write_pointer + 1 % sample_depth
next_write_pointer = Signal().like(write_pointer)
with m.If(write_pointer == self._sample_depth - 1):
m.d.comb += next_write_pointer.eq(0)
m.d.comb += next_write_pointer.eq((write_pointer + 1) % sample_depth)
with m.Else():
m.d.comb += next_write_pointer.eq(write_pointer + 1)
# Rising edge detection for start/stop requests
m.d.sync += prev_request_start.eq(request_start)

View File

@ -1,4 +1,5 @@
from amaranth import *
from amaranth.lib.enum import IntEnum
from manta.io_core import IOCore
@ -12,23 +13,20 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
def __init__(self, probes, base_addr, interface):
# Instantiate a bunch of trigger blocks
self.probes = probes
self.triggers = [LogicAnalyzerTrigger(p) for p in self.probes]
self._probes = probes
self._triggers = [LogicAnalyzerTrigger(p) for p in self._probes]
# Make IO core for everything
outputs = {}
for p in self.probes:
outputs[p.name + "_arg"] = p.width
outputs[p.name + "_op"] = 4
self.r = IOCore({"outputs": outputs}, base_addr, interface)
ops = [t.op for t in self._triggers]
args = [t.arg for t in self._triggers]
self.registers = IOCore(base_addr, interface, outputs=ops + args)
# Bus Input/Output
self.bus_i = self.r.bus_i
self.bus_o = self.r.bus_o
self.bus_i = self.registers.bus_i
self.bus_o = self.registers.bus_o
# Global trigger. High if any probe is triggered.
self.trig = Signal(1)
self.trig = Signal()
def get_max_addr(self):
"""
@ -36,13 +34,13 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
space used by the core extends from `base_addr` to the number returned
by this function.
"""
return self.r.get_max_addr()
return self.registers.get_max_addr()
def clear_triggers(self):
# reset all triggers to disabled with no argument
for p in self.probes:
self.r.set_probe(p.name + "_op", 0)
self.r.set_probe(p.name + "_arg", 0)
for p in self._probes:
self.registers.set_probe(p.name + "_op", Operations.DISABLE)
self.registers.set_probe(p.name + "_arg", 0)
def set_triggers(self, config):
# set triggers
@ -52,34 +50,42 @@ class LogicAnalyzerTriggerBlock(Elaboratable):
# Handle triggers that don't need an argument
if len(components) == 2:
name, op = components
self.r.set_probe(name + "_op", self.triggers[0].operations[op])
self.registers.set_probe(name + "_op", Operations[op].value)
# Handle triggers that do need an argument
elif len(components) == 3:
name, op, arg = components
self.r.set_probe(name + "_op", self.triggers[0].operations[op])
self.r.set_probe(name + "_arg", int(arg))
self.registers.set_probe(name + "_op", Operations[op].value)
self.registers.set_probe(name + "_arg", int(arg))
def elaborate(self, platform):
m = Module()
# Add IO Core as submodule
m.submodules.registers = self.r
m.submodules.registers = self.registers
# Add triggers as submodules
for t in self.triggers:
for t in self._triggers:
m.submodules[t.signal.name + "_trigger"] = t
# Connect IO core registers to triggers
for probe, trigger in zip(self.probes, self.triggers):
m.d.comb += trigger.arg.eq(getattr(self.r, probe.name + "_arg"))
m.d.comb += trigger.op.eq(getattr(self.r, probe.name + "_op"))
m.d.comb += self.trig.eq(Cat([t.triggered for t in self.triggers]).any())
m.d.comb += self.trig.eq(Cat([t.triggered for t in self._triggers]).any())
return m
class Operations(IntEnum):
DISABLE = 0
RISING = 1
FALLING = 2
CHANGING = 3
GT = 4
LT = 5
GEQ = 6
LEQ = 7
EQ = 8
NEQ = 9
class LogicAnalyzerTrigger(Elaboratable):
"""
A module containing a programmable "trigger" for a given input signal,
@ -88,23 +94,10 @@ class LogicAnalyzerTrigger(Elaboratable):
"""
def __init__(self, signal):
self.operations = {
"DISABLE": 0,
"RISING": 1,
"FALLING": 2,
"CHANGING": 3,
"GT": 4,
"LT": 5,
"GEQ": 6,
"LEQ": 7,
"EQ": 8,
"NEQ": 9,
}
self.signal = signal
self.op = Signal(range(len(self.operations)))
self.arg = Signal().like(signal)
self.triggered = Signal(1)
self.op = Signal(Operations, name=signal.name + "_op")
self.arg = Signal(signal.width, name=signal.name + "_arg")
self.triggered = Signal()
def elaborate(self, platform):
m = Module()
@ -113,34 +106,34 @@ class LogicAnalyzerTrigger(Elaboratable):
prev = Signal().like(self.signal)
m.d.sync += prev.eq(self.signal)
with m.If(self.op == self.operations["DISABLE"]):
with m.If(self.op == Operations.DISABLE):
m.d.comb += self.triggered.eq(0)
with m.Elif(self.op == self.operations["RISING"]):
with m.Elif(self.op == Operations.RISING):
m.d.comb += self.triggered.eq(self.signal > prev)
with m.Elif(self.op == self.operations["FALLING"]):
with m.Elif(self.op == Operations.FALLING):
m.d.comb += self.triggered.eq(self.signal < prev)
with m.Elif(self.op == self.operations["CHANGING"]):
with m.Elif(self.op == Operations.CHANGING):
m.d.comb += self.triggered.eq(self.signal != prev)
with m.Elif(self.op == self.operations["GT"]):
with m.Elif(self.op == Operations.GT):
m.d.comb += self.triggered.eq(self.signal > self.arg)
with m.Elif(self.op == self.operations["LT"]):
with m.Elif(self.op == Operations.LT):
m.d.comb += self.triggered.eq(self.signal < self.arg)
with m.Elif(self.op == self.operations["GEQ"]):
with m.Elif(self.op == Operations.GEQ):
m.d.comb += self.triggered.eq(self.signal >= self.arg)
with m.Elif(self.op == self.operations["LEQ"]):
with m.Elif(self.op == Operations.LEQ):
m.d.comb += self.triggered.eq(self.signal <= self.arg)
with m.Elif(self.op == self.operations["EQ"]):
with m.Elif(self.op == Operations.EQ):
m.d.comb += self.triggered.eq(self.signal == self.arg)
with m.Elif(self.op == self.operations["NEQ"]):
with m.Elif(self.op == Operations.NEQ):
m.d.comb += self.triggered.eq(self.signal != self.arg)
with m.Else():

View File

@ -148,4 +148,4 @@ def test_output_probe_initial_values_xilinx():
@pytest.mark.skipif(not ice40_tools_installed(), reason="no toolchain installed")
def test_output_probe_initial_values_ice40():
IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB3").verify()
IOCoreLoopbackTest(ICEStickPlatform(), "/dev/ttyUSB2").verify()

View File

@ -9,12 +9,12 @@ fsm = LogicAnalyzerFSM(config, base_addr=0, interface=None)
def test_signals_reset_correctly():
def testbench():
# Make sure pointers and write enable reset to zero
for sig in [fsm.r.write_pointer, fsm.r.read_pointer, fsm.write_enable]:
for sig in [fsm.write_pointer, fsm.read_pointer, fsm.write_enable]:
if (yield sig) != 0:
raise ValueError
# Make sure state resets to IDLE
if (yield fsm.r.state != States.IDLE):
if (yield fsm.state != States.IDLE):
raise ValueError
simulate(fsm, testbench)
@ -24,9 +24,9 @@ def test_single_shot_no_wait_for_trigger():
def testbench():
# Configure and start FSM
yield fsm.trigger.eq(1)
yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.r.trigger_location.eq(4)
yield fsm.r.request_start.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
# Wait until write_enable is asserted
while not (yield fsm.write_enable):
@ -35,11 +35,11 @@ def test_single_shot_no_wait_for_trigger():
# Wait 8 clock cycles for capture to complete
for i in range(8):
# Make sure that read_pointer does not increase
if (yield fsm.r.read_pointer) != 0:
if (yield fsm.read_pointer) != 0:
raise ValueError
# Make sure that write_pointer increases by one each cycle
if (yield fsm.r.write_pointer) != i:
if (yield fsm.write_pointer) != i:
raise ValueError
yield
@ -48,11 +48,11 @@ def test_single_shot_no_wait_for_trigger():
yield
# Check that write_pointer points to the end of memory
if (yield fsm.r.write_pointer) != 7:
if (yield fsm.write_pointer) != 7:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.r.state) != States.CAPTURED:
if (yield fsm.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "single_shot_no_wait_for_trigger.vcd")
@ -61,9 +61,9 @@ def test_single_shot_no_wait_for_trigger():
def test_single_shot_wait_for_trigger():
def testbench():
# Configure and start FSM
yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.r.trigger_location.eq(4)
yield fsm.r.request_start.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield fsm.request_start.eq(1)
yield
yield
@ -73,8 +73,8 @@ def test_single_shot_wait_for_trigger():
# Wait 4 clock cycles to get to IN_POSITION
for i in range(4):
rp = yield fsm.r.read_pointer
wp = yield fsm.r.write_pointer
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
# Make sure that read_pointer does not increase
if rp != 0:
@ -101,13 +101,13 @@ def test_single_shot_wait_for_trigger():
yield
# Check that write_pointer points to the end of memory
rp = yield fsm.r.read_pointer
wp = yield fsm.r.write_pointer
if (wp + 1) % fsm.config["sample_depth"] != rp:
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if (wp + 1) % config["sample_depth"] != rp:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.r.state) != States.CAPTURED:
if (yield fsm.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "single_shot_wait_for_trigger.vcd")
@ -116,8 +116,8 @@ def test_single_shot_wait_for_trigger():
def test_immediate():
def testbench():
# Configure and start FSM
yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.r.request_start.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.request_start.eq(1)
yield
yield
@ -125,9 +125,9 @@ def test_immediate():
if not (yield fsm.write_enable):
raise ValueError
for i in range(fsm.config["sample_depth"]):
rp = yield fsm.r.read_pointer
wp = yield fsm.r.write_pointer
for i in range(config["sample_depth"]):
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if rp != 0:
raise ValueError
@ -141,15 +141,15 @@ def test_immediate():
yield
# Check that write_pointer points to the end of memory
rp = yield fsm.r.read_pointer
wp = yield fsm.r.write_pointer
rp = yield fsm.read_pointer
wp = yield fsm.write_pointer
if rp != 0:
raise ValueError
if wp != 7:
raise ValueError
# Check that state is CAPTURED
if (yield fsm.r.state) != States.CAPTURED:
if (yield fsm.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "immediate.vcd")
@ -158,8 +158,8 @@ def test_immediate():
def test_incremental():
def testbench():
# Configure and start FSM
yield fsm.r.trigger_mode.eq(TriggerModes.INCREMENTAL)
yield fsm.r.request_start.eq(1)
yield fsm.trigger_mode.eq(TriggerModes.INCREMENTAL)
yield fsm.request_start.eq(1)
yield
yield
@ -177,7 +177,7 @@ def test_incremental():
yield
# Check that state is CAPTURED
if (yield fsm.r.state) != States.CAPTURED:
if (yield fsm.state) != States.CAPTURED:
raise ValueError
simulate(fsm, testbench, "incremental.vcd")
@ -186,8 +186,8 @@ def test_incremental():
def test_single_shot_write_enable():
def testbench():
# Configure FSM
yield fsm.r.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.r.trigger_location.eq(4)
yield fsm.trigger_mode.eq(TriggerModes.SINGLE_SHOT)
yield fsm.trigger_location.eq(4)
yield
# Make sure write is not enabled before starting the FSM
@ -195,11 +195,11 @@ def test_single_shot_write_enable():
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.r.request_start.eq(1)
yield fsm.request_start.eq(1)
yield
yield
for _ in range(fsm.config["sample_depth"]):
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError
@ -224,7 +224,7 @@ def test_single_shot_write_enable():
def test_immediate_write_enable():
def testbench():
# Configure FSM
yield fsm.r.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield fsm.trigger_mode.eq(TriggerModes.IMMEDIATE)
yield
# Make sure write is not enabled before starting the FSM
@ -232,11 +232,11 @@ def test_immediate_write_enable():
raise ValueError
# Start the FSM, ensure write enable is asserted throughout the capture
yield fsm.r.request_start.eq(1)
yield fsm.request_start.eq(1)
yield
yield
for _ in range(fsm.config["sample_depth"]):
for _ in range(config["sample_depth"]):
if not (yield fsm.write_enable):
raise ValueError

View File

@ -1,5 +1,6 @@
from amaranth.sim import Simulator
from manta.logic_analyzer import LogicAnalyzerCore
from manta.logic_analyzer.trigger_block import Operations
from manta.utils import *
from random import sample
@ -32,8 +33,8 @@ def print_data_at_addr(addr):
def set_fsm_register(name, data):
addr = la.fsm.r.mmap[f"{name}_buf"]["addrs"][0]
strobe_addr = la.fsm.r.base_addr
addr = la.fsm.registers._memory_map[name]["addrs"][0]
strobe_addr = la.fsm.registers._base_addr
yield from write_register(la, strobe_addr, 0)
yield from write_register(la, addr, data)
@ -42,8 +43,8 @@ def set_fsm_register(name, data):
def set_trig_blk_register(name, data):
addr = la.trig_blk.r.mmap[f"{name}_buf"]["addrs"][0]
strobe_addr = la.trig_blk.r.base_addr
addr = la.trig_blk.registers._memory_map[name]["addrs"][0]
strobe_addr = la.trig_blk.registers._base_addr
yield from write_register(la, strobe_addr, 0)
yield from write_register(la, addr, data)
@ -67,9 +68,7 @@ def test_single_shot_capture():
yield from set_fsm_register("request_stop", 0)
# setting triggers
yield from set_trig_blk_register(
"curly_op", la.trig_blk.triggers[0].operations["EQ"]
)
yield from set_trig_blk_register("curly_op", Operations.EQ)
yield from set_trig_blk_register("curly_arg", 4)
# setting trigger mode

View File

@ -48,16 +48,33 @@ class MemoryCoreLoopbackTest(Elaboratable):
},
}
def get_probe(self, name):
# This is a hack! And should be removed once the full Amaranth-native
# API is built out
for i in self.manta.io_core._inputs:
if i.name == name:
return i
for o in self.manta.io_core._outputs:
if o.name == name:
return o
return None
def elaborate(self, platform):
m = Module()
m.submodules.manta = self.manta
uart_pins = platform.request("uart")
addr = self.get_probe("addr")
data = self.get_probe("data")
we = self.get_probe("we")
m.d.comb += [
self.manta.mem_core.user_addr.eq(self.manta.io_core.addr),
self.manta.mem_core.user_data.eq(self.manta.io_core.data),
self.manta.mem_core.user_we.eq(self.manta.io_core.we),
self.manta.mem_core.user_addr.eq(addr),
self.manta.mem_core.user_data.eq(data),
self.manta.mem_core.user_we.eq(we),
self.manta.interface.rx.eq(uart_pins.rx.i),
uart_pins.tx.o.eq(self.manta.interface.tx),
]

View File

@ -14,8 +14,8 @@ def verify_receive(data):
valid_asserted_before = False
for i in range(10 * uart_rx.clocks_per_baud):
bit_index = i // uart_rx.clocks_per_baud
for i in range(10 * uart_rx._clocks_per_baud):
bit_index = i // uart_rx._clocks_per_baud
# Every cycle, run checks on uart_rx:
if (yield uart_rx.valid_o):

View File

@ -26,8 +26,8 @@ def verify_bit_sequence(byte):
data_bits = "0" + f"{byte:08b}"[::-1] + "1"
data_bits = [int(bit) for bit in data_bits]
for i in range(10 * uart_tx.clocks_per_baud):
bit_index = i // uart_tx.clocks_per_baud
for i in range(10 * uart_tx._clocks_per_baud):
bit_index = i // uart_tx._clocks_per_baud
if (yield uart_tx.tx) != data_bits[bit_index]:
raise ValueError("Wrong bit in sequence!")