simplify uart/ether APIs, improve lazy loading

This commit is contained in:
Fischer Moseley 2023-04-28 14:49:51 -04:00
parent ab58af0bfc
commit f5caca613a
10 changed files with 176 additions and 176 deletions

View File

@ -3,47 +3,21 @@ from random import randint
m = Manta("manta.yaml")
# for addr in range(m.my_lut_mem.size):
# write_data = randint(0, (2**16)-1)
# m.my_lut_mem.write(addr, write_data)
# The API supports reads/writes to single addresses:
m.my_lut_mem.write(4, 42)
print(m.my_lut_mem.read(4))
# read_data = m.my_lut_mem.read(addr)
# print(f"test addr: {addr} with data: {write_data}")
# print(f" -> correct data received on readback?: {write_data == read_data}")
# addrs = list(range(m.my_lut_mem.size))
# datas = addrs
# print(m.my_lut_mem.interface.read_batch(addrs))
# internal rage intensifies
# try writing a single register at a time which is known good,
# and then read them back all at once
# m.my_lut_mem.write(39, 42069)
# m.my_lut_mem.write(40, 42070)
# print(m.my_lut_mem.read(39))
# print(m.my_lut_mem.read(40))
# print(m.my_lut_mem.interface.read_batch([39,40]))
# exit()
# ok so i think i kind of understand the issue
# so basically what's happening here is that whatever is in the
# 16's place of the write data at the last write persists in all the reads, meaning
#addrs = list(range(48))
# As it does read/writes to multiple addresses at once:
addrs = list(range(m.my_lut_mem.size))
print(addrs)
print('\n')
m.my_lut_mem.write(addrs, addrs)
print(m.my_lut_mem.read(addrs))
from time import sleep
# And here's a little test to write random data and read it back:
for addr in range(m.my_lut_mem.size):
write_data = randint(0, (2**16)-1)
m.my_lut_mem.write(addr, write_data)
for addr in addrs:
m.my_lut_mem.write(addr, addr)
sleep(0.1)
print([m.my_lut_mem.read(addr) for addr in addrs])
print('\n')
read_data = m.my_lut_mem.read(addr)
print(f"test addr: {addr} with data: {hex(write_data)}")
print(f" -> correct data received on readback?: {write_data == read_data}")
assert write_data == read_data, "data read differs from data written!"

View File

@ -1,12 +1,23 @@
from manta import Manta
from random import randint
m = Manta('manta.yaml')
m = Manta("manta.yaml")
# The API supports reads/writes to single addresses:
m.my_lut_mem.write(4, 42)
print(m.my_lut_mem.read(4))
# As it does read/writes to multiple addresses at once:
addrs = list(range(m.my_lut_mem.size))
m.my_lut_mem.write(addrs, addrs)
print(m.my_lut_mem.read(addrs))
# And here's a little test to write random data and read it back:
for addr in range(m.my_lut_mem.size):
write_data = randint(0, (2**16)-1)
m.my_lut_mem.write(addr, write_data)
read_data = m.my_lut_mem.read(addr)
print(f"test addr: {addr} with data: {write_data}")
print(f" -> correct data received on readback?: {write_data == read_data}")
print(f"test addr: {addr} with data: {hex(write_data)}")
print(f" -> correct data received on readback?: {write_data == read_data}")
assert write_data == read_data, "data read differs from data written!"

View File

@ -6,6 +6,4 @@ cores:
depth: 16384
ethernet:
interface: "en8"
turbo_super_sonic_fast_hedgehog_mode: true
tcpreplay: true
interface: "en8"

View File

@ -1,7 +1,5 @@
# Internal Dependencies
from .hdl_utils import *
from .uart_iface import *
from .ether_iface import *
from .la_core import *
from .io_core import *
from .block_mem_core import *
@ -19,9 +17,11 @@ class Manta:
# set interface
if "uart" in config:
from .uart_iface import UARTInterface
self.interface = UARTInterface(config["uart"])
elif "ethernet" in config:
from .ether_iface import EthernetInterface
self.interface = EthernetInterface(config["ethernet"])
else:

View File

@ -61,8 +61,17 @@ class BlockMemoryCore:
tlp.append(f"input wire {self.name}_we")
return tlp
def get_physical_addr(self, addr):
if isinstance(addr, int):
return addr + self.base_addr
elif isinstance(addr, list):
return [a + self.base_addr for a in addr]
raise ValueError("Read address must be integer or list of integers.")
def read(self, addr):
return self.interface.read_register(addr + self.base_addr)
return self.interface.read(self.get_physical_addr(addr))
def write(self, addr, data):
return self.interface.write_register(addr + self.base_addr, data)
return self.interface.write(self.get_physical_addr(addr), data)

