meta: set ruff max line length to 100 characters

This should hopefully make the Amaranth source more readable, since indentation and the `m.d.sync +=` prefix take a bit of line space.
This commit is contained in:
Fischer Moseley 2026-02-25 13:15:16 -07:00 committed by Fischer (Moseley) Pettner
parent 40d428614b
commit 1e5a247cf4
18 changed files with 51 additions and 142 deletions

View File

@ -45,3 +45,6 @@ where = ["src"]
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[tool.ruff]
line-length = 100

View File

@ -15,9 +15,7 @@ class EthernetInterface(Elaboratable):
machine and the FPGA.
"""
def __init__(
self, phy, clk_freq, fpga_ip_addr, host_ip_addr, udp_port=2001, **kwargs
):
def __init__(self, phy, clk_freq, fpga_ip_addr, host_ip_addr, udp_port=2001, **kwargs):
"""
This function is the main mechanism for configuring an Ethernet
Interface in an Amaranth-native design.
@ -96,18 +94,14 @@ class EthernetInterface(Elaboratable):
def _check_config(self):
# Make sure UDP port is an integer in the range 0-65535
if not isinstance(self._udp_port, int):
raise TypeError(
"UDP Port must be specified as an integer between 0 and 65535."
)
raise TypeError("UDP Port must be specified as an integer between 0 and 65535.")
if not 0 <= self._udp_port <= 65535:
raise ValueError("UDP Port must be between 0 and 65535.")
# Make sure Host IP address is four bytes separated by a period
if not isinstance(self._host_ip_addr, str):
raise TypeError(
"Host IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'."
)
raise TypeError("Host IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'.")
if len(self._host_ip_addr.split(".")) != 4:
raise ValueError("Host IP must be specified in the form 'xxx.xxx.xxx.xxx'.")
@ -118,9 +112,7 @@ class EthernetInterface(Elaboratable):
# Make sure FPGA IP is four bytes separated by a period
if not isinstance(self._fpga_ip_addr, str):
raise TypeError(
"FPGA IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'."
)
raise TypeError("FPGA IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'.")
if len(self._fpga_ip_addr.split(".")) != 4:
raise ValueError("FPGA IP must be specified in the form 'xxx.xxx.xxx.xxx'.")
@ -587,9 +579,7 @@ class EthernetInterface(Elaboratable):
# Make sure address and datas are all integers
if not isinstance(addrs, list) or not isinstance(datas, list):
raise TypeError(
"Write addresses and data must be an integer or list of integers."
)
raise TypeError("Write addresses and data must be an integer or list of integers.")
if not all(isinstance(a, int) for a in addrs):
raise TypeError("Write addresses must be all be integers.")

View File

