blacken code

This commit is contained in:
Fischer Moseley 2023-02-05 15:40:16 -05:00
parent e48f60e03a
commit 030a841f53
2 changed files with 149 additions and 108 deletions

View File

@ -1,5 +1,5 @@
import os import os
os.system('echo "#!/usr/bin/python3" > manta') os.system('echo "#!/usr/bin/python3" > manta')
os.system('cat manta.py >> manta') os.system("cat manta.py >> manta")
os.system('chmod +x manta') os.system("chmod +x manta")

253
manta.py
View File

@ -7,136 +7,164 @@ import serial
debug = True debug = True
version = '0.0.0' version = "0.0.0"
def load_source_files(path): def load_source_files(path):
"""concatenates the list of files provided into a single string""" """concatenates the list of files provided into a single string"""
source_files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] source_files = [
source_files = [f for f in source_files if f.split('.')[-1] in ['sv', 'v']] f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))
]
source_files = [f for f in source_files if f.split(".")[-1] in ["sv", "v"]]
buf = "" buf = ""
for source_file in source_files: for source_file in source_files:
with open(path + source_file, 'r') as f: with open(path + source_file, "r") as f:
buf += f.read() buf += f.read()
return buf return buf
downlink_template = load_source_files('src/') downlink_template = load_source_files("src/")
def load_config(path): def load_config(path):
"""Take path to configuration file, and retun the configuration as a python list/dict object.""" """Take path to configuration file, and retun the configuration as a python list/dict object."""
extension = path.split('.')[-1] extension = path.split(".")[-1]
if 'json' in extension: if "json" in extension:
with open(path, 'r') as f: with open(path, "r") as f:
config = json.load(f) config = json.load(f)
return config return config
elif 'yaml' in extension or 'yml' in extension: elif "yaml" in extension or "yml" in extension:
with open(path, 'r') as f: with open(path, "r") as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
return config return config
else: else:
raise ValueError('Unable to recognize configuration file extension.') raise ValueError("Unable to recognize configuration file extension.")
def check_config(config): def check_config(config):
"""Takes a list/dict python object representing a core configuration and throws an error if it is misconfigured.""" """Takes a list/dict python object representing a core configuration and throws an error if it is misconfigured."""
assert 'downlink' in config or 'uplink' in config, 'No downlink or uplink specified.' assert (
"downlink" in config or "uplink" in config
), "No downlink or uplink specified."
if 'downlink' in config: if "downlink" in config:
dl = config["downlink"]
dl = config['downlink'] assert dl[
assert dl['sample_depth'], 'Downlink core specified, but sample_depth not specified.' "sample_depth"
assert dl['clock_freq'], 'Downlink core specified, but clock_freq not specified.' ], "Downlink core specified, but sample_depth not specified."
assert dl['probes'] and len(dl['probes']) > 0, 'Downlink core specified, but no probes specified.' assert dl[
assert dl['triggers'] and len(dl['triggers']) > 0, 'Downlink core specified, but no triggers specified.' "clock_freq"
], "Downlink core specified, but clock_freq not specified."
assert (
dl["probes"] and len(dl["probes"]) > 0
), "Downlink core specified, but no probes specified."
assert (
dl["triggers"] and len(dl["triggers"]) > 0
), "Downlink core specified, but no triggers specified."
# confirm core clock is sufficiently fast # confirm core clock is sufficiently fast
prescaler = dl['clock_freq'] // config['uart']['baudrate'] prescaler = dl["clock_freq"] // config["uart"]["baudrate"]
assert prescaler >= 2 assert prescaler >= 2
# confirm actual baudrate and target baudrate are within 5% # confirm actual baudrate and target baudrate are within 5%
actual_baudrate = config['downlink']['clock_freq'] / prescaler actual_baudrate = config["downlink"]["clock_freq"] / prescaler
baudrate_error = abs(actual_baudrate - config['uart']['baudrate']) / config['uart']['baudrate'] baudrate_error = (
assert baudrate_error <= 0.05, f'Unable to match target baudrate! Actual baudrate differs from target by {round(100*baudrate_error, 2)}%' abs(actual_baudrate - config["uart"]["baudrate"])
/ config["uart"]["baudrate"]
)
assert (
baudrate_error <= 0.05
), f"Unable to match target baudrate! Actual baudrate differs from target by {round(100*baudrate_error, 2)}%"
if debug: if debug:
print(f'UART interface on debug core will the following configuration:') print(f"UART interface on debug core will the following configuration:")
print(f' - target_baudrate: {config["uart"]["baudrate"]}') print(f' - target_baudrate: {config["uart"]["baudrate"]}')
print(f' - prescaler: {prescaler}') print(f" - prescaler: {prescaler}")
print(f' - actual_baudrate: {round(actual_baudrate, 2)}') print(f" - actual_baudrate: {round(actual_baudrate, 2)}")
if 'uplink' in config: if "uplink" in config:
raise NotImplementedError('Cannot check configuration validity for uplinks just yet!') raise NotImplementedError(
"Cannot check configuration validity for uplinks just yet!"
)
if 'uart' in config: if "uart" in config:
uart = config['uart'] uart = config["uart"]
# confirm number of data bits is valid # confirm number of data bits is valid
assert 'data' in uart, 'Number of data bits in UART interface not specified.' assert "data" in uart, "Number of data bits in UART interface not specified."
assert uart['data'] in [8, 7, 6, 5], 'Invalid number of data bits.' assert uart["data"] in [8, 7, 6, 5], "Invalid number of data bits."
# confirm number of stop bits is valid # confirm number of stop bits is valid
assert uart['stop'] in [1, 1.5, 2], 'Invalid number of stop bits.' assert uart["stop"] in [1, 1.5, 2], "Invalid number of stop bits."
# confirm parity is valid # confirm parity is valid
assert uart['parity'] in ['none', 'even', 'odd', 'mark', 'space'], 'Invalid parity setting.' assert uart["parity"] in [
"none",
"even",
"odd",
"mark",
"space",
], "Invalid parity setting."
def gen_downlink_core(config): def gen_downlink_core(config):
buf = downlink_template buf = downlink_template
dl = config['downlink'] dl = config["downlink"]
# add timestamp # add timestamp
timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S") timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
buf = buf.replace('@TIMESTAMP', timestamp) buf = buf.replace("@TIMESTAMP", timestamp)
# add user # add user
user = os.environ.get('USER', os.environ.get('USERNAME')) user = os.environ.get("USER", os.environ.get("USERNAME"))
buf = buf.replace('@USER', user) buf = buf.replace("@USER", user)
# add trigger # add trigger
trigger = [f'({trigger})' for trigger in dl['triggers']] trigger = [f"({trigger})" for trigger in dl["triggers"]]
trigger = ' || '.join(trigger) trigger = " || ".join(trigger)
buf = buf.replace('@TRIGGER', trigger) buf = buf.replace("@TRIGGER", trigger)
# add concat # add concat
concat = [name for name in dl['probes']] concat = [name for name in dl["probes"]]
concat = ', '.join(concat) concat = ", ".join(concat)
concat = '{' + concat + '};' concat = "{" + concat + "};"
buf = buf.replace('@CONCAT', concat) buf = buf.replace("@CONCAT", concat)
# add probes # add probes
probe_verilog = [] probe_verilog = []
for name, width in dl['probes'].items(): for name, width in dl["probes"].items():
if width == 1: if width == 1:
probe_verilog.append(f'input wire {name},') probe_verilog.append(f"input wire {name},")
else: else:
probe_verilog.append(f'input wire [{width-1}:0] {name},') probe_verilog.append(f"input wire [{width-1}:0] {name},")
probe_verilog = '\n\t'.join(probe_verilog) probe_verilog = "\n\t".join(probe_verilog)
buf = buf.replace('@PROBES', probe_verilog) buf = buf.replace("@PROBES", probe_verilog)
# add sample width # add sample width
sample_width = sum([width for name, width in dl['probes'].items()]) sample_width = sum([width for name, width in dl["probes"].items()])
buf = buf.replace('@SAMPLE_WIDTH', str(sample_width)) buf = buf.replace("@SAMPLE_WIDTH", str(sample_width))
# add sample depth # add sample depth
buf = buf.replace('@SAMPLE_DEPTH', str(dl['sample_depth'])) buf = buf.replace("@SAMPLE_DEPTH", str(dl["sample_depth"]))
# uart config # uart config
buf = buf.replace('@DATA_WIDTH', str(config['uart']['data'])) buf = buf.replace("@DATA_WIDTH", str(config["uart"]["data"]))
buf = buf.replace('@BAUDRATE', str(config['uart']['baudrate'])) buf = buf.replace("@BAUDRATE", str(config["uart"]["baudrate"]))
buf = buf.replace('@CLK_FREQ_HZ', str(dl['clock_freq'])) buf = buf.replace("@CLK_FREQ_HZ", str(dl["clock_freq"]))
return buf return buf
def print_help(): def print_help():
help = f""" help = f"""
Manta v{version} - An In-Situ Debugging Tool for Programmable Hardware Manta v{version} - An In-Situ Debugging Tool for Programmable Hardware
@ -151,6 +179,7 @@ Supported commands:
""" """
print(help) print(help)
def print_ray(): def print_ray():
color_ray = f""" color_ray = f"""
\033[96m (\.-./) \033[96m (\.-./)
@ -168,110 +197,117 @@ def print_ray():
\033[96m '-....-' \033[00m""" \033[96m '-....-' \033[00m"""
print(color_ray) print(color_ray)
def setup_serial(ser, config): def setup_serial(ser, config):
ser.baudrate = config['uart']['baudrate'] ser.baudrate = config["uart"]["baudrate"]
ser.port = config['uart']['port'] ser.port = config["uart"]["port"]
ser.timeout = config['uart']['timeout'] ser.timeout = config["uart"]["timeout"]
# setup number of data bits # setup number of data bits
if config['uart']['data'] == 8: if config["uart"]["data"] == 8:
ser.bytesize = serial.EIGHTBITS ser.bytesize = serial.EIGHTBITS
elif config['uart']['data'] == 7: elif config["uart"]["data"] == 7:
ser.bytesize = serial.SEVENBITS ser.bytesize = serial.SEVENBITS
elif config['uart']['data'] == 6: elif config["uart"]["data"] == 6:
ser.bytesize = serial.SIXBITS ser.bytesize = serial.SIXBITS
elif config['uart']['data'] == 5: elif config["uart"]["data"] == 5:
ser.bytesize = serial.FIVEBITS ser.bytesize = serial.FIVEBITS
else: else:
raise ValueError("Invalid number of data bits in UART configuration.") raise ValueError("Invalid number of data bits in UART configuration.")
# setup number of stop bits # setup number of stop bits
if config['uart']['stop'] == 1: if config["uart"]["stop"] == 1:
ser.stopbits = serial.STOPBITS_ONE ser.stopbits = serial.STOPBITS_ONE
elif config['uart']['stop'] == 1.5: elif config["uart"]["stop"] == 1.5:
ser.stopbits = serial.STOPBITS_ONE_POINT_FIVE ser.stopbits = serial.STOPBITS_ONE_POINT_FIVE
elif config['uart']['stop'] == 2: elif config["uart"]["stop"] == 2:
ser.stopbits = serial.STOPBITS_TWO ser.stopbits = serial.STOPBITS_TWO
else: else:
raise ValueError("Invalid number of stop bits in UART configuration.") raise ValueError("Invalid number of stop bits in UART configuration.")
# setup parity # setup parity
if config['uart']['parity'] == 'none': if config["uart"]["parity"] == "none":
ser.parity = serial.PARITY_NONE ser.parity = serial.PARITY_NONE
elif config['uart']['parity'] == 'even': elif config["uart"]["parity"] == "even":
ser.parity = serial.PARITY_EVEN ser.parity = serial.PARITY_EVEN
elif config['uart']['parity'] == 'odd': elif config["uart"]["parity"] == "odd":
ser.parity = serial.PARITY_ODD ser.parity = serial.PARITY_ODD
elif config['uart']['parity'] == 'mark': elif config["uart"]["parity"] == "mark":
ser.parity = serial.PARITY_MARK ser.parity = serial.PARITY_MARK
elif config['uart']['parity'] == 'space': elif config["uart"]["parity"] == "space":
ser.parity = serial.PARITY_SPACE ser.parity = serial.PARITY_SPACE
else: else:
raise ValueError("Invalid parity setting in UART configuration.") raise ValueError("Invalid parity setting in UART configuration.")
def read_serial(config): def read_serial(config):
# obtain bytestream from FPGA # obtain bytestream from FPGA
with serial.Serial() as ser: with serial.Serial() as ser:
setup_serial(ser, config) setup_serial(ser, config)
ser.open() ser.open()
ser.flushInput() ser.flushInput()
ser.write(b'\x30') ser.write(b"\x30")
data = ser.read(4096) data = ser.read(4096)
return data return data
def part_select(data, width): def part_select(data, width):
top, bottom = width top, bottom = width
assert top >= bottom assert top >= bottom
mask = 2**(top - bottom + 1) - 1 mask = 2 ** (top - bottom + 1) - 1
return (data >> bottom) & mask return (data >> bottom) & mask
def make_widths(config): def make_widths(config):
# {probe0, probe1, probe2} # {probe0, probe1, probe2}
# [12, 1, 3] should produce # [12, 1, 3] should produce
# [ (11,0) , (12, 12), (15,13) ] # [ (11,0) , (12, 12), (15,13) ]
widths = list(config['probes'].values()) widths = list(config["probes"].values())
parts = [] parts = []
for i, width in enumerate(widths): for i, width in enumerate(widths):
if (i == 0): if i == 0:
parts.append( (width - 1, 0) ) parts.append((width - 1, 0))
else: else:
parts.append( ((parts[i-1][1] + width) , (parts[i-1][1] + 1)) ) parts.append(((parts[i - 1][1] + width), (parts[i - 1][1] + 1)))
# reversing this list is a little bit of a hack, should fix/document
return parts[::-1]
# reversing this list is a little bit of a hack, should fix/document
return parts[::-1]
def export_waveform(config, data, path): def export_waveform(config, data, path):
extension = path.split('.')[-1] extension = path.split(".")[-1]
if extension == 'vcd': if extension == "vcd":
from vcd import VCDWriter from vcd import VCDWriter
vcd_file = open(path, 'w')
vcd_file = open(path, "w")
timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S") timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
with VCDWriter(vcd_file, timescale='10 ns', date=timestamp, version = 'manta') as writer: with VCDWriter(
vcd_file, timescale="10 ns", date=timestamp, version="manta"
) as writer:
# add probes to vcd file # add probes to vcd file
vcd_probes = [] vcd_probes = []
for name, width in config['probes'].items(): for name, width in config["probes"].items():
probe = writer.register_var('ila', name, 'wire', size = width) probe = writer.register_var("ila", name, "wire", size=width)
vcd_probes.append(probe) vcd_probes.append(probe)
# calculate bit widths for part selecting # calculate bit widths for part selecting
@ -286,48 +322,53 @@ def export_waveform(config, data, path):
vcd_file.close() vcd_file.close()
else: else:
raise NotImplementedError('More file formats to come!') raise NotImplementedError("More file formats to come!")
if __name__ == '__main__':
if __name__ == "__main__":
# print help menu if no args passed or help menu requested # print help menu if no args passed or help menu requested
if len(argv) == 1 or argv[1] == 'help': if len(argv) == 1 or argv[1] == "help":
print_help() print_help()
exit() exit()
# open minicom-like serial terminal with given config # open minicom-like serial terminal with given config
elif argv[1] == 'terminal': elif argv[1] == "terminal":
assert len(argv) == 3, 'Not enough (or too many) config files specified.' assert len(argv) == 3, "Not enough (or too many) config files specified."
# TODO: make this work with a looser config file - it should work even if we just have a uart definition # TODO: make this work with a looser config file - it should work even if we just have a uart definition
config = load_config(argv[2]) config = load_config(argv[2])
check_config(config) check_config(config)
raise NotImplementedError('Miniterm console is still under development!') raise NotImplementedError("Miniterm console is still under development!")
# list available serial ports # list available serial ports
elif argv[1] == 'ports': elif argv[1] == "ports":
import serial.tools.list_ports import serial.tools.list_ports
for info in serial.tools.list_ports.comports(): for info in serial.tools.list_ports.comports():
print(info) print(info)
# show splash screen # show splash screen
elif argv[1] == 'ray': elif argv[1] == "ray":
print_ray() print_ray()
exit() exit()
# generate the specified core # generate the specified core
elif argv[1] == 'gen': elif argv[1] == "gen":
assert len(argv) == 4, 'Wrong number of arguments, only a config file and output file must both be specified.' assert (
len(argv) == 4
), "Wrong number of arguments, only a config file and output file must both be specified."
config = load_config(argv[2]) config = load_config(argv[2])
check_config(config) check_config(config)
with open(argv[3], 'w') as f: with open(argv[3], "w") as f:
f.write(gen_downlink_core(config)) f.write(gen_downlink_core(config))
# run the specified core # run the specified core
elif argv[1] == 'run': elif argv[1] == "run":
assert len(argv) == 4, 'Wrong number of arguments, only a config file and output file must both be specified.' assert (
len(argv) == 4
), "Wrong number of arguments, only a config file and output file must both be specified."
config = load_config(argv[2]) config = load_config(argv[2])
check_config(config) check_config(config)
@ -335,5 +376,5 @@ if __name__ == '__main__':
export_waveform(config, data, argv[3]) export_waveform(config, data, argv[3])
else: else:
print('Option not recognized.') print("Option not recognized.")
print_help() print_help()