From a1fddf555e7eae265bc9e2339b2a40fbd2ef224e Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Mon, 12 Feb 2024 08:50:43 -0800 Subject: [PATCH] move init'ing UARTInterface from config to classmethod --- src/manta/manta.py | 2 +- src/manta/uart/__init__.py | 162 +++++++++++++++++++------------------ 2 files changed, 83 insertions(+), 81 deletions(-) diff --git a/src/manta/manta.py b/src/manta/manta.py index 35744a5..50a83e7 100644 --- a/src/manta/manta.py +++ b/src/manta/manta.py @@ -63,7 +63,7 @@ class Manta(Elaboratable): def get_interface(self): if "uart" in self.config: - return UARTInterface(self.config["uart"]) + return UARTInterface.from_config(self.config["uart"]) elif "ethernet" in self.config: return EthernetInterface(self.config["ethernet"]) diff --git a/src/manta/uart/__init__.py b/src/manta/uart/__init__.py index a735897..f974209 100644 --- a/src/manta/uart/__init__.py +++ b/src/manta/uart/__init__.py @@ -8,26 +8,31 @@ from serial import Serial class UARTInterface(Elaboratable): - def __init__(self, config): - self.config = config - self.check_config(self.config) - - self.port = config["port"] - self.clock_freq = config["clock_freq"] - self.baudrate = config["baudrate"] - self.clocks_per_baud = int(self.clock_freq // self.baudrate) - - self.define_signals() + def __init__(self, port, baudrate, clock_freq, chunk_size=256): + self._port = port + self._baudrate = baudrate + self._clock_freq = clock_freq + self._chunk_size = chunk_size + self._clocks_per_baud = int(self._clock_freq // self._baudrate) + self._check_config() # Set chunk_size, which is the max amount of bytes that the core will # dump to the OS driver at a time. Since the FPGA will return bytes # almost instantaneously, this prevents the OS's input buffer from # overflowing, and dropping bytes. - self.chunk_size = 256 # in bytes - if "chunk_size" in config: - self.chunk_size = config["chunk_size"] - def check_config(self, config): + self.rx = Signal() + self.tx = Signal() + + self.bus_o = Signal(InternalBus()) + self.bus_i = Signal(InternalBus()) + + @classmethod + def from_config(cls, config): + port = config.get("port") + clock_freq = config.get("clock_freq") + baudrate = config.get("baudrate") + # Warn if unrecognized options have been given recognized_options = ["port", "clock_freq", "baudrate", "chunk_size"] for option in config: @@ -36,84 +41,88 @@ class UARTInterface(Elaboratable): f"Ignoring unrecognized option '{option}' in UART interface config." ) + if "chunk_size" in config: + return cls(port, baudrate, clock_freq, config["chunk_size"]) + + else: + return cls(port, baudrate, clock_freq) + + def _check_config(self): # Ensure a serial port has been given - if "port" not in config: + if self._port is None: raise ValueError("No serial port provided to UART interface.") # Ensure clock frequency is provided and positive - if "clock_freq" not in config: + if self._clock_freq is None: raise ValueError("No clock frequency provided to UART interface.") - if config["clock_freq"] <= 0: + if self._clock_freq <= 0: raise ValueError("Non-positive clock frequency provided to UART interface.") # Check that baudrate is provided and positive - if "baudrate" not in config: + if self._baudrate is None: raise ValueError("No baudrate provided to UART interface.") - if config["baudrate"] <= 0: + if self._baudrate <= 0: raise ValueError("Non-positive baudrate provided to UART interface.") # Confirm the actual baudrate is within 5% of the target baudrate - clock_freq = config["clock_freq"] - baudrate = config["baudrate"] - clocks_per_baud = clock_freq // baudrate - actual_baudrate = clock_freq / clocks_per_baud - error = 100 * abs(actual_baudrate - baudrate) / baudrate + actual_baudrate = self._clock_freq / self._clocks_per_baud + error = 100 * abs(actual_baudrate - self._baudrate) / self._baudrate if error > 5: raise ValueError( "UART interface is unable to match targeted baudrate with specified clock frequency." ) - def get_serial_device(self): + def _get_serial_device(self): """ Return an open PySerial serial device if one exists, otherwise, open one. """ - if hasattr(self, "serial_device"): - return self.serial_device + + # Check if we've already opened a device + if hasattr(self, "_serial_device"): + return self._serial_device + + if self._port != "auto": + self._serial_device = Serial(self._port, self._baudrate, timeout=1) + return self._serial_device + + # Try to autodetect which port to use based on the PID/VID of the device attached. + # This looks for the PID/VID of the FT2232, the primary chip used on the icestick + # and Digilent dev boards. However, folks will likely want to connect other things + # in the future, so in the future we'll probably want to look for other chips as + # well. + + # The FT2232 exposes two serial ports - and for whatever reason it usually has the + # 0th device used for JTAG programming, and the 1st used for UART. So we'll grab + # the 1st. + + import serial.tools.list_ports + + ports = [] + for port in serial.tools.list_ports.comports(): + if (port.vid == 0x403) and (port.pid == 0x6010): + ports.append(port) + + if len(ports) != 2: + raise ValueError( + f"Expected to see two serial ports for FT2232 device, but instead see {len(ports)}." + ) + + if ports[0].serial_number != ports[1].serial_number: + raise ValueError( + f"Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices." + ) + + if ports[0].location > ports[1].location: + chosen_port = ports[0].device else: - if self.port != "auto": - self.serial_device = Serial(self.port, self.baudrate, timeout=1) - return self.serial_device + chosen_port = ports[1].device - else: - # Try to autodetect which port to use based on the PID/VID of the device attached. - # This looks for the PID/VID of the FT2232, the primary chip used on the icestick - # and Digilent dev boards. However, folks will likely want to connect other things - # in the future, so in the future we'll probably want to look for other chips as - # well. - - # The FT2232 exposes two serial ports - and for whatever reason it usually has the - # 0th device used for JTAG programming, and the 1st used for UART. So we'll grab - # the 1st. - - import serial.tools.list_ports - - ports = [] - for port in serial.tools.list_ports.comports(): - if (port.vid == 0x403) and (port.pid == 0x6010): - ports.append(port) - - if len(ports) != 2: - raise ValueError( - f"Expected to see two serial ports for FT2232 device, but instead see {len(ports)}." - ) - - if ports[0].serial_number != ports[1].serial_number: - raise ValueError( - f"Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices." - ) - - if ports[0].location > ports[1].location: - chosen_port = ports[0].device - - else: - chosen_port = ports[1].device - - self.serial_device = Serial(chosen_port, self.baudrate, timeout=1) - return self.serial_device + self._serial_device = Serial(chosen_port, self._baudrate, timeout=1) + return self._serial_device def get_top_level_ports(self): return [self.rx, self.tx] @@ -133,8 +142,8 @@ class UARTInterface(Elaboratable): raise ValueError("Read address must be an integer or list of integers.") # Send read requests, and get responses - ser = self.get_serial_device() - addr_chunks = split_into_chunks(addrs, self.chunk_size) + ser = self._get_serial_device() + addr_chunks = split_into_chunks(addrs, self._chunk_size) datas = [] for addr_chunk in addr_chunks: @@ -152,7 +161,7 @@ class UARTInterface(Elaboratable): # Split received bytes into individual responses and decode responses = split_into_chunks(bytes_in, 7) - data_chunk = [self.decode_read_response(r) for r in responses] + data_chunk = [self._decode_read_response(r) for r in responses] datas += data_chunk return datas @@ -186,10 +195,10 @@ class UARTInterface(Elaboratable): # Encode addrs and datas into write requests bytes_out = "".join([f"W{a:04X}{d:04X}\r\n" for a, d in zip(addrs, datas)]) bytes_out = bytes_out.encode("ascii") - ser = self.get_serial_device() + ser = self._get_serial_device() ser.write(bytes_out) - def decode_read_response(self, response_bytes): + def _decode_read_response(self, response_bytes): """ Check that read response is formatted properly, and extract the encoded data if so. """ @@ -221,21 +230,14 @@ class UARTInterface(Elaboratable): return int(response_ascii[1:5], 16) - def define_signals(self): - self.rx = Signal() - self.tx = Signal() - - self.bus_o = Signal(InternalBus()) - self.bus_i = Signal(InternalBus()) - def elaborate(self, platform): # fancy submoduling and such goes in here m = Module() - m.submodules.uart_rx = uart_rx = UARTReceiver(self.clocks_per_baud) + m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud) m.submodules.bridge_rx = bridge_rx = ReceiveBridge() m.submodules.bridge_tx = bridge_tx = TransmitBridge() - m.submodules.uart_tx = uart_tx = UARTTransmitter(self.clocks_per_baud) + m.submodules.uart_tx = uart_tx = UARTTransmitter(self._clocks_per_baud) m.d.comb += [ # UART RX -> Internal Bus