add underscores for private objects

This commit is contained in:
Fischer Moseley 2024-02-17 15:04:50 -08:00
parent 8aedb8e968
commit 68aeb1a4a8
8 changed files with 200 additions and 148 deletions

View File

@ -7,7 +7,8 @@ import socket
class EthernetInterface(Elaboratable): class EthernetInterface(Elaboratable):
"""A module for communicating with Manta over Ethernet, using UDP. """
A module for communicating with Manta over Ethernet, using UDP.
Provides methods for generating synthesizable logic for the FPGA, Provides methods for generating synthesizable logic for the FPGA,
as well as methods for reading and writing to memory by the host. as well as methods for reading and writing to memory by the host.
@ -48,14 +49,18 @@ class EthernetInterface(Elaboratable):
def _check_config(self): def _check_config(self):
# Make sure UDP port is an integer in the range 0-65535 # Make sure UDP port is an integer in the range 0-65535
if not isinstance(self._udp_port, int): if not isinstance(self._udp_port, int):
raise TypeError("UDP Port must be specified as an integer between 0 and 65535.") raise TypeError(
"UDP Port must be specified as an integer between 0 and 65535."
)
if not 0 <= self._udp_port <= 65535: if not 0 <= self._udp_port <= 65535:
raise ValueError("UDP Port must be between 0 and 65535.") raise ValueError("UDP Port must be between 0 and 65535.")
# Make sure Host IP address is four bytes separated by a period # Make sure Host IP address is four bytes separated by a period
if not isinstance(self._host_ip_addr, str): if not isinstance(self._host_ip_addr, str):
raise TypeError("Host IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'.") raise TypeError(
"Host IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'."
)
if len(self._host_ip_addr.split(".")) != 4: if len(self._host_ip_addr.split(".")) != 4:
raise ValueError("Host IP must be specified in the form 'xxx.xxx.xxx.xxx'.") raise ValueError("Host IP must be specified in the form 'xxx.xxx.xxx.xxx'.")
@ -66,7 +71,9 @@ class EthernetInterface(Elaboratable):
# Make sure FPGA IP is four bytes separated by a period # Make sure FPGA IP is four bytes separated by a period
if not isinstance(self._fpga_ip_addr, str): if not isinstance(self._fpga_ip_addr, str):
raise TypeError("FPGA IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'.") raise TypeError(
"FPGA IP must be specified as a string in the form 'xxx.xxx.xxx.xxx'."
)
if len(self._fpga_ip_addr.split(".")) != 4: if len(self._fpga_ip_addr.split(".")) != 4:
raise ValueError("FPGA IP must be specified in the form 'xxx.xxx.xxx.xxx'.") raise ValueError("FPGA IP must be specified in the form 'xxx.xxx.xxx.xxx'.")
@ -75,8 +82,11 @@ class EthernetInterface(Elaboratable):
if not 0 <= int(byte) <= 255: if not 0 <= int(byte) <= 255:
raise ValueError(f"Invalid byte in FPGA IP: {byte}") raise ValueError(f"Invalid byte in FPGA IP: {byte}")
def get_top_level_ports(self): def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the top-level
Manta module.
"""
ports = [ ports = [
self.rmii_clocks_ref_clk, self.rmii_clocks_ref_clk,
self.rmii_crs_dv, self.rmii_crs_dv,
@ -166,7 +176,7 @@ class EthernetInterface(Elaboratable):
# Make sure all list elements are integers # Make sure all list elements are integers
if not all(isinstance(a, int) for a in addrs): if not all(isinstance(a, int) for a in addrs):
raise ValueError("Read address must be an integer or list of integers.") raise TypeError("Read address must be an integer or list of integers.")
# Send read requests, and get responses # Send read requests, and get responses
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -205,15 +215,15 @@ class EthernetInterface(Elaboratable):
# Make sure address and datas are all integers # Make sure address and datas are all integers
if not isinstance(addrs, list) or not isinstance(datas, list): if not isinstance(addrs, list) or not isinstance(datas, list):
raise ValueError( raise TypeError(
"Write addresses and data must be an integer or list of integers." "Write addresses and data must be an integer or list of integers."
) )
if not all(isinstance(a, int) for a in addrs): if not all(isinstance(a, int) for a in addrs):
raise ValueError("Write addresses must be all be integers.") raise TypeError("Write addresses must be all be integers.")
if not all(isinstance(d, int) for d in datas): if not all(isinstance(d, int) for d in datas):
raise ValueError("Write data must all be integers.") raise TypeError("Write data must all be integers.")
# Since the FPGA doesn't issue any responses to write requests, we # Since the FPGA doesn't issue any responses to write requests, we
# the host's input buffer isn't written to, and we don't need to # the host's input buffer isn't written to, and we don't need to

View File

@ -3,11 +3,13 @@ from manta.utils import *
class UDPSinkBridge(Elaboratable): class UDPSinkBridge(Elaboratable):
"""A module for bridging Manta's internal bus to an AXI stream of UDP """
A module for bridging Manta's internal bus to an AXI stream of UDP
packet data. packet data.
Connects to the LiteEth core's "sink" port. Connects to the LiteEth core's "sink" port.
""" """
def __init__(self): def __init__(self):
self.bus_i = Signal(InternalBus()) self.bus_i = Signal(InternalBus())

