diff --git a/vpi/vcd_priv.h b/vpi/vcd_priv.h index 70b445d62..672cf602c 100644 --- a/vpi/vcd_priv.h +++ b/vpi/vcd_priv.h @@ -79,7 +79,6 @@ typedef enum vcd_work_item_type_e { struct lxt2_wr_symbol; -# define VAL_CHAR_ARRAY_SIZE 64 struct vcd_work_item_s { vcd_work_item_type_t type; uint64_t time; @@ -89,11 +88,7 @@ struct vcd_work_item_s { union { double val_double; -#ifdef VAL_CHAR_ARRAY_SIZE - char val_char[VAL_CHAR_ARRAY_SIZE]; -#else char*val_char; -#endif } op_; }; diff --git a/vpi/vcd_priv2.cc b/vpi/vcd_priv2.cc index ae22a0d03..20213adbf 100644 --- a/vpi/vcd_priv2.cc +++ b/vpi/vcd_priv2.cc @@ -83,6 +83,9 @@ extern "C" void vcd_scope_names_delete(void) static pthread_t work_thread; static const unsigned WORK_QUEUE_SIZE = 128*1024; +static const unsigned WORK_QUEUE_BATCH_MIN = 4*1024; +static const unsigned WORK_QUEUE_BATCH_MAX = 32*1024; + static struct vcd_work_item_s work_queue[WORK_QUEUE_SIZE]; static volatile unsigned work_queue_next = 0; static volatile unsigned work_queue_fill = 0; @@ -90,9 +93,8 @@ static volatile unsigned work_queue_fill = 0; static pthread_mutex_t work_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t work_queue_is_empty_sig = PTHREAD_COND_INITIALIZER; static pthread_cond_t work_queue_notempty_sig = PTHREAD_COND_INITIALIZER; -static pthread_cond_t work_queue_notfull_sig = PTHREAD_COND_INITIALIZER; +static pthread_cond_t work_queue_minfree_sig = PTHREAD_COND_INITIALIZER; -static uint64_t work_queue_next_time = 0; struct vcd_work_item_s* vcd_work_thread_peek(void) { @@ -119,47 +121,64 @@ void vcd_work_thread_pop(void) work_queue_fill = use_fill; unsigned use_next = work_queue_next; -#ifndef VAL_CHAR_ARRAY_SIZE + struct vcd_work_item_s*cell = work_queue + use_next; if (cell->type == WT_EMIT_BITS) { free(cell->op_.val_char); } -#endif + use_next += 1; if (use_next >= WORK_QUEUE_SIZE) use_next = 0; work_queue_next = use_next; - if (use_fill == WORK_QUEUE_SIZE-1) - pthread_cond_signal(&work_queue_notfull_sig); + if (use_fill == WORK_QUEUE_SIZE-WORK_QUEUE_BATCH_MIN) + pthread_cond_signal(&work_queue_minfree_sig); else if (use_fill == 0) pthread_cond_signal(&work_queue_is_empty_sig); pthread_mutex_unlock(&work_queue_mutex); } +/* + * Work queue items are created in batches to reduce thread + * bouncing. When the producer gets a free work item, it actually + * locks the queue in order to produce a batch. The queue stays locked + * until the batch is complete. Then the releases the whole lot to the + * consumer. + */ +static uint64_t work_queue_next_time = 0; +static unsigned current_batch_cnt = 0; +static unsigned current_batch_alloc = 0; +static unsigned current_batch_base = 0; + void vcd_work_start( void* (*fun) (void*), void*arg ) { pthread_create(&work_thread, 0, fun, arg); } -void vcd_work_sync(void) -{ - if (work_queue_fill > 0) { - pthread_mutex_lock(&work_queue_mutex); - while (work_queue_fill > 0) - pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex); - pthread_mutex_unlock(&work_queue_mutex); - } -} - static struct vcd_work_item_s* grab_item(void) { - pthread_mutex_lock(&work_queue_mutex); - while (work_queue_fill >= WORK_QUEUE_SIZE) - pthread_cond_wait(&work_queue_notfull_sig, &work_queue_mutex); + if (current_batch_alloc == 0) { + pthread_mutex_lock(&work_queue_mutex); + while ((WORK_QUEUE_SIZE-work_queue_fill) < WORK_QUEUE_BATCH_MIN) + pthread_cond_wait(&work_queue_minfree_sig, &work_queue_mutex); - unsigned cur = work_queue_next + work_queue_fill; + current_batch_base = work_queue_next + work_queue_fill; + current_batch_alloc = WORK_QUEUE_SIZE - work_queue_fill; + + pthread_mutex_unlock(&work_queue_mutex); + + if (current_batch_base >= WORK_QUEUE_SIZE) + current_batch_base -= WORK_QUEUE_SIZE; + if (current_batch_alloc > WORK_QUEUE_BATCH_MAX) + current_batch_alloc = WORK_QUEUE_BATCH_MAX; + current_batch_cnt = 0; + } + + assert(current_batch_cnt < current_batch_alloc); + + unsigned cur = current_batch_base + current_batch_cnt; if (cur >= WORK_QUEUE_SIZE) cur -= WORK_QUEUE_SIZE; @@ -169,21 +188,50 @@ static struct vcd_work_item_s* grab_item(void) return cell; } -static void unlock_item(void) +static void end_batch(void) { - unsigned use_fill = work_queue_fill + 1; + pthread_mutex_lock(&work_queue_mutex); + + unsigned use_fill = work_queue_fill; + bool was_empty_flag = (use_fill==0) && (current_batch_cnt > 0); + + use_fill += current_batch_cnt; work_queue_fill = use_fill; - if (use_fill == 1) + + current_batch_alloc = 0; + current_batch_cnt = 0; + + if (was_empty_flag) pthread_cond_signal(&work_queue_notempty_sig); pthread_mutex_unlock(&work_queue_mutex); } +static inline void unlock_item(bool flush_batch =false) +{ + current_batch_cnt += 1; + if (current_batch_cnt == current_batch_alloc || flush_batch) + end_batch(); +} + +void vcd_work_sync(void) +{ + if (current_batch_alloc > 0) + end_batch(); + + if (work_queue_fill > 0) { + pthread_mutex_lock(&work_queue_mutex); + while (work_queue_fill > 0) + pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex); + pthread_mutex_unlock(&work_queue_mutex); + } +} + void vcd_work_flush(void) { struct vcd_work_item_s*cell = grab_item(); cell->type = WT_FLUSH; - unlock_item(); + unlock_item(true); } void vcd_work_dumpon(void) @@ -220,13 +268,8 @@ void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char* val) struct vcd_work_item_s*cell = grab_item(); cell->type = WT_EMIT_BITS; cell->sym_.lxt2 = sym; -#ifdef VAL_CHAR_ARRAY_SIZE - size_t need_len = strlen(val) + 1; - assert(need_len <= VAL_CHAR_ARRAY_SIZE); - memcpy(cell->op_.val_char, val, need_len); -#else cell->op_.val_char = strdup(val); -#endif + unlock_item(); } @@ -234,6 +277,6 @@ void vcd_work_terminate(void) { struct vcd_work_item_s*cell = grab_item(); cell->type = WT_TERMINATE; - unlock_item(); + unlock_item(true); pthread_join(work_thread, 0); }