Batch vcd work item creation

Rather then lock/unlock the work queue ring for every item, save
tons of pthread lock manipulation by allocating to the producer
in batches. Over the long run, this doesn't change the CPU balance
or hold up either thread, but it eliminates almost 3/4 of the
lock/unlock episodes.
This commit is contained in:
Stephen Williams 2010-01-09 10:08:16 -08:00
parent 76ebde4cd2
commit 7fc6b02e96
2 changed files with 74 additions and 36 deletions

View File

@ -79,7 +79,6 @@ typedef enum vcd_work_item_type_e {
struct lxt2_wr_symbol;
# define VAL_CHAR_ARRAY_SIZE 64
struct vcd_work_item_s {
vcd_work_item_type_t type;
uint64_t time;
@ -89,11 +88,7 @@ struct vcd_work_item_s {
union {
double val_double;
#ifdef VAL_CHAR_ARRAY_SIZE
char val_char[VAL_CHAR_ARRAY_SIZE];
#else
char*val_char;
#endif
} op_;
};

View File

@ -83,6 +83,9 @@ extern "C" void vcd_scope_names_delete(void)
static pthread_t work_thread;
static const unsigned WORK_QUEUE_SIZE = 128*1024;
static const unsigned WORK_QUEUE_BATCH_MIN = 4*1024;
static const unsigned WORK_QUEUE_BATCH_MAX = 32*1024;
static struct vcd_work_item_s work_queue[WORK_QUEUE_SIZE];
static volatile unsigned work_queue_next = 0;
static volatile unsigned work_queue_fill = 0;
@ -90,9 +93,8 @@ static volatile unsigned work_queue_fill = 0;
static pthread_mutex_t work_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t work_queue_is_empty_sig = PTHREAD_COND_INITIALIZER;
static pthread_cond_t work_queue_notempty_sig = PTHREAD_COND_INITIALIZER;
static pthread_cond_t work_queue_notfull_sig = PTHREAD_COND_INITIALIZER;
static pthread_cond_t work_queue_minfree_sig = PTHREAD_COND_INITIALIZER;
static uint64_t work_queue_next_time = 0;
struct vcd_work_item_s* vcd_work_thread_peek(void)
{
@ -119,47 +121,64 @@ void vcd_work_thread_pop(void)
work_queue_fill = use_fill;
unsigned use_next = work_queue_next;
#ifndef VAL_CHAR_ARRAY_SIZE
struct vcd_work_item_s*cell = work_queue + use_next;
if (cell->type == WT_EMIT_BITS) {
free(cell->op_.val_char);
}
#endif
use_next += 1;
if (use_next >= WORK_QUEUE_SIZE)
use_next = 0;
work_queue_next = use_next;
if (use_fill == WORK_QUEUE_SIZE-1)
pthread_cond_signal(&work_queue_notfull_sig);
if (use_fill == WORK_QUEUE_SIZE-WORK_QUEUE_BATCH_MIN)
pthread_cond_signal(&work_queue_minfree_sig);
else if (use_fill == 0)
pthread_cond_signal(&work_queue_is_empty_sig);
pthread_mutex_unlock(&work_queue_mutex);
}
/*
* Work queue items are created in batches to reduce thread
* bouncing. When the producer gets a free work item, it actually
* locks the queue in order to produce a batch. The queue stays locked
* until the batch is complete. Then the releases the whole lot to the
* consumer.
*/
static uint64_t work_queue_next_time = 0;
static unsigned current_batch_cnt = 0;
static unsigned current_batch_alloc = 0;
static unsigned current_batch_base = 0;
void vcd_work_start( void* (*fun) (void*), void*arg )
{
pthread_create(&work_thread, 0, fun, arg);
}
void vcd_work_sync(void)
{
if (work_queue_fill > 0) {
pthread_mutex_lock(&work_queue_mutex);
while (work_queue_fill > 0)
pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex);
pthread_mutex_unlock(&work_queue_mutex);
}
}
static struct vcd_work_item_s* grab_item(void)
{
pthread_mutex_lock(&work_queue_mutex);
while (work_queue_fill >= WORK_QUEUE_SIZE)
pthread_cond_wait(&work_queue_notfull_sig, &work_queue_mutex);
if (current_batch_alloc == 0) {
pthread_mutex_lock(&work_queue_mutex);
while ((WORK_QUEUE_SIZE-work_queue_fill) < WORK_QUEUE_BATCH_MIN)
pthread_cond_wait(&work_queue_minfree_sig, &work_queue_mutex);
unsigned cur = work_queue_next + work_queue_fill;
current_batch_base = work_queue_next + work_queue_fill;
current_batch_alloc = WORK_QUEUE_SIZE - work_queue_fill;
pthread_mutex_unlock(&work_queue_mutex);
if (current_batch_base >= WORK_QUEUE_SIZE)
current_batch_base -= WORK_QUEUE_SIZE;
if (current_batch_alloc > WORK_QUEUE_BATCH_MAX)
current_batch_alloc = WORK_QUEUE_BATCH_MAX;
current_batch_cnt = 0;
}
assert(current_batch_cnt < current_batch_alloc);
unsigned cur = current_batch_base + current_batch_cnt;
if (cur >= WORK_QUEUE_SIZE)
cur -= WORK_QUEUE_SIZE;
@ -169,21 +188,50 @@ static struct vcd_work_item_s* grab_item(void)
return cell;
}
static void unlock_item(void)
static void end_batch(void)
{
unsigned use_fill = work_queue_fill + 1;
pthread_mutex_lock(&work_queue_mutex);
unsigned use_fill = work_queue_fill;
bool was_empty_flag = (use_fill==0) && (current_batch_cnt > 0);
use_fill += current_batch_cnt;
work_queue_fill = use_fill;
if (use_fill == 1)
current_batch_alloc = 0;
current_batch_cnt = 0;
if (was_empty_flag)
pthread_cond_signal(&work_queue_notempty_sig);
pthread_mutex_unlock(&work_queue_mutex);
}
static inline void unlock_item(bool flush_batch =false)
{
current_batch_cnt += 1;
if (current_batch_cnt == current_batch_alloc || flush_batch)
end_batch();
}
void vcd_work_sync(void)
{
if (current_batch_alloc > 0)
end_batch();
if (work_queue_fill > 0) {
pthread_mutex_lock(&work_queue_mutex);
while (work_queue_fill > 0)
pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex);
pthread_mutex_unlock(&work_queue_mutex);
}
}
void vcd_work_flush(void)
{
struct vcd_work_item_s*cell = grab_item();
cell->type = WT_FLUSH;
unlock_item();
unlock_item(true);
}
void vcd_work_dumpon(void)
@ -220,13 +268,8 @@ void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char* val)
struct vcd_work_item_s*cell = grab_item();
cell->type = WT_EMIT_BITS;
cell->sym_.lxt2 = sym;
#ifdef VAL_CHAR_ARRAY_SIZE
size_t need_len = strlen(val) + 1;
assert(need_len <= VAL_CHAR_ARRAY_SIZE);
memcpy(cell->op_.val_char, val, need_len);
#else
cell->op_.val_char = strdup(val);
#endif
unlock_item();
}
@ -234,6 +277,6 @@ void vcd_work_terminate(void)
{
struct vcd_work_item_s*cell = grab_item();
cell->type = WT_TERMINATE;
unlock_item();
unlock_item(true);
pthread_join(work_thread, 0);
}