@ -260,9 +260,7 @@ class PHYCore(SoCMini):
phy = core_config["phy"]
# MII.
if phy in [liteeth_phys.LiteEthPHYMII]:
ethphy = phy(
clock_pads=platform.request("mii_clocks"), pads=platform.request("mii")
)
ethphy = phy(clock_pads=platform.request("mii_clocks"), pads=platform.request("mii"))
# RMII.
elif phy in [liteeth_phys.LiteEthPHYRMII]:
ethphy = phy(
@ -367,9 +365,7 @@ class PHYCore(SoCMini):
if not isinstance(ethphy, LiteEthPHYModel):
self.platform.add_period_constraint(eth_rx_clk, 1e9 / phy.rx_clk_freq)
self.platform.add_period_constraint(eth_tx_clk, 1e9 / phy.tx_clk_freq)
self.platform.add_false_path_constraints(
self.crg.cd_sys.clk, eth_rx_clk, eth_tx_clk
)
self.platform.add_false_path_constraints(self.crg.cd_sys.clk, eth_rx_clk, eth_tx_clk)
# MAC Core -----------------------------------------------------------------------------------------
@ -409,9 +405,7 @@ class MACCore(PHYCore):
# Wishbone Interface -----------------------------------------------------------------------
wb_bus = wishbone.Interface()
platform.add_extension(wb_bus.get_ios("wishbone"))
self.comb += wb_bus.connect_to_pads(
self.platform.request("wishbone"), mode="slave"
)
self.comb += wb_bus.connect_to_pads(self.platform.request("wishbone"), mode="slave")
self.bus.add_master(master=wb_bus)
if bus_standard == "axi-lite":
@ -419,9 +413,7 @@ class MACCore(PHYCore):
axil_bus = axi.AXILiteInterface(address_width=32, data_width=32)
platform.add_extension(axil_bus.get_ios("bus"))
self.submodules += axi.Wishbone2AXILite(ethmac.bus, axil_bus)
self.comb += axil_bus.connect_to_pads(
self.platform.request("bus"), mode="slave"
)
self.comb += axil_bus.connect_to_pads(self.platform.request("bus"), mode="slave")
self.bus.add_master(master=axil_bus)
ethmac_region_size = (nrxslots + ntxslots) * buffer_depth
@ -512,9 +504,7 @@ class UDPCore(PHYCore):
port_ios = platform.request(name)
raw_port = self.core.udp.crossbar.get_port(
port_ios.sink_dst_port, dw=data_width
)
raw_port = self.core.udp.crossbar.get_port(port_ios.sink_dst_port, dw=data_width)
# Connect IOs.
# ------------
@ -618,12 +608,8 @@ class UDPCore(PHYCore):
)
axil_bus = axi.AXILiteInterface(address_width=32, data_width=32)
platform.add_extension(axil_bus.get_ios("mmap"))
self.submodules += axi.Wishbone2AXILite(
self.etherbone.wishbone.bus, axil_bus
)
self.comb += axil_bus.connect_to_pads(
platform.request("mmap"), mode="master"
)
self.submodules += axi.Wishbone2AXILite(self.etherbone.wishbone.bus, axil_bus)
self.comb += axil_bus.connect_to_pads(platform.request("mmap"), mode="master")
# UDP Ports --------------------------------------------------------------------------------
for name, port_cfg in core_config["udp_ports"].items():

View File

@ -72,9 +72,7 @@ class IOCore(MantaCore):
input_signals = []
for name, width in inputs.items():
if not isinstance(name, str):
raise ValueError(
f"Input probe '{name}' has invalid name, names must be strings."
)
raise ValueError(f"Input probe '{name}' has invalid name, names must be strings.")
if not isinstance(width, int):
raise ValueError(f"Input probe '{name}' must have integer width.")
@ -88,9 +86,7 @@ class IOCore(MantaCore):
output_signals = []
for name, attrs in outputs.items():
if not isinstance(name, str):
raise ValueError(
f"Output probe '{name}' has invalid name, names must be strings."
)
raise ValueError(f"Output probe '{name}' has invalid name, names must be strings.")
if not isinstance(attrs, int) and not isinstance(attrs, dict):
raise ValueError(f"Unrecognized format for output probe '{name}'.")
@ -151,9 +147,7 @@ class IOCore(MantaCore):
self._memory_map = {}
# Add strobe register
self._memory_map["strobe"] = dict(
signals=[self._strobe], addrs=[self.base_addr]
)
self._memory_map["strobe"] = dict(signals=[self._strobe], addrs=[self.base_addr])
# Assign memory to all inputs and outputs
ios = self._inputs + self._outputs
@ -252,9 +246,7 @@ class IOCore(MantaCore):
raise KeyError(f"Probe '{probe.name}' is not an output of the IO core.")
if len(probes) > 1:
raise ValueError(
f"Multiple output probes found in IO core for name '{probe.name}'."
)
raise ValueError(f"Multiple output probes found in IO core for name '{probe.name}'.")
# Check that value isn't too big for the register
check_value_fits_in_bits(value, len(probe))
@ -307,9 +299,7 @@ class IOCore(MantaCore):
raise ValueError(f"Probe with name '{probe}' not found in IO core.")
if len(probes) > 1:
raise ValueError(
f"Multiple probes found in IO core for name '{probe}'."
)
raise ValueError(f"Multiple probes found in IO core for name '{probe}'.")
return self.get_probe(probes[0])
@ -321,9 +311,7 @@ class IOCore(MantaCore):
raise KeyError(f"Probe with name '{probe.name}' not found in IO core.")
if len(probes) > 1:
raise ValueError(
f"Multiple probes found in IO core for name '{probe.name}'."
)
raise ValueError(f"Multiple probes found in IO core for name '{probe.name}'.")
# Pulse strobe register
self.interface.write(self.base_addr, 0)

View File

