add lut ram operations to Python API
This commit is contained in:
parent
c85cc4d357
commit
f7077f96d8
|
|
@ -13,4 +13,5 @@
|
|||
|
||||
# Python Packaging output
|
||||
dist/
|
||||
*.egg-info
|
||||
*.egg-info
|
||||
__pycache__/
|
||||
|
|
@ -1,46 +0,0 @@
|
|||
import serial
|
||||
from time import sleep
|
||||
import random
|
||||
|
||||
usb_device = "/dev/tty.usbserial-2102926963071"
|
||||
|
||||
def write_block(data, base_addr = 0):
|
||||
msg = b''
|
||||
|
||||
for addr, data in enumerate(data):
|
||||
addr_str = '{:04X}'.format(base_addr + addr)
|
||||
data_str = '{:04X}'.format(data)
|
||||
msg += f"M{addr_str}{data_str}\r\n".encode('ascii')
|
||||
|
||||
ser.write(msg)
|
||||
|
||||
|
||||
def read_block(addrs, base_addr):
|
||||
msg = b''
|
||||
|
||||
for addr in range(addrs):
|
||||
addr_str = '{:04X}'.format(base_addr + addr)
|
||||
msg += f"M{addr_str}\r\n".encode('ascii')
|
||||
|
||||
ser.write(msg)
|
||||
response = ser.read(7*addrs)
|
||||
response = response.decode('ascii').replace('\r\n', '').split('M')
|
||||
return [int(i, 16) for i in response if i]
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
with serial.Serial(usb_device, 115200) as ser:
|
||||
|
||||
sleep(1)
|
||||
|
||||
for i in range(1000):
|
||||
test_data = [random.randint(0, 65535) for i in range(32)]
|
||||
write_block(test_data, 0)
|
||||
|
||||
received = read_block(32, 0)
|
||||
print(i)
|
||||
|
||||
if(received != test_data):
|
||||
exit()
|
||||
|
|
@ -5,6 +5,6 @@ cores:
|
|||
size: 64
|
||||
|
||||
uart:
|
||||
port: "/dev/tty.usbserial-2102926963071"
|
||||
port: "/dev/ttyUSB1"
|
||||
baudrate: 115200
|
||||
clock_freq: 100000000
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
from manta import Manta
|
||||
from random import randint
|
||||
|
||||
m = Manta('manta.yaml')
|
||||
|
||||
for addr in range(m.my_lut_ram.size):
|
||||
write_data = randint(0, (2**16)-1)
|
||||
m.my_lut_ram.write(addr, write_data)
|
||||
|
||||
read_data = m.my_lut_ram.read(addr)
|
||||
print(f"test addr: {addr} with data: {write_data}")
|
||||
print(f" -> correct data received on readback?: {write_data == read_data}")
|
||||
|
|
@ -26,7 +26,7 @@ module top_level (
|
|||
ssd ssd (
|
||||
.clk_in(clk),
|
||||
.rst_in(btnc),
|
||||
.val_in( (manta_inst.my_lut_ram_btx_rdata << 16) | (manta_inst.brx_my_lut_ram_wdata) ),
|
||||
.val_in( {manta_inst.my_lut_ram_btx_rdata, manta_inst.brx_my_lut_ram_wdata} ),
|
||||
.cat_out(cat),
|
||||
.an_out(an));
|
||||
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@ import os
|
|||
from datetime import datetime
|
||||
|
||||
version = "0.0.0"
|
||||
verbose = True
|
||||
|
||||
|
||||
class UARTInterface:
|
||||
def __init__(self, config):
|
||||
|
|
@ -35,48 +33,59 @@ class UARTInterface:
|
|||
baudrate_error <= 5
|
||||
), "Unable to match target baudrate - they differ by {baudrate_error}%"
|
||||
|
||||
if "verbose" in config:
|
||||
self.verbose = config["verbose"]
|
||||
|
||||
else:
|
||||
self.verbose = False
|
||||
|
||||
def open(self):
|
||||
import serial
|
||||
self.ser = serial.Serial(self.port, self.baudrate)
|
||||
|
||||
def read(self, bytes):
|
||||
self.ser.read(bytes)
|
||||
|
||||
def write(self, bytes):
|
||||
self.ser.write(bytes)
|
||||
|
||||
def read_register(self, addr):
|
||||
# open the port if it's not already open
|
||||
if not hasattr(self, "ser"):
|
||||
self.open()
|
||||
|
||||
# request from the bus
|
||||
addr_hex = hex(addr).split("0x")[-1] # TODO: turn this into format()
|
||||
request = f"M{addr_hex}\r\n".encode('ascii')
|
||||
addr_str = '{:04X}'.format(addr)
|
||||
request = f"M{addr_str}\r\n".encode('ascii')
|
||||
|
||||
if verbose:
|
||||
print(f"reading from {addr_hex} with message {request}")
|
||||
|
||||
self.write(request.encode('ascii'))
|
||||
self.ser.write(request)
|
||||
|
||||
# read and parse the response
|
||||
response = self.read(7)
|
||||
response = self.ser.read(7)
|
||||
|
||||
if verbose:
|
||||
print(f"response {response} received!")
|
||||
|
||||
assert response[0] == 'M'.encode('ascii'), "Bad message recieved, incorrect preamble."
|
||||
assert response[-1] == '\n'.encode('ascii'), "Bad message received, incorrect EOL."
|
||||
assert response[-2] == '\r'.encode('ascii'), "Bad message received, incorrect EOL."
|
||||
assert response is not None, "No reponse received."
|
||||
response = response.decode('ascii')
|
||||
assert response[0] == 'M', "Bad message recieved, incorrect preamble."
|
||||
assert response[-1] == '\n', "Bad message received, incorrect EOL."
|
||||
assert response[-2] == '\r', "Bad message received, incorrect EOL."
|
||||
assert len(response) == 7, f"Wrong number of bytes received, expecting 7 but got {len(response)}."
|
||||
|
||||
return int(response[1:4].decode('ascii'))
|
||||
data = int(response[1:5], 16)
|
||||
data_hex ='{:04X}'.format(data)
|
||||
|
||||
|
||||
if self.verbose:
|
||||
print(f"read {data_hex} from {addr_str}")
|
||||
|
||||
return data
|
||||
|
||||
def write_register(self, addr, data):
|
||||
addr_hex = hex(addr).split("0x")[-1] # TODO: turn this into format()
|
||||
data_hex = hex(data).split("0x")[-1]
|
||||
msg = f"M{addr_hex}{data_hex}\r\n"
|
||||
# open the port if it's not already open
|
||||
if not hasattr(self, "ser"):
|
||||
self.open()
|
||||
|
||||
if verbose:
|
||||
print(f"writing {data_hex} to {addr_hex} with message {msg}")
|
||||
addr_str = '{:04X}'.format(addr)
|
||||
data_str = '{:04X}'.format(data)
|
||||
request = f"M{addr_str}{data_str}\r\n"
|
||||
|
||||
self.write(msg.encode('ascii'))
|
||||
if self.verbose:
|
||||
print(f"wrote {data_str} to {addr_str}")
|
||||
|
||||
self.ser.write(request.encode('ascii'))
|
||||
|
||||
def hdl_top_level_ports(self):
|
||||
# this should return the probes that we want to connect to top-level, but like as a string of verilog
|
||||
|
|
@ -374,6 +383,12 @@ class LUTRAMCore:
|
|||
# no top_level connections since this core just lives on the bus
|
||||
return []
|
||||
|
||||
def read(self, addr):
|
||||
return self.interface.read_register(addr + self.base_addr)
|
||||
|
||||
def write(self, addr, data):
|
||||
return self.interface.write_register(addr + self.base_addr, data)
|
||||
|
||||
class LogicAnalyzerCore:
|
||||
def __init__(self, config, name, base_addr, interface):
|
||||
self.name = name
|
||||
|
|
|
|||
Loading…
Reference in New Issue