View File

@ -47,11 +47,19 @@ class EthernetInterface:
self.ethertype = int(config["ethertype"], 16)
# Set whether we use tcpreplay for faster packet blasting
self.send_packet = sendp
if "tcpreplay" in config:
assert isinstance(config["tcpreplay"], bool), \
"tcpreplay configuration option must be boolean!"
self.send_packet = sendpfast if config["tcpreplay"] else sendp
if config["tcpreplay"]:
self.send_packets = lambda p: sendpfast(p, iface=self.iface)
else:
self.send_packets = lambda p: sendp(p, iface=self.iface, verbose=0)
else:
self.send_packets = lambda p: sendp(p, iface=self.iface, verbose=0)
self.verbose = False
if "verbose" in config:
@ -59,51 +67,22 @@ class EthernetInterface:
"verbose configuration option must be boolean!"
self.verbose = config["verbose"]
def read_register(self, addr):
pkt = Ether()
pkt.src = self.host_mac
pkt.dst = self.fpga_mac
pkt.type = self.ethertype
def read(self, addr):
# Perform type checks, output list of addresses
if isinstance(addr, int):
addrs = [addr]
# one byte of rw, two bytes of address, and 44 of padding
# makes the 46 byte minimum length
msg = b'\x00' + addr.to_bytes(2, 'big') + 43*b'\x00'
elif isinstance(addr, list):
assert all(isinstance(a, int) for a in addr), \
"Read addresses must be integer or list of integers."
addrs = addr
pkt = pkt / msg
pkt.load = msg
else:
raise ValueError("Read addresses must be integer or list of integers.")
sniffer = AsyncSniffer(iface = self.iface, filter=f"ether src {self.fpga_mac}")
sniffer.start()
sleep(0.1)
self.send_packet(pkt, iface=self.iface, verbose = 0)
results = sniffer.stop()
assert len(results) == 1, "Received more packets than expected!"
raw_response_bytes = bytes(results[0].payload)[3:5]
return int.from_bytes(raw_response_bytes, 'big')
def write_register(self, addr, data):
pkt = Ether()
pkt.src = self.host_mac
pkt.dst = self.fpga_mac
pkt.type = self.ethertype
# one byte of rw, two bytes of address, two bytes of
# data, and 42 of padding makes the 46 byte
# minimum length
msg = b'\x01' + addr.to_bytes(2, 'big') + data.to_bytes(2, 'big') + 41*b'\x00'
pkt = pkt / msg
pkt.load = msg
self.send_packet(pkt, iface=self.iface, verbose = self.verbose)
def read_batch(self, addrs):
# Prepare packets to read from addresses
pkts = []
for addr in addrs:
# Prepare packets with read requests
request_pkts = []
for a in addrs:
pkt = Ether()
pkt.src = self.host_mac
pkt.dst = self.fpga_mac
@ -111,58 +90,73 @@ class EthernetInterface:
# one byte of rw, two bytes of address, and 44 of padding
# makes the 46 byte minimum length
msg = b'\x00' + addr.to_bytes(2, 'big') + 43*b'\x00'
msg = b'\x00' + a.to_bytes(2, 'big') + 43*b'\x00'
pkt = pkt / msg
pkt.load = msg
pkts.append(pkt)
request_pkts.append(pkt)
# Start sniffer in another thread, send packets, grab responses
sniffer = AsyncSniffer(iface = self.iface, count = len(addrs), filter=f"ether src {self.fpga_mac}")
sniffer.start()
sleep(0.1)
sendp(pkts, iface=self.iface, verbose = 0, inter = 0.05)
sleep(0.5)
self.send_packets(request_pkts)
sniffer.join()
results = sniffer.results
response_pkts = sniffer.results
assert len(response_pkts) == len(request_pkts), "Received wrong number of packets!"
assert len(results) == len(addrs), "Received more packets than expected!"
# Get read data by pulling bytes 3 and 4 from the returned packets
# payload, and interpreting it as big endian
get_read_data = lambda x: int.from_bytes(bytes(x.payload)[3:5], 'big')
read_data = [get_read_data(pkt) for pkt in response_pkts]
# #print(raw(results[1]))
# for packet in results:
# hexdump(packet)
# print( [i for i in bytes(packet.payload)] )
# print( [i for i in raw(packet)] )
# print("\n")
if len(read_data) == 1:
return read_data[0]
# Parse packets
datas = []
for packet in results:
raw_response_bytes = bytes(packet.payload)[3:5]
data = int.from_bytes(raw_response_bytes, 'big')
datas.append(data)
else:
return read_data
return datas
def write(self, addr, data):
# Perform type checks, output list of addresses
if isinstance(addr, int):
assert isinstance(data, int), \
"Data must also be integer if address is integer."
addrs = [addr]
datas = [data]
def write_batch(self, addrs, datas):
assert len(addrs) == len(datas), \
"Number of addresses provided is unequal to number of data provided!"
elif isinstance(addr, list):
assert all(isinstance(a, int) for a in addr), \
"Write addresses must be integer or list of integers."
pkts = []
for addr, data in zip(addrs, datas):
assert all(isinstance(d, int) for d in data), \
"Write data must be integer or list of integers."
assert len(addr) == len(data), \
"There must be equal number of write addresses and data."
addrs = addr
datas = data
else:
raise ValueError("Write addresses and data must be integer or list of integers.")
# Prepare packets with write requests
request_pkts = []
for a, d in zip(addrs, datas):
pkt = Ether()
pkt.src = self.host_mac
pkt.dst = self.fpga_mac
pkt.type = self.ethertype
# one byte of rw, two bytes of address, two bytes of data, and 41
# bytes of paddding make the 46 byte limit.
msg = b'\x01' + addr.to_bytes(2, 'big') + data.to_bytes(2, 'big') + 41*b'\x00'
# one byte of rw, two bytes of address, two bytes of data, and 42 of padding
# makes the 46 byte minimum length
msg = b'\x01' + a.to_bytes(2, 'big') + d.to_bytes(2, 'big') + 41*b'\x00'
pkt = pkt / msg
pkt.load = msg
pkts.append(pkt)
request_pkts.append(pkt)
self.send_packet(pkts, iface=self.iface, verbose = 0)
self.send_packets(request_pkts)
def hdl_top_level_ports(self):
return ["input wire crsdv", \

View File

@ -24,7 +24,7 @@ class IOCoreProbe:
self.interface.write_register(self.base_addr, data)
def get(self):
return self.interface.read_register(self.base_addr)
return self.interface.read(self.base_addr)
class IOCore:
def __init__(self, config, name, base_addr, interface):

View File

@ -253,7 +253,7 @@ class LogicAnalyzerCore:
# reset all the other triggers
for addr in range(self.trigger_block_base_addr, self.block_memory_base_addr):
self.interface.write_register(addr, 0)
self.interface.write(addr, 0)
for trigger in self.triggers:
# determine if the trigger is good
@ -268,7 +268,7 @@ class LogicAnalyzerCore:
op_register = 2*(list(self.probes.keys()).index(probe_name)) + self.trigger_block_base_addr
self.interface.write_register(op_register, operations[op])
self.interface.write(op_register, operations[op])
else:
assert len(statement) == 3, "Missing information in trigger statement."
@ -277,8 +277,8 @@ class LogicAnalyzerCore:
op_register = 2*(list(self.probes.keys()).index(probe_name)) + self.trigger_block_base_addr
arg_register = op_register + 1
self.interface.write_register(op_register, operations[op])
self.interface.write_register(arg_register, int(arg))
self.interface.write(op_register, operations[op])
self.interface.write(arg_register, int(arg))
@ -288,12 +288,12 @@ class LogicAnalyzerCore:
# request to stop the existing capture
print(" -> Resetting core...")
state = self.interface.read_register(self.state_reg_addr)
state = self.interface.read(self.state_reg_addr)
if state != self.IDLE:
self.interface.write_register(self.request_stop_reg_addr, 0)
self.interface.write_register(self.request_stop_reg_addr, 1)
self.interface.write(self.request_stop_reg_addr, 0)
self.interface.write(self.request_stop_reg_addr, 1)
state = self.interface.read_register(self.state_reg_addr)
state = self.interface.read(self.state_reg_addr)
assert state == self.IDLE, "Logic analyzer did not reset to correct state when requested to."
# Configure trigger conditions
@ -302,32 +302,32 @@ class LogicAnalyzerCore:
# Configure the trigger_mode
print(" -> Setting trigger mode")
self.interface.write_register(self.trigger_mode_reg_addr, self.trigger_mode)
self.interface.write(self.trigger_mode_reg_addr, self.trigger_mode)
# Configure the trigger_loc
print(" -> Setting trigger location...")
self.interface.write_register(self.trigger_loc_reg_addr, self.trigger_loc)
self.interface.write(self.trigger_loc_reg_addr, self.trigger_loc)
# Start the capture by pulsing request_start
print(" -> Starting capture...")
self.interface.write_register(self.request_start_reg_addr, 1)
self.interface.write_register(self.request_start_reg_addr, 0)
self.interface.write(self.request_start_reg_addr, 1)
self.interface.write(self.request_start_reg_addr, 0)
# Wait for core to finish capturing data
print(" -> Waiting for capture to complete...")
state = self.interface.read_register(self.state_reg_addr)
state = self.interface.read(self.state_reg_addr)
while(state != self.CAPTURED):
state = self.interface.read_register(self.state_reg_addr)
state = self.interface.read(self.state_reg_addr)
# Read out contents from memory
print(" -> Reading sample memory contents...")
addrs = list(range(self.block_memory_base_addr, self.max_addr))
block_mem_contents = self.interface.read_registers(addrs)
block_mem_contents = self.interface.reads(addrs)
# Revolve BRAM contents around so the data pointed to by the read_pointer
# is at the beginning
print(" -> Reading read_pointer and revolving memory...")
read_pointer = self.interface.read_register(self.read_pointer_reg_addr)
read_pointer = self.interface.read(self.read_pointer_reg_addr)
return block_mem_contents[read_pointer:] + block_mem_contents[:read_pointer]

View File

@ -31,8 +31,17 @@ class LUTMemoryCore:
# no top_level connections since this core just lives on the bus
return ""
def get_physical_addr(self, addr):
if isinstance(addr, int):
return addr + self.base_addr
elif isinstance(addr, list):
return [a + self.base_addr for a in addr]
raise ValueError("Read address must be integer or list of integers.")
def read(self, addr):
return self.interface.read_register(addr + self.base_addr)
return self.interface.read(self.get_physical_addr(addr))
def write(self, addr, data):
return self.interface.write_register(addr + self.base_addr, data)
return self.interface.write(self.get_physical_addr(addr), data)

View File

@ -82,34 +82,19 @@ class UARTInterface:
return int(response_str[1:5], 16)
def read_register(self, addr):
self.open_port_if_not_alredy_open()
# request from the bus
request = f"M{addr:04X}\r\n".encode('ascii')
self.ser.write(request)
def read(self, addr):
# Perform type checks, output list of addresses
if isinstance(addr, int):
addrs = [addr]
# read and parse the response
data = self.decode_response(self.ser.read(7))
elif isinstance(addr, list):
assert all(isinstance(a, int) for a in addr), \
"Read addresses must be integer or list of integers."
addrs = addr
if self.verbose:
print(f"read {data:04X} from {addr:04X}")
return data
def write_register(self, addr, data):
self.open_port_if_not_alredy_open()
# request from the bus
request = f"M{addr:04X}{data:04X}\r\n".encode('ascii')
self.ser.write(request)
if self.verbose:
print(f"wrote {data:04X} to {addr:04X}")
def read_registers(self, addrs):
assert isinstance(addrs, list), "Read addresses must be list of integers."
assert all(isinstance(addr, int) for addr in addrs), "Read addresses must be list of integers."
else:
raise ValueError("Read addresses must be integer or list of integers.")
# send data in chunks because the reponses will fill up the OS's
# input buffer in no time flat
@ -131,16 +116,37 @@ class UARTInterface:
response = inbound_bytes[i:i+7]
data.append(self.decode_response(response))
return data
if len(data) == 1:
return data[0]
def write_registers(self, addrs, datas):
assert isinstance(addrs, list), "Write addresses must be list of integers."
assert isinstance(datas, list), "Write data must be list of integers."
assert all(isinstance(addr, int) for addr in addrs), "Write addresses must be list of integers."
assert all(isinstance(data, int) for data in datas), "Write data must be list of integers."
assert len(addrs) == len(datas), "Write addresses and write data must be of same length."
else:
return data
# send data in chunks because the responses will fill up the OS's
def write(self, addr, data):
# Perform type checks, output list of addresses
if isinstance(addr, int):
assert isinstance(data, int), \
"Data must also be integer if address is integer."
addrs = [addr]
datas = [data]
elif isinstance(addr, list):
assert all(isinstance(a, int) for a in addr), \
"Write addresses must be integer or list of integers."
assert all(isinstance(d, int) for d in data), \
"Write data must be integer or list of integers."
assert len(addr) == len(data), \
"There must be equal number of write addresses and data."
addrs = addr
datas = data
else:
raise ValueError("Write addresses and data must be integer or list of integers.")
# send data in chunks because the reponses will fill up the OS's
# input buffer in no time flat
self.open_port_if_not_alredy_open()
@ -157,7 +163,6 @@ class UARTInterface:
def hdl_top_level_ports(self):
# this should return the probes that we want to connect to top-level, but like as a string of verilog
return ["input wire rx", "output reg tx"]
def rx_hdl_def(self):