From f6f9096895a0144de9e414536b49fef25aad574e Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Sun, 16 Apr 2023 15:36:47 -0400 Subject: [PATCH] add batch read/write UART for speedo mode --- examples/nexys_a7/video_sprite/send_image.py | 7 +- src/manta/__init__.py | 101 ++++++++++++++----- 2 files changed, 79 insertions(+), 29 deletions(-) diff --git a/examples/nexys_a7/video_sprite/send_image.py b/examples/nexys_a7/video_sprite/send_image.py index f0b8c42..f259d2a 100644 --- a/examples/nexys_a7/video_sprite/send_image.py +++ b/examples/nexys_a7/video_sprite/send_image.py @@ -33,5 +33,8 @@ if __name__ == "__main__": from manta import Manta m = Manta('manta.yaml') - for addr, pixel in enumerate(pixels): - m.image_mem.write(addr, pixel) \ No newline at end of file + # for addr, pixel in enumerate(pixels): + # m.image_mem.write(addr, pixel) + + addrs = list(range(len(pixels))) + m.image_mem.interface.write_registers(addrs, pixels) \ No newline at end of file diff --git a/src/manta/__init__.py b/src/manta/__init__.py index 583ce7e..b98ef6d 100644 --- a/src/manta/__init__.py +++ b/src/manta/__init__.py @@ -123,19 +123,25 @@ class UARTInterface: assert config["baudrate"] > 0, "Baudrate must be positive." self.baudrate = config["baudrate"] - # confirm core clock is sufficiently fast + # Confirm core clock is sufficiently fast clocks_per_baud = self.clock_freq // self.baudrate assert clocks_per_baud >= 2 self.clocks_per_baud = clocks_per_baud - # confirm we can match baudrate suffeciently well + # Confirm we can match baudrate suffeciently well actual_baudrate = self.clock_freq / clocks_per_baud baudrate_error = 100 * abs(actual_baudrate - self.baudrate) / self.baudrate assert ( baudrate_error <= 5 ), "Unable to match target baudrate - they differ by {baudrate_error}%" - # set verbosity + # Set chunk_size, which is the max amount of bytes that get dumped + # to the OS driver at a time + self.chunk_size = 256 + if "chunk size" in config: + self.chunk_size = config["chunk size"] + + # Set verbosity self.verbose = False if "verbose" in config: self.verbose = config["verbose"] @@ -167,31 +173,30 @@ class UARTInterface: assert rd[0].serial_number == rd[1].serial_number, "Serial numbers should be the same on both FT2232 ports - probably somehow grabbed ports on two different devices." return rd[0].device if rd[0].location > rd[1].location else rd[1].device + def decode_response(self, response): + """Make sure reponse from FPGA has the correct format, and return data contained within if so.""" + assert response is not None, "No reponse received." + + response_str = response.decode('ascii') + assert response_str[0] == 'M', "Bad message recieved, incorrect preamble." + assert response_str[-1] == '\n', "Bad message received, incorrect EOL." + assert response_str[-2] == '\r', "Bad message received, incorrect EOL." + assert len(response_str) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}." + + return int(response_str[1:5], 16) + def read_register(self, addr): self.open_port_if_not_alredy_open() # request from the bus - addr_str = '{:04X}'.format(addr) - request = f"M{addr_str}\r\n".encode('ascii') - + request = f"M{addr:04X}\r\n".encode('ascii') self.ser.write(request) # read and parse the response - response = self.ser.read(7) - - assert response is not None, "No reponse received." - response = response.decode('ascii') - assert response[0] == 'M', "Bad message recieved, incorrect preamble." - assert response[-1] == '\n', "Bad message received, incorrect EOL." - assert response[-2] == '\r', "Bad message received, incorrect EOL." - assert len(response) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}." - - data = int(response[1:5], 16) - data_hex ='{:04X}'.format(data) - + data = self.decode_response(self.ser.read(7)) if self.verbose: - print(f"read {data_hex} from {addr_str}") + print(f"read {data:04X} from {addr:04X}") return data @@ -199,14 +204,57 @@ class UARTInterface: self.open_port_if_not_alredy_open() # request from the bus - addr_str = '{:04X}'.format(addr) - data_str = '{:04X}'.format(data) - request = f"M{addr_str}{data_str}\r\n" + request = f"M{addr:04X}{data:04X}\r\n".encode('ascii') + self.ser.write(request) if self.verbose: - print(f"wrote {data_str} to {addr_str}") + print(f"wrote {data:04X} to {addr:04X}") - self.ser.write(request.encode('ascii')) + def read_registers(self, addrs): + assert isinstance(addrs, list), "Read addresses must be list of integers." + assert all(isinstance(addr, int) for addr in addrs), "Read addresses must be list of integers." + + # send data in chunks because the reponses will fill up the OS's + # input buffer in no time flat + self.open_port_if_not_alredy_open() + + inbound_bytes = b"" + for i in range(0, len(addrs), self.chunk_size): + addr_chunk = addrs[i:i+self.chunk_size] + + outbound_bytes = [f"M{addr:04X}\r\n".encode('ascii') for addr in addr_chunk] + outbound_bytes = b"".join(outbound_bytes) + + self.ser.write(outbound_bytes) + + inbound_bytes += self.ser.read(len(outbound_bytes)) + + data = [] + for i in range(0, len(inbound_bytes), 7): + response = inbound_bytes[i:i+7] + data = self.decode_response(response) + + return data + + def write_registers(self, addrs, datas): + assert isinstance(addrs, list), "Write addresses must be list of integers." + assert isinstance(datas, list), "Write data must be list of integers." + assert all(isinstance(addr, int) for addr in addrs), "Write addresses must be list of integers." + assert all(isinstance(data, int) for data in datas), "Write data must be list of integers." + assert len(addrs) == len(datas), "Write addresses and write data must be of same length." + + # send data in chunks because the responses will fill up the OS's + # input buffer in no time flat + self.open_port_if_not_alredy_open() + + for i in range(0, len(addrs), self.chunk_size): + addr_chunk = addrs[i:i+self.chunk_size] + + outbound_bytes = [f"M{addrs[i]:04X}{datas[i]:04X}\r\n" for i in range(len(addr_chunk))] + outbound_bytes = [ob.encode('ascii') for ob in outbound_bytes] + outbound_bytes = b"".join(outbound_bytes) + + self.ser.write(outbound_bytes) def hdl_top_level_ports(self): # this should return the probes that we want to connect to top-level, but like as a string of verilog @@ -627,9 +675,8 @@ class LogicAnalyzerCore: # Read out contents from memory print(" -> Reading sample memory contents...") - block_mem_contents = [] - for i in range(self.block_memory_base_addr, self.max_addr): - block_mem_contents.append(self.interface.read_register(i)) + addrs = list(range(self.block_memory_base_addr, self.max_addr)) + block_mem_contents = self.interface.read_registers(addrs) # Revolve BRAM contents around so the data pointed to by the read_pointer # is at the beginning