@ -194,9 +194,7 @@ class LogicAnalyzerCore(MantaCore):
# Check operation
if operation not in ["DISABLE", "RISING", "FALLING", "CHANGING"]:
raise ValueError(
f"Unable to interpret trigger condition '{trigger}'."
)
raise ValueError(f"Unable to interpret trigger condition '{trigger}'.")
# Check three-token triggers
elif len(trigger) == 3:
@ -208,9 +206,7 @@ class LogicAnalyzerCore(MantaCore):
# Check operation
if operation not in ["GT", "LT", "GEQ", "LEQ", "EQ", "NEQ"]:
raise ValueError(
f"Unable to interpret trigger condition '{trigger}'."
)
raise ValueError(f"Unable to interpret trigger condition '{trigger}'.")
else:
raise ValueError(f"Unable to interpret trigger condition '{trigger}'.")
@ -242,9 +238,7 @@ class LogicAnalyzerCore(MantaCore):
# Warn on trigger location
if trigger_location:
warn(
"Ignoring provided trigger_location as trigger mode is set to Immediate."
)
warn("Ignoring provided trigger_location as trigger mode is set to Immediate.")
self._trigger_mode = mode
self._triggers = []
@ -253,9 +247,7 @@ class LogicAnalyzerCore(MantaCore):
elif mode == TriggerModes.INCREMENTAL:
# Warn on trigger location
if trigger_location:
warn(
"Ignoring provided trigger_location as trigger mode is set to Incremental."
)
warn("Ignoring provided trigger_location as trigger mode is set to Incremental.")
# Validate triggers
self._validate_triggers(triggers)

View File

@ -50,9 +50,7 @@ class LogicAnalyzerCapture:
raise ValueError(f"Probe {name} not found in LogicAnalyzerCapture!")
if len(indices) > 1:
raise ValueError(
f"Probe {name} found multiple times in LogicAnalyzerCapture!"
)
raise ValueError(f"Probe {name} found multiple times in LogicAnalyzerCapture!")
idx = indices[0]

View File

@ -139,9 +139,7 @@ class LogicAnalyzerFSM(Elaboratable):
with m.If(write_pointer > trigger_location):
m.d.sync += read_pointer.eq(write_pointer - trigger_location)
with m.Else():
m.d.sync += read_pointer.eq(
write_pointer - trigger_location + sample_depth
)
m.d.sync += read_pointer.eq(write_pointer - trigger_location + sample_depth)
# ok that's all for horrible

View File

@ -32,9 +32,7 @@ class Manta(Elaboratable):
# Load config from YAML
extension = config_path.split(".")[-1]
if extension not in ["yaml", "yml"]:
raise ValueError(
f"Configuration file {config_path} has unrecognized file type."
)
raise ValueError(f"Configuration file {config_path} has unrecognized file type.")
with open(config_path, "r") as f:
config = yaml.safe_load(f)

View File

@ -79,13 +79,10 @@ class MemoryCore(MantaCore):
n_partial = self._width % 16
self._mems = [
Memory(shape=16, depth=self._depth, init=[0] * self._depth)
for _ in range(n_full)
Memory(shape=16, depth=self._depth, init=[0] * self._depth) for _ in range(n_full)
]
if n_partial > 0:
self._mems += [
Memory(shape=n_partial, depth=self._depth, init=[0] * self._depth)
]
self._mems += [Memory(shape=n_partial, depth=self._depth, init=[0] * self._depth)]
@property
def top_level_ports(self):
@ -334,9 +331,7 @@ class MemoryCore(MantaCore):
# Make sure address and datas are all integers
if not isinstance(addrs, list) or not isinstance(datas, list):
raise TypeError(
"Write addresses and data must be an integer or list of integers."
)
raise TypeError("Write addresses and data must be an integer or list of integers.")
if not all(isinstance(a, int) for a in addrs):
raise TypeError("Write addresses must be all be integers.")

View File

