Fix fork join kill-accounting callback lifetime and reentrancy.
This commit is contained in:
parent
c298f1e8b3
commit
4ff579f300
|
|
@ -129,8 +129,7 @@ void VlTriggerScheduler::moveToResumeQueue(const char* eventDescription) {
|
|||
if (!m_fired.empty()) {
|
||||
VL_DEBUG_IF(VL_DBG_MSGF(" Moving to resume queue processes waiting for %s:\n",
|
||||
eventDescription);
|
||||
for (const auto& susp
|
||||
: m_fired) {
|
||||
for (const auto& susp : m_fired) {
|
||||
VL_DBG_MSGF(" - ");
|
||||
susp.dump();
|
||||
});
|
||||
|
|
@ -144,8 +143,7 @@ void VlTriggerScheduler::ready(const char* eventDescription) {
|
|||
if (!m_awaiting.empty()) {
|
||||
VL_DEBUG_IF(
|
||||
VL_DBG_MSGF(" Committing processes waiting for %s:\n", eventDescription);
|
||||
for (const auto& susp
|
||||
: m_awaiting) {
|
||||
for (const auto& susp : m_awaiting) {
|
||||
VL_DBG_MSGF(" - ");
|
||||
susp.dump();
|
||||
});
|
||||
|
|
@ -201,8 +199,7 @@ bool VlDynamicTriggerScheduler::evaluate() {
|
|||
void VlDynamicTriggerScheduler::doPostUpdates() {
|
||||
VL_DEBUG_IF(if (!m_post.empty())
|
||||
VL_DBG_MSGF(" Doing post updates for processes:\n"); //
|
||||
for (const auto& susp
|
||||
: m_post) {
|
||||
for (const auto& susp : m_post) {
|
||||
VL_DBG_MSGF(" - ");
|
||||
susp.dump();
|
||||
});
|
||||
|
|
@ -212,8 +209,7 @@ void VlDynamicTriggerScheduler::doPostUpdates() {
|
|||
|
||||
void VlDynamicTriggerScheduler::resume() {
|
||||
VL_DEBUG_IF(if (!m_triggered.empty()) VL_DBG_MSGF(" Resuming processes:\n"); //
|
||||
for (const auto& susp
|
||||
: m_triggered) {
|
||||
for (const auto& susp : m_triggered) {
|
||||
VL_DBG_MSGF(" - ");
|
||||
susp.dump();
|
||||
});
|
||||
|
|
@ -238,12 +234,12 @@ void VlDynamicTriggerScheduler::dump() const {
|
|||
//======================================================================
|
||||
// VlForkSync:: Methods
|
||||
|
||||
void VlProcess::forkSyncOnKill(VlForkSync* forkSyncp) {
|
||||
void VlProcess::forkSyncOnKill(VlForkSyncState* forkSyncp) {
|
||||
m_forkSyncOnKillp = forkSyncp;
|
||||
m_forkSyncOnKillDone = false;
|
||||
}
|
||||
|
||||
void VlProcess::forkSyncOnKillClear(VlForkSync* forkSyncp) {
|
||||
void VlProcess::forkSyncOnKillClear(VlForkSyncState* forkSyncp) {
|
||||
if (m_forkSyncOnKillp != forkSyncp) return;
|
||||
m_forkSyncOnKillp = nullptr;
|
||||
m_forkSyncOnKillDone = false;
|
||||
|
|
@ -260,22 +256,34 @@ void VlProcess::state(int s) {
|
|||
m_state = s;
|
||||
}
|
||||
|
||||
VlForkSync::~VlForkSync() {
|
||||
for (std::weak_ptr<VlProcess>& weakp : m_onKillProcessps) {
|
||||
if (VlProcessRef processp = weakp.lock()) processp->forkSyncOnKillClear(this);
|
||||
}
|
||||
VlForkSyncState::~VlForkSyncState() {
|
||||
for (const VlProcessRef& processp : m_onKillProcessps) processp->forkSyncOnKillClear(this);
|
||||
}
|
||||
|
||||
void VlForkSync::onKill(VlProcessRef process) {
|
||||
if (!process) return;
|
||||
m_onKillProcessps.emplace_back(process);
|
||||
process->forkSyncOnKill(this);
|
||||
m_state->m_onKillProcessps.emplace_back(process);
|
||||
process->forkSyncOnKill(m_state.get());
|
||||
}
|
||||
|
||||
void VlForkSync::done(const char* filename, int lineno) {
|
||||
void VlForkSyncState::done(const char* filename, int lineno) {
|
||||
VL_DEBUG_IF(VL_DBG_MSGF(" Process forked at %s:%d finished\n", filename, lineno););
|
||||
if (m_join->m_counter > 0) m_join->m_counter--;
|
||||
if (m_join->m_counter == 0) m_join->m_susp.resume();
|
||||
if (!m_inited) {
|
||||
++m_pendingDones;
|
||||
return;
|
||||
}
|
||||
if (m_counter > 0) m_counter--;
|
||||
if (m_counter != 0) return;
|
||||
if (m_inDone) {
|
||||
m_resumePending = true;
|
||||
return;
|
||||
}
|
||||
m_inDone = true;
|
||||
do {
|
||||
m_resumePending = false;
|
||||
m_susp.resume();
|
||||
} while (m_resumePending && m_inited && m_counter == 0);
|
||||
m_inDone = false;
|
||||
}
|
||||
|
||||
//======================================================================
|
||||
|
|
|
|||
|
|
@ -383,44 +383,59 @@ struct VlForever final {
|
|||
//=============================================================================
|
||||
// VlForkSync is used to manage fork..join and fork..join_any constructs.
|
||||
|
||||
class VlForkSync final {
|
||||
// VlJoin stores the handle of a suspended coroutine that did a fork..join or fork..join_any.
|
||||
// If the counter reaches 0, the suspended coroutine shall be resumed.
|
||||
struct VlJoin final {
|
||||
size_t m_counter = 0; // When reaches 0, resume suspended coroutine
|
||||
VlCoroutineHandle m_susp; // Coroutine to resume
|
||||
};
|
||||
// Shared fork..join state, because VlForkSync is copied into generated coroutine frames.
|
||||
class VlForkSyncState final {
|
||||
public:
|
||||
size_t m_counter = 0; // When reaches 0, resume suspended coroutine
|
||||
VlCoroutineHandle m_susp; // Coroutine to resume
|
||||
bool m_inited = false;
|
||||
size_t m_pendingDones = 0; // done() calls seen before init() (e.g. early killed branch)
|
||||
bool m_inDone = false; // Guard against re-entrant resume recursion from nested kills
|
||||
bool m_resumePending = false; // Join reached zero again while inside done()
|
||||
std::vector<VlProcessRef> m_onKillProcessps; // Branches registered for kill hooks
|
||||
|
||||
// The join info is shared among all forked processes
|
||||
std::shared_ptr<VlJoin> m_join;
|
||||
std::vector<std::weak_ptr<VlProcess>> m_onKillProcessps; // Branches registered for kill hooks
|
||||
VlForkSyncState() // Construct with a null coroutine handle
|
||||
: m_susp{VlProcessRef{}} {}
|
||||
~VlForkSyncState();
|
||||
void done(const char* filename = VL_UNKNOWN, int lineno = 0);
|
||||
};
|
||||
|
||||
class VlForkSync final {
|
||||
std::shared_ptr<VlForkSyncState> m_state{std::make_shared<VlForkSyncState>()};
|
||||
|
||||
public:
|
||||
~VlForkSync();
|
||||
// Create the join object and set the counter to the specified number
|
||||
void init(size_t count, VlProcessRef process) { m_join.reset(new VlJoin{count, {process}}); }
|
||||
void init(size_t count, VlProcessRef process) {
|
||||
const size_t pendingDones = m_state->m_pendingDones;
|
||||
m_state->m_pendingDones = 0;
|
||||
count = (pendingDones >= count) ? 0 : (count - pendingDones);
|
||||
m_state->m_counter = count;
|
||||
m_state->m_susp = {process};
|
||||
m_state->m_inited = true;
|
||||
}
|
||||
// Register process kill callback so killed fork branches still decrement join counter
|
||||
void onKill(VlProcessRef process);
|
||||
// Called whenever any of the forked processes finishes. If the join counter reaches 0, the
|
||||
// main process gets resumed
|
||||
void done(const char* filename = VL_UNKNOWN, int lineno = 0);
|
||||
void done(const char* filename = VL_UNKNOWN, int lineno = 0) {
|
||||
m_state->done(filename, lineno);
|
||||
}
|
||||
// Used by coroutines for co_awaiting a join
|
||||
auto join(VlProcessRef process, const char* filename = VL_UNKNOWN, int lineno = 0) {
|
||||
assert(m_join);
|
||||
VL_DEBUG_IF(
|
||||
VL_DBG_MSGF(" Awaiting join of fork at: %s:%d\n", filename, lineno););
|
||||
struct Awaitable final {
|
||||
VlProcessRef process; // Data of the suspended process, null if not needed
|
||||
const std::shared_ptr<VlJoin> join; // Join to await on
|
||||
const std::shared_ptr<VlForkSyncState> state; // Join to await on
|
||||
VlFileLineDebug fileline;
|
||||
|
||||
bool await_ready() { return join->m_counter == 0; } // Suspend if join still exists
|
||||
bool await_ready() { return state->m_counter == 0; } // Suspend if join still exists
|
||||
void await_suspend(std::coroutine_handle<> coro) {
|
||||
join->m_susp = {coro, process, fileline};
|
||||
state->m_susp = {coro, process, fileline};
|
||||
}
|
||||
void await_resume() const {}
|
||||
};
|
||||
return Awaitable{process, m_join, VlFileLineDebug{filename, lineno}};
|
||||
return Awaitable{process, m_state, VlFileLineDebug{filename, lineno}};
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -110,13 +110,15 @@ constexpr IData VL_CLOG2_CE_Q(QData lhs) VL_PURE {
|
|||
// Metadata of processes
|
||||
using VlProcessRef = std::shared_ptr<VlProcess>;
|
||||
class VlForkSync;
|
||||
class VlForkSyncState;
|
||||
|
||||
class VlProcess final {
|
||||
// MEMBERS
|
||||
int m_state; // Current state of the process
|
||||
VlProcessRef m_parentp = nullptr; // Parent process, if exists
|
||||
std::set<VlProcess*> m_children; // Active child processes
|
||||
VlForkSync* m_forkSyncOnKillp = nullptr; // Optional fork..join counter to decrement on kill
|
||||
VlForkSyncState* m_forkSyncOnKillp
|
||||
= nullptr; // Optional fork..join counter to decrement on kill
|
||||
bool m_forkSyncOnKillDone = false; // Ensure on-kill callback fires only once
|
||||
|
||||
public:
|
||||
|
|
@ -154,10 +156,12 @@ public:
|
|||
disableFork();
|
||||
}
|
||||
void disableFork() {
|
||||
for (VlProcess* childp : m_children) childp->disable();
|
||||
// childp->disable() may resume coroutines and mutate m_children
|
||||
const std::set<VlProcess*> children = m_children;
|
||||
for (VlProcess* childp : children) childp->disable();
|
||||
}
|
||||
void forkSyncOnKill(VlForkSync* forkSyncp);
|
||||
void forkSyncOnKillClear(VlForkSync* forkSyncp);
|
||||
void forkSyncOnKill(VlForkSyncState* forkSyncp);
|
||||
void forkSyncOnKillClear(VlForkSyncState* forkSyncp);
|
||||
bool completed() const { return state() == FINISHED || state() == KILLED; }
|
||||
bool completedFork() const {
|
||||
for (const VlProcess* const childp : m_children)
|
||||
|
|
@ -1939,7 +1943,7 @@ public:
|
|||
VlClassRef() = default;
|
||||
// Init with nullptr
|
||||
// cppcheck-suppress noExplicitConstructor
|
||||
VlClassRef(VlNull){};
|
||||
VlClassRef(VlNull) {};
|
||||
template <typename... T_Args>
|
||||
VlClassRef(VlDeleter& deleter, T_Args&&... args)
|
||||
// () required here to avoid narrowing conversion warnings,
|
||||
|
|
|
|||
Loading…
Reference in New Issue