View File

@ -3,11 +3,13 @@ from manta.utils import *
class UDPSourceBridge(Elaboratable): class UDPSourceBridge(Elaboratable):
"""A module for bridging the AXI-stream of incoming UDP packet data to """
A module for bridging the AXI-stream of incoming UDP packet data to
Manta's internal bus. Manta's internal bus.
Connects to the LiteEth core's "source" port. Connects to the LiteEth core's "source" port.
""" """
def __init__(self): def __init__(self):
self.bus_o = Signal(InternalBus()) self.bus_o = Signal(InternalBus())

View File

@ -8,6 +8,13 @@ from serial import Serial
class UARTInterface(Elaboratable): class UARTInterface(Elaboratable):
"""
A module for communicating with Manta over UART.
Provides methods for generating synthesizable logic for the FPGA,
as well as methods for reading and writing to memory by the host.
"""
def __init__(self, port, baudrate, clock_freq, chunk_size=256): def __init__(self, port, baudrate, clock_freq, chunk_size=256):
self._port = port self._port = port
self._baudrate = baudrate self._baudrate = baudrate
@ -73,7 +80,7 @@ class UARTInterface(Elaboratable):
def _get_serial_device(self): def _get_serial_device(self):
""" """
Return an open PySerial serial device if one exists, otherwise, open one. Return an open PySerial serial device if one exists, otherwise, open one and return it.
""" """
# Check if we've already opened a device # Check if we've already opened a device
@ -121,6 +128,10 @@ class UARTInterface(Elaboratable):
return self._serial_device return self._serial_device
def get_top_level_ports(self): def get_top_level_ports(self):
"""
Return the Amaranth signals that should be included as ports in the top-level
Manta module.
"""
return [self.rx, self.tx] return [self.rx, self.tx]
def read(self, addrs): def read(self, addrs):
@ -201,7 +212,7 @@ class UARTInterface(Elaboratable):
def _decode_read_response(self, response_bytes): def _decode_read_response(self, response_bytes):
""" """
Check that read response is formatted properly, and extract the encoded data if so. Check that read response is formatted properly, and return the encoded data if so.
""" """
# Make sure response is not empty # Make sure response is not empty
@ -232,7 +243,6 @@ class UARTInterface(Elaboratable):
return int(response_ascii[1:5], 16) return int(response_ascii[1:5], 16)
def elaborate(self, platform): def elaborate(self, platform):
# fancy submoduling and such goes in here
m = Module() m = Module()
m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud) m.submodules.uart_rx = uart_rx = UARTReceiver(self._clocks_per_baud)

View File