@ -100,9 +100,7 @@ class UARTInterface(Elaboratable):
sanitized_config[option] = config[option]
else:
warn(
f"Ignoring unrecognized option '{option}' in UART interface config."
)
warn(f"Ignoring unrecognized option '{option}' in UART interface config.")
return cls(**sanitized_config)
@ -244,9 +242,7 @@ class UARTInterface(Elaboratable):
bytes_in = set.read(bytes_expected)
if len(bytes_in) != bytes_expected:
raise ValueError(
f"Only got {len(bytes_in)} out of {bytes_expected} bytes."
)
raise ValueError(f"Only got {len(bytes_in)} out of {bytes_expected} bytes.")
# Split received bytes into individual responses and decode
responses = split_into_chunks(bytes_in, 7)
@ -268,9 +264,7 @@ class UARTInterface(Elaboratable):
# Make sure address and data are all integers
if not isinstance(addrs, list) or not isinstance(data, list):
raise TypeError(
"Write addresses and data must be an integer or list of integers."
)
raise TypeError("Write addresses and data must be an integer or list of integers.")
if not all(isinstance(a, int) for a in addrs):
raise TypeError("Write addresses must be all be integers.")
@ -301,9 +295,7 @@ class UARTInterface(Elaboratable):
response_ascii = response_bytes.decode("ascii")
if len(response_ascii) != 7:
raise ValueError(
"Unable to decode read response - wrong number of bytes received."
)
raise ValueError("Unable to decode read response - wrong number of bytes received.")
if response_ascii[0] != "D":
raise ValueError("Unable to decode read response - incorrect preamble.")

View File

@ -55,9 +55,7 @@ class ReceiveBridge(Elaboratable):
m.d.comb += self._is_eol.eq(0)
def _drive_output_bus(self, m):
with m.If(
(self._state == States.READ) & (self._byte_num == 4) & (self._is_eol)
):
with m.If((self._state == States.READ) & (self._byte_num == 4) & (self._is_eol)):
m.d.comb += self.addr_o.eq(
Cat(self._buffer[3], self._buffer[2], self._buffer[1], self._buffer[0])
)
@ -65,9 +63,7 @@ class ReceiveBridge(Elaboratable):
m.d.comb += self.valid_o.eq(1)
m.d.comb += self.rw_o.eq(0)
with m.Elif(
(self._state == States.WRITE) & (self._byte_num == 8) & (self._is_eol)
):
with m.Elif((self._state == States.WRITE) & (self._byte_num == 8) & (self._is_eol)):
m.d.comb += self.addr_o.eq(
Cat(self._buffer[3], self._buffer[2], self._buffer[1], self._buffer[0])
)
@ -103,9 +99,7 @@ class ReceiveBridge(Elaboratable):
# otherwise buffer them
with m.Else():
m.d.sync += self._buffer[self._byte_num].eq(
self._from_ascii_hex
)
m.d.sync += self._buffer[self._byte_num].eq(self._from_ascii_hex)
m.d.sync += self._byte_num.eq(self._byte_num + 1)
with m.Else():
@ -120,9 +114,7 @@ class ReceiveBridge(Elaboratable):
# otherwise buffer them
with m.Else():
m.d.sync += self._buffer[self._byte_num].eq(
self._from_ascii_hex
)
m.d.sync += self._buffer[self._byte_num].eq(self._from_ascii_hex)
m.d.sync += self._byte_num.eq(self._byte_num + 1)
with m.Else():

View File

@ -34,9 +34,7 @@ class UARTTransmitter(Elaboratable):
with m.Elif(~self.done_o):
m.d.sync += self._baud_counter.eq(self._baud_counter - 1)
m.d.sync += self.done_o.eq(
(self._baud_counter == 1) & (self._bit_index == 9)
)
m.d.sync += self.done_o.eq((self._baud_counter == 1) & (self._bit_index == 9))
# A baud period has elapsed
with m.If(self._baud_counter == 0):

View File

@ -90,9 +90,7 @@ def test_logic_analyzer_core_dump():
# Create Manta instance
manta = Manta()
manta.cores.test_core = LogicAnalyzerCore(
sample_depth=2048, probes=[probe0, probe1, probe2]
)
manta.cores.test_core = LogicAnalyzerCore(sample_depth=2048, probes=[probe0, probe1, probe2])
# Create Temporary File
tf = tempfile.NamedTemporaryFile(delete=False)
@ -122,9 +120,7 @@ def test_logic_analyzer_core_dump():
def test_uart_interface_dump():
manta = Manta()
manta.interface = UARTInterface(
port="/dev/ttyUSB0", baudrate=115200, clock_freq=100e6
)
manta.interface = UARTInterface(port="/dev/ttyUSB0", baudrate=115200, clock_freq=100e6)
# Create Temporary File
tf = tempfile.NamedTemporaryFile(delete=False)

