From 06270cdd2b7ab0f6be2be5d8d4252d68a96ca4e4 Mon Sep 17 00:00:00 2001 From: Stephen Williams Date: Fri, 8 Jan 2010 20:20:26 -0800 Subject: [PATCH 1/3] Basic work queue thread for lxt2 output. --- configure.in | 1 + vpi/sys_lxt2.c | 87 +++++++++++++++++-------- vpi/vcd_priv.h | 74 +++++++++++++++++++++ vpi/vcd_priv2.cc | 164 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 299 insertions(+), 27 deletions(-) diff --git a/configure.in b/configure.in index 4497f033c..58022f816 100644 --- a/configure.in +++ b/configure.in @@ -74,6 +74,7 @@ AC_CHECK_HEADERS(readline/readline.h readline/history.h sys/resource.h) case "${host}" in *linux*) AC_DEFINE([LINUX], [1], [Host operating system is Linux.]) ;; esac # vpi uses these +AC_CHECK_LIB(pthread, pthread_create) AC_CHECK_LIB(z, gzwrite) AC_CHECK_LIB(z, gzwrite, HAVE_LIBZ=yes, HAVE_LIBZ=no) AC_SUBST(HAVE_LIBZ) diff --git a/vpi/sys_lxt2.c b/vpi/sys_lxt2.c index ebc9792cb..599a81d9e 100644 --- a/vpi/sys_lxt2.c +++ b/vpi/sys_lxt2.c @@ -32,17 +32,19 @@ # include # include # include -# include # include #ifdef HAVE_MALLOC_H # include #endif # include "stringheap.h" +# include static char *dump_path = NULL; static struct lxt2_wr_trace *dump_file = NULL; +static void* lxt2_thread(void*arg); + struct vcd_info { vpiHandle item; vpiHandle cb; @@ -172,15 +174,12 @@ static void show_this_item(struct vcd_info*info) if (vpi_get(vpiType, info->item) == vpiRealVar) { value.format = vpiRealVal; vpi_get_value(info->item, &value); - lxt2_wr_emit_value_double(dump_file, info->sym, 0, - value.value.real); + vcd_work_emit_double(info->sym, value.value.real); } else { value.format = vpiBinStrVal; vpi_get_value(info->item, &value); - lxt2_wr_emit_value_bit_string(dump_file, info->sym, - 0 /* array row */, - value.value.str); + vcd_work_emit_bits(info->sym, value.value.str); } } @@ -190,18 +189,11 @@ static void show_this_item_x(struct vcd_info*info) if (vpi_get(vpiType,info->item) == vpiRealVar) { /* Should write a NaN here? */ } else { - lxt2_wr_emit_value_bit_string(dump_file, info->sym, 0, "x"); + vcd_work_emit_bits(info->sym, "x"); } } -/* - * managed qsorted list of scope names for duplicates bsearching - */ - -struct vcd_names_list_s lxt_tab; - - static int dumpvars_status = 0; /* 0:fresh 1:cb installed, 2:callback done */ static PLI_UINT64 dumpvars_time; __inline__ static int dump_header_pending(void) @@ -235,7 +227,7 @@ static PLI_INT32 variable_cb_2(p_cb_data cause) PLI_UINT64 now = timerec_to_time64(cause->time); if (now != vcd_cur_time) { - lxt2_wr_set_time64(dump_file, now); + vcd_work_set_time(now); vcd_cur_time = now; } @@ -290,7 +282,7 @@ static PLI_INT32 dumpvars_cb(p_cb_data cause) vcd_cur_time = dumpvars_time; if (!dump_is_off) { - lxt2_wr_set_time64(dump_file, dumpvars_time); + vcd_work_set_time(dumpvars_time); vcd_checkpoint(); } @@ -307,16 +299,17 @@ static PLI_INT32 finish_cb(p_cb_data cause) dumpvars_time = timerec_to_time64(cause->time); if (!dump_is_off && !dump_is_full && dumpvars_time != vcd_cur_time) { - lxt2_wr_set_time64(dump_file, dumpvars_time); + vcd_work_set_time(dumpvars_time); } + vcd_work_terminate(); for (cur = vcd_list ; cur ; cur = next) { next = cur->next; free(cur); } vcd_list = 0; - vcd_names_delete(&lxt_tab); + vcd_scope_names_delete(); nexus_ident_delete(); free(dump_path); dump_path = 0; @@ -373,11 +366,11 @@ static PLI_INT32 sys_dumpoff_calltf(PLI_BYTE8*name) now64 = timerec_to_time64(&now); if (now64 > vcd_cur_time) { - lxt2_wr_set_time(dump_file, now64); + vcd_work_set_time(now64); vcd_cur_time = now64; } - lxt2_wr_set_dumpoff(dump_file); + vcd_work_dumpoff(); vcd_checkpoint_x(); return 0; @@ -400,11 +393,11 @@ static PLI_INT32 sys_dumpon_calltf(PLI_BYTE8*name) now64 = timerec_to_time64(&now); if (now64 > vcd_cur_time) { - lxt2_wr_set_time64(dump_file, now64); + vcd_work_set_time(now64); vcd_cur_time = now64; } - lxt2_wr_set_dumpon(dump_file); + vcd_work_dumpon(); vcd_checkpoint(); return 0; @@ -424,7 +417,7 @@ static PLI_INT32 sys_dumpall_calltf(PLI_BYTE8*name) now64 = timerec_to_time64(&now); if (now64 > vcd_cur_time) { - lxt2_wr_set_time64(dump_file, now64); + vcd_work_set_time(now64); vcd_cur_time = now64; } @@ -435,6 +428,7 @@ static PLI_INT32 sys_dumpall_calltf(PLI_BYTE8*name) static void *close_dumpfile(void) { + vcd_work_terminate(); lxt2_wr_close(dump_file); dump_file = NULL; return NULL; @@ -480,6 +474,7 @@ static void open_dumpfile(vpiHandle callh) lxt2_wr_set_partial_on(dump_file, 1); lxt2_wr_set_break_size(dump_file, use_file_size_limit); + vcd_work_start(lxt2_thread, 0); atexit((void(*)(void))close_dumpfile); } } @@ -527,7 +522,7 @@ static PLI_INT32 sys_dumpfile_calltf(PLI_BYTE8*name) */ static PLI_INT32 sys_dumpflush_calltf(PLI_BYTE8*name) { - if (dump_file) lxt2_wr_flush(dump_file); + if (dump_file) vcd_work_flush(); return 0; } @@ -697,10 +692,10 @@ static void scan_item(unsigned depth, vpiHandle item, int skip) vpi_printf("LXT2 info: scanning scope %s, %u levels\n", fullname, depth); #endif - nskip = 0 != vcd_names_search(&lxt_tab, fullname); + nskip = vcd_scope_names_test(fullname); if (!nskip) - vcd_names_add(&lxt_tab, fullname); + vcd_scope_names_add(fullname); else vpi_printf("LXT2 warning: ignoring signals in " "previously scanned scope %s\n", fullname); @@ -784,7 +779,6 @@ static PLI_INT32 sys_dumpvars_calltf(PLI_BYTE8*name) int dep = draw_scope(item); - vcd_names_sort(&lxt_tab); scan_item(depth, item, 0); while (dep--) pop_scope(); @@ -799,6 +793,45 @@ static PLI_INT32 sys_dumpvars_calltf(PLI_BYTE8*name) return 0; } +static void* lxt2_thread(void*arg) +{ + int run_flag = 1; + while (run_flag) { + struct vcd_work_item_s*cell = vcd_work_thread_peek(); + + switch (cell->type) { + case WT_NONE: + break; + case WT_FLUSH: + lxt2_wr_flush(dump_file); + break; + case WT_DUMPON: + lxt2_wr_set_dumpon(dump_file); + break; + case WT_DUMPOFF: + lxt2_wr_set_dumpoff(dump_file); + break; + case WT_SET_TIME: + lxt2_wr_set_time64(dump_file, cell->op_.val_u64); + break; + case WT_EMIT_DOUBLE: + lxt2_wr_emit_value_double(dump_file, cell->sym_.lxt2, + 0, cell->op_.val_double); + case WT_EMIT_BITS: + lxt2_wr_emit_value_bit_string(dump_file, cell->sym_.lxt2, + 0, cell->op_.val_char); + free(cell->op_.val_char); + break; + case WT_TERMINATE: + run_flag = 0; + break; + } + + vcd_work_thread_pop(); + } + return 0; +} + void sys_lxt2_register() { int idx; diff --git a/vpi/vcd_priv.h b/vpi/vcd_priv.h index a263a571b..b5bbe98c1 100644 --- a/vpi/vcd_priv.h +++ b/vpi/vcd_priv.h @@ -47,11 +47,85 @@ EXTERN void vcd_names_sort(struct vcd_names_list_s*tab); EXTERN void vcd_names_delete(); +/* + * Keep a map of nexus ident's to help with alias detection. + */ EXTERN const char*find_nexus_ident(int nex); EXTERN void set_nexus_ident(int nex, const char *id); EXTERN void nexus_ident_delete(); +/* + * Keep a set of scope names to help with duplicate detection. + */ +EXTERN void vcd_scope_names_add(const char*name); +EXTERN int vcd_scope_names_test(const char*name); +EXTERN void vcd_scope_names_delete(void); + +/* + * Implement a work queue that can be used to send commands to a + * dumper thread. + */ + +typedef enum vcd_work_item_type_e { + WT_NONE, + WT_SET_TIME, + WT_EMIT_BITS, + WT_EMIT_DOUBLE, + WT_DUMPON, + WT_DUMPOFF, + WT_FLUSH, + WT_TERMINATE +} vcd_work_item_type_t; + +struct lxt2_wr_symbol; + +struct vcd_work_item_s { + vcd_work_item_type_t type; + union { + struct lxt2_wr_symbol*lxt2; + } sym_; + union { + double val_double; + char*val_char; + uint64_t val_u64; + } op_; +}; + +/* + * The thread_peek and thread_pop functions work as pairs. The work + * thread processing work items uses vcd_work_thread_peek to look at + * the first item in the work queue. The work thread can be assured + * that the work item it stable. When it is done with the work item, + * it calls vcd_work_thread_pop to cause it to be popped from the work + * queue. + */ +EXTERN struct vcd_work_item_s* vcd_work_thread_peek(void); +EXTERN void vcd_work_thread_pop(void); + +/* + * Create work threads with the vcd_work_start function, and terminate + * the work thread (gracefully) with the vcd_work_terminate + * function. Synchronize with the work thread with the vcd_work_sync + * function. This blocks until the work thread is done all the work it + * has so far. + */ +EXTERN void vcd_work_start( void* (*fun) (void*arg), void*arg); +EXTERN void vcd_work_terminate(void); + +EXTERN void vcd_work_sync(void); + +/* + * The remaining vcd_work_* functions send messages to the work thread + * causing it to perform various VCD-related tasks. + */ +EXTERN void vcd_work_flush(void); /* Drain output caches. */ +EXTERN void vcd_work_set_time(uint64_t val); +EXTERN void vcd_work_dumpon(void); +EXTERN void vcd_work_dumpoff(void); +EXTERN void vcd_work_emit_double(struct lxt2_wr_symbol*sym, double val); +EXTERN void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char*bits); + /* The compiletf routines are common for the VCD, LXT and LXT2 dumpers. */ EXTERN PLI_INT32 sys_dumpvars_compiletf(PLI_BYTE8 *name); diff --git a/vpi/vcd_priv2.cc b/vpi/vcd_priv2.cc index a811c710b..0670fe43b 100644 --- a/vpi/vcd_priv2.cc +++ b/vpi/vcd_priv2.cc @@ -19,6 +19,8 @@ # include "vcd_priv.h" # include +# include +# include /* Nexus Id cache @@ -55,3 +57,165 @@ extern "C" void nexus_ident_delete() { nexus_ident_map.clear(); } + + +static std::set vcd_scope_names_set; + +extern "C" void vcd_scope_names_add(const char*name) +{ + vcd_scope_names_set .insert(name); +} + +extern "C" int vcd_scope_names_test(const char*name) +{ + if (vcd_scope_names_set.find(name) == vcd_scope_names_set.end()) + return 0; + else + return 1; +} + +extern "C" void vcd_scope_names_delete(void) +{ + vcd_scope_names_set.clear(); +} + +static pthread_t work_thread; + +static const unsigned WORK_QUEUE_SIZE = 512*1024; +static struct vcd_work_item_s work_queue[WORK_QUEUE_SIZE]; +static volatile unsigned work_queue_next = 0; +static volatile unsigned work_queue_fill = 0; + +static pthread_mutex_t work_queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t work_queue_is_empty_sig = PTHREAD_COND_INITIALIZER; +static pthread_cond_t work_queue_notempty_sig = PTHREAD_COND_INITIALIZER; +static pthread_cond_t work_queue_notfull_sig = PTHREAD_COND_INITIALIZER; + +struct vcd_work_item_s* vcd_work_thread_peek(void) +{ + // There must always only be 1 vcd work thread, and only the + // work thread decreases the fill, so if the work_queue_fill + // is non-zero, I can reliably assume that there is at least + // one item that I can peek at. I only need to lock if I must + // wait for the work_queue_fill to become non-zero. + if (work_queue_fill == 0) { + pthread_mutex_lock(&work_queue_mutex); + while (work_queue_fill == 0) + pthread_cond_wait(&work_queue_notempty_sig, &work_queue_mutex); + pthread_mutex_unlock(&work_queue_mutex); + } + + return work_queue + work_queue_next; +} + +void vcd_work_thread_pop(void) +{ + pthread_mutex_lock(&work_queue_mutex); + + unsigned use_fill = work_queue_fill - 1; + work_queue_fill = use_fill; + + unsigned use_next = work_queue_next + 1; + if (use_next >= WORK_QUEUE_SIZE) + use_next = 0; + work_queue_next = use_next; + + if (use_fill == WORK_QUEUE_SIZE-1) + pthread_cond_signal(&work_queue_notfull_sig); + else if (use_fill == 0) + pthread_cond_signal(&work_queue_is_empty_sig); + + pthread_mutex_unlock(&work_queue_mutex); +} + +void vcd_work_start( void* (*fun) (void*), void*arg ) +{ + pthread_create(&work_thread, 0, fun, arg); +} + +void vcd_work_sync(void) +{ + if (work_queue_fill > 0) { + pthread_mutex_lock(&work_queue_mutex); + while (work_queue_fill > 0) + pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex); + pthread_mutex_unlock(&work_queue_mutex); + } +} + +static struct vcd_work_item_s* grab_item(void) +{ + pthread_mutex_lock(&work_queue_mutex); + while (work_queue_fill >= WORK_QUEUE_SIZE) + pthread_cond_wait(&work_queue_notfull_sig, &work_queue_mutex); + + unsigned cur = work_queue_next + work_queue_fill; + if (cur >= WORK_QUEUE_SIZE) + cur -= WORK_QUEUE_SIZE; + + return work_queue + cur; +} + +static void unlock_item(void) +{ + work_queue_fill += 1; + if (work_queue_fill == 1) + pthread_cond_signal(&work_queue_notempty_sig); + + pthread_mutex_unlock(&work_queue_mutex); +} + +void vcd_work_flush(void) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_FLUSH; + unlock_item(); +} + +void vcd_work_dumpon(void) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_DUMPON; + unlock_item(); +} + +void vcd_work_dumpoff(void) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_DUMPOFF; + unlock_item(); +} + +void vcd_work_set_time(uint64_t val) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_SET_TIME; + cell->op_.val_u64 = val; + unlock_item(); +} + +void vcd_work_emit_double(struct lxt2_wr_symbol*sym, double val) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_EMIT_DOUBLE; + cell->sym_.lxt2 = sym; + cell->op_.val_double = val; + unlock_item(); +} + +void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char* val) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_EMIT_BITS; + cell->sym_.lxt2 = sym; + cell->op_.val_char = strdup(val); + unlock_item(); +} + +void vcd_work_terminate(void) +{ + struct vcd_work_item_s*cell = grab_item(); + cell->type = WT_TERMINATE; + unlock_item(); + pthread_join(work_thread, 0); +} From 76ebde4cd25b97dab224ecb28ed0fb333cac407e Mon Sep 17 00:00:00 2001 From: Stephen Williams Date: Fri, 8 Jan 2010 21:46:32 -0800 Subject: [PATCH 2/3] Blend time stamp into other work items. The time change is usually a trivial operation, so instead carry a timestamp on all the work items and let the work thread decide on its own when to do a SET_TIME operation. This reduces some pthread overhead and thus gets us some better performance. --- vpi/sys_lxt2.c | 14 ++++++++++---- vpi/vcd_priv.h | 9 +++++++-- vpi/vcd_priv2.cc | 36 +++++++++++++++++++++++++++--------- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/vpi/sys_lxt2.c b/vpi/sys_lxt2.c index 599a81d9e..91fb76cb4 100644 --- a/vpi/sys_lxt2.c +++ b/vpi/sys_lxt2.c @@ -795,10 +795,18 @@ static PLI_INT32 sys_dumpvars_calltf(PLI_BYTE8*name) static void* lxt2_thread(void*arg) { + /* Keep track of the current time, and only call the set_time + function when the time changes. */ + uint64_t cur_time = 0; int run_flag = 1; while (run_flag) { struct vcd_work_item_s*cell = vcd_work_thread_peek(); + if (cell->time != cur_time) { + cur_time = cell->time; + lxt2_wr_set_time64(dump_file, cur_time); + } + switch (cell->type) { case WT_NONE: break; @@ -811,16 +819,13 @@ static void* lxt2_thread(void*arg) case WT_DUMPOFF: lxt2_wr_set_dumpoff(dump_file); break; - case WT_SET_TIME: - lxt2_wr_set_time64(dump_file, cell->op_.val_u64); - break; case WT_EMIT_DOUBLE: lxt2_wr_emit_value_double(dump_file, cell->sym_.lxt2, 0, cell->op_.val_double); + break; case WT_EMIT_BITS: lxt2_wr_emit_value_bit_string(dump_file, cell->sym_.lxt2, 0, cell->op_.val_char); - free(cell->op_.val_char); break; case WT_TERMINATE: run_flag = 0; @@ -829,6 +834,7 @@ static void* lxt2_thread(void*arg) vcd_work_thread_pop(); } + return 0; } diff --git a/vpi/vcd_priv.h b/vpi/vcd_priv.h index b5bbe98c1..70b445d62 100644 --- a/vpi/vcd_priv.h +++ b/vpi/vcd_priv.h @@ -69,7 +69,6 @@ EXTERN void vcd_scope_names_delete(void); typedef enum vcd_work_item_type_e { WT_NONE, - WT_SET_TIME, WT_EMIT_BITS, WT_EMIT_DOUBLE, WT_DUMPON, @@ -80,15 +79,21 @@ typedef enum vcd_work_item_type_e { struct lxt2_wr_symbol; +# define VAL_CHAR_ARRAY_SIZE 64 struct vcd_work_item_s { vcd_work_item_type_t type; + uint64_t time; union { struct lxt2_wr_symbol*lxt2; } sym_; + union { double val_double; +#ifdef VAL_CHAR_ARRAY_SIZE + char val_char[VAL_CHAR_ARRAY_SIZE]; +#else char*val_char; - uint64_t val_u64; +#endif } op_; }; diff --git a/vpi/vcd_priv2.cc b/vpi/vcd_priv2.cc index 0670fe43b..ae22a0d03 100644 --- a/vpi/vcd_priv2.cc +++ b/vpi/vcd_priv2.cc @@ -21,6 +21,7 @@ # include # include # include +# include /* Nexus Id cache @@ -81,7 +82,7 @@ extern "C" void vcd_scope_names_delete(void) static pthread_t work_thread; -static const unsigned WORK_QUEUE_SIZE = 512*1024; +static const unsigned WORK_QUEUE_SIZE = 128*1024; static struct vcd_work_item_s work_queue[WORK_QUEUE_SIZE]; static volatile unsigned work_queue_next = 0; static volatile unsigned work_queue_fill = 0; @@ -91,6 +92,8 @@ static pthread_cond_t work_queue_is_empty_sig = PTHREAD_COND_INITIALIZER; static pthread_cond_t work_queue_notempty_sig = PTHREAD_COND_INITIALIZER; static pthread_cond_t work_queue_notfull_sig = PTHREAD_COND_INITIALIZER; +static uint64_t work_queue_next_time = 0; + struct vcd_work_item_s* vcd_work_thread_peek(void) { // There must always only be 1 vcd work thread, and only the @@ -115,7 +118,14 @@ void vcd_work_thread_pop(void) unsigned use_fill = work_queue_fill - 1; work_queue_fill = use_fill; - unsigned use_next = work_queue_next + 1; + unsigned use_next = work_queue_next; +#ifndef VAL_CHAR_ARRAY_SIZE + struct vcd_work_item_s*cell = work_queue + use_next; + if (cell->type == WT_EMIT_BITS) { + free(cell->op_.val_char); + } +#endif + use_next += 1; if (use_next >= WORK_QUEUE_SIZE) use_next = 0; work_queue_next = use_next; @@ -153,13 +163,17 @@ static struct vcd_work_item_s* grab_item(void) if (cur >= WORK_QUEUE_SIZE) cur -= WORK_QUEUE_SIZE; - return work_queue + cur; + // Write the new timestamp into the work item. + struct vcd_work_item_s*cell = work_queue + cur; + cell->time = work_queue_next_time; + return cell; } static void unlock_item(void) { - work_queue_fill += 1; - if (work_queue_fill == 1) + unsigned use_fill = work_queue_fill + 1; + work_queue_fill = use_fill; + if (use_fill == 1) pthread_cond_signal(&work_queue_notempty_sig); pthread_mutex_unlock(&work_queue_mutex); @@ -188,10 +202,7 @@ void vcd_work_dumpoff(void) void vcd_work_set_time(uint64_t val) { - struct vcd_work_item_s*cell = grab_item(); - cell->type = WT_SET_TIME; - cell->op_.val_u64 = val; - unlock_item(); + work_queue_next_time = val; } void vcd_work_emit_double(struct lxt2_wr_symbol*sym, double val) @@ -205,10 +216,17 @@ void vcd_work_emit_double(struct lxt2_wr_symbol*sym, double val) void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char* val) { + struct vcd_work_item_s*cell = grab_item(); cell->type = WT_EMIT_BITS; cell->sym_.lxt2 = sym; +#ifdef VAL_CHAR_ARRAY_SIZE + size_t need_len = strlen(val) + 1; + assert(need_len <= VAL_CHAR_ARRAY_SIZE); + memcpy(cell->op_.val_char, val, need_len); +#else cell->op_.val_char = strdup(val); +#endif unlock_item(); } From 7fc6b02e9601dbd10a3d31e4e729dc9b31553e10 Mon Sep 17 00:00:00 2001 From: Stephen Williams Date: Sat, 9 Jan 2010 10:08:16 -0800 Subject: [PATCH 3/3] Batch vcd work item creation Rather then lock/unlock the work queue ring for every item, save tons of pthread lock manipulation by allocating to the producer in batches. Over the long run, this doesn't change the CPU balance or hold up either thread, but it eliminates almost 3/4 of the lock/unlock episodes. --- vpi/vcd_priv.h | 5 --- vpi/vcd_priv2.cc | 105 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 74 insertions(+), 36 deletions(-) diff --git a/vpi/vcd_priv.h b/vpi/vcd_priv.h index 70b445d62..672cf602c 100644 --- a/vpi/vcd_priv.h +++ b/vpi/vcd_priv.h @@ -79,7 +79,6 @@ typedef enum vcd_work_item_type_e { struct lxt2_wr_symbol; -# define VAL_CHAR_ARRAY_SIZE 64 struct vcd_work_item_s { vcd_work_item_type_t type; uint64_t time; @@ -89,11 +88,7 @@ struct vcd_work_item_s { union { double val_double; -#ifdef VAL_CHAR_ARRAY_SIZE - char val_char[VAL_CHAR_ARRAY_SIZE]; -#else char*val_char; -#endif } op_; }; diff --git a/vpi/vcd_priv2.cc b/vpi/vcd_priv2.cc index ae22a0d03..20213adbf 100644 --- a/vpi/vcd_priv2.cc +++ b/vpi/vcd_priv2.cc @@ -83,6 +83,9 @@ extern "C" void vcd_scope_names_delete(void) static pthread_t work_thread; static const unsigned WORK_QUEUE_SIZE = 128*1024; +static const unsigned WORK_QUEUE_BATCH_MIN = 4*1024; +static const unsigned WORK_QUEUE_BATCH_MAX = 32*1024; + static struct vcd_work_item_s work_queue[WORK_QUEUE_SIZE]; static volatile unsigned work_queue_next = 0; static volatile unsigned work_queue_fill = 0; @@ -90,9 +93,8 @@ static volatile unsigned work_queue_fill = 0; static pthread_mutex_t work_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t work_queue_is_empty_sig = PTHREAD_COND_INITIALIZER; static pthread_cond_t work_queue_notempty_sig = PTHREAD_COND_INITIALIZER; -static pthread_cond_t work_queue_notfull_sig = PTHREAD_COND_INITIALIZER; +static pthread_cond_t work_queue_minfree_sig = PTHREAD_COND_INITIALIZER; -static uint64_t work_queue_next_time = 0; struct vcd_work_item_s* vcd_work_thread_peek(void) { @@ -119,47 +121,64 @@ void vcd_work_thread_pop(void) work_queue_fill = use_fill; unsigned use_next = work_queue_next; -#ifndef VAL_CHAR_ARRAY_SIZE + struct vcd_work_item_s*cell = work_queue + use_next; if (cell->type == WT_EMIT_BITS) { free(cell->op_.val_char); } -#endif + use_next += 1; if (use_next >= WORK_QUEUE_SIZE) use_next = 0; work_queue_next = use_next; - if (use_fill == WORK_QUEUE_SIZE-1) - pthread_cond_signal(&work_queue_notfull_sig); + if (use_fill == WORK_QUEUE_SIZE-WORK_QUEUE_BATCH_MIN) + pthread_cond_signal(&work_queue_minfree_sig); else if (use_fill == 0) pthread_cond_signal(&work_queue_is_empty_sig); pthread_mutex_unlock(&work_queue_mutex); } +/* + * Work queue items are created in batches to reduce thread + * bouncing. When the producer gets a free work item, it actually + * locks the queue in order to produce a batch. The queue stays locked + * until the batch is complete. Then the releases the whole lot to the + * consumer. + */ +static uint64_t work_queue_next_time = 0; +static unsigned current_batch_cnt = 0; +static unsigned current_batch_alloc = 0; +static unsigned current_batch_base = 0; + void vcd_work_start( void* (*fun) (void*), void*arg ) { pthread_create(&work_thread, 0, fun, arg); } -void vcd_work_sync(void) -{ - if (work_queue_fill > 0) { - pthread_mutex_lock(&work_queue_mutex); - while (work_queue_fill > 0) - pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex); - pthread_mutex_unlock(&work_queue_mutex); - } -} - static struct vcd_work_item_s* grab_item(void) { - pthread_mutex_lock(&work_queue_mutex); - while (work_queue_fill >= WORK_QUEUE_SIZE) - pthread_cond_wait(&work_queue_notfull_sig, &work_queue_mutex); + if (current_batch_alloc == 0) { + pthread_mutex_lock(&work_queue_mutex); + while ((WORK_QUEUE_SIZE-work_queue_fill) < WORK_QUEUE_BATCH_MIN) + pthread_cond_wait(&work_queue_minfree_sig, &work_queue_mutex); - unsigned cur = work_queue_next + work_queue_fill; + current_batch_base = work_queue_next + work_queue_fill; + current_batch_alloc = WORK_QUEUE_SIZE - work_queue_fill; + + pthread_mutex_unlock(&work_queue_mutex); + + if (current_batch_base >= WORK_QUEUE_SIZE) + current_batch_base -= WORK_QUEUE_SIZE; + if (current_batch_alloc > WORK_QUEUE_BATCH_MAX) + current_batch_alloc = WORK_QUEUE_BATCH_MAX; + current_batch_cnt = 0; + } + + assert(current_batch_cnt < current_batch_alloc); + + unsigned cur = current_batch_base + current_batch_cnt; if (cur >= WORK_QUEUE_SIZE) cur -= WORK_QUEUE_SIZE; @@ -169,21 +188,50 @@ static struct vcd_work_item_s* grab_item(void) return cell; } -static void unlock_item(void) +static void end_batch(void) { - unsigned use_fill = work_queue_fill + 1; + pthread_mutex_lock(&work_queue_mutex); + + unsigned use_fill = work_queue_fill; + bool was_empty_flag = (use_fill==0) && (current_batch_cnt > 0); + + use_fill += current_batch_cnt; work_queue_fill = use_fill; - if (use_fill == 1) + + current_batch_alloc = 0; + current_batch_cnt = 0; + + if (was_empty_flag) pthread_cond_signal(&work_queue_notempty_sig); pthread_mutex_unlock(&work_queue_mutex); } +static inline void unlock_item(bool flush_batch =false) +{ + current_batch_cnt += 1; + if (current_batch_cnt == current_batch_alloc || flush_batch) + end_batch(); +} + +void vcd_work_sync(void) +{ + if (current_batch_alloc > 0) + end_batch(); + + if (work_queue_fill > 0) { + pthread_mutex_lock(&work_queue_mutex); + while (work_queue_fill > 0) + pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex); + pthread_mutex_unlock(&work_queue_mutex); + } +} + void vcd_work_flush(void) { struct vcd_work_item_s*cell = grab_item(); cell->type = WT_FLUSH; - unlock_item(); + unlock_item(true); } void vcd_work_dumpon(void) @@ -220,13 +268,8 @@ void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char* val) struct vcd_work_item_s*cell = grab_item(); cell->type = WT_EMIT_BITS; cell->sym_.lxt2 = sym; -#ifdef VAL_CHAR_ARRAY_SIZE - size_t need_len = strlen(val) + 1; - assert(need_len <= VAL_CHAR_ARRAY_SIZE); - memcpy(cell->op_.val_char, val, need_len); -#else cell->op_.val_char = strdup(val); -#endif + unlock_item(); } @@ -234,6 +277,6 @@ void vcd_work_terminate(void) { struct vcd_work_item_s*cell = grab_item(); cell->type = WT_TERMINATE; - unlock_item(); + unlock_item(true); pthread_join(work_thread, 0); }