add batch read/write UART for speedo mode

This commit is contained in:
Fischer Moseley 2023-04-16 15:36:47 -04:00
parent 9cc2357ea4
commit f6f9096895
2 changed files with 79 additions and 29 deletions

View File

@ -33,5 +33,8 @@ if __name__ == "__main__":
from manta import Manta
m = Manta('manta.yaml')
for addr, pixel in enumerate(pixels):
m.image_mem.write(addr, pixel)
# for addr, pixel in enumerate(pixels):
# m.image_mem.write(addr, pixel)
addrs = list(range(len(pixels)))
m.image_mem.interface.write_registers(addrs, pixels)

View File

@ -123,19 +123,25 @@ class UARTInterface:
assert config["baudrate"] > 0, "Baudrate must be positive."
self.baudrate = config["baudrate"]
# confirm core clock is sufficiently fast
# Confirm core clock is sufficiently fast
clocks_per_baud = self.clock_freq // self.baudrate
assert clocks_per_baud >= 2
self.clocks_per_baud = clocks_per_baud
# confirm we can match baudrate suffeciently well
# Confirm we can match baudrate suffeciently well
actual_baudrate = self.clock_freq / clocks_per_baud
baudrate_error = 100 * abs(actual_baudrate - self.baudrate) / self.baudrate
assert (
baudrate_error <= 5
), "Unable to match target baudrate - they differ by {baudrate_error}%"
# set verbosity
# Set chunk_size, which is the max amount of bytes that get dumped
# to the OS driver at a time
self.chunk_size = 256
if "chunk size" in config:
self.chunk_size = config["chunk size"]
# Set verbosity
self.verbose = False
if "verbose" in config:
self.verbose = config["verbose"]
@ -167,31 +173,30 @@ class UARTInterface:
assert rd[0].serial_number == rd[1].serial_number, "Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices."
return rd[0].device if rd[0].location > rd[1].location else rd[1].device
def decode_response(self, response):
"""Make sure reponse from FPGA has the correct format, and return data contained within if so."""
assert response is not None, "No reponse received."
response_str = response.decode('ascii')
assert response_str[0] == 'M', "Bad message recieved, incorrect preamble."
assert response_str[-1] == '\n', "Bad message received, incorrect EOL."
assert response_str[-2] == '\r', "Bad message received, incorrect EOL."
assert len(response_str) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}."
return int(response_str[1:5], 16)
def read_register(self, addr):
self.open_port_if_not_alredy_open()
# request from the bus
addr_str = '{:04X}'.format(addr)
request = f"M{addr_str}\r\n".encode('ascii')
request = f"M{addr:04X}\r\n".encode('ascii')
self.ser.write(request)
# read and parse the response
response = self.ser.read(7)
assert response is not None, "No reponse received."
response = response.decode('ascii')
assert response[0] == 'M', "Bad message recieved, incorrect preamble."
assert response[-1] == '\n', "Bad message received, incorrect EOL."
assert response[-2] == '\r', "Bad message received, incorrect EOL."
assert len(response) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}."
data = int(response[1:5], 16)
data_hex ='{:04X}'.format(data)
data = self.decode_response(self.ser.read(7))
if self.verbose:
print(f"read {data_hex} from {addr_str}")
print(f"read {data:04X} from {addr:04X}")
return data
@ -199,14 +204,57 @@ class UARTInterface:
self.open_port_if_not_alredy_open()
# request from the bus
addr_str = '{:04X}'.format(addr)
data_str = '{:04X}'.format(data)
request = f"M{addr_str}{data_str}\r\n"
request = f"M{addr:04X}{data:04X}\r\n".encode('ascii')
self.ser.write(request)
if self.verbose:
print(f"wrote {data_str} to {addr_str}")
print(f"wrote {data:04X} to {addr:04X}")
self.ser.write(request.encode('ascii'))
def read_registers(self, addrs):
assert isinstance(addrs, list), "Read addresses must be list of integers."
assert all(isinstance(addr, int) for addr in addrs), "Read addresses must be list of integers."
# send data in chunks because the reponses will fill up the OS's
# input buffer in no time flat
self.open_port_if_not_alredy_open()
inbound_bytes = b""
for i in range(0, len(addrs), self.chunk_size):
addr_chunk = addrs[i:i+self.chunk_size]
outbound_bytes = [f"M{addr:04X}\r\n".encode('ascii') for addr in addr_chunk]
outbound_bytes = b"".join(outbound_bytes)
self.ser.write(outbound_bytes)
inbound_bytes += self.ser.read(len(outbound_bytes))
data = []
for i in range(0, len(inbound_bytes), 7):
response = inbound_bytes[i:i+7]
data = self.decode_response(response)
return data
def write_registers(self, addrs, datas):
assert isinstance(addrs, list), "Write addresses must be list of integers."
assert isinstance(datas, list), "Write data must be list of integers."
assert all(isinstance(addr, int) for addr in addrs), "Write addresses must be list of integers."
assert all(isinstance(data, int) for data in datas), "Write data must be list of integers."
assert len(addrs) == len(datas), "Write addresses and write data must be of same length."
# send data in chunks because the responses will fill up the OS's
# input buffer in no time flat
self.open_port_if_not_alredy_open()
for i in range(0, len(addrs), self.chunk_size):
addr_chunk = addrs[i:i+self.chunk_size]
outbound_bytes = [f"M{addrs[i]:04X}{datas[i]:04X}\r\n" for i in range(len(addr_chunk))]
outbound_bytes = [ob.encode('ascii') for ob in outbound_bytes]
outbound_bytes = b"".join(outbound_bytes)
self.ser.write(outbound_bytes)
def hdl_top_level_ports(self):
# this should return the probes that we want to connect to top-level, but like as a string of verilog
@ -627,9 +675,8 @@ class LogicAnalyzerCore:
# Read out contents from memory
print(" -> Reading sample memory contents...")
block_mem_contents = []
for i in range(self.block_memory_base_addr, self.max_addr):
block_mem_contents.append(self.interface.read_register(i))
addrs = list(range(self.block_memory_base_addr, self.max_addr))
block_mem_contents = self.interface.read_registers(addrs)
# Revolve BRAM contents around so the data pointed to by the read_pointer
# is at the beginning