Migrating &stochsyn to generic concurrency interface.

This commit is contained in:
Alan Mishchenko 2024-08-03 18:12:03 -07:00
parent 43426f0a94
commit 037971d9c9
5 changed files with 256 additions and 107 deletions

View File

@ -4151,6 +4151,10 @@ SOURCE=.\src\misc\util\utilNam.h
# End Source File
# Begin Source File
SOURCE=.\src\misc\util\utilPth.c
# End Source File
# Begin Source File
SOURCE=.\src\misc\util\utilSignal.c
# End Source File
# Begin Source File

View File

@ -1,6 +1,6 @@
/**CFile****************************************************************
FileName [giaDeep.c]
FileName [giaStoch.c]
SystemName [ABC: Logic synthesis and verification system.]
@ -14,7 +14,7 @@
Date [Ver. 1.0. Started - June 20, 2005.]
Revision [$Id: giaDeep.c,v 1.00 2005/06/20 00:00:00 alanmi Exp $]
Revision [$Id: giaStoch.c,v 1.00 2005/06/20 00:00:00 alanmi Exp $]
***********************************************************************/
@ -22,23 +22,13 @@
#include "base/main/main.h"
#include "base/cmd/cmd.h"
#ifdef _MSC_VER
#ifdef WIN32
#include <process.h>
#define unlink _unlink
#else
#include <unistd.h>
#endif
#ifdef ABC_USE_PTHREADS
#ifdef _WIN32
#include "../lib/pthread.h"
#else
#include <pthread.h>
#endif
#endif
ABC_NAMESPACE_IMPL_START
////////////////////////////////////////////////////////////////////////
@ -108,7 +98,7 @@ void Gia_StochProcessArray( Vec_Ptr_t * vGias, char * pScript, int TimeSecs, int
/**Function*************************************************************
Synopsis [Processing on a many cores.]
Synopsis [Processing on many cores.]
Description []
@ -117,27 +107,6 @@ void Gia_StochProcessArray( Vec_Ptr_t * vGias, char * pScript, int TimeSecs, int
SeeAlso []
***********************************************************************/
#ifndef ABC_USE_PTHREADS
void Gia_StochProcess( Vec_Ptr_t * vGias, char * pScript, int nProcs, int TimeSecs, int fVerbose )
{
Gia_StochProcessArray( vGias, pScript, TimeSecs, fVerbose );
}
#else // pthreads are used
#define PAR_THR_MAX 100
typedef struct Gia_StochThData_t_
{
Vec_Ptr_t * vGias;
char * pScript;
int Index;
int Rand;
int nTimeOut;
int fWorking;
} Gia_StochThData_t;
Gia_Man_t * Gia_StochProcessOne( Gia_Man_t * p, char * pScript, int Rand, int TimeSecs )
{
Gia_Man_t * pNew;
@ -162,87 +131,68 @@ Gia_Man_t * Gia_StochProcessOne( Gia_Man_t * p, char * pScript, int Rand, int Ti
return Gia_ManDup(p);
}
void * Gia_StochWorkerThread( void * pArg )
/**Function*************************************************************
Synopsis [Generic concurrent processing.]
Description [User-defined problem-specific data and the way to process it.]
SideEffects []
SeeAlso []
***********************************************************************/
typedef struct StochSynData_t_
{
Gia_StochThData_t * pThData = (Gia_StochThData_t *)pArg;
volatile int * pPlace = &pThData->fWorking;
Gia_Man_t * pGia, * pNew;
while ( 1 )
{
while ( *pPlace == 0 );
assert( pThData->fWorking );
if ( pThData->Index == -1 )
{
pthread_exit( NULL );
assert( 0 );
return NULL;
}
pGia = (Gia_Man_t *)Vec_PtrEntry( pThData->vGias, pThData->Index );
pNew = Gia_StochProcessOne( pGia, pThData->pScript, pThData->Rand, pThData->nTimeOut );
Gia_ManStop( pGia );
Vec_PtrWriteEntry( pThData->vGias, pThData->Index, pNew );
pThData->fWorking = 0;
}
assert( 0 );
return NULL;
Gia_Man_t * pIn;
Gia_Man_t * pOut;
char * pScript;
int Rand;
int TimeOut;
} StochSynData_t;
int Gia_StochProcess1( void * p )
{
StochSynData_t * pData = (StochSynData_t *)p;
assert( pData->pIn != NULL );
assert( pData->pOut == NULL );
pData->pOut = Gia_StochProcessOne( pData->pIn, pData->pScript, pData->Rand, pData->TimeOut );
return 1;
}
void Gia_StochProcess( Vec_Ptr_t * vGias, char * pScript, int nProcs, int TimeSecs, int fVerbose )
{
Gia_StochThData_t ThData[PAR_THR_MAX];
pthread_t WorkerThread[PAR_THR_MAX];
int i, k, status;
if ( fVerbose )
printf( "Running concurrent synthesis with %d processes.\n", nProcs );
fflush( stdout );
if ( nProcs < 2 )
return Gia_StochProcessArray( vGias, pScript, TimeSecs, fVerbose );
// subtract manager thread
nProcs--;
assert( nProcs >= 1 && nProcs <= PAR_THR_MAX );
// start threads
if ( nProcs <= 2 ) {
if ( fVerbose )
printf( "Running non-concurrent synthesis.\n" ), fflush(stdout);
Gia_StochProcessArray( vGias, pScript, TimeSecs, fVerbose );
return;
}
StochSynData_t * pData = ABC_CALLOC( StochSynData_t, Vec_PtrSize(vGias) );
Vec_Ptr_t * vData = Vec_PtrAlloc( Vec_PtrSize(vGias) );
Gia_Man_t * pGia; int i;
Abc_Random(1);
for ( i = 0; i < nProcs; i++ )
{
ThData[i].vGias = vGias;
ThData[i].pScript = pScript;
ThData[i].Index = -1;
ThData[i].Rand = Abc_Random(0) % 0x1000000;
ThData[i].nTimeOut = TimeSecs;
ThData[i].fWorking = 0;
status = pthread_create( WorkerThread + i, NULL, Gia_StochWorkerThread, (void *)(ThData + i) ); assert( status == 0 );
Vec_PtrForEachEntry( Gia_Man_t *, vGias, pGia, i ) {
pData[i].pIn = pGia;
pData[i].pOut = NULL;
pData[i].pScript = pScript;
pData[i].Rand = Abc_Random(0) % 0x1000000;
pData[i].TimeOut = TimeSecs;
Vec_PtrPush( vData, pData+i );
}
// look at the threads
for ( k = 0; k < Vec_PtrSize(vGias); k++ )
{
for ( i = 0; i < nProcs; i++ )
{
if ( ThData[i].fWorking )
continue;
ThData[i].Index = k;
ThData[i].fWorking = 1;
break;
}
if ( i == nProcs )
k--;
}
// wait till threads finish
for ( i = 0; i < nProcs; i++ )
if ( ThData[i].fWorking )
i = -1;
// stop threads
for ( i = 0; i < nProcs; i++ )
{
assert( !ThData[i].fWorking );
// stop
ThData[i].Index = -1;
ThData[i].fWorking = 1;
if ( fVerbose )
printf( "Running concurrent synthesis with %d processes.\n", nProcs ), fflush(stdout);
Util_ProcessThreads( Gia_StochProcess1, vData, nProcs, TimeSecs, fVerbose );
// replace old AIGs by new AIGs
Vec_PtrForEachEntry( Gia_Man_t *, vGias, pGia, i ) {
Gia_ManStop( pGia );
Vec_PtrWriteEntry( vGias, i, pData[i].pOut );
}
Vec_PtrFree( vData );
ABC_FREE( pData );
}
#endif // pthreads are used
/**Function*************************************************************
Synopsis []

View File

@ -549,6 +549,9 @@ extern int * Abc_QuickSortCost( int * pCosts, int nSize, int fDecrease );
extern unsigned Abc_Random( int fReset );
extern word Abc_RandomW( int fReset );
// pthreads
extern void Util_ProcessThreads( int (*pUserFunc)(void *), void * vData, int nProcs, int TimeOut, int fVerbose );
ABC_NAMESPACE_HEADER_END
#endif

View File

@ -4,5 +4,6 @@ SRC += src/misc/util/utilBridge.c \
src/misc/util/utilFile.c \
src/misc/util/utilIsop.c \
src/misc/util/utilNam.c \
src/misc/util/utilPth.c \
src/misc/util/utilSignal.c \
src/misc/util/utilSort.c

191
src/misc/util/utilPth.c Normal file
View File

@ -0,0 +1,191 @@
/**CFile****************************************************************
FileName [utilPth.c]
SystemName [ABC: Logic synthesis and verification system.]
PackageName [Generic interface to pthreads.]
Synopsis [Generic interface to pthreads.]
Author [Alan Mishchenko]
Affiliation [UC Berkeley]
Date [Ver. 1.0. Started - August 3, 2024.]
Revision [$Id: utilPth.c,v 1.00 2024/08/03 00:00:00 alanmi Exp $]
***********************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#ifdef ABC_USE_PTHREADS
#ifdef _WIN32
#include "../lib/pthread.h"
#else
#include <pthread.h>
#endif
#ifdef __cplusplus
#include <atomic>
using namespace std;
#else
#include <stdatomic.h>
#endif
#endif
#include "misc/vec/vec.h"
ABC_NAMESPACE_IMPL_START
////////////////////////////////////////////////////////////////////////
/// DECLARATIONS ///
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
/// FUNCTION DEFINITIONS ///
////////////////////////////////////////////////////////////////////////
/**Function*************************************************************
Synopsis []
Description []
SideEffects []
SeeAlso []
***********************************************************************/
#ifndef ABC_USE_PTHREADS
void Util_ProcessThreads( int (*pUserFunc)(void *), void * vData, int nProcs, int TimeOut, int fVerbose )
{
void * pData; int i;
Vec_PtrForEachEntry( void *, (Vec_Ptr_t *)vData, pData, i )
pUserFunc( pData );
}
#else // pthreads are used
#define PAR_THR_MAX 100
typedef struct Util_ThData_t_
{
void * pUserData;
int (*pUserFunc)(void *);
int iThread;
int nTimeOut;
atomic_bool fWorking;
} Util_ThData_t;
void * Util_Thread( void * pArg )
{
struct timespec pause_duration;
pause_duration.tv_sec = 0;
pause_duration.tv_nsec = 10000000L; // 10 milliseconds
Util_ThData_t * pThData = (Util_ThData_t *)pArg;
while ( 1 )
{
while ( !atomic_load_explicit((atomic_bool *)&pThData->fWorking, memory_order_acquire) )
nanosleep(&pause_duration, NULL);
if ( pThData->pUserData == NULL )
{
pthread_exit( NULL );
assert( 0 );
return NULL;
}
pThData->pUserFunc( pThData->pUserData );
atomic_store_explicit(&pThData->fWorking, 0, memory_order_release);
}
assert( 0 );
return NULL;
}
void Util_ProcessThreads( int (*pUserFunc)(void *), void * vData, int nProcs, int TimeOut, int fVerbose )
{
//abctime clkStart = Abc_Clock();
Util_ThData_t ThData[PAR_THR_MAX];
pthread_t WorkerThread[PAR_THR_MAX];
Vec_Ptr_t * vStack = NULL;
int i, status;
fflush( stdout );
if ( nProcs <= 2 ) {
void * pData; int i;
Vec_PtrForEachEntry( void *, (Vec_Ptr_t *)vData, pData, i )
pUserFunc( pData );
return;
}
// subtract manager thread
nProcs--;
assert( nProcs >= 1 && nProcs <= PAR_THR_MAX );
// start threads
for ( i = 0; i < nProcs; i++ )
{
ThData[i].pUserData = NULL;
ThData[i].pUserFunc = pUserFunc;
ThData[i].iThread = i;
ThData[i].nTimeOut = TimeOut;
atomic_store_explicit(&ThData[i].fWorking, 0, memory_order_release);
status = pthread_create( WorkerThread + i, NULL, Util_Thread, (void *)(ThData + i) ); assert( status == 0 );
}
struct timespec pause_duration;
pause_duration.tv_sec = 0;
pause_duration.tv_nsec = 10000000L; // 10 milliseconds
// look at the threads
vStack = Vec_PtrDup( (Vec_Ptr_t *)vData );
while ( Vec_PtrSize(vStack) > 0 )
{
for ( i = 0; i < nProcs; i++ )
{
if ( atomic_load_explicit(&ThData[i].fWorking, memory_order_acquire) )
continue;
ThData[i].pUserData = Vec_PtrPop( vStack );
atomic_store_explicit(&ThData[i].fWorking, 1, memory_order_release);
break;
}
}
Vec_PtrFree( vStack );
// wait till threads finish
for ( i = 0; i < nProcs; i++ )
{
if ( atomic_load_explicit(&ThData[i].fWorking, memory_order_acquire) )
i = -1; // Start from the beginning again
nanosleep(&pause_duration, NULL);
}
// stop threads
for ( i = 0; i < nProcs; i++ )
{
ThData[i].pUserData = NULL;
atomic_store_explicit(&ThData[i].fWorking, 1, memory_order_release);
}
// Join threads
for ( i = 0; i < nProcs; i++ )
pthread_join( WorkerThread[i], NULL );
//if ( fVerbose )
// Abc_PrintTime( 1, "Time", Abc_Clock() - clkStart );
}
#endif // pthreads are used
////////////////////////////////////////////////////////////////////////
/// END OF FILE ///
////////////////////////////////////////////////////////////////////////
ABC_NAMESPACE_IMPL_END