verilator/test_regress/driver.py

3014 lines
125 KiB
Python
Raw Normal View History

2024-09-08 19:00:03 +02:00
#!/usr/bin/env python3
# pylint: disable=C0103,C0114,C0115,C0116,C0209,C0302,R0902,R0903,R0904,R0912,R0913,R0914,R0915,R0916,W0212,W0511,W0603,W1201
######################################################################
import argparse
import collections
import ctypes
2024-09-08 19:00:03 +02:00
import glob
import hashlib
import json
import logging
import multiprocessing
import os
import pickle
import platform
import pty
2024-09-08 19:00:03 +02:00
import re
import resource
2024-09-08 19:00:03 +02:00
import runpy
import shutil
import signal
import subprocess
import sys
import time
from functools import lru_cache # Eventually use python 3.9's cache
from pprint import pformat, pprint
import distro
2024-09-08 19:00:03 +02:00
if False: # pylint: disable=using-constant-test
pprint(pformat("Ignored")) # Prevent unused warning
# Map of all scenarios, with the names used to enable them
All_Scenarios = {
# yapf: disable
'dist': ['dist'],
'atsim': ['simulator', 'simulator_st', 'atsim'],
'ghdl': ['linter', 'simulator', 'simulator_st', 'ghdl'],
'iv': ['simulator', 'simulator_st', 'iv'],
'ms': ['linter', 'simulator', 'simulator_st', 'ms'],
'nc': ['linter', 'simulator', 'simulator_st', 'nc'],
'vcs': ['linter', 'simulator', 'simulator_st', 'vcs'],
'xrun': ['linter', 'simulator', 'simulator_st', 'xrun'],
'xsim': ['linter', 'simulator', 'simulator_st', 'xsim'],
'vlt': ['linter', 'simulator', 'simulator_st', 'vlt_all', 'vlt'],
'vltmt': ['simulator', 'vlt_all', 'vltmt'],
# yapf: enable
}
# Globals
test = None
Arg_Tests = []
Quitting = False
Vltmt_Threads = 3
forker = None
Start = None
nodist_directory = "../nodist"
2024-09-08 19:00:03 +02:00
# So an 'import vltest_bootstrap' inside test files will do nothing
sys.modules['vltest_bootstrap'] = {}
#######################################################################
#######################################################################
# Decorators
class staticproperty(property):
def __get__(self, owner_self, owner_cls):
return self.fget()
#######################################################################
#######################################################################
# VtOs -- OS extensions
class VtOs:
@staticmethod
def delenv(var: str) -> None:
"""Delete environment variable, if exists"""
if var in os.environ:
del os.environ[var]
@staticmethod
def getenv_def(var: str, default=None) -> str:
"""Return environment variable, returning default if does not exist"""
if var in os.environ:
return os.environ[var]
return default
@staticmethod
def mkdir_ok(path: str) -> None:
"""Make directory, no error if exists"""
try:
os.mkdir(path)
except FileExistsError:
pass
@staticmethod
def run_capture(command: str, check=True) -> str:
"""Run a command and return results"""
proc = subprocess.run([command], capture_output=True, text=True, shell=True, check=False)
if check and proc.returncode:
sys.exit("%Error: command failed '" + command + "':\n" + proc.stderr + proc.stdout)
return str(proc.stdout)
@staticmethod
def unlink_ok(filename: str) -> None:
"""Unlink a file, no error if fails"""
try:
os.unlink(filename)
except OSError:
pass
#######################################################################
#######################################################################
# Capabilities -- What OS/Verilator is built to support
class Capabilities:
# @lru_cache(maxsize=1024) broken with @staticmethod on older pythons we use
_cached_cmake_version = None
_cached_cxx_version = None
_cached_have_coroutines = None
_cached_have_dev_asan = None
_cached_have_dev_gcov = None
2024-09-08 19:00:03 +02:00
_cached_have_gdb = None
_cached_have_sc = None
_cached_have_solver = None
_cached_make_version = None
@staticproperty
def cmake_version() -> str: # pylint: disable=no-method-argument
if Capabilities._cached_cmake_version is None:
out = VtOs.run_capture('cmake --version', check=False)
match = re.search(r'cmake version (\d+)\.(\d+)', out, re.IGNORECASE)
if match:
Capabilities._cached_cmake_version = match.group(1) + "." + match.group(2)
else:
Capabilities._cached_cmake_version = 0
return Capabilities._cached_cmake_version
@staticproperty
def cxx_version() -> str: # pylint: disable=no-method-argument
if Capabilities._cached_cxx_version is None:
Capabilities._cached_cxx_version = VtOs.run_capture(
os.environ['MAKE'] + " --silent -C " + os.environ['TEST_REGRESS'] +
" -f Makefile print-cxx-version",
check=False)
2024-09-08 19:00:03 +02:00
return Capabilities._cached_cxx_version
@staticproperty
def have_coroutines() -> bool: # pylint: disable=no-method-argument
if Capabilities._cached_have_coroutines is None:
Capabilities._cached_have_coroutines = bool(
Capabilities._verilator_get_supported('COROUTINES'))
return Capabilities._cached_have_coroutines
@staticproperty
def have_dev_asan() -> bool: # pylint: disable=no-method-argument
if Capabilities._cached_have_dev_asan is None:
Capabilities._cached_have_dev_asan = bool(
Capabilities._verilator_get_supported('DEV_ASAN'))
return Capabilities._cached_have_dev_asan
@staticproperty
def have_dev_gcov() -> bool: # pylint: disable=no-method-argument
if Capabilities._cached_have_dev_gcov is None:
Capabilities._cached_have_dev_gcov = bool(
Capabilities._verilator_get_supported('DEV_GCOV'))
return Capabilities._cached_have_dev_gcov
2024-09-08 19:00:03 +02:00
@staticproperty
def have_gdb() -> bool: # pylint: disable=no-method-argument
if Capabilities._cached_have_gdb is None:
out = VtOs.run_capture('gdb --version 2>/dev/null', check=False)
Capabilities._cached_have_gdb = bool('Copyright' in out)
return Capabilities._cached_have_gdb
@staticproperty
def have_sc() -> bool: # pylint: disable=no-method-argument
if Capabilities._cached_have_sc is None:
if 'SYSTEMC' in os.environ:
Capabilities._cached_have_sc = True
elif 'SYSTEMC_INCLUDE' in os.environ:
Capabilities._cached_have_sc = True
elif 'CFG_HAVE_SYSTEMC' in os.environ:
Capabilities._cached_have_sc = True
else:
Capabilities._cached_have_sc = bool(
Capabilities._verilator_get_supported('SYSTEMC'))
return Capabilities._cached_have_sc
@staticproperty
def have_solver() -> bool: # pylint: disable=no-method-argument
if Capabilities._cached_have_solver is None:
out = VtOs.run_capture('(z3 --help || cvc5 --help || cvc4 --help) 2>/dev/null',
check=False)
Capabilities._cached_have_solver = bool('usage' in out.casefold())
2024-09-08 19:00:03 +02:00
return Capabilities._cached_have_solver
@staticproperty
@lru_cache(maxsize=1024)
def make_version() -> str: # pylint: disable=no-method-argument
if Capabilities._cached_make_version is None:
out = VtOs.run_capture(os.environ['MAKE'] + ' --version', check=False)
match = re.search(r'make ([0-9]+\.[0-9]+)', out, flags=re.IGNORECASE)
if match:
Capabilities._cached_make_version = match.group(1)
else:
Capabilities._cached_make_version = -1
return Capabilities._cached_make_version
# Fetch
@staticmethod
def warmup_cache() -> None:
_ignore = Capabilities.have_coroutines
_ignore = Capabilities.have_dev_asan
_ignore = Capabilities.have_dev_gcov
2024-09-08 19:00:03 +02:00
_ignore = Capabilities.have_gdb
_ignore = Capabilities.have_sc
_ignore = Capabilities.have_solver
# Internals
@staticmethod
def _verilator_get_supported(feature) -> str:
# Returns if given feature is supported
cmd = "perl " + os.environ['VERILATOR_ROOT'] + "/bin/verilator -get-supported " + feature
out = VtOs.run_capture(cmd, check=False).strip()
if out == '1':
return True
if out in ('', '0'):
return False
sys.exit("%Error: couldn't verilator_get_supported: " + cmd + "\n" + out)
#######################################################################
#######################################################################
# Forker class - run multprocess pool of processes
# Similar interface to Perl's Parallel::Forker.
class Forker:
class Job:
2024-09-08 19:00:03 +02:00
def __init__(self, _id, name, scenario, args, quiet, rerun_skipping, run_pre_start,
run_on_start, run_on_finish):
2024-09-08 19:00:03 +02:00
self.fail_max_skip = False
self.id = _id
self.name = name
self.scenario = scenario
self.args = args
self.quiet = quiet
2024-09-08 19:00:03 +02:00
self.rerun_skipping = rerun_skipping
self.run_pre_start = run_pre_start
self.run_on_start = run_on_start
self.run_on_finish = run_on_finish
self.mprocess = None # Set once call run()
@property
def pid(self) -> int:
return self.mprocess.pid
@property
def exitcode(self) -> int:
return self.mprocess.exitcode
def __init__(self, max_processes):
self._max_processes = max_processes
self._id_next = 0
self._left = collections.deque() # deque of Job
self._running = {} # key of pid, value of Job
2024-09-08 19:00:03 +02:00
def is_any_left(self) -> bool:
return self.num_running() > 0 or (len(self._left) > 0 and not Quitting)
2024-09-08 19:00:03 +02:00
def max_proc(self, n: int) -> None:
self._max_processes = n
def poll(self) -> bool:
"""Run threads, returning if more work to do (if False, sleep)"""
2024-09-08 19:00:03 +02:00
# We don't use SIGCHLD as conflicted with other handler, instead just poll
completed = [] # Need two passes to avoid changing the list we are iterating
nrunning = 0
more_now = False
2024-09-08 19:00:03 +02:00
for process in self._running.values():
if process.exitcode is not None:
completed.append(process)
more_now = True
2024-09-08 19:00:03 +02:00
else:
nrunning += 1
# Start new work now, so running in background while we then collect completions
while len(self._left) and nrunning < self._max_processes and not Quitting:
process = self._left.popleft()
2024-09-08 19:00:03 +02:00
self._run(process)
nrunning += 1
more_now = True
for process in completed:
self._finished(process)
return more_now
2024-09-08 19:00:03 +02:00
def running(self) -> list:
return self._running.values()
def num_running(self) -> int:
return len(self._running)
def schedule(self, name, scenario, args, quiet, rerun_skipping, run_pre_start, run_on_start,
run_on_finish):
2024-09-08 19:00:03 +02:00
# print("-Forker::schedule: [" + name + "]")
process = Forker.Job(self._id_next,
name=name,
scenario=scenario,
args=args,
quiet=quiet,
rerun_skipping=rerun_skipping,
run_pre_start=run_pre_start,
run_on_start=run_on_start,
run_on_finish=run_on_finish)
2024-09-08 19:00:03 +02:00
self._id_next += 1
self._left.append(process)
def kill_tree_all(self) -> None:
# print("-Forker: kill_tree_all")
for process in self._running.values():
process.mprocess.kill()
def _run(self, process) -> None:
# print("-Forker: [" + process.name + "] run_pre_start")
process.run_pre_start(process)
ctx = multiprocessing.get_context('forkserver')
process.mprocess = ctx.Process( #
target=forker.child_start, # pylint: disable=used-before-assignment
args=(process, ))
2024-09-08 19:00:03 +02:00
process.mprocess.start()
# print("-Forker: [" + process.name + "] RUNNING pid=" + str(process.pid))
self._running[process.pid] = process
@staticmethod
def child_start(process) -> None:
# Runs in context of child
# print("-Forker: [" + process.name + "] run_on_start")
process.run_on_start(process)
# print("-Forker: [" + process.name + "] FINISHED run_on_start")
sys.exit(0)
# Don't close anything
def _finished(self, process) -> None:
del self._running[process.pid]
# print("-Forker: [" + process.name + "] run_on_finish exitcode=" + str(process.exitcode))
process.run_on_finish(process)
process.mprocess.close()
#######################################################################
#######################################################################
# Runner class
class Runner:
def __init__(self, driver_log_filename, quiet, ok_cnt=0, fail1_cnt=0, skip_cnt=0):
self.driver_log_filename = driver_log_filename
self.quiet = quiet
# Counts
self.all_cnt = 0
self.left_cnt = 0
self.ok_cnt = ok_cnt
self.fail1_cnt = fail1_cnt
self.fail_cnt = 0
self.skip_cnt = skip_cnt
self.skip_msgs = []
self.fail_msgs = []
self.fail_tests = []
self._last_proc_finish_time = 0
self._last_summary_time = 0
self._last_summary_left = 0
self._running_ids = {}
self._msg_fail_max_skip = False
Runner.runner = self
def one_test(self, py_filename: str, scenario: str, rerun_skipping=False) -> None:
self.all_cnt += 1
self.left_cnt += 1
forker.schedule(name=py_filename,
scenario=scenario,
args=Args,
quiet=self.quiet,
2024-09-08 19:00:03 +02:00
rerun_skipping=rerun_skipping,
run_pre_start=self._run_pre_start_static,
run_on_start=self._run_on_start_static,
run_on_finish=self._run_on_finish_static)
@staticmethod
def _run_pre_start_static(process) -> None:
Runner.runner._run_pre_start(process) # pylint: disable=protected-access
def _run_pre_start(self, process) -> None:
# Running in context of parent, before run_on_start
# Make an identifier that is unique across all current running jobs
i = 1
while i in self._running_ids:
i += 1
process.running_id = i
self._running_ids[process.running_id] = 1
if Args.fail_max and Args.fail_max <= self.fail_cnt:
if not self._msg_fail_max_skip:
self._msg_fail_max_skip = True
print("== Too many test failures; exceeded --fail-max\n", file=sys.stderr)
process.fail_max_skip = 1
@staticmethod
def _run_on_start_static(process) -> None:
# Running in context of child, so can't pass data to parent directly
if process.quiet:
2024-09-08 19:00:03 +02:00
sys.stdout = open(os.devnull, 'w') # pylint: disable=R1732,unspecified-encoding
sys.stderr = open(os.devnull, 'w') # pylint: disable=R1732,unspecified-encoding
print("=" * 70)
global Args
Args = process.args
2024-09-08 19:00:03 +02:00
global test
test = VlTest(py_filename=process.name,
scenario=process.scenario,
running_id=process.running_id)
test.oprint("=" * 50)
test._prep()
if process.rerun_skipping:
print(" ---------- Earlier logfiles below; test was rerunnable = False\n")
os.system("cat " + test.obj_dir + "/*.log")
print(" ---------- Earlier logfiles above; test was rerunnable = False\n")
2024-09-08 19:00:03 +02:00
elif process.fail_max_skip:
test.skip("Too many test failures; exceeded --fail-max")
else:
VtOs.unlink_ok(test._status_filename)
test._read()
# Don't put anything other than _exit after _read,
# as may call _exit via another path
test._exit()
@staticmethod
def _run_on_finish_static(process) -> None:
Runner.runner._run_on_finish(process) # pylint: disable=protected-access
def _run_on_finish(self, process) -> None:
# Running in context of parent
global test
test = VlTest(py_filename=process.name,
scenario=process.scenario,
running_id=process.running_id)
test._quit = Quitting
2024-09-08 19:00:03 +02:00
test._read_status()
if test.ok:
self.ok_cnt += 1
2025-03-27 23:33:18 +01:00
if Args.driver_clean:
test.clean()
elif test._quit:
pass
2024-09-08 19:00:03 +02:00
elif test._scenario_off and not test.errors:
pass
elif test._skips and not test.errors and not test.errors_keep_going:
self.skip_msgs.append("\t#" + test.soprint("-Skip: " + test._skips))
self.skip_cnt += 1
else:
error_msg = test.errors if test.errors else test.errors_keep_going
test.oprint("FAILED: " + error_msg)
makecmd = VtOs.getenv_def('VERILATOR_MAKE', os.environ['MAKE'] + " &&")
2024-09-08 19:00:03 +02:00
upperdir = 'test_regress/' if re.search(r'test_regress', os.getcwd()) else ''
self.fail_msgs.append("\t#" + test.soprint("%Error: " + error_msg) + "\t\t" + makecmd +
" " + upperdir + test.py_filename + ' ' +
2024-09-08 19:00:03 +02:00
' '.join(self._manual_args()) + " --" + test.scenario + "\n")
self.fail_tests.append(test)
self.fail_cnt += 1
self.report(self.driver_log_filename)
other = ""
for proc in forker.running():
other += " " + proc.name
if other != "" and not Args.quiet:
test.oprint("Simultaneous running tests:" + other)
if Args.stop:
sys.exit("%Error: --stop and errors found\n")
self.left_cnt -= 1
self._last_proc_finish_time = time.time()
if process.running_id:
del self._running_ids[process.running_id]
def wait_and_report(self) -> None:
self.print_summary(force=True)
# Wait for all children to finish
while forker.is_any_left():
more_now = forker.poll()
if not Args.interactive_debugger:
2024-09-08 19:00:03 +02:00
self.print_summary(force=False)
if not more_now:
time.sleep(0.1)
2024-09-08 19:00:03 +02:00
self.report(None)
self.report(self.driver_log_filename)
def report(self, filename: str) -> None:
if filename:
with open(filename, "w", encoding="utf8") as fh:
self._report_fh(fh)
else:
self._report_fh(sys.stdout)
def _report_fh(self, fh) -> None:
fh.write("\n")
fh.write('=' * 70 + "\n")
for f in sorted(self.fail_msgs):
fh.write(f.strip() + "\n")
for f in sorted(self.skip_msgs):
fh.write(f.strip() + "\n")
if self.fail_cnt:
sumtxt = 'FAILED'
elif self.skip_cnt:
sumtxt = 'PASSED w/SKIPS'
else:
sumtxt = 'PASSED'
fh.write("==TESTS DONE, " + sumtxt + ": " + self.sprint_summary() + "\n")
def print_summary(self, force=False):
change = self._last_summary_left != self.left_cnt
if (force or ((time.time() - self._last_summary_time) >= 15)
or (not self.quiet and change)):
self._last_summary_left = self.left_cnt
self._last_summary_time = time.time()
print("==SUMMARY: " + self.sprint_summary(), file=sys.stderr)
if (self._last_proc_finish_time != 0
and ((time.time() - self._last_proc_finish_time) > 15)):
self._last_proc_finish_time = time.time()
other = ""
for proc in forker.running():
other += " " + proc.name
print("==STILL RUNNING:" + other, file=sys.stderr)
@staticmethod
def _py_filename_adjust(py_filename: str,
tdir_def: str) -> list: # Return (py_filename, t_dir)
for tdir in Args.test_dirs: # pylint: disable=redefined-outer-name
2024-09-08 19:00:03 +02:00
# t_dir used both absolutely and under obj_dir
try_py_filename = tdir + "/" + os.path.basename(py_filename)
if os.path.exists(try_py_filename):
# Note most tests require error messages of the form t/x.v
# Therefore py_filename must be t/ for local tests
# t_dir must be absolute - used under t or under obj_dir
tdir_abs = os.path.abspath(tdir)
return (try_py_filename, tdir_abs)
return (py_filename, os.path.abspath(tdir_def))
def sprint_summary(self) -> str:
delta = time.time() - Start # pylint: disable=used-before-assignment
2024-09-08 19:00:03 +02:00
# Fudge of 120% works out about right so ETA correctly predicts completion time
eta = 1.2 * ((self.all_cnt * (delta / ((self.all_cnt - self.left_cnt) + 0.001))) - delta)
if delta < 10:
eta = 0
out = ""
if self.left_cnt:
out += "Left " + str(self.left_cnt) + " "
out += "Passed " + str(self.ok_cnt)
# Ordered below most severe to least severe
out += " Failed " + str(self.fail_cnt)
if self.fail1_cnt:
out += " Failed-First " + str(self.fail1_cnt)
if self.skip_cnt:
out += " Skipped " + str(self.skip_cnt)
if forker.num_running():
out += " Running " + str(forker.num_running())
2025-03-22 13:49:40 +01:00
if self.left_cnt > 10 and eta > 10 and self.all_cnt != self.left_cnt:
2024-09-08 19:00:03 +02:00
out += " Eta %d:%02d" % (int(eta / 60), eta % 60)
out += " Time %d:%02d" % (int(delta / 60), delta % 60)
return out
def _manual_args(self) -> str:
# Return command line with scenarios stripped
out = []
for oarg in Args.orig_argv_sw:
2024-09-08 19:00:03 +02:00
showarg = True
for val in All_Scenarios.values():
for allscarg in val:
if oarg == "--" + allscarg:
showarg = False
# Also strip certain flags that per-test debugging won't want
if showarg and oarg != '--rerun' and oarg != '--quiet':
out.append(oarg)
return out
#######################################################################
#######################################################################
# Test exceptions
class VtSkipException(Exception):
pass
class VtErrorException(Exception):
pass
#######################################################################
#######################################################################
# Test class
class VlTest:
_file_contents_cache = {}
# @lru_cache(maxsize=1024) broken with @staticmethod on older pythons we use
_cached_aslr_off = None
_cached_cfg_with_ccache = None
def __init__(self, py_filename, scenario, running_id):
self.py_filename = py_filename # Name of .py file to get setup from
self.running_id = running_id
self.scenario = scenario
self.root = '..' # Relative path to git root (above test_regress)
2024-09-08 19:00:03 +02:00
2024-12-01 16:27:05 +01:00
self._force_pass = False
2024-09-08 19:00:03 +02:00
self._have_solver_called = False
self._inputs = {}
self._ok = False
self._quit = False
2024-09-08 19:00:03 +02:00
self._scenario_off = False # scenarios() didn't match running scenario
self._skips = None
match = re.match(r'^(.*/)?([^/]*)\.py', self.py_filename)
self.name = match.group(2) # Name of this test
self.benchmark = Args.benchmark
self.benchmarksim = False
self.clean_command = None
self.context_threads = 0 # Number of threads to allocate in the context
self.errors = None
self.errors_keep_going = None
self.main_time_multiplier = 1
self.make_main = 1 # Make __main.cpp
self.make_pli = 0 # need to compile pli
self.make_top_shell = 1 # Make a default __top.v file
self.rerunnable = True # Rerun if fails
self.sc_time_resolution = "SC_PS" # Keep - PS is SystemC default
self.sim_time = 1100 # simulation time units for main wrapper
self.threads = -1 # --threads (negative means auto based on scenario)
self.verbose = Args.verbose
self.verilated_randReset = ""
self.vm_prefix = "V" + self.name
# Make e.g. self.vlt, self.vltmt etc
self.vlt = False # Set below also
self.vltmt = False # Set below also
self.xsim = False # Set below also
for ascenario in All_Scenarios:
self.__dict__[ascenario] = False
self.__dict__[scenario] = True
self.vlt_all = self.vlt or self.vltmt # Any Verilator scenario
(self.py_filename, self.t_dir) = Runner._py_filename_adjust(self.py_filename, ".")
for tdir in Args.test_dirs: # pylint: disable=redefined-outer-name
2024-09-08 19:00:03 +02:00
# t_dir used both absolutely and under obj_dir
self.t_dir = None
if os.path.exists(tdir + "/" + self.name + ".py"):
# Note most tests require error messages of the form t/x.v
# Therefore py_filename must be t/ for local tests
self.py_filename = os.path.relpath(tdir + "/" + self.name + ".py")
# t_dir must be absolute - used under t or under obj_dir
self.t_dir = os.path.abspath(tdir)
break
if not self.t_dir:
sys.exit("%Error: Can't locate dir for " + self.name)
scen_dir = os.path.relpath(self.t_dir + "/../obj_" + self.scenario)
# Simplify filenames on local runs
scen_dir = re.sub(r'^t/\.\./', '', scen_dir)
# Not mkpath so error if try to build somewhere odd
VtOs.mkdir_ok(scen_dir)
2024-11-25 03:12:08 +01:00
self.obj_dir = scen_dir + "/" + self.name + Args.obj_suffix
2024-09-08 19:00:03 +02:00
define_opt = self._define_opt_calc()
# All compilers
self.v_flags = []
if self.xsim:
self.v_flags += ["-f input.xsim.vc"]
elif os.path.exists('input.vc'):
self.v_flags += ["-f input.vc"]
if not re.search(r'/test_regress', self.t_dir): # Don't include standard dir, only site's
self.v_flags += ["+incdir+" + self.t_dir + " -y " + self.t_dir]
self.v_flags += [define_opt + "TEST_OBJ_DIR=" + self.obj_dir]
if Args.verbose:
self.v_flags += [define_opt + "TEST_VERBOSE=1"]
if Args.benchmark:
self.v_flags += [define_opt + "TEST_BENCHMARK=Args.benchmark"]
if Args.trace:
self.v_flags += [define_opt + "WAVES=1"]
self.v_flags2 = [] # Overridden in some sim files
self.v_other_filenames = []
self.all_run_flags = []
self.pli_flags = [
"-I" + os.environ['VERILATOR_ROOT'] + "/include/vltstd",
"-I" + os.environ['VERILATOR_ROOT'] + "/include", "-fPIC", "-shared"
2024-09-08 19:00:03 +02:00
]
if platform.system() == 'Darwin':
self.pli_flags += ["-Wl,-undefined,dynamic_lookup"]
else:
self.pli_flags += ["-rdynamic"]
if Args.verbose:
self.pli_flags += ["-DTEST_VERBOSE=1"]
self.pli_flags += ["-o", self.obj_dir + "/libvpi.so"]
self.tool_c_flags = []
# ATSIM
self.atsim_define = 'ATSIM'
self.atsim_flags = [
"-c", "+sv", "+define+ATSIM", ("+sv_dir+" + self.obj_dir + "/.athdl_compile")
]
self.atsim_flags2 = [] # Overridden in some sim files
self.atsim_run_flags = []
# GHDL
self.ghdl_define = 'GHDL'
self.ghdl_work_dir = self.obj_dir + "/ghdl_compile"
self.ghdl_flags = [("-v" if Args.debug else ""),
("--workdir=" + self.obj_dir + "/ghdl_compile")]
self.ghdl_flags2 = [] # Overridden in some sim files
self.ghdl_run_flags = []
# IV
self.iv_define = 'IVERILOG'
self.iv_flags = ["+define+IVERILOG", "-g2012", ("-o" + self.obj_dir + "/simiv")]
self.iv_flags2 = [] # Overridden in some sim files
self.iv_run_flags = []
# VCS
self.vcs_define = 'VCS'
self.vcs_flags = [
"+vcs+lic+wait", "+cli", "-debug_access", "+define+VCS+1", "-q", "-sverilog",
"-CFLAGS", "'-DVCS'"
]
self.vcs_flags2 = [] # Overridden in some sim files
self.vcs_run_flags = ["+vcs+lic_wait"]
# NC
self.nc_define = 'NC'
self.nc_flags = [
"+licqueue", "+nowarn+LIBNOU", "+define+NC=1", "-q", "+assert", "+sv", "-c",
2025-06-27 21:47:13 +02:00
"-xmlibdirname", (self.obj_dir + "/xcelium.d"), ("+access+r" if Args.trace else "")
2024-09-08 19:00:03 +02:00
]
self.nc_flags2 = [] # Overridden in some sim files
2025-06-27 21:47:13 +02:00
self.nc_run_flags = [
"+licqueue",
"-q",
"+assert",
"+sv",
"-R",
"-covoverwrite",
"-xmlibdirname",
(self.obj_dir + "/xcelium.d"),
]
2024-09-08 19:00:03 +02:00
# ModelSim
self.ms_define = 'MS'
self.ms_flags = [
"-sv", "-work", (self.obj_dir + "/work"), "+define+MS=1", "-ccflags", '\"-DMS=1\"'
]
self.ms_flags2 = [] # Overridden in some sim files
self.ms_run_flags = ["-lib", self.obj_dir + "/work", "-c", "-do", "'run -all;quit'"]
# XSim
self.xsim_define = 'XSIM'
self.xsim_flags = [
"--nolog", "--sv", "--define", "XSIM", "--work",
(self.name + "=" + self.obj_dir + "/xsim")
]
self.xsim_flags2 = [] # Overridden in some sim files
self.xsim_run_flags = [
"--nolog", "--runall", "--lib", (self.name + "=" + self.obj_dir + "/xsim"),
(" --debug all" if Args.trace else "")
]
self.xsim_run_flags2 = [] # Overridden in some sim files
# Xcelium (xrun)
self.xrun_define = 'XRUN'
self.xrun_flags = [] # Doesn't really have a compile step
self.xrun_flags2 = [] # Overridden in some sim files
self.xrun_run_flags = [
"-64", "-access", "+rwc", "-newsv", "-sv", "-xmlibdirname", self.obj_dir + "/work",
"-l", self.obj_dir + "/history", "-quiet", "-plinowarn"
]
# Verilator
self.verilator_define = 'VERILATOR'
self.verilator_flags = [
"-cc",
"-Mdir",
self.obj_dir,
"--fdedup", # As currently disabled unless -O3
"--debug-check",
"--comp-limit-members",
"10"
]
self.verilator_flags2 = []
Deprecate clocker attribute and --clk option (#6463) The only use for the clocker attribute and the AstVar::isUsedClock that is actually necessary today for correctness is to mark top level inputs of --lib-create blocks as being (or driving) a clock signal. Correctness of --lib-create (and hence hierarchical blocks) actually used to depend on having the right optimizations eliminate intermediate clocks (e.g.: V3Gate), when the top level port was not used directly in a sensitivity list, or marking top level signals manually via --clk or the clocker attribute. However V3Sched::partition already needs to trace through the logic to figure out what signals might drive a sensitivity list, so it can very easily mark all top level inputs as such. In this patch we remove the AstVar::attrClocker and AstVar::isUsedClock attributes, and replace them with AstVar::isPrimaryClock, automatically set by V3Sched::partition. This eliminates all need for manual annotation so we are deprecating the --clk/--no-clk options and the clocker/no_clocker attributes. This also eliminates the opportunity for any further mis-optimization similar to #6453. Regarding the other uses of the removed AstVar attributes: - As of 5.000, initial edges are triggered via a separate mechanism applied in V3Sched, so the use in V3EmitCFunc.cpp is redundant - Also as of 5.000, we can handle arbitrary sensitivity expressions, so the restriction on eliminating clock signals in V3Gate is unnecessary - Since the recent change when Dfg is applied after V3Scope, it does perform the equivalent of GateClkDecomp, so we can delete that pass.
2025-09-20 16:50:22 +02:00
self.verilator_flags3 = []
2024-09-08 19:00:03 +02:00
self.verilator_make_gmake = True
self.verilator_make_cmake = False
self.verilated_debug = Args.verilated_debug
self._status_filename = self.obj_dir + "/V" + self.name + ".status"
self.coverage_filename = self.obj_dir + "/coverage.dat"
self.golden_filename = re.sub(r'\.py$', '.out', self.py_filename)
self.main_filename = self.obj_dir + "/" + self.vm_prefix + "__main.cpp"
self.compile_log_filename = self.obj_dir + "/vlt_compile.log"
2024-09-08 19:00:03 +02:00
self.run_log_filename = self.obj_dir + "/vlt_sim.log"
self.stats = self.obj_dir + "/V" + self.name + "__stats.txt"
if Args.top_filename:
self.top_filename = Args.top_filename
else:
self.top_filename = re.sub(r'\.py$', '', self.py_filename) + '.' + self.v_suffix
2024-09-08 19:00:03 +02:00
self.pli_filename = re.sub(r'\.py$', '', self.py_filename) + '.cpp'
self.top_shell_filename = self.obj_dir + "/" + self.vm_prefix + "__top.v"
def _define_opt_calc(self) -> str:
return "--define " if self.xsim else "+define+"
def init_benchmarksim(self) -> None:
# Simulations with benchmarksim enabled append to the same file between runs.
# Test files must ensure a clean benchmark data file before executing tests.
filename = self.benchmarksim_filename
with open(filename, 'w', encoding="utf8") as fh:
fh.write("# Verilator simulation benchmark data\n")
fh.write("# Test name: " + self.name + "\n")
fh.write("# Top file: " + self.top_filename + "\n")
fh.write("evals, time[s]\n")
def soprint(self, message: str) -> str:
message = message.rstrip() + "\n"
message = self.scenario + "/" + self.name + ": " + message
return message
def oprint(self, message: str) -> None:
message = message.rstrip() + "\n"
print(self.soprint(message), end="")
def error(self, message: str) -> None:
"""Called from tests as: error("Reason message")
Newline is optional. Only first line is passed to summaries
Throws a VtErrorException, so rest of testing is not executed"""
2024-12-01 16:27:05 +01:00
if self._force_pass:
return
2024-09-08 19:00:03 +02:00
message = message.rstrip() + "\n"
print("%Warning: " + self.scenario + "/" + self.name + ": " + message,
file=sys.stderr,
end="")
if not self.errors:
message = re.sub(r'\n.*', '\n', message)
self.errors = message
raise VtErrorException
def error_keep_going(self, message: str) -> None:
"""Called from tests as: error_keep_going("Reason message")
Newline is optional. Only first line is passed to summaries"""
2024-12-01 16:27:05 +01:00
if self._quit or self._force_pass:
return
2024-09-08 19:00:03 +02:00
message = message.rstrip() + "\n"
print("%Warning: " + self.scenario + "/" + self.name + ": " + message,
file=sys.stderr,
end="")
if not self.errors_keep_going:
message = re.sub(r'\n.*', '\n', message)
self.errors_keep_going = message
def skip(self, message: str) -> None:
"""Called from tests as: skip("Reason message"[, ...])
Newline is optional. Only first line is passed to summaries.
Throws a VtSkipException, so rest of testing is not executed"""
message = message.rstrip() + "\n"
print("-Skip: " + self.scenario + "/" + self.name + ": " + message,
file=sys.stderr,
end="")
if not self._skips:
message = re.sub(r'\n.*', '\n', message)
self._skips = message
raise VtSkipException
def scenarios(self, *scenario_list) -> None:
"""Called from tests as: scenarios(<list_of_scenarios>) to
specify which scenarios this test runs under. Where ... is
one cases listed in All_Scenarios.
All scenarios must be on one line; this is parsed outside Python."""
enabled_scenarios = {}
for param in scenario_list:
hit = False
for allsc, allscs in All_Scenarios.items():
for allscarg in allscs:
if param == allscarg:
hit = True
enabled_scenarios[allsc] = True
if not hit:
self.error("scenarios(...) has unknown scenario '" + param + "'")
if not enabled_scenarios.get(self.scenario, None):
self._scenario_off = True
self.skip("scenario '" + self.scenario + "' not enabled for test")
# self._exit() implied by skip's exception
@staticmethod
def _prefilter_scenario(py_filename: str, scenario: str) -> bool:
"""Read a python file to see if scenarios require it to be run.
Much faster than parsing the file for a runtime check."""
(py_filename, _) = Runner._py_filename_adjust(py_filename, ".")
with open(py_filename, 'r', encoding="utf-8") as fh:
for line in fh:
m = re.search(r'^\s*test.scenarios\((.*?)\)', line)
if m:
for param in re.findall(r"""["']([^,]*)["']""", m.group(1)):
for allscarg in All_Scenarios[scenario]:
if param == allscarg:
return True
return False
def _prep(self) -> None:
VtOs.mkdir_ok(self.obj_dir) # Ok if already exists
def _read(self) -> None:
if not os.path.exists(self.py_filename):
self.error("Can't open " + self.py_filename)
return
global test
test = self
sys.path.append(self.t_dir) # To find vltest_bootstrap.py
# print("_read/exec py_filename=" + self.py_filename)
# print("_read/exec dir=", ' '.join(dir()))
# print("_read/exec vars=", ' '.join(vars().keys()))
# print("_read/exec globals=", ' '.join(globals().keys()))
# print("_read/exec locals=", ' '.join(locals().keys()))
try:
runpy.run_path(self.py_filename, globals())
except (VtErrorException, VtSkipException):
pass
def _exit(self):
if self.ok:
self.oprint("Self PASSED")
elif self._skips and not self.errors:
self.oprint("-Skip: " + self._skips)
else:
# If no worse errors, promote errors_keep_going to normal errors
if not self.errors and self.errors_keep_going:
self.errors = self.errors_keep_going
if not self.errors:
self.error("Missing ok")
self.oprint("%Error: " + self.errors)
self._write_status()
sys.exit(0)
def _write_status(self) -> None:
with open(self._status_filename, "wb") as fh:
pass_to_driver = {
'_ok': self._ok,
'_scenario_off': self._scenario_off,
'_skips': self._skips,
'errors': self.errors,
}
pickle.dump(pass_to_driver, fh)
def _read_status(self) -> None:
filename = self._status_filename
if not os.path.isfile(filename):
self.error_keep_going("Child test did not return status (test has Python error?): " +
self.py_filename)
return
with open(filename, "rb") as fh:
dic = pickle.load(fh)
for k in dic.keys():
# print("_read_status " + filename + ".test['" + k + "]=" + pformat(dic[k]))
setattr(self, k, dic[k])
#----------------------------------------------------------------------
# Methods invoked by tests
2025-03-27 23:33:18 +01:00
def clean(self, for_rerun=False) -> None:
"""Called on a --driver-clean or rerun to cleanup files."""
2024-09-08 19:00:03 +02:00
if self.clean_command:
os.system(self.clean_command)
os.system('/bin/rm -rf ' + self.obj_dir + '__fail1')
2025-03-27 23:33:18 +01:00
if for_rerun:
# Prevents false-failures when switching compilers
# Remove old results to force hard rebuild
os.system('/bin/mv ' + self.obj_dir + ' ' + self.obj_dir + '__fail1')
else:
os.system('/bin/rm -rf ' + self.obj_dir)
2024-09-08 19:00:03 +02:00
def clean_objs(self) -> None:
os.system("/bin/rm -rf " + ' '.join(glob.glob(self.obj_dir + "/*")))
def _checkflags(self, param):
checkflags = (
' ' + ' '.join(param['v_flags']) + #
' ' + ' '.join(param['v_flags2']) + #
' ' + ' '.join(param['verilator_flags']) + #
' ' + ' '.join(param['verilator_flags2']) + #
' ' + ' '.join(param['verilator_flags3']) + ' ')
return checkflags
def compile_vlt_cmd(self, **kwargs) -> list:
"""Return default command list to run verilator"""
param = {'stdout_filename': None}
param.update(vars(self))
param.update(kwargs)
vlt_cmd = [
"perl", os.environ["VERILATOR_ROOT"] + "/bin/verilator",
*self._compile_vlt_flags(**param), param['top_filename'], *param['v_other_filenames']
]
if param['stdout_filename']:
vlt_cmd += ["> " + param['stdout_filename']]
return vlt_cmd
def _compile_vlt_flags(self, **param) -> list:
checkflags = self._checkflags(param)
d_verilator_flags = ' ' + ' '.join(self.driver_verilator_flags) + ' '
self.pins_sc_uint_bool = ( # pylint: disable=attribute-defined-outside-init
bool(
re.search(r'-pins-sc-uint-bool\b', checkflags)
or re.search(r'-pins-sc-uint-bool\b', d_verilator_flags)))
self.savable = ( # pylint: disable=attribute-defined-outside-init
bool(re.search(r'-savable\b', checkflags)))
self.coverage = ( # pylint: disable=attribute-defined-outside-init
bool(re.search(r'-coverage\b', checkflags)))
self.sc = ( # pylint: disable=attribute-defined-outside-init
bool(re.search(r'-sc\b', checkflags)))
self.timing = ( # pylint: disable=attribute-defined-outside-init
bool(re.search(r'( -?-timing\b| -?-binary\b)', checkflags)))
self.trace = ( # pylint: disable=attribute-defined-outside-init
bool(Args.trace or re.search(r'-trace\b|-trace-fst\b', checkflags)))
if re.search(r'-trace-fst', checkflags):
if self.sc:
self.trace_format = 'fst-sc' # pylint: disable=attribute-defined-outside-init
else:
self.trace_format = 'fst-c' # pylint: disable=attribute-defined-outside-init
elif re.search(r'-trace-saif', checkflags):
if self.sc:
self.trace_format = 'saif-sc' # pylint: disable=attribute-defined-outside-init
else:
self.trace_format = 'saif-c' # pylint: disable=attribute-defined-outside-init
2024-09-08 19:00:03 +02:00
elif self.sc:
self.trace_format = 'vcd-sc' # pylint: disable=attribute-defined-outside-init
else:
self.trace_format = 'vcd-c' # pylint: disable=attribute-defined-outside-init
if param.get('benchmarksim', None):
self.benchmarksim = True # pylint: disable=attribute-defined-outside-init
verilator_flags = [*param.get('verilator_flags', "")]
if Args.gdb:
verilator_flags += ["--gdb"]
if Args.gdbbt:
verilator_flags += ["--gdbbt"]
if Args.rr:
verilator_flags += ["--rr"]
if Args.trace:
2025-04-05 16:46:39 +02:00
verilator_flags += ["--trace-vcd"]
2024-09-08 19:00:03 +02:00
if Args.gdbsim or Args.rrsim:
verilator_flags += ["-CFLAGS -ggdb -LDFLAGS -ggdb"]
verilator_flags += ["--x-assign unique"] # More likely to be buggy
if param['vltmt']:
verilator_flags += ["--debug-partition"]
if param['threads'] >= 0:
verilator_flags += ["--threads", str(param['threads'])]
if param['vltmt'] and re.search(r'-trace-fst ', checkflags):
verilator_flags += ["--trace-threads 2"]
if param['make_main'] and param['verilator_make_gmake']:
verilator_flags += ["--exe"]
if param['make_main'] and param['verilator_make_gmake']:
verilator_flags += ["../" + self.main_filename]
cmdargs = [
"--prefix",
param['vm_prefix'],
*verilator_flags,
*param['verilator_flags2'],
*param['verilator_flags3'],
*param['v_flags'],
*param['v_flags2'],
# Flags from driver cmdline override default flags and
# flags from the test itself
*self.driver_verilator_flags,
]
return cmdargs
def lint(self, **kwargs) -> None:
"""Run a linter. Arguments similar to run(); default arguments are from self"""
param = {}
param.update(vars(self))
param.update({ # Lint-specific default overrides
'make_main': False,
'make_top_shell': False,
'verilator_flags2': ["--lint-only"],
'verilator_make_gmake': False
})
param.update(kwargs)
self.compile(**param)
def compile(self, **kwargs) -> None:
"""Run simulation compiler. Arguments similar to run(); default arguments are from self"""
param = {
'expect_filename': None,
'fails': False,
'make_flags': [],
'tee': True,
'timing_loop': False,
}
param.update(vars(self))
param.update(kwargs)
if self.verbose:
self.oprint("Compile")
if param['vlt'] and param['threads'] > 1:
self.error("'threads =' argument must be <= 1 for vlt scenario")
# Compute automatic parameter values
checkflags = self._checkflags(param)
if re.search(r'(^|\s)-?-threads\s', checkflags):
self.error("Specify threads via 'threads=' argument, not as a command line option")
if param['threads'] < 0 and param['vltmt']:
param['threads'] = calc_threads(Vltmt_Threads)
if not param['context_threads']:
param['context_threads'] = param['threads'] if (param['threads'] >= 1) else 1
if re.search(r'( -?-main\b| -?-binary\b)', checkflags):
param['make_main'] = False
if re.search(r'( -?-build\b| -?-binary\b)', checkflags):
param['verilator_make_cmake'] = False
param['verilator_make_gmake'] = False
self.threads = param['threads']
self.context_threads = param['context_threads']
self.compile_vlt_cmd(**param)
if not re.search(r'TEST_DUMPFILE', ' '.join(self.v_flags)):
self.v_flags += [self._define_opt_calc() + "TEST_DUMPFILE=" + self.trace_filename]
if not param['make_top_shell']:
self.top_shell_filename = ""
else:
self.top_shell_filename = self.obj_dir + "/" + self.vm_prefix + "__top." + self.v_suffix
param['top_shell_filename'] = self.top_shell_filename
if param['atsim']:
tool_define = param['atsim_define']
self._make_top(param['make_top_shell'])
self.run(logfile=self.obj_dir + "/atsim_compile.log",
fails=param['fails'],
cmd=[
VtOs.getenv_def('VERILATOR_ATSIM', "atsim"),
' '.join(param['atsim_flags']),
' '.join(param['atsim_flags2']),
' '.join(param['v_flags']),
' '.join(param['v_flags2']),
param['top_filename'],
param['top_shell_filename'],
' '.join(param['v_other_filenames']),
])
elif param['ghdl']:
tool_define = param['ghdl_define']
VtOs.mkdir_ok(self.ghdl_work_dir)
self._make_top(param['make_top_shell'])
tool_exe = VtOs.getenv_def('VERILATOR_GHDL', "ghdl")
self.run(
logfile=self.obj_dir + "/ghdl_compile.log",
fails=param['fails'],
cmd=[
tool_exe,
# Add -c here, as having -c twice freaks it out
("" if re.search(r' -c\b', tool_exe) else "-c"),
' '.join(param['ghdl_flags']),
' '.join(param['ghdl_flags2']),
#' '.join(param['v_flags']), # Not supported
#' '.join(param['v_flags2']), # Not supported
param['top_filename'],
param['top_shell_filename'],
' '.join(param['v_other_filenames']),
"-e t",
])
elif param['vcs']:
tool_define = param['vcs_define']
self._make_top(param['make_top_shell'])
self.run(logfile=self.obj_dir + "/vcs_compile.log",
fails=param['fails'],
cmd=[
VtOs.getenv_def('VERILATOR_VCS', "vcs"),
' '.join(param['vcs_flags']),
' '.join(param['vcs_flags2']),
("-CFLAGS -DTEST_VERBOSE=1" if Args.verbose else ""),
' '.join(param['v_flags']),
' '.join(param['v_flags2']),
param['top_filename'],
param['top_shell_filename'],
' '.join(param['v_other_filenames']),
])
elif param['nc']:
tool_define = param['nc_define']
self._make_top(param['make_top_shell'])
self.run(logfile=self.obj_dir + "/nc_compile.log",
fails=param['fails'],
cmd=[
VtOs.getenv_def('VERILATOR_NCVERILOG', "ncverilog"),
' '.join(param['nc_flags']),
' '.join(param['nc_flags2']),
' '.join(param['v_flags']),
' '.join(param['v_flags2']),
param['top_filename'],
param['top_shell_filename'],
' '.join(param['v_other_filenames']),
])
elif param['ms']:
tool_define = param['ms_define']
self._make_top(param['make_top_shell'])
self.run(logfile=self.obj_dir + "/ms_compile.log",
fails=param['fails'],
cmd=[
("vlib " + self.obj_dir + "/work && "),
VtOs.getenv_def('VERILATOR_MODELSIM', "vlog"),
' '.join(param['ms_flags']),
' '.join(param['ms_flags2']),
' '.join(param['v_flags']),
' '.join(param['v_flags2']),
param['top_filename'],
param['top_shell_filename'],
])
elif param['iv']:
tool_define = param['iv_define']
self._make_top(param['make_top_shell'])
cmd = (VtOs.getenv_def('VERILATOR_IVERILOG', "iverilog"), ' '.join(param['iv_flags']),
' '.join(param['iv_flags2']), ' '.join(param['v_flags']),
' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'],
' '.join(param['v_other_filenames']))
cmd = list(map(lambda str: re.sub(r'\+define\+', '-D ', str), cmd))
self.run(logfile=self.obj_dir + "/iv_compile.log", fails=param['fails'], cmd=cmd)
elif param['xrun']:
tool_define = param['xrun_define']
self._make_top(param['make_top_shell'])
elif param['xsim']:
tool_define = param['xsim_define']
self._make_top(param['make_top_shell'])
self.run(logfile=self.obj_dir + "/xsim_compile.log",
fails=param['fails'],
cmd=[
VtOs.getenv_def('VERILATOR_XVLOG', "xvlog"),
' '.join(param['xsim_flags']),
' '.join(param['xsim_flags2']),
' '.join(param['v_flags']),
' '.join(param['v_flags2']),
param['top_filename'],
param['top_shell_filename'],
])
elif param['vlt_all']:
tool_define = param['verilator_define']
if self.sc and not self.have_sc:
self.skip("Test requires SystemC; ignore error since not installed\n")
return
if self.timing and not self.have_coroutines:
self.skip("Test requires Coroutines; ignore error since not available\n")
return
if self.timing and self.sc and re.search(r'Ubuntu 24.04', distro.name(
pretty=True)) and re.search(r'clang', self.cxx_version):
self.skip(
"Test requires SystemC and Coroutines; broken on Ubuntu 24.04 w/clang\n" +
" OS=" + distro.name(pretty=True) + " CXX=" + self.cxx_version)
return
2024-09-08 19:00:03 +02:00
if param['verilator_make_cmake'] and not self.have_cmake:
self.skip(
"Test requires CMake; ignore error since not available or version too old\n")
return
if not param['fails'] and param['make_main']:
self._make_main(param['timing_loop'])
if (param['verilator_make_gmake']
or (not param['verilator_make_gmake'] and not param['verilator_make_cmake'])):
vlt_cmd = self.compile_vlt_cmd(**param)
if self.verbose:
self.oprint("Running Verilator (gmake)")
if Args.verilation:
self.run(logfile=self.obj_dir + "/vlt_compile.log",
fails=param['fails'],
tee=param['tee'],
expect_filename=param['expect_filename'],
verilator_run=True,
cmd=vlt_cmd)
if param['verilator_make_cmake']:
vlt_args = self._compile_vlt_flags(**param)
if self.verbose:
self.oprint("Running cmake")
VtOs.mkdir_ok(self.obj_dir)
csources = []
if param['make_main']:
csources.append(self.main_filename)
self.run(
logfile=self.obj_dir + "/vlt_cmake.log",
fails=param['fails'],
tee=param['tee'],
expect_filename=param['expect_filename'],
verilator_run=True,
cmd=[
"cd \"" + self.obj_dir + "\" && cmake",
"\"" + self.t_dir + "/..\"",
"-DTEST_VERILATOR_ROOT=" + os.environ['VERILATOR_ROOT'],
"-DTEST_NAME=" + self.name,
"-DTEST_CSOURCES=\"" + ' '.join(csources) + "\"",
"-DTEST_VERILATOR_ARGS=\"" + ' '.join(vlt_args) + "\"",
"-DTEST_VERILATOR_SOURCES=\"" + param['top_filename'] + ' ' +
' '.join(param['v_other_filenames']) + "\"",
"-DTEST_VERBOSE=\"" + ("1" if self.verbose else "0") + "\"",
"-DTEST_SYSTEMC=\"" + ("1" if self.sc else "0") + "\"",
"-DCMAKE_PREFIX_PATH=\"" +
(VtOs.getenv_def('SYSTEMC_INCLUDE', VtOs.getenv_def('SYSTEMC', '')) +
"/..\""),
"-DTEST_OPT_FAST=\"" + ("-Os" if param['benchmark'] else "-O0") + "\"",
"-DTEST_OPT_GLOBAL=\"" + ("-Os" if param['benchmark'] else "-O0") + "\"",
"-DTEST_VERILATION=\"" + ("1" if Args.verilation else "0") + "\"",
])
if not param['fails'] and param['verilator_make_gmake']:
if self.verbose:
self.oprint("Running make (gmake)")
self.run(
logfile=self.obj_dir + "/vlt_gcc.log",
entering=self.obj_dir,
cmd=[
os.environ['MAKE'],
2025-05-22 23:53:04 +02:00
(("-j " + str(Args.driver_build_jobs)) if Args.driver_build_jobs else ""),
2024-09-08 19:00:03 +02:00
"-C " + self.obj_dir,
"-f " + os.path.abspath(os.path.dirname(__file__)) + "/Makefile_obj",
("" if self.verbose else "--no-print-directory"),
"VM_PREFIX=" + self.vm_prefix,
"TEST_OBJ_DIR=" + self.obj_dir,
"CPPFLAGS_DRIVER=-D" + self.name.upper(),
("CPPFLAGS_DRIVER2=-DTEST_VERBOSE=1" if self.verbose else ""),
("" if param['benchmark'] else "OPT_FAST=-O0"),
("" if param['benchmark'] else "OPT_GLOBAL=-O0"),
self.vm_prefix, # bypass default rule, as we don't need archive
*param['make_flags'],
])
if not param['fails'] and param['verilator_make_cmake']:
if self.verbose:
self.oprint("Running cmake --build")
self.run(logfile=self.obj_dir + "/vlt_cmake_build.log",
cmd=[
"cmake",
"--build",
self.obj_dir,
("--verbose" if self.verbose else ""),
])
else:
self.error("No compile step defined for '%s' scenario" % self.scenario)
2024-09-08 19:00:03 +02:00
if param['make_pli']:
if self.verbose:
self.oprint("Compile vpi")
cmd = [
os.environ['CXX'], *param['pli_flags'], "-D" + tool_define, "-DIS_VPI",
VtOs.getenv_def('CFLAGS', ''), self.pli_filename
]
self.run(logfile=self.obj_dir + "/pli_compile.log", fails=param['fails'], cmd=cmd)
def timeout(self, seconds):
"""Limit the CPU time of the test - this limit is inherited
by all of the spawned child processess"""
# An unprivileged process may set only its soft limit
# to a value in the range from 0 up to the hard limit
_, hardlimit = resource.getrlimit(resource.RLIMIT_CPU)
softlimit = ctypes.c_long(min(seconds, ctypes.c_ulong(hardlimit).value)).value
# Casting is required due to a quirk in Python,
# rlimit values are interpreted as LONG, instead of ULONG
# https://github.com/python/cpython/issues/137044
rlimit = (softlimit, hardlimit)
resource.setrlimit(resource.RLIMIT_CPU, rlimit)
2025-09-11 13:01:36 +02:00
def leak_check_disable(self):
"""Disable memory leak detection when leaks are expected,
e.g.: on early abnormal termination"""
2025-09-16 20:22:36 +02:00
asan_options = os.environ.get("ASAN_OPTIONS", "")
self.setenv("ASAN_OPTIONS", asan_options + ":detect_leaks=0")
2025-09-11 13:01:36 +02:00
2024-09-08 19:00:03 +02:00
def execute(self, **kwargs) -> None:
"""Run simulation executable.
Arguments similar to run(); default arguments are from self"""
# Default arguments are from self
# params may be expect or {tool}_expect
param = {
'aslr_off': False,
'entering': False,
'check_finished': False,
'executable': None,
'expect_filename': None,
'fails': False,
'run_env': '',
2024-09-15 00:01:36 +02:00
'tee': True,
2024-10-06 10:49:53 +02:00
'use_libvpi': False
2024-09-08 19:00:03 +02:00
}
param.update(vars(self))
param.update(kwargs)
if self.verbose:
self.oprint("Run")
if not self.verbose:
os.environ['SYSTEMC_DISABLE_COPYRIGHT_MESSAGE'] = 'DISABLE'
else:
VtOs.delenv('SYSTEMC_DISABLE_COPYRIGHT_MESSAGE')
if not self._have_solver_called:
os.environ['VERILATOR_SOLVER'] = "test.py-file-needs-have_solver()-call"
if param['check_finished'] is None and not param['fails']:
param['check_finished'] = 1
run_env = param['run_env']
if run_env:
run_env = run_env + ' '
if param['atsim']:
2024-10-06 10:49:53 +02:00
cmd = [
"echo q | " + run_env + self.obj_dir + "/athdl_sv",
' '.join(param['atsim_run_flags']), ' '.join(param['all_run_flags'])
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('atsim_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/atsim_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['ghdl']:
2024-10-06 10:49:53 +02:00
cmd = [
run_env + self.obj_dir + "/simghdl", ' '.join(param['ghdl_run_flags']),
' '.join(param['all_run_flags'])
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('ghdl_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/ghdl_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['iv']:
cmd = [
2024-10-06 10:49:53 +02:00
run_env + self.obj_dir + "/simiv", ' '.join(param['iv_run_flags']),
' '.join(param['all_run_flags'])
2024-09-08 19:00:03 +02:00
]
if param['use_libvpi']:
# Don't enter command line on $stop, include vpi
cmd += ["vvp -n -m " + self.obj_dir + "/libvpi.so"]
2024-10-06 10:49:53 +02:00
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('iv_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/vlt_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['ms']:
pli_opt = ""
if param['use_libvpi']:
pli_opt = "-pli " + self.obj_dir + "/libvpi.so"
2024-10-06 10:49:53 +02:00
cmd = [
"echo q | " + run_env + VtOs.getenv_def('VERILATOR_MODELSIM', "vsim"),
2025-01-08 15:44:48 +01:00
' '.join(param['ms_run_flags']), ' '.join(param['all_run_flags']), pli_opt, (" t")
2024-10-06 10:49:53 +02:00
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('ms_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/ms_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['nc']:
2024-10-06 10:49:53 +02:00
cmd = [
"echo q | " + run_env + VtOs.getenv_def('VERILATOR_NCVERILOG', "ncverilog"),
' '.join(param['nc_run_flags']), ' '.join(param['all_run_flags'])
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('nc_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/nc_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['vcs']:
# my $fh = IO::File->new(">simv.key") or die "%Error: $! simv.key,"
# fh.print("quit\n"); fh.close()
2024-10-06 10:49:53 +02:00
cmd = [
"echo q | " + run_env + "./simv", ' '.join(param['vcs_run_flags']),
' '.join(param['all_run_flags'])
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('vcs_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/vcs_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['xrun']:
pli_opt = ""
if param['use_libvpi']:
pli_opt = "-loadvpi " + self.obj_dir + "/libvpi.so:vpi_compat_bootstrap"
2024-10-06 10:49:53 +02:00
cmd = [
"echo q | " + run_env + VtOs.getenv_def('VERILATOR_XRUN', "xrun"),
' '.join(param['xrun_run_flags']),
' '.join(param['xrun_flags2']),
' '.join(param['all_run_flags']),
pli_opt,
param['top_filename'],
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('xrun_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/xrun_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['xsim']:
2024-10-06 10:49:53 +02:00
cmd = [
run_env + VtOs.getenv_def('VERILATOR_XELAB', "xelab"),
' '.join(param['xsim_run_flags']), ' '.join(param['xsim_run_flags2']),
' '.join(param['all_run_flags']), (" " + self.name + ".top")
]
self.run(cmd=cmd,
check_finished=param['check_finished'],
entering=param['entering'],
expect_filename=param.get('xsim_run_expect_filename', None),
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/xsim_sim.log"),
tee=param['tee'])
2024-09-08 19:00:03 +02:00
elif param['vlt_all']:
if not param['executable']:
param['executable'] = self.obj_dir + "/" + param['vm_prefix']
debugger = ""
if Args.gdbsim:
debugger = VtOs.getenv_def('VERILATOR_GDB', "gdb") + " "
elif Args.rrsim:
debugger = "rr record "
2024-10-06 10:49:53 +02:00
cmd = [
(run_env + debugger + param['executable'] + (" -ex 'run " if Args.gdbsim else "")),
*param['all_run_flags'],
("'" if Args.gdbsim else ""),
]
cmd += self.driver_verilated_flags
2024-09-08 19:00:03 +02:00
self.run(
2024-10-06 10:49:53 +02:00
cmd=cmd,
2024-09-08 19:00:03 +02:00
aslr_off=param['aslr_off'], # Disable address space layour randomization
check_finished=param['check_finished'], # Check for All Finished
entering=param['entering'], # Print entering directory information
expect_filename=param['expect_filename'],
fails=param['fails'],
logfile=param.get('logfile', self.obj_dir + "/vlt_sim.log"),
tee=param['tee'],
verilator_run=True,
)
else:
self.error("No execute step for this simulator")
#---------------------------------------------------------------
# Accessors
@property
def aslr_off(self) -> str:
if VlTest._cached_aslr_off is None:
2024-09-21 17:37:51 +02:00
out = VtOs.run_capture('setarch --addr-no-randomize echo OK 2>/dev/null', check=False)
2024-09-08 19:00:03 +02:00
if re.search(r'OK', out):
VlTest._cached_aslr_off = "setarch --addr-no-randomize "
else:
VlTest._cached_aslr_off = ""
return VlTest._cached_aslr_off
@property
def benchmarksim_filename(self) -> str:
return self.obj_dir + "/" + self.name + "_benchmarksim.csv"
@property
def driver_verilator_flags(self) -> list:
return Args.passdown_verilator_flags
2024-09-08 19:00:03 +02:00
@property
def driver_verilated_flags(self) -> list:
return Args.passdown_verilated_flags
2024-09-08 19:00:03 +02:00
@property
def get_default_vltmt_threads(self) -> int:
return Vltmt_Threads
@property
def ok(self) -> bool:
if self.errors or self.errors_keep_going or self._skips:
self._ok = False
return self._ok
def passes(self, is_ok=True):
if not self.errors:
self._ok = is_ok
@property
def too_few_cores(self) -> bool:
return calc_threads(Vltmt_Threads) < Vltmt_Threads
@property
def trace_filename(self) -> str:
if re.match(r'^fst', self.trace_format):
return self.obj_dir + "/simx.fst"
if re.match(r'^saif', self.trace_format):
return self.obj_dir + "/simx.saif"
2024-09-08 19:00:03 +02:00
return self.obj_dir + "/simx.vcd"
def skip_if_too_few_cores(self) -> None:
if self.too_few_cores:
self.skip("Skipping due to too few cores")
@property
def v_suffix(self) -> str:
return "v"
@property
def wno_unopthreads_for_few_cores(self) -> str:
if self.too_few_cores:
print("Too few cores, using -Wno-UNOPTTHREADS")
return "-Wno-UNOPTTHREADS"
return ""
#---------------------------------------------------------------
# Capabilities
@property
def cmake_version(self) -> str:
return Capabilities.cmake_version
@property
def cxx_version(self) -> str:
return Capabilities.cxx_version
@property
def have_cmake(self) -> bool:
ver = Capabilities.cmake_version
if not ver:
return False
m = re.match(r'^(\d+)\.(\d+)$', ver)
if not m:
return False
return int(m.group(1)) > 3 or int(m.group(2)) >= 8 # >= 3.8
2024-09-08 19:00:03 +02:00
@property
def have_coroutines(self) -> bool:
return Capabilities.have_coroutines
@property
def have_dev_asan(self) -> bool:
return Capabilities.have_dev_asan
@property
def have_dev_gcov(self) -> bool:
return Capabilities.have_dev_gcov
2024-09-08 19:00:03 +02:00
@property
def have_gdb(self) -> bool:
return Capabilities.have_gdb
@property
def have_sc(self) -> bool:
return Capabilities.have_sc
@property
def have_solver(self) -> bool:
self._have_solver_called = True
return Capabilities.have_solver
@property
def make_version(self) -> str:
return Capabilities.make_version
#---------------------------------------------------------------
# OS functions
def getenv_def(self, var: str, default=None) -> str:
"""Return environment variable, returning default if does not exist"""
return VtOs.getenv_def(var, default)
def mkdir_ok(self, filename) -> None:
"""Make directory, no error if exists"""
if test.verbose:
print("\tmkdir " + filename)
VtOs.mkdir_ok(filename)
def run_capture(self, cmd: str, check=True) -> str:
"""Run a command and return results"""
if test.verbose:
print("\t" + cmd)
return VtOs.run_capture(cmd, check=check)
def setenv(self, var: str, val: str) -> None:
2025-01-01 14:30:25 +01:00
"""Set environment variable"""
2024-09-08 19:00:03 +02:00
print("\texport %s='%s'" % (var, val))
os.environ[var] = val
def unlink_ok(self, filename) -> None:
"""Unlink a file, no error if fails"""
if test.verbose:
print("\trm " + filename)
VtOs.unlink_ok(filename)
#----------------------------------------------------------------------
def run(
self, #
cmd: list,
aslr_off=False, # Disable address space layour randomization
check_finished=False, # Check for All Finished
entering=None, # Print entering directory information
expect_filename=None, # Filename that should match logfile
fails=False, # True: normal 1 exit code, 'any': any exit code
2024-09-08 19:00:03 +02:00
logfile=None, # Filename to write putput to
tee=True,
verilator_run=False) -> str: # Move gcov data to parallel area
try:
command = ' '.join(cmd)
except TypeError:
print('run(cmd=' + pformat(cmd))
command = ' '.join(cmd)
if aslr_off and aslr_off != "":
prefix = self.aslr_off
if prefix:
command = prefix + " " + command
if Args.benchmark and re.match(r'^cd ', command):
command = "time " + command
print("\t" + command + ((" > " + logfile) if logfile else ""))
if entering:
print("driver: Entering directory '" + os.path.abspath(entering) + "'")
# Execute command redirecting output, keeping order between stderr and stdout.
# Must do low-level IO so GCC interaction works (can't be line-based)
status = None
# process_caller_block # pylint: disable=using-constant-test
logfh = None
if logfile:
logfh = open(logfile, 'wb') # pylint: disable=consider-using-with
if not Args.interactive_debugger:
# Some parallel job's run() may attempt to capture driver.py's
# terminal, e.g. gdb does this. So, unless known we want to run GDB
# (where we want it to control the terminal), become a controlling
# terminal for this job so such a capture won't break driver.py's
# signaling, which would e.g. break control-C.
pid, fd = pty.fork()
if pid == 0:
os.environ['TERM'] = "dumb"
subprocess.run(["stty", "nl"], check=True) # No carriage returns
os.execlp("bash", "/bin/bash", "-c", command)
else:
while True:
try:
data = os.read(fd, 2048)
self._run_output(data, logfh, tee)
# Parent detects child termination by checking for b''
if not data:
break
except OSError:
break
2024-09-08 19:00:03 +02:00
(pid, rc) = os.waitpid(pid, 0)
2024-09-08 19:00:03 +02:00
else:
# Do not redirect output when using an interactive debugger, so it
# can have direct access to the user terminal (so terminal control
# characters and the like work). That means the log file will be
# empty but hopefully that's ok, just re-run the test without the
# interactive debugger to confirm a fix.
with subprocess.Popen(command, shell=True, bufsize=0) as proc:
proc.wait()
2024-09-08 19:00:03 +02:00
rc = proc.returncode # Negative if killed by signal
if logfh:
logfh.close()
if (rc in (
-4, # SIGILL
-8, # SIGFPA
-11)): # SIGSEGV
self.error("Exec failed with core dump")
status = 128 + (-rc) # So is "normal" shell 0-255 status
elif rc >= 256:
# waitpid returns status << 8; subprocess otherwise; handle both
status = int(rc / 256) # So is shell $?-like
elif rc:
status = rc
else:
status = 0
2024-09-08 19:00:03 +02:00
sys.stdout.flush()
sys.stderr.flush()
if entering:
print("driver: Leaving directory '" + os.path.abspath(entering) + "'")
if not fails and status:
2024-12-01 16:27:05 +01:00
firstline = self._error_log_summary(logfile)
# Strip ANSI escape sequences
firstline = re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', firstline)
2024-12-01 16:27:05 +01:00
self.error("Exec of " + self._error_cmd_simplify(cmd) + " failed: " + firstline)
2024-09-08 19:00:03 +02:00
if fails and status:
if not verilator_run:
print("(Exec failed, matching expected fail)")
elif fails == 'any':
print("(Exec failed, matching expected 'any' exit code fail)")
elif fails is True:
if status == 1:
print("(Exec failed, matching expected 'True' exit code 1 fail)")
else:
self.error("Exec of " + self._error_cmd_simplify(cmd) +
" failed with exit code " + str(status) +
", but expected 'True' exit code 1 fail")
else: # Future: support numeric exit code?
self.error("fails=" + str(fails) + " is not legal value")
2024-09-08 19:00:03 +02:00
if fails and not status:
2024-12-01 16:27:05 +01:00
self.error("Exec of " + self._error_cmd_simplify(cmd) + " ok, but expected to fail")
2024-09-08 19:00:03 +02:00
if self.errors or self._skips:
return False
# Read the log file a couple of times to allow for NFS delays
if check_finished:
2024-10-08 03:21:27 +02:00
delay = 0.25
for tryn in range(Args.log_retries - 1, -1, -1):
if tryn != Args.log_retries - 1:
2024-10-08 03:21:27 +02:00
time.sleep(delay)
delay = min(1, delay * 2)
2024-09-08 19:00:03 +02:00
moretry = tryn != 0
if not self._run_log_try(logfile, check_finished, moretry):
2024-09-08 19:00:03 +02:00
break
if expect_filename:
self.files_identical(logfile, expect_filename, is_logfile=True)
return False
return True
def _run_output(self, data, logfh, tee):
if re.search(r'--debug-exit-uvm23: Exiting', str(data)):
self._force_pass = True
print("EXIT: " + str(data))
if tee:
sys.stdout.write(data.decode('latin-1'))
if Args.interactive_debugger:
sys.stdout.flush()
if logfh:
logfh.write(data)
def _run_log_try(self, logfile: str, check_finished: bool, moretry: bool) -> bool:
# If moretry, then return true to try again
2024-09-08 19:00:03 +02:00
with open(logfile, 'r', encoding='latin-1', newline='\n') as fh:
if not fh and moretry:
return True
wholefile = str(fh.read())
# Finished?
if check_finished and not re.search(r'\*\-\* All Finished \*\-\*', wholefile):
if moretry:
return True
self.error("Missing '*-* All Finished *-*'")
return False
#######################################################################
# Little utilities
@staticmethod
2024-12-01 16:27:05 +01:00
def _error_cmd_simplify(cmd: list) -> str:
if cmd[0] == "perl" and re.search(r'/bin/verilator', cmd[1]):
return "verilator"
return cmd[0]
def _error_log_summary(self, filename: str) -> str:
size = ""
if False: # Show test size for fault grading # pylint: disable=using-constant-test
if self.top_filename and os.path.exists(self.top_filename):
size = "(Test " + str(os.stat(self.top_filename).st_size) + " B) "
if not filename:
return size
firstline = ""
with open(filename, 'r', encoding="utf8") as fh:
lineno = 0
for line in fh:
lineno += 1
if lineno > 100:
break
line = line.rstrip()
if re.match(r'^- ', line): # Debug message
continue
if not firstline:
firstline = line
if (re.search(r'error|warn', line, re.IGNORECASE)
and not re.search(r'-Werror', line)):
return size + line
return size + firstline
2024-09-08 19:00:03 +02:00
def _make_main(self, timing_loop: bool) -> None:
if timing_loop and self.sc:
self.error("Cannot use timing loop and SystemC together!")
self._read_inputs_v()
filename = self.main_filename
with open(filename, "w", encoding="utf8") as fh:
fh.write("// Test defines\n")
fh.write("#define MAIN_TIME_MULTIPLIER " +
str(int(round(self.main_time_multiplier, 0))) + "\n")
fh.write("#include <memory>\n")
if self.benchmarksim:
fh.write("#include <fstream>\n")
fh.write("#include <chrono>\n")
fh.write("#include <iomanip>\n")
fh.write("// OS header\n")
fh.write('#include "verilatedos.h"' + "\n")
fh.write("// Generated header\n")
fh.write('#include "' + self.vm_prefix + '.h"' + "\n")
fh.write("// General headers\n")
fh.write('#include "verilated.h"' + "\n")
if self.sc:
fh.write('#include "systemc.h"' + "\n")
if self.trace and self.trace_format == 'fst-c':
fh.write("#include \"verilated_fst_c.h\"\n")
if self.trace and self.trace_format == 'fst-sc':
fh.write("#include \"verilated_fst_sc.h\"\n")
if self.trace and self.trace_format == 'vcd-c':
fh.write("#include \"verilated_vcd_c.h\"\n")
if self.trace and self.trace_format == 'vcd-sc':
fh.write("#include \"verilated_vcd_sc.h\"\n")
if self.trace and self.trace_format == 'saif-c':
fh.write("#include \"verilated_saif_c.h\"\n")
if self.trace and self.trace_format == 'saif-sc':
fh.write("#include \"verilated_saif_sc.h\"\n")
2024-09-08 19:00:03 +02:00
if self.savable:
fh.write("#include \"verilated_save.h\"\n")
fh.write("std::unique_ptr<" + self.vm_prefix + "> topp;\n")
if self.savable:
fh.write("\n")
fh.write("void save_model(const char* filenamep) {\n")
fh.write(" VL_PRINTF(\"Saving model to '%s'\\n\", filenamep);\n")
fh.write(" VerilatedSave os;\n")
fh.write(" os.open(filenamep);\n")
fh.write(" os << *topp;\n")
fh.write(" os.close();\n")
fh.write("}\n")
fh.write("\n")
fh.write("void restore_model(const char* filenamep) {\n")
fh.write(" VL_PRINTF(\"Restoring model from '%s'\\n\", filenamep);\n")
fh.write(" VerilatedRestore os;\n")
fh.write(" os.open(filenamep);\n")
fh.write(" os >> *topp;\n")
fh.write(" os.close();\n")
fh.write("}\n")
#### Main
if self.sc:
fh.write("extern int sc_main(int argc, char** argv);\n")
fh.write("int sc_main(int argc, char** argv) {\n")
if 'fastclk' in self._inputs:
if self.pins_sc_uint_bool:
fh.write(" sc_signal<sc_dt::sc_uint<1>> fastclk;\n")
else:
fh.write(" sc_signal<bool> fastclk;\n")
if 'clk' in self._inputs:
if self.pins_sc_uint_bool:
fh.write(" sc_signal<sc_dt::sc_uint<1>> clk;\n")
else:
fh.write(" sc_signal<bool> clk;\n")
fh.write(" sc_set_time_resolution(1, " + self.sc_time_resolution + ");\n")
fh.write(" sc_time sim_time(" + str(self.sim_time) + ", " +
self.sc_time_resolution + ");\n")
else:
fh.write("int main(int argc, char** argv) {\n")
fh.write(" uint64_t sim_time = " + str(self.sim_time) + ";\n")
fh.write(
" const std::unique_ptr<VerilatedContext> contextp{new VerilatedContext};\n")
fh.write(" contextp->threads(" + str(self.context_threads) + ");\n")
fh.write(" contextp->commandArgs(argc, argv);\n")
fh.write(" contextp->debug(" + ('1' if self.verilated_debug else '0') + ");\n")
fh.write(" srand48(5);\n") # Ensure determinism
if self.verilated_randReset is not None and self.verilated_randReset != "":
fh.write(" contextp->randReset(" + str(self.verilated_randReset) + ");\n")
fh.write(" topp.reset(new " + self.vm_prefix + "{\"top\"});\n")
if self.verilated_debug:
fh.write(" contextp->internalsDump()\n;")
if self.sc:
if 'fastclk' in self._inputs:
fh.write(" topp->fastclk(fastclk);\n")
if 'clk' in self._inputs:
fh.write(" topp->clk(clk);\n")
setp = ""
else:
fh.write(" topp->eval();\n")
setp = "topp->"
if self.benchmarksim:
fh.write(" std::chrono::time_point<std::chrono::steady_clock> starttime;\n")
fh.write(" bool warm = false;\n")
fh.write(" uint64_t n_evals = 0;\n")
if self.trace:
fh.write("\n")
fh.write("#if VM_TRACE\n")
fh.write(" contextp->traceEverOn(true);\n")
if self.trace_format == 'fst-c':
fh.write(" std::unique_ptr<VerilatedFstC> tfp{new VerilatedFstC};\n")
if self.trace_format == 'fst-sc':
fh.write(" std::unique_ptr<VerilatedFstSc> tfp{new VerilatedFstSc};\n")
if self.trace_format == 'vcd-c':
fh.write(" std::unique_ptr<VerilatedVcdC> tfp{new VerilatedVcdC};\n")
if self.trace_format == 'vcd-sc':
fh.write(" std::unique_ptr<VerilatedVcdSc> tfp{new VerilatedVcdSc};\n")
if self.trace_format == 'saif-c':
fh.write(" std::unique_ptr<VerilatedSaifC> tfp{new VerilatedSaifC};\n")
if self.trace_format == 'saif-sc':
fh.write(" std::unique_ptr<VerilatedSaifSc> tfp{new VerilatedSaifSc};\n")
2024-09-08 19:00:03 +02:00
if self.sc:
fh.write(" sc_core::sc_start(sc_core::SC_ZERO_TIME);" +
" // Finish elaboration before trace and open\n")
fh.write(" topp->trace(tfp.get(), 99);\n")
fh.write(" tfp->open(\"" + self.trace_filename + "\");\n")
if self.trace and not self.sc:
fh.write(" if (tfp) tfp->dump(contextp->time());\n")
fh.write("#endif\n")
if self.savable:
fh.write(" const char* save_time_strp"
" = contextp->commandArgsPlusMatch(\"save_time=\");\n")
fh.write(" unsigned int save_time = !save_time_strp[0]"
" ? 0 : std::atoi(save_time_strp + std::strlen(\"+save_time=\"));\n")
fh.write(" const char* save_restore_strp"
" = contextp->commandArgsPlusMatch(\"save_restore=\");\n")
fh.write(" unsigned int save_restore = !save_restore_strp[0] ? 0 : 1;\n")
if self.savable:
fh.write(" if (save_restore) {\n")
fh.write(" restore_model(\"" + self.obj_dir + "/saved.vltsv\");\n")
fh.write(" } else {\n")
else:
fh.write(" {\n")
if 'fastclk' in self._inputs:
fh.write(" " + setp + "fastclk = false;\n")
if 'clk' in self._inputs:
fh.write(" " + setp + "clk = false;\n")
if not timing_loop:
self._print_advance_time(fh, 10, None)
fh.write(" }\n")
timestamp = "sc_time_stamp()" if self.sc else "contextp->time()"
fh.write(" while (")
if not timing_loop or 'clk' in self._inputs:
fh.write("(" + timestamp + " < sim_time * MAIN_TIME_MULTIPLIER) && ")
fh.write("!contextp->gotFinish()) {\n")
if timing_loop:
fh.write(" topp->eval();\n")
if self.trace:
fh.write("#if VM_TRACE\n")
fh.write(" if (tfp) tfp->dump(contextp->time());\n")
fh.write("#endif // VM_TRACE\n")
if 'clk' in self._inputs:
fh.write(" const uint64_t cycles"
" = contextp->time() / MAIN_TIME_MULTIPLIER;\n")
fh.write(" uint64_t new_time = (cycles + 1) * MAIN_TIME_MULTIPLIER;\n")
fh.write(" if (topp->eventsPending() &&\n")
fh.write(" topp->nextTimeSlot()"
" / MAIN_TIME_MULTIPLIER <= cycles) {\n")
fh.write(" new_time = topp->nextTimeSlot();\n")
fh.write(" } else {\n")
if self.pins_sc_uint_bool:
fh.write(" " + setp + "clk.write(!" + setp + "clk.read());\n")
else:
fh.write(" " + setp + "clk = !" + setp + "clk;\n")
fh.write(" }\n")
fh.write(" contextp->time(new_time);\n")
else:
fh.write(" if (!topp->eventsPending()) break;\n")
fh.write(" contextp->time(topp->nextTimeSlot());\n")
else:
for i in range(5):
action = False
if 'fastclk' in self._inputs:
if self.pins_sc_uint_bool:
fh.write(" " + setp + "fastclk.write(!" + setp +
"fastclk.read());\n")
else:
fh.write(" " + setp + "fastclk = !" + setp + "fastclk;\n")
action = True
if i == 0 and 'clk' in self._inputs:
if self.pins_sc_uint_bool:
fh.write(" " + setp + "clk.write(!" + setp + "clk.read());\n")
else:
fh.write(" " + setp + "clk = !" + setp + "clk;\n")
action = True
if self.savable:
fh.write(" if (save_time && " + timestamp + " == save_time) {\n")
fh.write(" save_model(\"" + self.obj_dir + "/saved.vltsv\");\n")
fh.write(" printf(\"Exiting after save_model\\n\");\n")
fh.write(" topp.reset(nullptr);\n")
fh.write(" return 0;\n")
fh.write(" }\n")
self._print_advance_time(fh, 1, action)
if self.benchmarksim:
fh.write(" if (VL_UNLIKELY(!warm)) {\n")
fh.write(" starttime = std::chrono::steady_clock::now();\n")
fh.write(" warm = true;\n")
fh.write(" } else {\n")
fh.write(" ++n_evals;\n")
fh.write(" }\n")
fh.write(" }\n")
if self.benchmarksim:
fh.write(" {\n")
fh.write(" const std::chrono::duration<double> exec_s"
" = std::chrono::steady_clock::now() - starttime;\n")
fh.write(" std::ofstream benchfile(\"" + self.benchmarksim_filename +
"\", std::ofstream::out | std::ofstream::app);\n")
fh.write(" benchfile << std::fixed << std::setprecision(9)"
" << n_evals << \",\" << exec_s.count() << std::endl;\n")
fh.write(" benchfile.close();\n")
fh.write(" }\n")
fh.write(" if (!contextp->gotFinish()) {\n")
fh.write(' vl_fatal(__FILE__, __LINE__, "main",' +
' "%Error: Timeout; never got a $finish");' + "\n")
fh.write(" }\n")
fh.write(" topp->final();\n")
fh.write("\n")
if self.coverage:
fh.write("#if VM_COVERAGE\n")
fh.write(" contextp->coveragep()->write(\"" + self.coverage_filename + "\");\n")
fh.write("#endif // VM_COVERAGE\n")
if self.trace:
fh.write("#if VM_TRACE\n")
fh.write(" if (tfp) tfp->close();\n")
fh.write(" tfp.reset();\n")
fh.write("#endif // VM_TRACE\n")
fh.write(" topp.reset();\n")
fh.write(" return 0;\n")
fh.write("}\n")
def _print_advance_time(self, fh, timeinc: str, action: bool) -> None:
setp = "" if self.sc else "topp->"
if self.sc:
fh.write(" sc_start(" + str(timeinc) + " * MAIN_TIME_MULTIPLIER, " +
self.sc_time_resolution + ");\n")
else:
if action:
fh.write(" " + setp + "eval();\n")
if self.trace and not self.sc:
fh.write("#if VM_TRACE\n")
fh.write(" if (tfp) tfp->dump(contextp->time());\n")
fh.write("#endif // VM_TRACE\n")
fh.write(" contextp->timeInc(" + str(timeinc) + " * MAIN_TIME_MULTIPLIER);\n")
#######################################################################
def _make_top(self, needed=True) -> None:
if not needed:
return
self._make_top_v()
def _make_top_v(self) -> None:
self._read_inputs_v()
2024-10-06 10:49:53 +02:00
with open(self.top_shell_filename, 'w', encoding="utf8") as fh:
2024-09-08 19:00:03 +02:00
fh.write("module top;\n")
for inp in sorted(self._inputs.keys()):
fh.write(" reg " + inp + ";\n")
# Inst
fh.write(" t t (\n")
comma = ""
for inp in sorted(self._inputs.keys()):
fh.write(" " + comma + "." + inp + " (" + inp + ")\n")
comma = ","
fh.write(" );\n")
# Waves
fh.write("\n")
fh.write("`ifdef WAVES\n")
fh.write(" initial begin\n")
fh.write(" $display(\"-Tracing Waves to Dumpfile: " + self.trace_filename +
"\");\n")
fh.write(" $dumpfile(\"" + self.trace_filename + "\");\n")
fh.write(" $dumpvars(0, top);\n")
fh.write(" end\n")
fh.write("`endif\n")
# Test
fh.write("\n")
fh.write(" initial begin\n")
if 'fastclk' in self._inputs:
fh.write(" fastclk = 0;\n")
if 'clk' in self._inputs:
fh.write(" clk = 0;\n")
fh.write(" #10;\n")
if 'fastclk' in self._inputs:
fh.write(" fastclk = 1;\n")
if 'clk' in self._inputs:
fh.write(" clk = 1;\n")
2024-10-06 10:49:53 +02:00
fh.write(" while ($time < " + str(self.sim_time) + ") begin\n")
2024-09-08 19:00:03 +02:00
for i in range(6):
fh.write(" #1;\n")
if 'fastclk' in self._inputs:
fh.write(" fastclk = !fastclk;\n")
if i == 4 and 'clk' in self._inputs:
fh.write(" clk = !clk;\n")
fh.write(" end\n")
fh.write(" end\n")
fh.write("endmodule\n")
#######################################################################
def _read_inputs_v(self) -> None:
filename = self.top_filename
if not os.path.exists(filename):
filename = self.t_dir + '/' + filename
with open(filename, 'r', encoding="utf8") as fh:
get_sigs = True
inputs = {}
for line in fh:
if get_sigs:
2025-09-13 14:53:23 +02:00
# Does not support escaped signals, we only need "clk" and a few others
m = re.match(r'^\s*input\s*(logic|bit|reg|wire)?\s*([A-Za-z0-9_]+)', line)
2024-09-08 19:00:03 +02:00
if m:
2025-09-13 14:53:23 +02:00
inputs[m.group(2)] = m.group(2)
2024-09-08 19:00:03 +02:00
if re.match(r'^\s*(function|task|endmodule)', line):
get_sigs = False
# Ignore any earlier inputs; Module 't' has precedence
if re.match(r'^\s*module\s+t\b', line):
inputs = {}
get_sigs = True
for sig, val in inputs.items():
self._inputs[sig] = val
#######################################################################
# File utilities
def files_identical(self, fn1: str, fn2: str, is_logfile=False, strip_hex=False) -> None:
2024-09-08 19:00:03 +02:00
"""Test if two files have identical contents"""
2024-10-08 03:21:27 +02:00
delay = 0.25
for tryn in range(Args.log_retries, -1, -1):
if tryn != Args.log_retries - 1:
2024-10-08 03:21:27 +02:00
time.sleep(delay)
delay = min(1, delay * 2)
2024-09-08 19:00:03 +02:00
moretry = tryn != 0
if not self._files_identical_try(
fn1=fn1, fn2=fn2, is_logfile=is_logfile, strip_hex=strip_hex, moretry=moretry):
2024-09-08 19:00:03 +02:00
break
def _files_identical_try(self, fn1: str, fn2: str, is_logfile: bool, strip_hex: bool,
moretry: bool) -> bool:
# If moretry, then return true to try again
2024-09-08 19:00:03 +02:00
try:
f1 = open( # pylint: disable=consider-using-with
fn1, 'r', encoding='latin-1', newline='\n')
except FileNotFoundError:
f1 = None
if not moretry:
self.error("Files_identical file does not exist: " + fn1)
return True # Retry
2024-09-08 19:00:03 +02:00
try:
f2 = open( # pylint: disable=consider-using-with
fn2, 'r', encoding='latin-1', newline='\n')
except FileNotFoundError:
f2 = None
if not moretry:
self.copy_if_golden(fn1, fn2)
self.error("Files_identical file does not exist: " + fn2)
return True # Retry
again = self._files_identical_reader(f1,
f2,
fn1=fn1,
fn2=fn2,
is_logfile=is_logfile,
strip_hex=strip_hex,
moretry=moretry)
2024-09-08 19:00:03 +02:00
if f1:
f1.close()
if f2:
f2.close()
return again
2024-09-08 19:00:03 +02:00
def _files_identical_reader(self, f1, f2, fn1: str, fn2: str, is_logfile: bool,
strip_hex: bool, moretry: bool) -> None:
# If moretry, then return true to try again
2024-09-08 19:00:03 +02:00
l1s = f1.readlines()
l2s = f2.readlines() if f2 else []
# print(" rawGOT="+pformat(l1s)+"\n rawEXP="+pformat(l2s))
if is_logfile:
l1o = []
for line in l1s:
if (re.match(r'^- [^\n]+\n', line) #
or re.match(r'^- [a-z.0-9]+:\d+:[^\n]+\n', line)
or re.match(r'^-node:', line) #
or re.match(r'^dot [^\n]+\n', line) #
or re.match(r'^Aborted', line) #
or re.match(r'^In file: .*\/sc_.*:\d+', line) #
or re.match(r'^libgcov.*', line) #
2024-09-08 19:00:03 +02:00
or re.match(r'--- \/tmp\/', line) # t_difftree.py
or re.match(r'\+\+\+ \/tmp\/', line) # t_difftree.py
or re.match(r'^==[0-9]+== ?[^\n]*\n', line)): # valgrind
continue
# Don't put control chars or unstable lines into source repository
while True:
(line, didn) = re.subn(r'(Internal Error: [^\n]+?\.(cpp|h)):[0-9]+', r'\1:#',
line)
if not didn:
break
# --vlt vs --vltmt run differences
line = re.sub(r'^-V\{t[0-9]+,[0-9]+\}', '-V{t#,#}', line)
line = re.sub(r'\r', '<#013>', line)
line = re.sub(r'Command Failed[^\n]+', 'Command Failed', line)
line = re.sub(r'Version: Verilator[^\n]+', 'Version: Verilator ###', line)
line = re.sub(r'"version": "[^"]+"', '"version": "###"', line)
2024-09-08 19:00:03 +02:00
line = re.sub(r'CPU Time: +[0-9.]+ seconds[^\n]+', 'CPU Time: ###', line)
line = re.sub(r'\?v=[0-9.]+', '?v=latest', line) # warning URL
line = re.sub(r'_h[0-9a-f]{8}_', '_h########_', line)
line = re.sub(r'%Error: /[^: ]+/([^/:])', r'%Error: .../\1',
line) # Avoid absolute paths
line = re.sub(r'("file://)/[^: ]+/([^/:])', r'\1/.../\2',
line) # Avoid absolute paths
2024-09-08 19:00:03 +02:00
line = re.sub(r' \/[^ ]+\/verilated_std.sv', ' verilated_std.sv', line)
#
(line, n) = re.subn(r'Exiting due to.*', r"Exiting due to", line)
if n:
l1o.append(line)
break # Trunc rest
l1o.append(line)
#
l1s = l1o
if strip_hex:
l1o = []
for line in l1s:
line = re.sub(r'\b0x[0-9a-f]+', '0x#', line)
l1o.append(line)
l1s = l1o
2024-09-08 19:00:03 +02:00
for lineno_m1 in range(0, max(len(l1s), len(l2s))):
l1 = l1s[lineno_m1] if lineno_m1 < len(l1s) else "*EOF*\n"
l2 = l2s[lineno_m1] if lineno_m1 < len(l2s) else "*EOF*\n"
if l1 != l2:
# print(" clnGOT="+pformat(l1s)+"\n clnEXP="+pformat(l2s))
if moretry:
return True # Retry
2024-09-08 19:00:03 +02:00
self.error_keep_going("Line " + str(lineno_m1) + " miscompares; " + fn1 + " != " +
fn2)
for c in range(min(len(l1), len(l2))):
if ord(l1[c]) != ord(l2[c]):
print("Miscompare starts at column " + str(c) +
(" w/ F1(got)=0x%02x F2(exp)=0x%02x" % (ord(l1[c]), ord(l2[c]))),
file=sys.stderr)
break
print("F1(got): " + l1 + "F2(exp): " + l2, file=sys.stderr)
if 'HARNESS_UPDATE_GOLDEN' in os.environ: # Update golden files with current
print("%Warning: HARNESS_UPDATE_GOLDEN set: cp " + fn1 + " " + fn2,
file=sys.stderr)
with open(fn2, 'w', encoding="utf8") as fhw:
fhw.write(''.join(l1s))
else:
print("To update reference: HARNESS_UPDATE_GOLDEN=1 {command} or --golden",
file=sys.stderr)
return False # No retry - bad
2024-09-08 19:00:03 +02:00
return False # No retry - good
2024-09-08 19:00:03 +02:00
def files_identical_sorted(self, fn1: str, fn2: str, is_logfile=False) -> None:
"""Test if two files, after sorting both, have identical contents"""
# Set LC_ALL as suggested in the sort manpage to avoid sort order
# changes from the locale.
os.environ['LC_ALL'] = 'C'
fn1sort = fn1 + '.sort'
self.run(cmd=['sort', fn1, "> " + fn1sort])
self.files_identical(fn1sort, fn2, is_logfile)
def copy_if_golden(self, fn1: str, fn2: str) -> None:
"""Copy a file if updating golden .out files"""
if 'HARNESS_UPDATE_GOLDEN' in os.environ: # Update golden files with current
print("%Warning: HARNESS_UPDATE_GOLDEN set: cp " + fn1 + " " + fn2, file=sys.stderr)
shutil.copy(fn1, fn2)
def vcd_identical(self, fn1: str, fn2: str) -> None:
"""Test if two VCD files have logically-identical contents"""
# vcddiff to check transitions, if installed
cmd = "vcddiff --help"
out = test.run_capture(cmd, check=True)
cmd = 'vcddiff ' + fn1 + ' ' + fn2
out = test.run_capture(cmd, check=True)
if out != "":
cmd = 'vcddiff ' + fn2 + " " + fn1 # Reversed arguments
out = VtOs.run_capture(cmd, check=False)
if out != "":
print(out)
self.copy_if_golden(fn1, fn2)
self.error("VCD miscompares " + fn2 + " " + fn1)
# vcddiff doesn't check module and variable scope, so check that
# Also provides backup if vcddiff not installed
h1 = self._vcd_read(fn1)
h2 = self._vcd_read(fn2)
a = json.dumps(h1, sort_keys=True, indent=1)
b = json.dumps(h2, sort_keys=True, indent=1)
if a != b:
self.copy_if_golden(fn1, fn2)
self.error("VCD hier miscompares " + fn1 + " " + fn2 + "\nGOT=" + a + "\nEXP=" + b +
"\n")
def fst2vcd(self, fn1: str, fn2: str) -> None:
cmd = "fst2vcd -h"
out = VtOs.run_capture(cmd, check=False)
if out == "" or not re.search(r'Usage:', out):
self.skip("No fst2vcd installed")
return
cmd = 'fst2vcd -e -f "' + fn1 + '" -o "' + fn2 + '"'
print("\t " + cmd + "\n") # Always print to help debug race cases
out = VtOs.run_capture(cmd, check=False)
print(out)
def fst_identical(self, fn1: str, fn2: str) -> None:
"""Test if two FST files have logically-identical contents"""
tmp = fn1 + ".vcd"
self.fst2vcd(fn1, tmp)
self.vcd_identical(tmp, fn2)
def saif_identical(self, fn1: str, fn2: str) -> None:
"""Test if two SAIF files have logically-identical contents"""
cmd = nodist_directory + '/verilator_saif_diff --first "' + fn1 + '" --second "' + fn2 + '"'
print("\t " + cmd + "\n")
out = test.run_capture(cmd, check=True)
if out != '':
print(out)
2025-04-05 21:09:18 +02:00
self.copy_if_golden(fn1, fn2)
self.error("SAIF files don't match!")
2024-09-08 19:00:03 +02:00
def _vcd_read(self, filename: str) -> str:
data = {}
with open(filename, 'r', encoding='latin-1') as fh:
hier_stack = ["TOP"]
var = []
for line in fh:
match1 = re.search(r'\$scope (module|struct|interface)\s+(\S+)', line)
match2 = re.search(r'(\$var (\S+)\s+\d+\s+)\S+\s+(\S+)', line)
match3 = re.search(r'(\$attrbegin .* \$end)', line)
line = line.rstrip()
# print("VR"+ ' '*len(hier_stack) +" L " + line)
if match1: # $scope
name = match1.group(2)
# print("VR"+ ' '*len(hier_stack) +" scope " + line)
hier_stack += [name]
scope = '.'.join(hier_stack)
data[scope] = match1.group(1) + " " + name
elif match2: # $var
# print("VR"+ ' '*len(hier_stack) +" var " + line)
scope = '.'.join(hier_stack)
var = match2.group(2)
data[scope + "." + var] = match2.group(1) + match2.group(3)
elif match3: # $attrbegin
# print("VR"+ ' '*len(hier_stack) +" attr " + line)
if var:
scope = '.'.join(hier_stack)
data[scope + "." + var + "#"] = match3.group(1)
elif re.search(r'\$enddefinitions', line):
break
n = len(re.findall(r'\$upscope', line))
if n:
for i in range(0, n): # pylint: disable=unused-variable
# print("VR"+ ' '*len(hier_stack) +" upscope " + line)
hier_stack.pop()
return data
def inline_checks(self) -> None:
covfn = self.coverage_filename
contents = self.file_contents(covfn)
if self.verbose:
self.oprint("Extract checks")
with open(self.top_filename, 'r', encoding="utf8") as fh:
flineno = 0
for line in fh:
flineno += 1
if re.search(r'CHECK', line):
match1 = re.search(
r'CHECK_COVER *\( *((-|[0-9])+) *,'
r'*"([^"]+)" *, *("([^"]+)" *,|) *(\d+) *\)', line)
match2 = re.search(r'CHECK_COVER_MISSING *\( *((-|[0-9])+) *\)', line)
if match1:
lineno = flineno + int(match1.group(1))
hier = match1.group(3)
comment = match1.group(5)
count = match1.group(6)
regexp = "\001l\002" + str(lineno)
if comment:
regexp += ".*\001o\002" + re.escape(comment)
if hier:
regexp += ".*\001h\002" + re.escape(hier)
regexp += ".*' " + str(count)
if not re.search(regexp, contents):
self.error("CHECK_COVER: " + covfn + ":" + str(flineno) +
": Regexp not found: " + regexp + "\n" + "From " +
self.top_filename + ": " + line)
elif match2:
lineno = flineno + int(match2.group(1))
regexp = "\001l\002" + str(lineno)
if re.search(regexp, contents):
self.error("CHECK_COVER_MISSING: " + covfn + ":" + str(flineno) +
": Regexp found: " + regexp + "\n" + "From " +
self.top_filename + ": " + line)
else:
self.error(self.top_filename + ":" + str(flineno) +
": Unknown CHECK request: " + line)
@staticproperty
def cfg_with_ccache() -> bool: # pylint: disable=no-method-argument
2024-09-08 19:00:03 +02:00
if VlTest._cached_cfg_with_ccache is None:
mkf = VlTest._file_contents_static(os.environ['VERILATOR_ROOT'] +
"/include/verilated.mk")
VlTest._cached_cfg_with_ccache = bool(re.search(r'OBJCACHE \?= ccache', mkf))
2024-09-08 19:00:03 +02:00
return VlTest._cached_cfg_with_ccache
def glob_some(self, pattern: str) -> list:
"""Return list of filenames matching a glob, with at least one match required."""
files = glob.glob(pattern)
# print("glob_some('" + pattern + "') files =\n " + pformat(files))
if not files:
self.error("glob_one: pattern '" + pattern + "' does not match any files")
return ['No_file_found']
return sorted(files)
def glob_one(self, pattern: str) -> str:
"""Return a filename matching a glob, with exactly one match required."""
files = self.glob_some(pattern)
if files and len(files) > 1:
msg = "glob_one: pattern '" + pattern + "' matches multiple files:\n"
for file in files:
msg += file + "\n"
self.error(msg)
return 'Multiple_files_found'
return files[0]
def file_grep_not(self, filename: str, regexp) -> None:
contents = self.file_contents(filename)
if contents == "_Already_Errored_":
return
if re.search(regexp, contents, re.MULTILINE):
self.error("File_grep_not: " + filename + ": Regexp found: '" + regexp + "'")
def file_grep(self, filename: str, regexp, expvalue=None) -> list:
contents = self.file_contents(filename)
if contents == "_Already_Errored_":
return None
match = re.search(regexp, contents, re.MULTILINE)
if not match:
self.error("File_grep: " + filename + ": Regexp not found: " + regexp)
return None
if expvalue is not None and str(expvalue) != match.group(1):
2024-09-08 19:00:03 +02:00
self.error("File_grep: " + filename + ": Got='" + match.group(1) + "' Expected='" +
str(expvalue) + "' in regexp: '" + regexp + "'")
return None
return [match.groups()]
def file_grep_count(self, filename: str, regexp, expcount: int) -> None:
contents = self.file_contents(filename)
if contents == "_Already_Errored_":
return
count = len(re.findall(regexp, contents))
if expcount != count:
self.error("File_grep_count: " + filename + ": Got='" + str(count) + "' Expected='" +
str(expcount) + "' in regexp: '" + regexp + "'")
2024-09-08 19:00:03 +02:00
def file_grep_any(self, filenames: list, regexp, expvalue=None) -> None:
for filename in filenames:
contents = self.file_contents(filename)
if contents == "_Already_Errored_":
return
match = re.search(regexp, contents)
if match:
if expvalue is not None and str(expvalue) != match.group(1):
2024-09-08 19:00:03 +02:00
self.error("file_grep: " + filename + ": Got='" + match.group(1) +
"' Expected='" + str(expvalue) + "' in regexp: " + regexp)
return
msg = "file_grep_any: Regexp '" + regexp + "' not found in any of the following files:\n"
for filename in filenames:
msg += filename + "\n"
self.error(msg)
def file_contents(self, filename: str) -> str:
if filename not in self._file_contents_cache:
if not os.path.exists(filename):
self._file_contents_cache[filename] = "_Already_Errored_"
self.error("File_contents file not found: " + filename)
else:
with open(filename, 'r', encoding='latin-1') as fh:
self._file_contents_cache[filename] = str(fh.read())
return self._file_contents_cache[filename]
@staticmethod
def _file_contents_static(filename: str) -> str:
if filename not in VlTest._file_contents_cache:
if not os.path.exists(filename):
VlTest._file_contents_cache[filename] = "_Already_Errored_"
sys.exit("_file_contents_static file not found: " + filename)
else:
with open(filename, 'r', encoding='latin-1') as fh:
VlTest._file_contents_cache[filename] = str(fh.read())
return VlTest._file_contents_cache[filename]
def write_wholefile(self, filename: str, contents: str) -> None:
with open(filename, 'wb') as fh:
fh.write(contents.encode('latin-1'))
if filename in self._file_contents_cache:
del self._file_contents_cache[filename]
def file_sed(self, in_filename: str, out_filename, edit_lambda) -> None:
contents = self.file_contents(in_filename)
contents = edit_lambda(contents)
self.write_wholefile(out_filename, contents)
def extract(
self,
in_filename: str,
out_filename: str,
regexp=r'.*',
lineno_adjust=-9999, #
lines=None) -> None: #'#, #-#'
2025-09-26 03:08:24 +02:00
if not os.path.exists(test.root + "/.git"):
self.skip("Not in a git repository")
return
2024-09-08 19:00:03 +02:00
temp_fn = out_filename
temp_fn = re.sub(r'.*/', '', temp_fn)
temp_fn = self.obj_dir + "/" + temp_fn
out = []
emph = ""
lineno = 0
lineno_out = 0
with open(in_filename, 'r', encoding="latin-1") as fh:
for line in fh:
lineno += 1
if re.search(regexp, line) and self._lineno_match(lineno, lines):
match = re.search(r't/[A-Za-z0-9_]+.v:(\d+):(\d+):', line)
if match:
mlineno = int(match.group(1)) + lineno_adjust
col = int(match.group(2))
mlineno = max(1, mlineno)
line = re.sub(r't/[A-Za-z0-9_]+.v:(\d+):(\d+):',
'example.v:' + str(mlineno) + ':' + str(col), line)
out.append(" " + line)
lineno_out += 1
if '<--' in line:
if emph:
emph += ","
emph += str(lineno_out)
with open(temp_fn, 'w', encoding="latin-1") as fhw:
lang = " sv" if re.search(r'\.s?vh?$', in_filename) else ""
fhw.write(".. comment: generated by " + self.name + "\n")
fhw.write(".. code-block::" + lang + "\n")
if lang != "" and len(out) > 1:
fhw.write(" :linenos:\n")
if emph:
fhw.write(" :emphasize-lines: " + emph + "\n")
fhw.write("\n")
for line in out:
2025-05-01 02:32:30 +02:00
line = re.sub(r' +$', '', line)
2024-09-08 19:00:03 +02:00
fhw.write(line)
self.files_identical(temp_fn, out_filename)
@staticmethod
def _lineno_match(lineno: int, lines: str) -> bool:
if not lines:
return True
for lc in lines.split(','):
match1 = re.match(r'^(\d+)$', lc)
match2 = re.match(r'^(\d+)-(\d+)$', lc)
if match1 and int(match1.group(1)) == lineno:
return True
if match2 and int(match2.group(1)) <= lineno <= int(match2.group(2)):
return True
return False
######################################################################
######################################################################
# Global Functions
def calc_jobs() -> int:
ok_threads = max_procs()
print("driver.py: Found %d cores, using -j %d" % (ok_threads, ok_threads))
return ok_threads
def calc_threads(default_threads) -> int:
ok_threads = max_procs()
return ok_threads if (ok_threads < default_threads) else default_threads
def _calc_hashset() -> list:
match = re.match(r'^(\d+)/(\d+)$', Args.hashset)
if not match:
sys.exit("%Error: Need number/number format for --hashset: " + Args.hashset)
setn = int(match.group(1))
nsets = int(match.group(2))
new = []
global Arg_Tests
for t in Arg_Tests:
checksum = int(hashlib.sha256(t.encode("utf-8")).hexdigest()[0:4], 16)
if (setn % nsets) == (checksum % nsets):
new.append(t)
Arg_Tests = new
#######################################################################
#######################################################################
# Verilator utilities
def get_cpu_count():
try:
return len(os.sched_getaffinity(0))
except AttributeError:
return multiprocessing.cpu_count()
2024-09-08 19:00:03 +02:00
@lru_cache(maxsize=1)
def max_procs() -> int:
procs = get_cpu_count()
2024-09-08 19:00:03 +02:00
if procs < 2:
print("driver.py: Python didn't find at least two CPUs")
return procs
def _parameter(param: str) -> None:
global _Parameter_Next_Level
if _Parameter_Next_Level:
if not re.match(r'^(\d+)$', param):
2024-09-08 19:00:03 +02:00
sys.exit("%Error: Expected number following " + _Parameter_Next_Level + ": " + param)
Args.passdown_verilator_flags.append(param)
2024-09-08 19:00:03 +02:00
_Parameter_Next_Level = None
elif re.match(r'^(\+verilator\+.*)', param):
Args.passdown_verilated_flags.append(param)
2024-09-08 19:00:03 +02:00
elif re.search(r'\.py', param):
Arg_Tests.append(param)
elif re.match(r'^-?(-debugi|-dumpi)', param):
Args.passdown_verilator_flags.append(param)
2024-09-08 19:00:03 +02:00
_Parameter_Next_Level = param
elif re.match(r'^-?(-W||-debug-check)', param):
Args.passdown_verilator_flags.append(param)
2024-09-08 19:00:03 +02:00
else:
sys.exit("%Error: Unknown parameter: " + param)
def run_them() -> None:
VtOs.mkdir_ok("obj_dist")
timestart = time.strftime("%Y%m%d_%H%M%S")
runner = Runner(driver_log_filename="obj_dist/driver_" + timestart + ".log", quiet=Args.quiet)
for test_py in Arg_Tests:
for scenario in sorted(set(Args.scenarios)):
if VlTest._prefilter_scenario(test_py, scenario):
runner.one_test(py_filename=test_py, scenario=scenario)
runner.wait_and_report()
if Args.rerun and runner.fail_cnt and not Quitting:
2024-09-08 19:00:03 +02:00
print('=' * 70)
print('=' * 70)
print("RERUN ==\n")
# Avoid parallel run to ensure that isn't causing problems
# If > 10 failures something more wrong and get results quickly
if runner.fail_cnt < 10:
forker.max_proc(1)
orig_runner = runner
runner = Runner(driver_log_filename="obj_dist/driver_" + timestart + "_rerun.log",
quiet=False,
fail1_cnt=orig_runner.fail_cnt,
ok_cnt=orig_runner.ok_cnt,
skip_cnt=orig_runner.skip_cnt)
for ftest in orig_runner.fail_tests:
# Reschedule test
if ftest.rerunnable:
2025-03-27 23:33:18 +01:00
ftest.clean(for_rerun=True)
2024-09-08 19:00:03 +02:00
runner.one_test(py_filename=ftest.py_filename,
scenario=ftest.scenario,
rerun_skipping=not ftest.rerunnable)
runner.wait_and_report()
if runner.fail_cnt:
sys.exit(10)
######################################################################
######################################################################
# Main
if __name__ == '__main__':
os.environ['PYTHONUNBUFFERED'] = "1"
if ('VERILATOR_ROOT' not in os.environ) and os.path.isfile('../bin/verilator'):
os.environ['VERILATOR_ROOT'] = os.getcwd() + "/.."
if 'MAKE' not in os.environ:
os.environ['MAKE'] = "make"
if 'CXX' not in os.environ:
os.environ['CXX'] = "c++"
if 'TEST_REGRESS' in os.environ:
sys.exit("%Error: TEST_REGRESS environment variable is already set")
os.environ['TEST_REGRESS'] = os.getcwd()
Start = time.time()
_Parameter_Next_Level = None
def sig_int(signum, env) -> None: # pylint: disable=unused-argument
global Quitting
if Quitting:
sys.exit("\nQuitting (immediately)...")
print("\nQuitting... (send another interrupt signal for immediate quit)")
Quitting = True
if forker:
forker.kill_tree_all()
signal.signal(signal.SIGINT, sig_int)
#---------------------------------------------------------------------
parser = argparse.ArgumentParser(
allow_abbrev=False,
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""Run Verilator regression tests""",
epilog="""driver.py invokes Verilator or another simulator on each test file.
See docs/internals.rst in the distribution for more information.
2025-01-01 14:30:25 +01:00
Copyright 2024-2025 by Wilson Snyder. This program is free software; you
can redistribute it and/or modify it under the terms of either the GNU
Lesser General Public License Version 3 or the Perl Artistic License
Version 2.0.
SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0""")
parser.add_argument('--benchmark', action='store', help='enable benchmarking')
parser.add_argument('--debug', action='store_const', const=9, help='enable debug')
# --debugi: see _parameter()
2025-03-27 23:33:18 +01:00
parser.add_argument('--driver-clean', action='store_true', help='clean after test passes')
parser.add_argument('--fail-max',
action='store',
default=None,
help='after specified number of failures, skip remaining tests')
parser.add_argument('--gdb', action='store_true', help='run Verilator executable with gdb')
parser.add_argument('--gdbbt',
action='store_true',
help='run Verilated executable with gdb and backtrace')
parser.add_argument('--gdbsim', action='store_true', help='run Verilated executable with gdb')
parser.add_argument('--golden', '--gold', action='store_true', help='update golden .out files')
parser.add_argument('--hashset', action='store', help='split tests based on <set>/<numsets>')
parser.add_argument('--jobs',
'-j',
action='store',
default=0,
type=int,
help='parallel job count (0=cpu count)')
2024-11-25 03:12:08 +01:00
parser.add_argument('--obj-suffix',
action='store',
default='',
help='suffix to add to obj_ test directory name')
parser.add_argument('--quiet',
action='store_true',
help='suppress output except failures and progress')
parser.add_argument('--rerun', action='store_true', help='rerun all tests that fail')
parser.add_argument('--rr', action='store_true', help='run Verilator executable with rr')
parser.add_argument('--rrsim', action='store_true', help='run Verilated executable with rr')
parser.add_argument('--site',
action='store_true',
help='include VERILATOR_TEST_SITE test list')
parser.add_argument('--stop', action='store_true', help='stop on the first error')
parser.add_argument("--top-filename", help="override the default Verilog file name")
parser.add_argument('--trace', action='store_true', help='enable simulator waveform tracing')
parser.add_argument('--verbose',
action='store_true',
help='compile and run test in verbose mode')
parser.add_argument(
'--verilation', # -no-verilation undocumented debugging
action='store_true',
default=True,
help="don't run verilator compile() phase")
parser.add_argument('--verilated-debug',
action='store_true',
help='enable Verilated executable debug')
## Scenarios
for scen, v in All_Scenarios.items():
parser.add_argument('--' + scen,
dest='scenarios',
action='append_const',
const=scen,
help='scenario-enable ' + scen)
(Args, rest) = parser.parse_known_intermixed_args()
Args.passdown_verilator_flags = []
Args.passdown_verilated_flags = []
for arg in rest:
_parameter(arg)
if Args.debug:
Args.passdown_verilator_flags.append("--debug --no-skip-identical")
logging.basicConfig(level=logging.DEBUG)
logging.info("In driver.py, ARGV=" + ' '.join(sys.argv))
# Use Args for some global information, so gets passed to forked process
Args.interactive_debugger = Args.gdb or Args.gdbsim or Args.rr or Args.rrsim
if Args.jobs > 1 and Args.interactive_debugger:
sys.exit("%Error: Unable to use -j > 1 with --gdb* and --rr* options")
if Args.golden:
os.environ['HARNESS_UPDATE_GOLDEN'] = '1'
if Args.jobs == 0:
Args.jobs = 1 if Args.interactive_debugger else calc_jobs()
if not Args.scenarios:
Args.scenarios = []
Args.scenarios.append('dist')
Args.scenarios.append('vlt')
Args.orig_argv_sw = []
for arg in sys.argv:
if re.match(r'^-', arg) and not re.match(r'^-j$', arg):
Args.orig_argv_sw.append(arg)
Args.test_dirs = ["t"]
if 'VERILATOR_TESTS_SITE' in os.environ:
if Args.site or len(Arg_Tests) >= 1:
for tdir in os.environ['VERILATOR_TESTS_SITE'].split(':'):
Args.test_dirs.append(tdir)
if not Arg_Tests: # Run everything
uniq = {}
for tdir in Args.test_dirs:
# Uniquify by inode, so different paths to same place get combined
stats = os.stat(tdir)
if stats.st_ino not in uniq:
uniq[stats.st_ino] = 1
Arg_Tests += sorted(glob.glob(tdir + "/t_*.py"))
if Args.hashset:
_calc_hashset()
# Number of retries when reading logfiles, generally only need many
# retries when system is busy running a lot of tests
Args.log_retries = 10 if (len(Arg_Tests) > 3) else 2
forker = Forker(Args.jobs)
2025-05-22 23:53:04 +02:00
Args.driver_build_jobs = None
if len(Arg_Tests) >= 2 and Args.jobs >= 2:
# Read supported into master process, so don't call every subprocess
Capabilities.warmup_cache()
# Without this tests such as t_debug_sigsegv_bt_bad.py will occasionally
# block on input and cause a SIGSTOP, then a "fg" was needed to resume testing.
print("== Many jobs; redirecting STDIN", file=sys.stderr)
#
sys.stdin = open("/dev/null", 'r', encoding="utf8") # pylint: disable=consider-using-with
2025-05-22 23:53:04 +02:00
else:
# Speed up single-test makes
Args.driver_build_jobs = calc_jobs()
run_them()