move init'ing UARTInterface from config to classmethod

This commit is contained in:
Fischer Moseley 2024-02-12 08:50:43 -08:00
parent 75a0fe46ff
commit a1fddf555e
2 changed files with 83 additions and 81 deletions

View File

@ -63,7 +63,7 @@ class Manta(Elaboratable):
def get_interface(self):
if "uart" in self.config:
return UARTInterface(self.config["uart"])
return UARTInterface.from_config(self.config["uart"])
elif "ethernet" in self.config:
return EthernetInterface(self.config["ethernet"])

View File

@ -8,26 +8,31 @@ from serial import Serial
class UARTInterface(Elaboratable):
def __init__(self, config):
self.config = config
self.check_config(self.config)
self.port = config["port"]
self.clock_freq = config["clock_freq"]
self.baudrate = config["baudrate"]
self.clocks_per_baud = int(self.clock_freq // self.baudrate)
self.define_signals()
def __init__(self, port, baudrate, clock_freq, chunk_size=256):
self._port = port
self._baudrate = baudrate
self._clock_freq = clock_freq
self._chunk_size = chunk_size
self._clocks_per_baud = int(self._clock_freq // self._baudrate)
self._check_config()
# Set chunk_size, which is the max amount of bytes that the core will
# dump to the OS driver at a time. Since the FPGA will return bytes
# almost instantaneously, this prevents the OS's input buffer from
# overflowing, and dropping bytes.
self.chunk_size = 256 # in bytes
if "chunk_size" in config:
self.chunk_size = config["chunk_size"]
def check_config(self, config):
self.rx = Signal()
self.tx = Signal()
self.bus_o = Signal(InternalBus())
self.bus_i = Signal(InternalBus())
@classmethod
def from_config(cls, config):
port = config.get("port")
clock_freq = config.get("clock_freq")
baudrate = config.get("baudrate")
# Warn if unrecognized options have been given
recognized_options = ["port", "clock_freq", "baudrate", "chunk_size"]
for option in config:
@ -36,84 +41,88 @@ class UARTInterface(Elaboratable):
f"Ignoring unrecognized option '{option}' in UART interface config."
)
if "chunk_size" in config:
return cls(port, baudrate, clock_freq, config["chunk_size"])
else:
return cls(port, baudrate, clock_freq)
def _check_config(self):
# Ensure a serial port has been given
if "port" not in config:
if self._port is None:
raise ValueError("No serial port provided to UART interface.")
# Ensure clock frequency is provided and positive
if "clock_freq" not in config:
if self._clock_freq is None:
raise ValueError("No clock frequency provided to UART interface.")
if config["clock_freq"] <= 0:
if self._clock_freq <= 0:
raise ValueError("Non-positive clock frequency provided to UART interface.")
# Check that baudrate is provided and positive
if "baudrate" not in config:
if self._baudrate is None:
raise ValueError("No baudrate provided to UART interface.")
if config["baudrate"] <= 0:
if self._baudrate <= 0:
raise ValueError("Non-positive baudrate provided to UART interface.")
# Confirm the actual baudrate is within 5% of the target baudrate
clock_freq = config["clock_freq"]
baudrate = config["baudrate"]
clocks_per_baud = clock_freq // baudrate
actual_baudrate = clock_freq / clocks_per_baud
error = 100 * abs(actual_baudrate - baudrate) / baudrate
actual_baudrate = self._clock_freq / self._clocks_per_baud
error = 100 * abs(actual_baudrate - self._baudrate) / self._baudrate
if error > 5:
raise ValueError(
"UART interface is unable to match targeted baudrate with specified clock frequency."
)
def get_serial_device(self):
def _get_serial_device(self):
"""
Return an open PySerial serial device if one exists, otherwise, open one.
"""
if hasattr(self, "serial_device"):
return self.serial_device
# Check if we've already opened a device
if hasattr(self, "_serial_device"):
return self._serial_device
if self._port != "auto":
self._serial_device = Serial(self._port, self._baudrate, timeout=1)
return self._serial_device
# Try to autodetect which port to use based on the PID/VID of the device attached.
# This looks for the PID/VID of the FT2232, the primary chip used on the icestick
# and Digilent dev boards. However, folks will likely want to connect other things
# in the future, so in the future we'll probably want to look for other chips as
# well.
# The FT2232 exposes two serial ports - and for whatever reason it usually has the
# 0th device used for JTAG programming, and the 1st used for UART. So we'll grab
# the 1st.
import serial.tools.list_ports
ports = []
for port in serial.tools.list_ports.comports():
if (port.vid == 0x403) and (port.pid == 0x6010):
ports.append(port)
if len(ports) != 2:
raise ValueError(
f"Expected to see two serial ports for FT2232 device, but instead see {len(ports)}."
)
if ports[0].serial_number != ports[1].serial_number:
raise ValueError(
f"Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices."
)
if ports[0].location > ports[1].location:
chosen_port = ports[0].device
else:
if self.port != "auto":
self.serial_device = Serial(self.port, self.baudrate, timeout=1)
return self.serial_device
chosen_port = ports[1].device
else:
# Try to autodetect which port to use based on the PID/VID of the device attached.
# This looks for the PID/VID of the FT2232, the primary chip used on the icestick
# and Digilent dev boards. However, folks will likely want to connect other things
# in the future, so in the future we'll probably want to look for other chips as
# well.
# The FT2232 exposes two serial ports - and for whatever reason it usually has the
# 0th device used for JTAG programming, and the 1st used for UART. So we'll grab
# the 1st.
import serial.tools.list_ports
ports = []
for port in serial.tools.list_ports.comports():
if (port.vid == 0x403) and (port.pid == 0x6010):
ports.append(port)
if len(ports) != 2:
raise ValueError(
f"Expected to see two serial ports for FT2232 device, but instead see {len(ports)}."
)
if ports[0].serial_number != ports[1].serial_number:
raise ValueError(
f"Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices."
)
if ports[0].location > ports[1].location:
chosen_port = ports[0].device
else:
chosen_port = ports[1].device
self.serial_device = Serial(chosen_port, self.baudrate, timeout=1)
return self.serial_device
self._serial_device = Serial(chosen_port, self._baudrate, timeout=1)
return self._serial_device
def get_top_level_ports(self):
return [self.rx, self.tx]
@ -133,8 +142,8 @@ class UARTInterface(Elaboratable):
raise ValueError("Read address must be an integer or list of integers.")
# Send read requests, and get responses
ser = self.get_serial_device()
addr_chunks = split_into_chunks(addrs, self.chunk_size)
ser = self._get_serial_device()
addr_chunks = split_into_chunks(addrs, self._chunk_size)
datas = []
for addr_chunk in addr_chunks:
@ -152,7 +161,7 @@ class UARTInterface(Elaboratable):
# Split received bytes into individual responses and decode
responses = split_into_chunks(bytes_in, 7)
data_chunk = [self.decode_read_response(r) for r in responses]
data_chunk = [self._decode_read_response(r) for r in responses]
datas += data_chunk
return datas
@ -186,10 +195,10 @@ class UARTInterface(Elaboratable):
# Encode addrs and datas into write requests
bytes_out = "".join([f"W{a:04X}{d:04X}\r\n" for a, d in zip(addrs, datas)])
bytes_out = bytes_out.encode("ascii")
ser = self.get_serial_device()
ser = self._get_serial_device()
ser.write(bytes_out)
def decode_read_response(self, response_bytes):
def _decode_read_response(self, response_bytes):
"""
Check that read response is formatted properly, and extract the encoded data if so.
"""
@ -221,21 +230,14 @@ class UARTInterface(Elaboratable):
return int(response_ascii[1:5], 16)
def define_signals(self):
self.rx = Signal()
self.tx = Signal()
self.bus_o = Signal(InternalBus())
self.bus_i = Signal(InternalBus())
def elaborate(self, platform):
# fancy submoduling and such goes in here
m = Module()
m.submodules.uart_rx = uart_rx = UARTReceiver(self.clocks_per_baud)
m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud)
m.submodules.bridge_rx = bridge_rx = ReceiveBridge()
m.submodules.bridge_tx = bridge_tx = TransmitBridge()
m.submodules.uart_tx = uart_tx = UARTTransmitter(self.clocks_per_baud)
m.submodules.uart_tx = uart_tx = UARTTransmitter(self._clocks_per_baud)
m.d.comb += [
# UART RX -> Internal Bus