@ -1,71 +1,78 @@
from amaranth import * from amaranth import *
from amaranth.lib.enum import IntEnum
from amaranth.lib.data import ArrayLayout from amaranth.lib.data import ArrayLayout
class States(IntEnum):
IDLE = 0
READ = 1
WRITE = 2
class ReceiveBridge(Elaboratable): class ReceiveBridge(Elaboratable):
"""
A module for bridging the stream of bytes from the UARTReceiver
module to Manta's internal bus.
"""
def __init__(self): def __init__(self):
# Top-Level Ports # Top-Level Ports
self.data_i = Signal(8) self.data_i = Signal(8)
self.valid_i = Signal() self.valid_i = Signal()
self.addr_o = Signal(16, reset=0) self.addr_o = Signal(16)
self.data_o = Signal(16, reset=0) self.data_o = Signal(16)
self.rw_o = Signal(1, reset=0) self.rw_o = Signal(1)
self.valid_o = Signal(1, reset=0) self.valid_o = Signal(1)
# State Machine
self.IDLE_STATE = 0
self.READ_STATE = 1
self.WRITE_STATE = 2
# Internal Signals # Internal Signals
self.buffer = Signal(ArrayLayout(4, 8), reset_less=True) self._buffer = Signal(ArrayLayout(4, 8))
self.state = Signal(2, reset=self.IDLE_STATE) self._state = Signal(States)
self.byte_num = Signal(4, reset=0) self._byte_num = Signal(4)
self.is_eol = Signal() self._is_eol = Signal()
self.is_ascii_hex = Signal() self._is_ascii_hex = Signal()
self.from_ascii_hex = Signal(8) self._from_ascii_hex = Signal(8)
def drive_ascii_signals(self, m): def _drive_ascii_signals(self, m):
# Decode 0-9 # Decode 0-9
with m.If((self.data_i >= 0x30) & (self.data_i <= 0x39)): with m.If((self.data_i >= 0x30) & (self.data_i <= 0x39)):
m.d.comb += self.is_ascii_hex.eq(1) m.d.comb += self._is_ascii_hex.eq(1)
m.d.comb += self.from_ascii_hex.eq(self.data_i - 0x30) m.d.comb += self._from_ascii_hex.eq(self.data_i - 0x30)
# Decode A-F # Decode A-F
with m.Elif((self.data_i >= 0x41) & (self.data_i <= 0x46)): with m.Elif((self.data_i >= 0x41) & (self.data_i <= 0x46)):
m.d.comb += self.is_ascii_hex.eq(1) m.d.comb += self._is_ascii_hex.eq(1)
m.d.comb += self.from_ascii_hex.eq(self.data_i - 0x41 + 10) m.d.comb += self._from_ascii_hex.eq(self.data_i - 0x41 + 10)
with m.Else(): with m.Else():
m.d.comb += self.is_ascii_hex.eq(0) m.d.comb += self._is_ascii_hex.eq(0)
m.d.comb += self.from_ascii_hex.eq(0) m.d.comb += self._from_ascii_hex.eq(0)
with m.If((self.data_i == ord("\r")) | (self.data_i == ord("\n"))): with m.If((self.data_i == ord("\r")) | (self.data_i == ord("\n"))):
m.d.comb += self.is_eol.eq(1) m.d.comb += self._is_eol.eq(1)
with m.Else(): with m.Else():
m.d.comb += self.is_eol.eq(0) m.d.comb += self._is_eol.eq(0)
def drive_output_bus(self, m): def _drive_output_bus(self, m):
with m.If( with m.If(
(self.state == self.READ_STATE) & (self.byte_num == 4) & (self.is_eol) (self._state == States.READ) & (self._byte_num == 4) & (self._is_eol)
): ):
m.d.comb += self.addr_o.eq( m.d.comb += self.addr_o.eq(
Cat(self.buffer[3], self.buffer[2], self.buffer[1], self.buffer[0]) Cat(self._buffer[3], self._buffer[2], self._buffer[1], self._buffer[0])
) )
m.d.comb += self.data_o.eq(0) m.d.comb += self.data_o.eq(0)
m.d.comb += self.valid_o.eq(1) m.d.comb += self.valid_o.eq(1)
m.d.comb += self.rw_o.eq(0) m.d.comb += self.rw_o.eq(0)
with m.Elif( with m.Elif(
(self.state == self.WRITE_STATE) & (self.byte_num == 8) & (self.is_eol) (self._state == States.WRITE) & (self._byte_num == 8) & (self._is_eol)
): ):
m.d.comb += self.addr_o.eq( m.d.comb += self.addr_o.eq(
Cat(self.buffer[3], self.buffer[2], self.buffer[1], self.buffer[0]) Cat(self._buffer[3], self._buffer[2], self._buffer[1], self._buffer[0])
) )
m.d.comb += self.data_o.eq( m.d.comb += self.data_o.eq(
Cat(self.buffer[7], self.buffer[6], self.buffer[5], self.buffer[4]) Cat(self._buffer[7], self._buffer[6], self._buffer[5], self._buffer[4])
) )
m.d.comb += self.valid_o.eq(1) m.d.comb += self.valid_o.eq(1)
m.d.comb += self.rw_o.eq(1) m.d.comb += self.rw_o.eq(1)
@ -76,53 +83,57 @@ class ReceiveBridge(Elaboratable):
m.d.comb += self.rw_o.eq(0) m.d.comb += self.rw_o.eq(0)
m.d.comb += self.valid_o.eq(0) m.d.comb += self.valid_o.eq(0)
def drive_fsm(self, m): def _drive_fsm(self, m):
with m.If(self.valid_i): with m.If(self.valid_i):
with m.If(self.state == self.IDLE_STATE): with m.If(self._state == States.IDLE):
m.d.sync += self.byte_num.eq(0) m.d.sync += self._byte_num.eq(0)
with m.If(self.data_i == ord("R")): with m.If(self.data_i == ord("R")):
m.d.sync += self.state.eq(self.READ_STATE) m.d.sync += self._state.eq(States.READ)
with m.Elif(self.data_i == ord("W")): with m.Elif(self.data_i == ord("W")):
m.d.sync += self.state.eq(self.WRITE_STATE) m.d.sync += self._state.eq(States.WRITE)
with m.If(self.state == self.READ_STATE): with m.If(self._state == States.READ):
# buffer bytes if we don't have enough # buffer bytes if we don't have enough
with m.If(self.byte_num < 4): with m.If(self._byte_num < 4):
# if bytes aren't valid ASCII then return to IDLE state # if bytes aren't valid ASCII then return to IDLE state
with m.If(self.is_ascii_hex == 0): with m.If(self._is_ascii_hex == 0):
m.d.sync += self.state.eq(self.IDLE_STATE) m.d.sync += self._state.eq(States.IDLE)
# otherwise buffer them # otherwise buffer them
with m.Else(): with m.Else():
m.d.sync += self.buffer[self.byte_num].eq(self.from_ascii_hex) m.d.sync += self._buffer[self._byte_num].eq(
m.d.sync += self.byte_num.eq(self.byte_num + 1) self._from_ascii_hex
)
m.d.sync += self._byte_num.eq(self._byte_num + 1)
with m.Else(): with m.Else():
m.d.sync += self.state.eq(self.IDLE_STATE) m.d.sync += self._state.eq(States.IDLE)
with m.If(self.state == self.WRITE_STATE): with m.If(self._state == States.WRITE):
# buffer bytes if we don't have enough # buffer bytes if we don't have enough
with m.If(self.byte_num < 8): with m.If(self._byte_num < 8):
# if bytes aren't valid ASCII then return to IDLE state # if bytes aren't valid ASCII then return to IDLE state
with m.If(self.is_ascii_hex == 0): with m.If(self._is_ascii_hex == 0):
m.d.sync += self.state.eq(self.IDLE_STATE) m.d.sync += self._state.eq(States.IDLE)
# otherwise buffer them # otherwise buffer them
with m.Else(): with m.Else():
m.d.sync += self.buffer[self.byte_num].eq(self.from_ascii_hex) m.d.sync += self._buffer[self._byte_num].eq(
m.d.sync += self.byte_num.eq(self.byte_num + 1) self._from_ascii_hex
)
m.d.sync += self._byte_num.eq(self._byte_num + 1)
with m.Else(): with m.Else():
m.d.sync += self.state.eq(self.IDLE_STATE) m.d.sync += self._state.eq(States.IDLE)
pass pass
def elaborate(self, platform): def elaborate(self, platform):
m = Module() m = Module()
self.drive_ascii_signals(m) self._drive_ascii_signals(m)
self.drive_output_bus(m) self._drive_output_bus(m)
self.drive_fsm(m) self._drive_fsm(m)
return m return m

