From 0bbdf4faa652eaf707977966de2efc4b738decc5 Mon Sep 17 00:00:00 2001 From: Fischer Moseley <42497969+fischermoseley@users.noreply.github.com> Date: Sun, 3 Sep 2023 18:07:38 -0700 Subject: [PATCH] make io_core python api ready for test --- src/manta/io_core/__init__.py | 259 ++++++++++++++++------------------ 1 file changed, 122 insertions(+), 137 deletions(-) diff --git a/src/manta/io_core/__init__.py b/src/manta/io_core/__init__.py index 7a6db3b..023545f 100644 --- a/src/manta/io_core/__init__.py +++ b/src/manta/io_core/__init__.py @@ -1,46 +1,18 @@ from ..utils import * from math import ceil -class Probe: - def __init__(self, name, width, base_addr, interface): +class IOCoreProbe: + def __init__(self, name, width, base_addr, initial_value = None): assert isinstance(width, int), f"Probe {name} must have integer width." assert width > 0, f"Probe {name} must have positive width." self.name = name self.width = width - self.interface = interface + self.initial_value = initial_value n_addrs = ceil(self.width / 16) self.addrs = list(range(base_addr, base_addr + n_addrs)) - -class InputProbe(Probe): - def __init__(self, name, width, base_addr, interface): - super().__init__(name, width, base_addr, interface) - - def get(self): - return pack_16bit_words(self.interface.read(self.addrs)) - -class OutputProbe(Probe): - def __init__(self, name, width, base_addr, initial_value, interface): - super().__init__(name, width, base_addr, interface) - self.initial_value = initial_value - - def get(self): - return pack_16bit_words(self.interface.read(self.addrs)) - - def set(self, value): - # check that value is an integer - assert isinstance(value, int), "Value must be an integer." - - # check that value is within range for the width of the probe - if value > 0: - assert data <= (2**self.width) - 1, f"Unsigned value too large for probe of width {self.width}" - - elif value < 0: - assert abs(data) <= (2**(self.width-1))-1, f"Signed value too large for probe of width {self.width}" - - data = unpack_16_bit_words(value) - self.interface.write(self.addrs, data) + self.brackets = "" if self.width == 1 else f"[{self.width-1}:0] " class IOCore: def __init__(self, config, name, base_addr, interface): @@ -53,7 +25,7 @@ class IOCore: # check for unrecognized options for option in config: if option not in ["type", "inputs", "outputs", "user_clock"]: - print(f"Warning: Ignoring unrecognized option '{option}' in IO core '{self.name}'") + print(f"Warning: Ignoring unrecognized option '{option}' in IO core '{name}'") # add user clock self.user_clock = False @@ -61,17 +33,17 @@ class IOCore: assert isinstance(config["user_clock"], bool), "Option user_clock must be a boolean." self.user_clock = config["user_clock"] - # add probes to core - self.probes = [] + # add input probes to core + self.input_probes = [] last_used_addr = self.base_addr # start at one since strobe register is at BASE_ADDR if 'inputs' in config: for name, width in config["inputs"].items(): - probe = InputProbe(name, width, last_used_addr + 1, interface) - self.probes.append(probe) - - setattr(self, probe.name, probe) # add friendly name so users can do my_io_core.my_probe.set() + probe = IOCoreProbe(name, width, last_used_addr + 1) last_used_addr = probe.addrs[-1] + self.input_probes.append(probe) + # add output probes to core + self.output_probes = [] if 'outputs' in config: for name, params in config["outputs"].items(): # get width and initial value from config @@ -84,17 +56,68 @@ class IOCore: initial_value = params["initial_value"] else: - raise ValueError(f"Unable to determine probe width and initial value for {probe}") + raise ValueError(f"Unable to determine probe width and initial value for {name}") # add probe to core - probe = OutputProbe(name, width, last_used_addr + 1, initial_value, interface) - self.probes.append(probe) - - setattr(self, probe.name, probe) # add friendly name so users can do my_io_core.my_probe.set() - last_used_addr = self.probes[-1].addrs[-1] + probe = IOCoreProbe(name, width, last_used_addr + 1, initial_value) + last_used_addr = probe.addrs[-1] + self.output_probes.append(probe) self.max_addr = last_used_addr + # add friendly names to each probe + # (so users can do io_core.probe.set() and get() for instance) + for probe in self.input_probes: + setattr(probe, "set", lambda value: self.set(probe, value) ) + setattr(self, probe.name, probe) + + for probe in self.input_probes: + setattr(probe, "set", lambda value: self.set(probe, value) ) + setattr(probe, "get", self.get(probe) ) + setattr(self, probe.name, probe) + + + def get(self, probe): + self.pulse_strobe_register() + return pack_16bit_words(self.interface.read(probe.addrs)) + + def set(self, probe, value): + # check that value is an integer + assert isinstance(value, int), "Value must be an integer." + + # check that value is within range for the width of the probe + if value > 0: + assert data <= (2**self.width) - 1, f"Unsigned value too large for probe of width {self.width}" + + elif value < 0: + assert abs(data) <= (2**(self.width-1))-1, f"Signed value too large for probe of width {self.width}" + + self.pulse_strobe_register() + data = unpack_16_bit_words(value) + self.interface.write(probe.addrs, data) + + def pulse_strobe_register(self): + # pulse the strobe register + self.interface.write(self.base_addr, 1) + self.interface.write(self.base_addr, 0) + strobe = self.interface.read(self.base_addr) + if strobe != 0: + raise ValueError("Unable to set strobe register to zero!") + + def hdl_top_level_ports(self): + ports = [] + + if self.user_clock: + ports.append(f"input wire {self.name}_user_clock") + + for probe in self.input_probes: + ports.append(f"input wire {probe.brackets}{probe.name}") + + for probe in self.input_probes: + ports.append(f"output reg {probe.brackets}{probe.name}") + + return ports + def hdl_inst(self): inst = VerilogManipulator("io_core/io_core_inst_tmpl.v") inst.sub(self.name, "/* MODULE_NAME */") @@ -107,90 +130,12 @@ class IOCore: else: inst.sub(f"{self.name}_user_clk", "/* USER_CLK */") - probes = {probe.name:probe.width for probe in self.probes} - + probes = {p.name:p.width for p in self.input_probes + self.output_probes} inst_ports = inst.net_conn(probes, trailing_comma=True) inst.sub(inst_ports, "/* INST_PORTS */") return inst.get_hdl() - def gen_memory_handling(self): - rcsb = "" # read case statement body - wcsb = "" # write case statement body - for probe in self.probes: - if probe.width <= 16: - rcsb += f"BASE_ADDR + {probe.addrs[0]}: data_o <= {probe.name}_buf;\n" - - if isinstance(probe, OutputProbe): - wcsb += f"BASE_ADDR + {probe.addrs[0]}: {probe.name}_buf <= data_i;\n" - - else: - for i in range(ceil(probe.width/16)): - top = ((i + 1) * 16) - 1 - btm = i * 16 - if top > probe.width - 1: - top = probe.width - 1 - - rcsb += f"BASE_ADDR + {probe.addrs[i]}: data_o <= {probe.name}_buf[{top}:{btm}];\n" - - if isinstance(probe, OutputProbe): - wcsb += f"BASE_ADDR + {probe.addrs[i]}: {probe.name}_buf[{top}:{btm}] <= data_i;\n" - - # remove trailing newline - return rcsb.rstrip(), wcsb.rstrip() - - def gen_input_probe_bufs(self): - ipb = [] - for probe in self.probes: - if isinstance(probe, InputProbe): - if probe.width == 1: - ipb.append(f"reg {probe.name}_buf = 0;") - - else: - ipb.append(f"reg [{probe.width-1}:0] {probe.name}_buf = 0;") - - return '\n'.join(ipb) - - - - def gen_output_probe_bufs(self): - opb = [] - for probe in self.probes: - if isinstance(probe, OutputProbe): - if probe.width == 1: - opb.append(f"reg {probe.name}_buf = {probe.initial_value};") - - else: - opb.append(f"reg [{probe.width-1}:0] {probe.name}_buf = {probe.initial_value};") - - return '\n'.join(opb) - - - def gen_output_probe_initial_values(self): - opiv = [] - for probe in self.probes: - if isinstance(probe, OutputProbe): - opiv.append(f"{probe.name} = {probe.initial_value};") - - return '\n'.join(opiv) - - - def gen_update_input_buffers(self): - uib = [] - for probe in self.probes: - if isinstance(probe, InputProbe): - uib.append(f"{probe.name}_buf <= {probe.name};") - - return '\n'.join(uib) - - def gen_update_output_buffers(self): - uob = [] - for probe in self.probes: - if isinstance(probe, OutputProbe): - uob.append(f"{probe.name} <= {probe.name}_buf;") - - return '\n'.join(uob) - def hdl_def(self): io_core = VerilogManipulator("io_core/io_core_def_tmpl.v") io_core.sub(self.name, "/* MODULE_NAME */") @@ -202,11 +147,10 @@ class IOCore: io_core.sub(top_level_ports, "/* TOP_LEVEL_PORTS */") # generate memory handling - rcsb, wcsb = self.gen_memory_handling() - io_core.sub(rcsb, "/* READ_CASE_STATEMENT_BODY */") - io_core.sub(wcsb, "/* WRITE_CASE_STATEMENT_BODY */") + io_core.sub(self.gen_read_case_statement_body(), "/* READ_CASE_STATEMENT_BODY */") + io_core.sub(self.gen_write_case_statement_body(), "/* WRITE_CASE_STATEMENT_BODY */") - # generate input and output probe buffers + # generate input and output probe buffers with initial values io_core.sub(self.gen_input_probe_bufs(), "/* INPUT_PROBE_BUFFERS */") io_core.sub(self.gen_output_probe_bufs(), "/* OUTPUT_PROBE_BUFFERS */") io_core.sub(self.gen_output_probe_initial_values(), "/* OUTPUT_PROBE_INITIAL_VALUES */") @@ -215,15 +159,56 @@ class IOCore: return io_core.get_hdl() - def hdl_top_level_ports(self): - ports = [] + def gen_read_case_statement_body(self): + lines = [] + for probe in self.input_probes + self.output_probes: + if probe.width <= 16: + lines.append(f"BASE_ADDR + {probe.addrs[0]}: data_o <= {probe.name}_buf;") - if self.user_clock: - ports.append(f"input wire {self.name}_user_clock") + # assign 16-bit slices of each probe's buffer to each address taken by the probe + else: + for i in range(ceil(probe.width/16)): + top = ((i + 1) * 16) - 1 + btm = i * 16 + if top > probe.width - 1: + top = probe.width - 1 + lines.append(f"BASE_ADDR + {probe.addrs[i]}: data_o <= {probe.name}_buf[{top}:{btm}];") - for probe in self.probes: - net_type = "input wire " if isinstance(probe, InputProbe) else "output reg " - name_def = probe.name if probe.width == 1 else f"[{probe.width-1}:0] {probe.name}" - ports.append(net_type + name_def) + return '\n'.join(lines) - return ports \ No newline at end of file + def gen_write_case_statement_body(self): + lines = [] + for probe in self.output_probes: + if probe.width <= 16: + lines.append(f"BASE_ADDR + {probe.addrs[0]}: {probe.name}_buf <= data_i;") + + else: + for i in range(ceil(probe.width/16)): + top = ((i + 1) * 16) - 1 + btm = i * 16 + if top > probe.width - 1: + top = probe.width - 1 + + lines.append(f"BASE_ADDR + {probe.addrs[i]}: {probe.name}_buf[{top}:{btm}] <= data_i;") + + return '\n'.join(lines) + + def gen_input_probe_bufs(self): + lines = [f"reg {p.brackets}{p.name}_buf = 0;" for p in self.input_probes] + return '\n'.join(lines) + + def gen_output_probe_bufs(self): + lines = [f"reg {p.brackets}{p.name}_buf = {p.initial_value};" for p in self.output_probes] + return '\n'.join(lines) + + def gen_output_probe_initial_values(self): + lines = [f"{p.name} = {p.initial_value}" for p in self.output_probes] + return '\n'.join(lines) + + def gen_update_input_buffers(self): + lines = [f"{p.name}_buf <= {p.name};" for p in self.input_probes] + return '\n'.join(lines) + + def gen_update_output_buffers(self): + lines = [f"{p.name} <= {p.name}_buf;" for p in self.output_probes] + return '\n'.join(lines)