View File

@ -16,9 +16,7 @@ verilog_root_dirs = [
@pytest.mark.parametrize("root_dir", verilog_root_dirs)
def test_verilog_examples_build(root_dir):
result = subprocess.run(
["./build.sh"], cwd=root_dir, capture_output=True, text=True
)
result = subprocess.run(["./build.sh"], cwd=root_dir, capture_output=True, text=True)
if result.returncode != 0:
raise ValueError(f"Command failed with return code {result.returncode}.")

View File

@ -98,9 +98,7 @@ class IOCoreLoopbackTest(Elaboratable):
)
else:
print(
f"Reading {o.name} through {i.name} yielded {readback} as expected."
)
print(f"Reading {o.name} through {i.name} yielded {readback} as expected.")
def verify(self):
self.build_and_program()

View File

@ -28,9 +28,7 @@ class LogicAnalyzerCounterTest(Elaboratable):
manta.interface = UARTInterface(
port=self.port, baudrate=3e6, clock_freq=platform.default_clk_frequency
)
manta.cores.la = LogicAnalyzerCore(
sample_depth=1024, probes=[probe0, probe1, probe2]
)
manta.cores.la = LogicAnalyzerCore(sample_depth=1024, probes=[probe0, probe1, probe2])
m = Module()
m.submodules.manta = manta

View File

@ -17,9 +17,7 @@ class MemoryCoreTests:
self.n_full = self.width // 16
self.n_mems = ceil(self.width / 16)
self.bus_addrs = list(
range(self.base_addr, self.max_addr)
) # include the endpoint!
self.bus_addrs = list(range(self.base_addr, self.max_addr)) # include the endpoint!
self.user_addrs = list(range(self.mem_core._depth))
# A model of what each bus address contains
@ -115,8 +113,7 @@ class MemoryCoreTests:
for _ in range(5):
for user_addr in jumble(self.user_addrs):
bus_addrs = [
self.base_addr + user_addr + (i * self.depth)
for i in range(self.n_mems)
self.base_addr + user_addr + (i * self.depth) for i in range(self.n_mems)
]
operation = choice(["read", "write"])
@ -238,9 +235,7 @@ class MemoryCoreTests:
data = self.ctx.get(self.mem_core.user_data_out)
if data != expected_data:
raise ValueError(
f"Read from {addr} yielded {data} instead of {expected_data}"
)
raise ValueError(f"Read from {addr} yielded {data} instead of {expected_data}")
async def write_user_side(self, addr, data):
# convert value to words, and save to self.model
@ -263,9 +258,7 @@ widths = [23, randint(0, 128)]
depths = [512, randint(0, 1024)]
base_addrs = [0, randint(0, 32678)]
cases = [
(m, w, d, ba) for m in modes for w in widths for d in depths for ba in base_addrs
]
cases = [(m, w, d, ba) for m in modes for w in widths for d in depths for ba in base_addrs]
@pytest.mark.parametrize("mode, width, depth, base_addr", cases)

View File

@ -89,9 +89,7 @@ nexys4ddr_pass_cases = [
@pytest.mark.skipif(not xilinx_tools_installed(), reason="no toolchain installed")
@pytest.mark.parametrize(
"baudrate, percent_slowdown, stall_interval", nexys4ddr_pass_cases
)
@pytest.mark.parametrize("baudrate, percent_slowdown, stall_interval", nexys4ddr_pass_cases)
def test_baudrate_mismatch_xilinx_passes(baudrate, percent_slowdown, stall_interval):
UARTBaudrateMismatchTest(
platform=Nexys4DDRPlatform(),
@ -109,9 +107,7 @@ nexys4ddr_fail_cases = [
@pytest.mark.skipif(not xilinx_tools_installed(), reason="no toolchain installed")
@pytest.mark.parametrize(
"baudrate, percent_slowdown, stall_interval", nexys4ddr_fail_cases
)
@pytest.mark.parametrize("baudrate, percent_slowdown, stall_interval", nexys4ddr_fail_cases)
def test_baudrate_mismatch_xilinx_fails(baudrate, percent_slowdown, stall_interval):
with pytest.raises(ValueError, match="Only got"):
UARTBaudrateMismatchTest(