View File

@ -2,58 +2,63 @@ from amaranth import *
class UARTReceiver(Elaboratable): class UARTReceiver(Elaboratable):
"""
A module for receiving bytes on a 8N1 UART at a configurable
baudrate. Outputs bytes as a stream.
"""
def __init__(self, clocks_per_baud): def __init__(self, clocks_per_baud):
self.clocks_per_baud = clocks_per_baud self._clocks_per_baud = clocks_per_baud
# Top-Level Ports # Top-Level Ports
self.rx = Signal() self.rx = Signal()
self.data_o = Signal(8, reset=0) self.data_o = Signal(8)
self.valid_o = Signal(1, reset=0) self.valid_o = Signal(1)
# Internal Signals # Internal Signals
self.busy = Signal() self._busy = Signal()
self.bit_index = Signal(range(10)) self._bit_index = Signal(range(10))
self.baud_counter = Signal(range(2 * clocks_per_baud)) self._baud_counter = Signal(range(2 * clocks_per_baud))
self.rx_d = Signal() self._rx_d = Signal()
self.rx_q = Signal() self._rx_q = Signal()
self.rx_q_prev = Signal() self._rx_q_prev = Signal()
def elaborate(self, platform): def elaborate(self, platform):
m = Module() m = Module()
# Two Flip-Flop Synchronizer # Two Flip-Flop Synchronizer
m.d.sync += [ m.d.sync += [
self.rx_d.eq(self.rx), self._rx_d.eq(self.rx),
self.rx_q.eq(self.rx_d), self._rx_q.eq(self._rx_d),
self.rx_q_prev.eq(self.rx_q), self._rx_q_prev.eq(self._rx_q),
] ]
m.d.sync += self.valid_o.eq(0) m.d.sync += self.valid_o.eq(0)
with m.If(~self.busy): with m.If(~self._busy):
with m.If((~self.rx_q) & (self.rx_q_prev)): with m.If((~self._rx_q) & (self._rx_q_prev)):
m.d.sync += self.busy.eq(1) m.d.sync += self._busy.eq(1)
m.d.sync += self.bit_index.eq(8) m.d.sync += self._bit_index.eq(8)
m.d.sync += self.baud_counter.eq( m.d.sync += self._baud_counter.eq(
self.clocks_per_baud + (self.clocks_per_baud // 2) - 2 self._clocks_per_baud + (self._clocks_per_baud // 2) - 2
) )
with m.Else(): with m.Else():
with m.If(self.baud_counter == 0): with m.If(self._baud_counter == 0):
with m.If(self.bit_index == 0): with m.If(self._bit_index == 0):
m.d.sync += self.valid_o.eq(1) m.d.sync += self.valid_o.eq(1)
m.d.sync += self.busy.eq(0) m.d.sync += self._busy.eq(0)
m.d.sync += self.bit_index.eq(0) m.d.sync += self._bit_index.eq(0)
m.d.sync += self.baud_counter.eq(0) m.d.sync += self._baud_counter.eq(0)
with m.Else(): with m.Else():
# m.d.sync += self.data_o.eq(Cat(self.rx_q, self.data_o[0:7])) # m.d.sync += self.data_o.eq(Cat(self._rx_q, self.data_o[0:7]))
m.d.sync += self.data_o.eq(Cat(self.data_o[1:8], self.rx_q)) m.d.sync += self.data_o.eq(Cat(self.data_o[1:8], self._rx_q))
m.d.sync += self.bit_index.eq(self.bit_index - 1) m.d.sync += self._bit_index.eq(self._bit_index - 1)
m.d.sync += self.baud_counter.eq(self.clocks_per_baud - 1) m.d.sync += self._baud_counter.eq(self._clocks_per_baud - 1)
with m.Else(): with m.Else():
m.d.sync += self.baud_counter.eq(self.baud_counter - 1) m.d.sync += self._baud_counter.eq(self._baud_counter - 1)
return m return m

View File

@ -2,86 +2,91 @@ from amaranth import *
class TransmitBridge(Elaboratable): class TransmitBridge(Elaboratable):
"""
A module for bridging Manta's internal bus to the stream of bytes
expected by the UARTTransmitter module.
"""
def __init__(self): def __init__(self):
# Top-Level Ports # Top-Level Ports
self.data_i = Signal(16) self.data_i = Signal(16)
self.rw_i = Signal() self.rw_i = Signal()
self.valid_i = Signal() self.valid_i = Signal()
self.data_o = Signal(8, reset=0) self.data_o = Signal(8)
self.start_o = Signal(1) self.start_o = Signal(1)
self.done_i = Signal() self.done_i = Signal()
# Internal Signals # Internal Signals
self.buffer = Signal(16, reset=0) self._buffer = Signal(16)
self.count = Signal(4, reset=0) self._count = Signal(4)
self.busy = Signal(1, reset=0) self._busy = Signal(1)
self.to_ascii_hex = Signal(8) self._to_ascii_hex = Signal(8)
self.n = Signal(4) self._n = Signal(4)
def elaborate(self, platform): def elaborate(self, platform):
m = Module() m = Module()
m.d.comb += self.start_o.eq(self.busy) m.d.comb += self.start_o.eq(self._busy)
with m.If(~self.busy): with m.If(~self._busy):
with m.If((self.valid_i) & (~self.rw_i)): with m.If((self.valid_i) & (~self.rw_i)):
m.d.sync += self.busy.eq(1) m.d.sync += self._busy.eq(1)
m.d.sync += self.buffer.eq(self.data_i) m.d.sync += self._buffer.eq(self.data_i)
with m.Else(): with m.Else():
# uart_tx is transmitting a byte: # uart_tx is transmitting a byte:
with m.If(self.done_i): with m.If(self.done_i):
m.d.sync += self.count.eq(self.count + 1) m.d.sync += self._count.eq(self._count + 1)
# Message has been transmitted # Message has been transmitted
with m.If(self.count > 5): with m.If(self._count > 5):
m.d.sync += self.count.eq(0) m.d.sync += self._count.eq(0)
# Go back to idle, or transmit next message # Go back to idle, or transmit next message
with m.If((self.valid_i) & (~self.rw_i)): with m.If((self.valid_i) & (~self.rw_i)):
m.d.sync += self.buffer.eq(self.data_i) m.d.sync += self._buffer.eq(self.data_i)
with m.Else(): with m.Else():
m.d.sync += self.busy.eq(0) m.d.sync += self._busy.eq(0)
# define to_ascii_hex # define to_ascii_hex
with m.If(self.n < 10): with m.If(self._n < 10):
m.d.comb += self.to_ascii_hex.eq(self.n + 0x30) m.d.comb += self._to_ascii_hex.eq(self._n + 0x30)
with m.Else(): with m.Else():
m.d.comb += self.to_ascii_hex.eq(self.n + 0x41 - 10) m.d.comb += self._to_ascii_hex.eq(self._n + 0x41 - 10)
# run the sequence # run the sequence
with m.If(self.count == 0): with m.If(self._count == 0):
m.d.comb += self.n.eq(0) m.d.comb += self._n.eq(0)
m.d.comb += self.data_o.eq(ord("D")) m.d.comb += self.data_o.eq(ord("D"))
with m.Elif(self.count == 1): with m.Elif(self._count == 1):
m.d.comb += self.n.eq(self.buffer[12:16]) m.d.comb += self._n.eq(self._buffer[12:16])
m.d.comb += self.data_o.eq(self.to_ascii_hex) m.d.comb += self.data_o.eq(self._to_ascii_hex)
with m.Elif(self.count == 2): with m.Elif(self._count == 2):
m.d.comb += self.n.eq(self.buffer[8:12]) m.d.comb += self._n.eq(self._buffer[8:12])
m.d.comb += self.data_o.eq(self.to_ascii_hex) m.d.comb += self.data_o.eq(self._to_ascii_hex)
with m.Elif(self.count == 3): with m.Elif(self._count == 3):
m.d.comb += self.n.eq(self.buffer[4:8]) m.d.comb += self._n.eq(self._buffer[4:8])
m.d.comb += self.data_o.eq(self.to_ascii_hex) m.d.comb += self.data_o.eq(self._to_ascii_hex)
with m.Elif(self.count == 4): with m.Elif(self._count == 4):
m.d.comb += self.n.eq(self.buffer[0:4]) m.d.comb += self._n.eq(self._buffer[0:4])
m.d.comb += self.data_o.eq(self.to_ascii_hex) m.d.comb += self.data_o.eq(self._to_ascii_hex)
with m.Elif(self.count == 5): with m.Elif(self._count == 5):
m.d.comb += self.n.eq(0) m.d.comb += self._n.eq(0)
m.d.comb += self.data_o.eq(ord("\r")) m.d.comb += self.data_o.eq(ord("\r"))
with m.Elif(self.count == 6): with m.Elif(self._count == 6):
m.d.comb += self.n.eq(0) m.d.comb += self._n.eq(0)
m.d.comb += self.data_o.eq(ord("\n")) m.d.comb += self.data_o.eq(ord("\n"))
with m.Else(): with m.Else():
m.d.comb += self.n.eq(0) m.d.comb += self._n.eq(0)
m.d.comb += self.data_o.eq(0) m.d.comb += self.data_o.eq(0)
return m return m

View File

@ -2,8 +2,13 @@ from amaranth import *
class UARTTransmitter(Elaboratable): class UARTTransmitter(Elaboratable):
"""
A module for transmitting bytes on a 8N1 UART at a configurable
baudrate. Accepts bytes as a stream.
"""
def __init__(self, clocks_per_baud): def __init__(self, clocks_per_baud):
self.clocks_per_baud = clocks_per_baud self._clocks_per_baud = clocks_per_baud
# Top-Level Ports # Top-Level Ports
self.data_i = Signal(8) self.data_i = Signal(8)
@ -13,38 +18,40 @@ class UARTTransmitter(Elaboratable):
self.tx = Signal(reset=1) self.tx = Signal(reset=1)
# Internal Signals # Internal Signals
self.baud_counter = Signal(range(clocks_per_baud)) self._baud_counter = Signal(range(self._clocks_per_baud))
self.buffer = Signal(9) self._buffer = Signal(9)
self.bit_index = Signal(4) self._bit_index = Signal(4)
def elaborate(self, platform): def elaborate(self, platform):
m = Module() m = Module()
with m.If((self.start_i) & (self.done_o)): with m.If((self.start_i) & (self.done_o)):
m.d.sync += self.baud_counter.eq(self.clocks_per_baud - 1) m.d.sync += self._baud_counter.eq(self._clocks_per_baud - 1)
m.d.sync += self.buffer.eq(Cat(self.data_i, 1)) m.d.sync += self._buffer.eq(Cat(self.data_i, 1))
m.d.sync += self.bit_index.eq(0) m.d.sync += self._bit_index.eq(0)
m.d.sync += self.done_o.eq(0) m.d.sync += self.done_o.eq(0)
m.d.sync += self.tx.eq(0) m.d.sync += self.tx.eq(0)
with m.Elif(~self.done_o): with m.Elif(~self.done_o):
m.d.sync += self.baud_counter.eq(self.baud_counter - 1) m.d.sync += self._baud_counter.eq(self._baud_counter - 1)
m.d.sync += self.done_o.eq((self.baud_counter == 1) & (self.bit_index == 9)) m.d.sync += self.done_o.eq(
(self._baud_counter == 1) & (self._bit_index == 9)
)
# A baud period has elapsed # A baud period has elapsed
with m.If(self.baud_counter == 0): with m.If(self._baud_counter == 0):
m.d.sync += self.baud_counter.eq(self.clocks_per_baud - 1) m.d.sync += self._baud_counter.eq(self._clocks_per_baud - 1)
# Clock out another bit if there are any left # Clock out another bit if there are any left
with m.If(self.bit_index < 9): with m.If(self._bit_index < 9):
m.d.sync += self.tx.eq(self.buffer.bit_select(self.bit_index, 1)) m.d.sync += self.tx.eq(self._buffer.bit_select(self._bit_index, 1))
m.d.sync += self.bit_index.eq(self.bit_index + 1) m.d.sync += self._bit_index.eq(self._bit_index + 1)
# Byte has been sent, send out next one or go to idle # Byte has been sent, send out next one or go to idle
with m.Else(): with m.Else():
with m.If(self.start_i): with m.If(self.start_i):
m.d.sync += self.buffer.eq(Cat(self.data_i, 1)) m.d.sync += self._buffer.eq(Cat(self.data_i, 1))
m.d.sync += self.bit_index.eq(0) m.d.sync += self._bit_index.eq(0)
m.d.sync += self.tx.eq(0) m.d.sync += self.tx.eq(0)
with m.Else(): with m.Else():