From ebddd854dc5bb49092c1b82a94d0a0dc71e36dd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arttu=20Yl=C3=A4-Outinen?= Date: Mon, 24 Jul 2017 12:15:17 +0300 Subject: [PATCH 1/3] Fix clobbered warnings in tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds volatile to loop variables in kvazaar tests in order to fix "might be clobbered by ‘longjmp’ or ‘vfork’" warnings when building with -O3. --- tests/coeff_sum_tests.c | 2 +- tests/intra_sad_tests.c | 2 +- tests/sad_tests.c | 2 +- tests/satd_tests.c | 2 +- tests/speed_tests.c | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/coeff_sum_tests.c b/tests/coeff_sum_tests.c index e1530057..7b33e1cb 100644 --- a/tests/coeff_sum_tests.c +++ b/tests/coeff_sum_tests.c @@ -52,7 +52,7 @@ SUITE(coeff_sum_tests) { setup(); - for (int i = 0; i < strategies.count; ++i) { + for (volatile int i = 0; i < strategies.count; ++i) { if (strcmp(strategies.strategies[i].type, "coeff_abs_sum") != 0) { continue; } diff --git a/tests/intra_sad_tests.c b/tests/intra_sad_tests.c index 1497beb4..d0416cd7 100644 --- a/tests/intra_sad_tests.c +++ b/tests/intra_sad_tests.c @@ -177,7 +177,7 @@ SUITE(intra_sad_tests) // Loop through all strategies picking out the intra sad ones and run // selectec strategies though all tests. - for (unsigned i = 0; i < strategies.count; ++i) { + for (volatile unsigned i = 0; i < strategies.count; ++i) { const char * type = strategies.strategies[i].type; if (strcmp(type, "sad_4x4") == 0) { diff --git a/tests/sad_tests.c b/tests/sad_tests.c index 2053ecfe..031ece47 100644 --- a/tests/sad_tests.c +++ b/tests/sad_tests.c @@ -378,7 +378,7 @@ SUITE(sad_tests) sad_test_env.tested_func = strategies.strategies[i].fptr; sad_test_env.strategy = &strategies.strategies[i]; int num_dim_tests = sizeof(tested_dims) / sizeof(tested_dims[0]); - for (int dim_test = 0; dim_test < num_dim_tests; ++dim_test) { + for (volatile int dim_test = 0; dim_test < num_dim_tests; ++dim_test) { sad_test_env.width = tested_dims[dim_test].width; sad_test_env.height = tested_dims[dim_test].height; RUN_TEST(test_reg_sad); diff --git a/tests/satd_tests.c b/tests/satd_tests.c index efdcda08..5f0cf93b 100644 --- a/tests/satd_tests.c +++ b/tests/satd_tests.c @@ -167,7 +167,7 @@ SUITE(satd_tests) // Loop through all strategies picking out the intra sad ones and run // selectec strategies though all tests. - for (unsigned i = 0; i < strategies.count; ++i) { + for (volatile unsigned i = 0; i < strategies.count; ++i) { const char * type = strategies.strategies[i].type; if (strcmp(type, "satd_4x4") == 0) { diff --git a/tests/speed_tests.c b/tests/speed_tests.c index f2cd0742..2a0d245c 100644 --- a/tests/speed_tests.c +++ b/tests/speed_tests.c @@ -405,7 +405,7 @@ SUITE(speed_tests) int num_tested_dims = sizeof(tested_dims) / sizeof(*tested_dims); // Call reg_sad with all the sizes it is actually called with. - for (int dim_i = 0; dim_i < num_tested_dims; ++dim_i) { + for (volatile int dim_i = 0; dim_i < num_tested_dims; ++dim_i) { test_env.width = tested_dims[dim_i].x; test_env.height = tested_dims[dim_i].y; RUN_TEST(inter_sad); From bc47fe94afcd6fc357f337d161a319f5cb98d51f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arttu=20Yl=C3=A4-Outinen?= Date: Thu, 20 Jul 2017 15:18:30 +0300 Subject: [PATCH 2/3] Drop thread queue debug code --- src/encoder_state-bitstream.c | 12 +-- src/encoderstate.c | 82 ++++------------ src/search.c | 11 --- src/threadqueue.c | 173 +++++++--------------------------- src/threadqueue.h | 62 +----------- 5 files changed, 54 insertions(+), 286 deletions(-) diff --git a/src/encoder_state-bitstream.c b/src/encoder_state-bitstream.c index bbb9407f..67bdbcc8 100644 --- a/src/encoder_state-bitstream.c +++ b/src/encoder_state-bitstream.c @@ -1018,19 +1018,13 @@ static void encoder_state_write_bitstream_main(encoder_state_t * const state) kvz_bitstream_add_rbsp_trailing_bits(stream); } - { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); - encoder_state_write_bitstream_children(state); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, encoder->threadqueue, "type=write_bitstream_append,frame=%d,encoder_type=%c", state->frame->num, state->type); - } - + encoder_state_write_bitstream_children(state); + if (state->encoder_control->cfg.hash != KVZ_HASH_NONE) { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); // Calculate checksum add_checksum(state); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, encoder->threadqueue, "type=write_bitstream_checksum,frame=%d,encoder_type=%c", state->frame->num, state->type); } - + //Get bitstream length for stats uint64_t newpos = kvz_bitstream_tell(stream); state->stats_bitstream_length = (newpos >> 3) - (curpos >> 3); diff --git a/src/encoderstate.c b/src/encoderstate.c index d9c42e94..9750d861 100644 --- a/src/encoderstate.c +++ b/src/encoderstate.c @@ -725,16 +725,7 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) // frame is encoded. Deblocking and SAO search is done during LCU encoding. for (int i = 0; i < state->lcu_order_count; ++i) { - PERFORMANCE_MEASURE_START(KVZ_PERF_LCU); - encoder_state_worker_encode_lcu(&state->lcu_order[i]); - -#ifdef KVZ_DEBUG - { - const lcu_order_element_t * const lcu = &state->lcu_order[i]; - PERFORMANCE_MEASURE_END(KVZ_PERF_LCU, ctrl->threadqueue, "type=encode_lcu,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d", state->frame->num, state->tile->id, state->slice->id, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH + lcu->size.x - 1, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH + lcu->size.y - 1); - } -#endif //KVZ_DEBUG } } else { // Add each LCU in the wavefront row as it's own job to the queue. @@ -769,14 +760,8 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) for (int i = 0; i < state->lcu_order_count; ++i) { const lcu_order_element_t * const lcu = &state->lcu_order[i]; -#ifdef KVZ_DEBUG - char job_description[256]; - sprintf(job_description, "type=encode_lcu,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d", state->frame->num, state->tile->id, state->slice->id, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH + lcu->size.x - 1, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH + lcu->size.y - 1); -#else - char* job_description = NULL; -#endif kvz_threadqueue_free_job(&state->tile->wf_jobs[lcu->id]); - state->tile->wf_jobs[lcu->id] = kvz_threadqueue_submit(ctrl->threadqueue, encoder_state_worker_encode_lcu, (void*)lcu, 1, job_description); + state->tile->wf_jobs[lcu->id] = kvz_threadqueue_submit(ctrl->threadqueue, encoder_state_worker_encode_lcu, (void*)lcu, 1); threadqueue_job_t **job = &state->tile->wf_jobs[lcu->id]; // If job object was returned, add dependancies and allow it to run. @@ -911,26 +896,8 @@ static void encoder_state_encode(encoder_state_t * const main_state) { for (int i = 0; main_state->children[i].encoder_control; ++i) { //If we don't have wavefronts, parallelize encoding of children. if (main_state->children[i].type != ENCODER_STATE_TYPE_WAVEFRONT_ROW) { -#ifdef KVZ_DEBUG - char job_description[256]; - switch (main_state->children[i].type) { - case ENCODER_STATE_TYPE_TILE: - sprintf(job_description, "type=encode_child,frame=%d,tile=%d,row=%d-%d,px_x=%d-%d,px_y=%d-%d", main_state->children[i].frame->num, main_state->children[i].tile->id, main_state->children[i].lcu_order[0].position.y + main_state->children[i].tile->lcu_offset_y, main_state->children[i].lcu_order[0].position.y + main_state->children[i].tile->lcu_offset_y, - main_state->children[i].lcu_order[0].position_px.x + main_state->children[i].tile->lcu_offset_x * LCU_WIDTH, main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].position_px.x + main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].size.x + main_state->children[i].tile->lcu_offset_x * LCU_WIDTH - 1, - main_state->children[i].lcu_order[0].position_px.y + main_state->children[i].tile->lcu_offset_y * LCU_WIDTH, main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].position_px.y + main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].size.y + main_state->children[i].tile->lcu_offset_y * LCU_WIDTH - 1); - break; - case ENCODER_STATE_TYPE_SLICE: - sprintf(job_description, "type=encode_child,frame=%d,slice=%d,start_in_ts=%d", main_state->children[i].frame->num, main_state->children[i].slice->id, main_state->children[i].slice->start_in_ts); - break; - default: - sprintf(job_description, "type=encode_child,frame=%d,invalid", main_state->children[i].frame->num); - break; - } -#else - char* job_description = NULL; -#endif kvz_threadqueue_free_job(&main_state->children[i].tqj_recon_done); - main_state->children[i].tqj_recon_done = kvz_threadqueue_submit(main_state->encoder_control->threadqueue, encoder_state_worker_encode_children, &(main_state->children[i]), 1, job_description); + main_state->children[i].tqj_recon_done = kvz_threadqueue_submit(main_state->encoder_control->threadqueue, encoder_state_worker_encode_children, &(main_state->children[i]), 1); if (main_state->children[i].previous_encoder_state != &main_state->children[i] && main_state->children[i].previous_encoder_state->tqj_recon_done && !main_state->children[i].frame->is_idr_frame) { #if 0 // Disabled due to non-determinism. @@ -1256,39 +1223,24 @@ static void _encode_one_frame_add_bitstream_deps(const encoder_state_t * const s void kvz_encode_one_frame(encoder_state_t * const state, kvz_picture* frame) { - { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); - encoder_state_init_new_frame(state, frame); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, state->encoder_control->threadqueue, "type=init_new_frame,frame=%d,poc=%d", state->frame->num, state->frame->poc); - } - { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); - encoder_state_encode(state); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, state->encoder_control->threadqueue, "type=encode,frame=%d", state->frame->num); - } - //kvz_threadqueue_flush(main_state->encoder_control->threadqueue); - { - threadqueue_job_t *job; -#ifdef KVZ_DEBUG - char job_description[256]; - sprintf(job_description, "type=write_bitstream,frame=%d", state->frame->num); -#else - char* job_description = NULL; -#endif + encoder_state_init_new_frame(state, frame); + encoder_state_encode(state); - job = kvz_threadqueue_submit(state->encoder_control->threadqueue, kvz_encoder_state_worker_write_bitstream, (void*) state, 1, job_description); - - _encode_one_frame_add_bitstream_deps(state, job); - if (state->previous_encoder_state != state && state->previous_encoder_state->tqj_bitstream_written) { - //We need to depend on previous bitstream generation - kvz_threadqueue_job_dep_add(job, state->previous_encoder_state->tqj_bitstream_written); - } - kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, job); - assert(!state->tqj_bitstream_written); - state->tqj_bitstream_written = job; + threadqueue_job_t *job = + kvz_threadqueue_submit(state->encoder_control->threadqueue, + kvz_encoder_state_worker_write_bitstream, + (void*) state, + 1); + + _encode_one_frame_add_bitstream_deps(state, job); + if (state->previous_encoder_state != state && state->previous_encoder_state->tqj_bitstream_written) { + //We need to depend on previous bitstream generation + kvz_threadqueue_job_dep_add(job, state->previous_encoder_state->tqj_bitstream_written); } + kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, job); + assert(!state->tqj_bitstream_written); + state->tqj_bitstream_written = job; state->frame->done = 0; - //kvz_threadqueue_flush(main_state->encoder_control->threadqueue); } diff --git a/src/search.c b/src/search.c index 0f2ae83c..d081015e 100644 --- a/src/search.c +++ b/src/search.c @@ -399,10 +399,6 @@ static double search_cu(encoder_state_t * const state, int x, int y, int depth, int x_local = SUB_SCU(x); int y_local = SUB_SCU(y); -#ifdef KVZ_DEBUG - int debug_split = 0; -#endif - PERFORMANCE_MEASURE_START(KVZ_PERF_SEARCHCU); // Stop recursion if the CU is completely outside the frame. if (x >= frame->width || y >= frame->height) { @@ -713,13 +709,6 @@ static double search_cu(encoder_state_t * const state, int x, int y, int depth, work_tree_copy_down(x_local, y_local, depth, work_tree); } - PERFORMANCE_MEASURE_END(KVZ_PERF_SEARCHCU, state->encoder_control->threadqueue, "type=search_cu,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d,depth=%d,split=%d,cur_cu_is_intra=%d", state->frame->num, state->tile->id, state->slice->id, - state->tile->offset_x + x, - state->tile->offset_x + x + cu_width, - state->tile->offset_y + y, - state->tile->offset_y + y + cu_width, - depth, debug_split, (cur_cu->type == CU_INTRA) ? 1 : 0); - assert(cur_cu->type != CU_NOTSET); return cost; diff --git a/src/threadqueue.c b/src/threadqueue.c index 881c4a5a..990d696c 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -37,40 +37,40 @@ typedef struct { #define THREADQUEUE_LIST_REALLOC_SIZE 32 -//#define PTHREAD_COND_SIGNAL(c) fprintf(stderr, "%s:%d pthread_cond_signal(%s=%p)\n", __FUNCTION__, __LINE__, #c, c); if (pthread_cond_signal((c)) != 0) { fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); assert(0); return 0; } -//#define PTHREAD_COND_BROADCAST(c) fprintf(stderr, "%s:%d pthread_cond_broadcast(%s=%p)\n", __FUNCTION__, __LINE__, #c, c); if (pthread_cond_broadcast((c)) != 0) { fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); assert(0); return 0; } -//#define PTHREAD_COND_WAIT(c,l) fprintf(stderr, "%s:%d pthread_cond_wait(%s=%p, %s=%p)\n", __FUNCTION__, __LINE__, #c, c, #l, l); if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_cond_wait(%s=%p, %s=%p) (done)\n", __FUNCTION__, __LINE__, #c, c, #l, l);} -//#define PTHREAD_LOCK(l) fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p) (try)\n", __FUNCTION__, __LINE__, #l, l); if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);} -//#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_unlock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);} +#define PTHREAD_COND_SIGNAL(c) \ + if (pthread_cond_signal((c)) != 0) { \ + fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); \ + assert(0); \ + return 0; \ + } +#define PTHREAD_COND_BROADCAST(c) \ + if (pthread_cond_broadcast((c)) != 0) { \ + fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); \ + assert(0); \ + return 0; \ + } -#define PTHREAD_COND_SIGNAL(c) if (pthread_cond_signal((c)) != 0) { fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); assert(0); return 0; } -#define PTHREAD_COND_BROADCAST(c) if (pthread_cond_broadcast((c)) != 0) { fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); assert(0); return 0; } +#define PTHREAD_COND_WAIT(c,l) \ + if (pthread_cond_wait((c),(l)) != 0) { \ + fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); \ + assert(0); \ + return 0; \ + } -#ifndef _PTHREAD_DUMP -#define PTHREAD_COND_WAIT(c,l) if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0; } -#define PTHREAD_LOCK(l) if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); assert(0); return 0; } -#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); assert(0); return 0; } +#define PTHREAD_LOCK(l) \ + if (pthread_mutex_lock((l)) != 0) { \ + fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); \ + assert(0); \ + return 0; \ + } -#else //PTHREAD_DUMP -#define PTHREAD_LOCK(l) do { \ - PERFORMANCE_MEASURE_START(); \ - if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); assert(0); return 0; } \ - PERFORMANCE_MEASURE_END(NULL, "pthread_mutex_lock(%s=%p)@%s:%d",#l,l,__FUNCTION__, __LINE__); \ -} while (0); - -#define PTHREAD_UNLOCK(l) do { \ - PERFORMANCE_MEASURE_START(); \ - if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); assert(0); return 0; } \ - PERFORMANCE_MEASURE_END(NULL, "pthread_mutex_unlock(%s=%p)@%s:%d",#l,l,__FUNCTION__, __LINE__); \ -} while (0); - -#define PTHREAD_COND_WAIT(c,l) do { \ - PERFORMANCE_MEASURE_START(); \ - if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0;} \ - PERFORMANCE_MEASURE_END(NULL, "pthread_cond_wait(%s=%p, %s=%p)@%s:%d",#c, c, #l, l,__FUNCTION__, __LINE__); \ -} while (0); -#endif //PTHREAD_DUMP +#define PTHREAD_UNLOCK(l) \ + if (pthread_mutex_unlock((l)) != 0) { \ + fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); \ + assert(0); \ + return 0; \ + } static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) { @@ -78,10 +78,6 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) threadqueue_queue_t * const threadqueue = threadqueue_worker_spec->threadqueue; threadqueue_job_t * next_job = NULL; -#ifdef KVZ_DEBUG - KVZ_GET_TIME(&threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]); -#endif //KVZ_DEBUG - for(;;) { threadqueue_job_t * job = NULL; @@ -153,18 +149,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) PTHREAD_UNLOCK(&threadqueue->lock); -#ifdef KVZ_DEBUG - job->debug_worker_id = threadqueue_worker_spec->worker_id; - KVZ_GET_TIME(&job->debug_clock_start); -#endif //KVZ_DEBUG - job->fptr(job->arg); -#ifdef KVZ_DEBUG - job->debug_worker_id = threadqueue_worker_spec->worker_id; - KVZ_GET_TIME(&job->debug_clock_stop); -#endif //KVZ_DEBUG - // This lock is necessary because the main thread may try to add // reverse dependencies to the job while running. PTHREAD_LOCK(&job->lock); @@ -234,13 +220,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) // We got out of the loop because threadqueue->stop is true. The queue is locked. assert(threadqueue->stop); --threadqueue->threads_running; - -#ifdef KVZ_DEBUG - KVZ_GET_TIME(&threadqueue->debug_clock_thread_end[threadqueue_worker_spec->worker_id]); - - fprintf(threadqueue->debug_log, "\t%d\t-\t%lf\t+%lf\t-\tthread\n", threadqueue_worker_spec->worker_id, KVZ_CLOCK_T_AS_DOUBLE(threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]), KVZ_CLOCK_T_DIFF(threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id], threadqueue->debug_clock_thread_end[threadqueue_worker_spec->worker_id])); -#endif //KVZ_DEBUG - + PTHREAD_UNLOCK(&threadqueue->lock); free(threadqueue_worker_spec_opaque); @@ -280,14 +260,7 @@ int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_cou fprintf(stderr, "Could not malloc threadqueue->threads!\n"); return 0; } -#ifdef KVZ_DEBUG - threadqueue->debug_clock_thread_start = MALLOC(KVZ_CLOCK_T, thread_count); - assert(threadqueue->debug_clock_thread_start); - threadqueue->debug_clock_thread_end = MALLOC(KVZ_CLOCK_T, thread_count); - assert(threadqueue->debug_clock_thread_end); - threadqueue->debug_log = fopen("threadqueue.log", "w"); -#endif //KVZ_DEBUG - + threadqueue->queue = NULL; threadqueue->queue_size = 0; threadqueue->queue_count = 0; @@ -358,10 +331,6 @@ void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr) assert(new_refcount >= 0); -#ifdef KVZ_DEBUG - FREE_POINTER(job->debug_description); -#endif - FREE_POINTER(job->rdepends); pthread_mutex_destroy(&job->lock); FREE_POINTER(job); @@ -408,12 +377,6 @@ int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) threadqueue->queue_start = 0; PTHREAD_UNLOCK(&threadqueue->lock); -#ifdef KVZ_DEBUG - FREE_POINTER(threadqueue->debug_clock_thread_start); - FREE_POINTER(threadqueue->debug_clock_thread_end); - fclose(threadqueue->debug_log); -#endif - //Free allocated stuff FREE_POINTER(threadqueue->queue); threadqueue->queue_count = 0; @@ -475,14 +438,12 @@ int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue return 1; } -threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* const debug_description) { +threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, void (*fptr)(void *arg), void *arg, int wait) { threadqueue_job_t *job; //No lock here... this should be constant if (threadqueue->threads_count == 0) { //FIXME: This should be improved in order to handle dependencies - PERFORMANCE_MEASURE_START(KVZ_PERF_JOB); fptr(arg); - PERFORMANCE_MEASURE_END(KVZ_PERF_JOB, threadqueue, "%s", debug_description); return NULL; } @@ -491,28 +452,6 @@ threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadque job = MALLOC(threadqueue_job_t, 1); job->refcount = 1; -#ifdef KVZ_DEBUG - if (debug_description) { - size_t desc_len = MIN(255, strlen(debug_description)); - char* desc; - - //Copy description - desc = MALLOC(char, desc_len + 1); - assert(desc); - memcpy(desc, debug_description, desc_len); - desc[desc_len] = 0; - - job->debug_description = desc; - } else { - char* desc; - desc = MALLOC(char, 255); - sprintf(desc, "(*%p)(%p)", fptr, arg); - - job->debug_description = desc; - } - KVZ_GET_TIME(&job->debug_clock_enqueue); -#endif //KVZ_DEBUG - if (!job) { fprintf(stderr, "Could not alloc job!\n"); assert(0); @@ -620,49 +559,3 @@ int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * const threadqueue, thre return 1; } - -#ifdef KVZ_DEBUG -int threadqueue_log(threadqueue_queue_t * threadqueue, const KVZ_CLOCK_T *start, const KVZ_CLOCK_T *stop, const char* debug_description) { - int i, thread_id = -1; - FILE* output; - - assert(start); - - if (threadqueue) { - //We need to lock to output safely - PTHREAD_LOCK(&threadqueue->lock); - - output = threadqueue->debug_log; - - //Find the thread - for(i = 0; i < threadqueue->threads_count; i++) { - if(pthread_equal(threadqueue->threads[i], pthread_self()) != 0) { - thread_id = i; - break; - } - } - } else { - thread_id = -1; - output = stderr; - } - - if (thread_id >= 0) { - if (stop) { - fprintf(output, "\t%d\t-\t%lf\t+%lf\t-\t%s\n", thread_id, KVZ_CLOCK_T_AS_DOUBLE(*start), KVZ_CLOCK_T_DIFF(*start, *stop), debug_description); - } else { - fprintf(output, "\t%d\t-\t%lf\t-\t-\t%s\n", thread_id, KVZ_CLOCK_T_AS_DOUBLE(*start), debug_description); - } - } else { - if (stop) { - fprintf(output, "\t\t-\t%lf\t+%lf\t-\t%s\n", KVZ_CLOCK_T_AS_DOUBLE(*start), KVZ_CLOCK_T_DIFF(*start, *stop), debug_description); - } else { - fprintf(output, "\t\t-\t%lf\t-\t-\t%s\n", KVZ_CLOCK_T_AS_DOUBLE(*start), debug_description); - } - } - - if (threadqueue) { - PTHREAD_UNLOCK(&threadqueue->lock); - } - return 1; -} -#endif //KVZ_DEBUG diff --git a/src/threadqueue.h b/src/threadqueue.h index b7527075..090bc602 100644 --- a/src/threadqueue.h +++ b/src/threadqueue.h @@ -53,17 +53,6 @@ typedef struct threadqueue_job_t { //Job function and state to use void (*fptr)(void *arg); void *arg; - -#ifdef KVZ_DEBUG - const char* debug_description; - - int debug_worker_id; - - KVZ_CLOCK_T debug_clock_enqueue; - KVZ_CLOCK_T debug_clock_start; - KVZ_CLOCK_T debug_clock_stop; - KVZ_CLOCK_T debug_clock_dequeue; -#endif } threadqueue_job_t; @@ -89,26 +78,14 @@ typedef struct { unsigned int queue_waiting_execution; //Number of jobs without any dependency which could be run unsigned int queue_waiting_dependency; //Number of jobs waiting for a dependency to complete unsigned int queue_running; //Number of jobs running - -#ifdef KVZ_DEBUG - //Format: pointer worker id time enqueued time started time stopped time dequeued job description - //For threads, pointer = "" and job description == "thread", time enqueued and time dequeued are equal to "-" - //For flush, pointer = "" and job description == "FLUSH", time enqueued, time dequeued and time started are equal to "-" - //Each time field, except the first one in the line be expressed in a relative way, by prepending the number of seconds by +. - //Dependencies: pointer -> pointer - FILE *debug_log; - - KVZ_CLOCK_T *debug_clock_thread_start; - KVZ_CLOCK_T *debug_clock_thread_end; -#endif } threadqueue_queue_t; //Init a threadqueue (if fifo, then behave as a FIFO with dependencies, otherwise as a LIFO with dependencies) int kvz_threadqueue_init(threadqueue_queue_t * threadqueue, int thread_count, int fifo); //Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run kvz_threadqueue_job_unwait_job in order to have it run -threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* debug_description); +threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, void (*fptr)(void *arg), void *arg, int wait); void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr); @@ -132,43 +109,6 @@ int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t //Free ressources in a threadqueue int kvz_threadqueue_finalize(threadqueue_queue_t * threadqueue); -#ifdef KVZ_DEBUG -int threadqueue_log(threadqueue_queue_t * threadqueue, const KVZ_CLOCK_T *start, const KVZ_CLOCK_T *stop, const char* debug_description); - -// Bitmasks for PERFORMANCE_MEASURE_START and PERFORMANCE_MEASURE_END. -#define KVZ_PERF_FRAME (1 << 0) -#define KVZ_PERF_JOB (1 << 1) -#define KVZ_PERF_LCU (1 << 2) -#define KVZ_PERF_SAOREC (1 << 3) -#define KVZ_PERF_BSLEAF (1 << 4) -#define KVZ_PERF_SEARCHCU (1 << 5) - -#define IMPL_PERFORMANCE_MEASURE_START(mask) KVZ_CLOCK_T start, stop; if ((KVZ_DEBUG) & mask) { KVZ_GET_TIME(&start); } -#define IMPL_PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) { if ((KVZ_DEBUG) & mask) { KVZ_GET_TIME(&stop); {char job_description[256]; sprintf(job_description, (str), __VA_ARGS__); threadqueue_log((threadqueue), &start, &stop, job_description);}} } \ - -#ifdef _MSC_VER -// Disable VS conditional expression warning from debug code. -# define WITHOUT_CONSTANT_EXP_WARNING(macro) \ - __pragma(warning(push)) \ - __pragma(warning(disable:4127)) \ - macro \ - __pragma(warning(pop)) -# define PERFORMANCE_MEASURE_START(mask) \ - WITHOUT_CONSTANT_EXP_WARNING(IMPL_PERFORMANCE_MEASURE_START(mask)) -# define PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) \ - WITHOUT_CONSTANT_EXP_WARNING(IMPL_PERFORMANCE_MEASURE_END(mask, threadqueue, str, ##__VA_ARGS__)) -#else -# define PERFORMANCE_MEASURE_START(mask) \ - IMPL_PERFORMANCE_MEASURE_START(mask) -# define PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) \ - IMPL_PERFORMANCE_MEASURE_END(mask, threadqueue, str, ##__VA_ARGS__) -#endif - -#else -#define PERFORMANCE_MEASURE_START(mask) -#define PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) -#endif - /* Constraints: * * - Always first lock threadqueue, than a job inside it From 7144a00beb511778e981258a82b7339e91569059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arttu=20Yl=C3=A4-Outinen?= Date: Thu, 20 Jul 2017 15:23:18 +0300 Subject: [PATCH 3/3] Rewrite thread queue Changes thread queue so that only the jobs that are ready to run are stored in the queue. Other jobs are kept track of by pointers in the reverse dependency lists of other jobs. When a job is ready to run it is appended to the queue. The job queue is stored as a linked list. The definitions of threadqueue_queue_t and threadqueue_job_t are moved to the .c file, turning them into opaque structs. Makes thread queue code simpler. Fixes some TSan errors. --- src/encoder.c | 13 +- src/encoderstate.c | 22 +- src/threadqueue.c | 811 ++++++++++++++++++++++++--------------------- src/threadqueue.h | 89 +---- 4 files changed, 466 insertions(+), 469 deletions(-) diff --git a/src/encoder.c b/src/encoder.c index cad51b98..a1f87126 100644 --- a/src/encoder.c +++ b/src/encoder.c @@ -286,11 +286,8 @@ encoder_control_t* kvz_encoder_control_init(const kvz_config *const cfg) } } - encoder->threadqueue = MALLOC(threadqueue_queue_t, 1); - if (!encoder->threadqueue || - !kvz_threadqueue_init(encoder->threadqueue, - encoder->cfg.threads, - encoder->cfg.owf > 0)) { + encoder->threadqueue = kvz_threadqueue_init(encoder->cfg.threads); + if (!encoder->threadqueue) { fprintf(stderr, "Could not initialize threadqueue.\n"); goto init_failed; } @@ -653,10 +650,8 @@ void kvz_encoder_control_free(encoder_control_t *const encoder) kvz_scalinglist_destroy(&encoder->scaling_list); - if (encoder->threadqueue) { - kvz_threadqueue_finalize(encoder->threadqueue); - } - FREE_POINTER(encoder->threadqueue); + kvz_threadqueue_free(encoder->threadqueue); + encoder->threadqueue = NULL; free(encoder); } diff --git a/src/encoderstate.c b/src/encoderstate.c index 9750d861..7fcee3ee 100644 --- a/src/encoderstate.c +++ b/src/encoderstate.c @@ -761,7 +761,7 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) const lcu_order_element_t * const lcu = &state->lcu_order[i]; kvz_threadqueue_free_job(&state->tile->wf_jobs[lcu->id]); - state->tile->wf_jobs[lcu->id] = kvz_threadqueue_submit(ctrl->threadqueue, encoder_state_worker_encode_lcu, (void*)lcu, 1); + state->tile->wf_jobs[lcu->id] = kvz_threadqueue_job_create(encoder_state_worker_encode_lcu, (void*)lcu); threadqueue_job_t **job = &state->tile->wf_jobs[lcu->id]; // If job object was returned, add dependancies and allow it to run. @@ -799,7 +799,7 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) } } - kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, state->tile->wf_jobs[lcu->id]); + kvz_threadqueue_submit(state->encoder_control->threadqueue, state->tile->wf_jobs[lcu->id]); // The wavefront row is done when the last LCU in the row is done. if (i + 1 == state->lcu_order_count) { @@ -897,8 +897,12 @@ static void encoder_state_encode(encoder_state_t * const main_state) { //If we don't have wavefronts, parallelize encoding of children. if (main_state->children[i].type != ENCODER_STATE_TYPE_WAVEFRONT_ROW) { kvz_threadqueue_free_job(&main_state->children[i].tqj_recon_done); - main_state->children[i].tqj_recon_done = kvz_threadqueue_submit(main_state->encoder_control->threadqueue, encoder_state_worker_encode_children, &(main_state->children[i]), 1); - if (main_state->children[i].previous_encoder_state != &main_state->children[i] && main_state->children[i].previous_encoder_state->tqj_recon_done && !main_state->children[i].frame->is_idr_frame) { + main_state->children[i].tqj_recon_done = + kvz_threadqueue_job_create(encoder_state_worker_encode_children, &main_state->children[i]); + if (main_state->children[i].previous_encoder_state != &main_state->children[i] && + main_state->children[i].previous_encoder_state->tqj_recon_done && + !main_state->children[i].frame->is_idr_frame) + { #if 0 // Disabled due to non-determinism. if (main_state->encoder_control->cfg->mv_constraint == KVZ_MV_CONSTRAIN_FRAME_AND_TILE_MARGIN) @@ -914,7 +918,7 @@ static void encoder_state_encode(encoder_state_t * const main_state) { } } } - kvz_threadqueue_job_unwait_job(main_state->encoder_control->threadqueue, main_state->children[i].tqj_recon_done); + kvz_threadqueue_submit(main_state->encoder_control->threadqueue, main_state->children[i].tqj_recon_done); } else { //Wavefront rows have parallelism at LCU level, so we should not launch multiple threads here! //FIXME: add an assert: we can only have wavefront children @@ -1227,19 +1231,17 @@ void kvz_encode_one_frame(encoder_state_t * const state, kvz_picture* frame) encoder_state_encode(state); threadqueue_job_t *job = - kvz_threadqueue_submit(state->encoder_control->threadqueue, - kvz_encoder_state_worker_write_bitstream, - (void*) state, - 1); + kvz_threadqueue_job_create(kvz_encoder_state_worker_write_bitstream, state); _encode_one_frame_add_bitstream_deps(state, job); if (state->previous_encoder_state != state && state->previous_encoder_state->tqj_bitstream_written) { //We need to depend on previous bitstream generation kvz_threadqueue_job_dep_add(job, state->previous_encoder_state->tqj_bitstream_written); } - kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, job); + kvz_threadqueue_submit(state->encoder_control->threadqueue, job); assert(!state->tqj_bitstream_written); state->tqj_bitstream_written = job; + state->frame->done = 0; } diff --git a/src/threadqueue.c b/src/threadqueue.c index 990d696c..3b0a69cd 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -30,10 +30,20 @@ #include "threads.h" -typedef struct { - threadqueue_queue_t * threadqueue; - int worker_id; -} threadqueue_worker_spec; +/** + * \file + * + * Lock acquisition order: + * + * 1. When locking a job and its dependency, the dependecy must be locked + * first and then the job depending on it. + * + * 2. When locking a job and the thread queue, the thread queue must be + * locked first and then the job. + * + * 3. When accessing threadqueue_job_t.next, the thread queue must be + * locked. + */ #define THREADQUEUE_LIST_REALLOC_SIZE 32 @@ -72,229 +82,417 @@ typedef struct { return 0; \ } -static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) + +typedef enum { + /** + * \brief Job has been submitted, but is not allowed to run yet. + */ + THREADQUEUE_JOB_STATE_PAUSED, + + /** + * \brief Job is waiting for dependencies. + */ + THREADQUEUE_JOB_STATE_WAITING, + + /** + * \brief Job is ready to run. + */ + THREADQUEUE_JOB_STATE_READY, + + /** + * \brief Job is running. + */ + THREADQUEUE_JOB_STATE_RUNNING, + + /** + * \brief Job is completed. + */ + THREADQUEUE_JOB_STATE_DONE, + +} threadqueue_job_state; + + +struct threadqueue_job_t { + pthread_mutex_t lock; + + threadqueue_job_state state; + + /** + * \brief Number of dependencies that have not been completed yet. + */ + int ndepends; + + /** + * \brief Reverse dependencies. + * + * Array of pointers to jobs that depend on this one. They have to exist + * when the thread finishes, because they cannot be run before. + */ + struct threadqueue_job_t **rdepends; + + /** + * \brief Number of elements in rdepends. + */ + int rdepends_count; + + /** + * \brief Allocated size of rdepends. + */ + int rdepends_size; + + /** + * \brief Reference count + */ + int refcount; + + /** + * \brief Pointer to the function to execute. + */ + void (*fptr)(void *arg); + + /** + * \brief Argument for fptr. + */ + void *arg; + + /** + * \brief Pointer to the next job in the queue. + */ + struct threadqueue_job_t *next; + +}; + + +struct threadqueue_queue_t { + pthread_mutex_t lock; + + /** + * \brief Job available condition variable + * + * Signalled when there is a new job to do. + */ + pthread_cond_t job_available; + + /** + * \brief Job done condition variable + * + * Signalled when a job has been completed. + */ + pthread_cond_t job_done; + + /** + * Array containing spawned threads + */ + pthread_t *threads; + + /** + * \brief Number of threads spawned + */ + int thread_count; + + /** + * \brief Number of threads running + */ + int thread_running_count; + + /** + * \brief If true, threads should stop ASAP. + */ + bool stop; + + /** + * \brief Pointer to the first ready job + */ + threadqueue_job_t *first; + + /** + * \brief Pointer to the last ready job + */ + threadqueue_job_t *last; +}; + + +/** + * \brief Add a job to the queue of jobs ready to run. + * + * The caller must have locked the thread queue and the job. This function + * takes the ownership of the job. + */ +static void threadqueue_push_job(threadqueue_queue_t * threadqueue, + threadqueue_job_t *job) { - threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque; - threadqueue_queue_t * const threadqueue = threadqueue_worker_spec->threadqueue; - threadqueue_job_t * next_job = NULL; + assert(job->ndepends == 0); + job->state = THREADQUEUE_JOB_STATE_READY; - for(;;) { - threadqueue_job_t * job = NULL; + if (threadqueue->first == NULL) { + threadqueue->first = job; + } else { + threadqueue->last->next = job; + } - PTHREAD_LOCK(&threadqueue->lock); + threadqueue->last = job; + job->next = NULL; +} - while(!threadqueue->stop && threadqueue->queue_waiting_execution == 0 && !next_job) { + +/** + * \brief Retrieve a job from the queue of jobs ready to run. + * + * The caller must have locked the thread queue. The calling function + * receives the ownership of the job. + */ +static threadqueue_job_t * threadqueue_pop_job(threadqueue_queue_t * threadqueue) +{ + assert(threadqueue->first != NULL); + + threadqueue_job_t *job = threadqueue->first; + threadqueue->first = job->next; + job->next = NULL; + + if (threadqueue->first == NULL) { + threadqueue->last = NULL; + } + + return job; +} + + +/** + * \brief Function executed by worker threads. + */ +static void* threadqueue_worker(void* threadqueue_opaque) +{ + threadqueue_queue_t * const threadqueue = (threadqueue_queue_t *) threadqueue_opaque; + + PTHREAD_LOCK(&threadqueue->lock); + + for (;;) { + while (!threadqueue->stop && threadqueue->first == NULL) { // Wait until there is something to do in the queue. - PTHREAD_COND_WAIT(&threadqueue->cond, &threadqueue->lock); + PTHREAD_COND_WAIT(&threadqueue->job_available, &threadqueue->lock); } - if(threadqueue->stop) { - if (next_job) { - // Put a job we had already reserved back into the queue. - PTHREAD_LOCK(&next_job->lock); - next_job->state = THREADQUEUE_JOB_STATE_QUEUED; - PTHREAD_UNLOCK(&next_job->lock); - } + if (threadqueue->stop) { break; } - //Find a task (should be fast enough) - job = NULL; - if (next_job) { - assert(next_job->ndepends == 0); - job = next_job; - } else { - //FIXME: if not using OWF, the first is better than the second, otherwise we should use the second order - //for (i = threadqueue->queue_count - 1; i >= threadqueue->queue_start; --i) { - //for (i = threadqueue->queue_start; i < threadqueue->queue_count; ++i) { + // Get a job and remove it from the queue. + threadqueue_job_t *job = threadqueue_pop_job(threadqueue); - for (int i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1); - (threadqueue->fifo ? i < threadqueue->queue_count : i >= threadqueue->queue_start); - (threadqueue->fifo ? ++i : --i)) { - threadqueue_job_t * const i_job = threadqueue->queue[i]; + PTHREAD_LOCK(&job->lock); + assert(job->state == THREADQUEUE_JOB_STATE_READY); + job->state = THREADQUEUE_JOB_STATE_RUNNING; + PTHREAD_UNLOCK(&job->lock); + PTHREAD_UNLOCK(&threadqueue->lock); - if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { - // Once we found the job with no dependancies, lock it and change - // its state to running, so nobody else can claim it. - PTHREAD_LOCK(&i_job->lock); - if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { - job = i_job; - job->state = THREADQUEUE_JOB_STATE_RUNNING; - } - PTHREAD_UNLOCK(&i_job->lock); - if (job) break; - } + job->fptr(job->arg); + + PTHREAD_LOCK(&threadqueue->lock); + PTHREAD_LOCK(&job->lock); + assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); + job->state = THREADQUEUE_JOB_STATE_DONE; + + PTHREAD_COND_SIGNAL(&threadqueue->job_done); + + // Go through all the jobs that depend on this one, decreasing their + // ndepends. Count how many jobs can now start executing so we know how + // many threads to wake up. + int num_new_jobs = 0; + for (int i = 0; i < job->rdepends_count; ++i) { + threadqueue_job_t * const depjob = job->rdepends[i]; + // The dependency (job) is locked before the job depending on it. + // This must be the same order as in kvz_threadqueue_job_dep_add. + PTHREAD_LOCK(&depjob->lock); + + assert(depjob->state == THREADQUEUE_JOB_STATE_WAITING || + depjob->state == THREADQUEUE_JOB_STATE_PAUSED); + assert(depjob->ndepends > 0); + depjob->ndepends--; + + if (depjob->ndepends == 0 && depjob->state == THREADQUEUE_JOB_STATE_WAITING) { + // Move the job to ready jobs. + threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(depjob)); + num_new_jobs++; } + + // Clear this reference to the job. + PTHREAD_UNLOCK(&depjob->lock); + kvz_threadqueue_free_job(&job->rdepends[i]); } + job->rdepends_count = 0; - if (!job) { - // We have no job. Probably because more threads were woken up than - // there were jobs to do. - PTHREAD_UNLOCK(&threadqueue->lock); - } else { - // We have a job with ndepends==0 and its state is running. - assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); + PTHREAD_UNLOCK(&job->lock); + kvz_threadqueue_free_job(&job); - // Advance queue_start to skip all the running jobs. - while (threadqueue->queue_start < threadqueue->queue_count && - threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED) - { - threadqueue->queue_start++; - } - - if (!next_job) { - --threadqueue->queue_waiting_execution; - ++threadqueue->queue_running; - } - - PTHREAD_UNLOCK(&threadqueue->lock); - - job->fptr(job->arg); - - // This lock is necessary because the main thread may try to add - // reverse dependencies to the job while running. - PTHREAD_LOCK(&job->lock); - assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); - - job->state = THREADQUEUE_JOB_STATE_DONE; - - next_job = NULL; - - int queue_waiting_dependency_decr = 0; - int queue_waiting_execution_incr = 0; - - // Go throught all the jobs that depend on this one, decresing their ndepends. - for (int i = 0; i < job->rdepends_count; ++i) { - threadqueue_job_t * const depjob = job->rdepends[i]; - // The dependency (job) is locked before the job depending on it. - // This must be the same order as in kvz_threadqueue_job_dep_add. - PTHREAD_LOCK(&depjob->lock); - - assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED); - assert(depjob->ndepends > 0); - --depjob->ndepends; - - // Count how many jobs can now start executing so we know how many - // threads to wake up. - if (depjob->ndepends == 0) { - if (!next_job) { - // Avoid having to find a new job for this worker through the - // queue by taking one of the jobs that depended on current job. - next_job = depjob; - depjob->state = THREADQUEUE_JOB_STATE_RUNNING; - } else { - ++queue_waiting_execution_incr; - } - ++queue_waiting_dependency_decr; - } - - PTHREAD_UNLOCK(&depjob->lock); - } - - PTHREAD_UNLOCK(&job->lock); - - PTHREAD_LOCK(&threadqueue->lock); - - assert(threadqueue->queue_waiting_dependency >= queue_waiting_dependency_decr); - - // This thread will - if (!next_job) { - // We didn't find a new job, so this thread will have to go wait. - threadqueue->queue_running--; - } - threadqueue->queue_waiting_dependency -= queue_waiting_dependency_decr; - threadqueue->queue_waiting_execution += queue_waiting_execution_incr; - - // Wake up enough threads to take care of the tasks now lacking dependancies. - for (int i = 0; i < queue_waiting_execution_incr; ++i) { - PTHREAD_COND_SIGNAL(&threadqueue->cond); - } - - // Signal main thread that a job has been completed. - pthread_cond_signal(&threadqueue->cb_cond); - - PTHREAD_UNLOCK(&threadqueue->lock); + // The current thread will process one of the new jobs so we wake up + // one threads less than the the number of new jobs. + for (int i = 0; i < num_new_jobs - 1; i++) { + pthread_cond_signal(&threadqueue->job_available); } } - // We got out of the loop because threadqueue->stop is true. The queue is locked. - assert(threadqueue->stop); - --threadqueue->threads_running; - + threadqueue->thread_running_count--; PTHREAD_UNLOCK(&threadqueue->lock); - - free(threadqueue_worker_spec_opaque); - - pthread_exit(NULL); - return NULL; } -int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_count, int fifo) { - int i; +/** + * \brief Initialize the queue. + * + * \return 1 on success, 0 on failure + */ +threadqueue_queue_t * kvz_threadqueue_init(int thread_count) +{ + threadqueue_queue_t *threadqueue = MALLOC(threadqueue_queue_t, 1); + if (!threadqueue) { + goto failed; + } + if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) { fprintf(stderr, "pthread_mutex_init failed!\n"); - assert(0); - return 0; + goto failed; } - if (pthread_cond_init(&threadqueue->cond, NULL) != 0) { + + if (pthread_cond_init(&threadqueue->job_available, NULL) != 0) { fprintf(stderr, "pthread_cond_init failed!\n"); - assert(0); - return 0; + goto failed; } - - if (pthread_cond_init(&threadqueue->cb_cond, NULL) != 0) { + + if (pthread_cond_init(&threadqueue->job_done, NULL) != 0) { fprintf(stderr, "pthread_cond_init failed!\n"); - assert(0); - return 0; + goto failed; } - - threadqueue->stop = false; - threadqueue->fifo = !!fifo; - threadqueue->threads_running = 0; - threadqueue->threads_count = thread_count; - + threadqueue->threads = MALLOC(pthread_t, thread_count); if (!threadqueue->threads) { fprintf(stderr, "Could not malloc threadqueue->threads!\n"); - return 0; + goto failed; + } + threadqueue->thread_count = 0; + threadqueue->thread_running_count = 0; + + threadqueue->stop = false; + + threadqueue->first = NULL; + threadqueue->last = NULL; + + // Lock the queue before creating threads, to ensure they all have correct information. + PTHREAD_LOCK(&threadqueue->lock); + for (int i = 0; i < thread_count; i++) { + if (pthread_create(&threadqueue->threads[i], NULL, threadqueue_worker, threadqueue) != 0) { + fprintf(stderr, "pthread_create failed!\n"); + goto failed; + } + threadqueue->thread_count++; + threadqueue->thread_running_count++; + } + PTHREAD_UNLOCK(&threadqueue->lock); + + return threadqueue; + +failed: + kvz_threadqueue_free(threadqueue); + return NULL; +} + + +/** + * \brief Create a job and return a pointer to it. + * + * The job is created in a paused state. Function kvz_threadqueue_submit + * must be called on the job in order to have it run. + * + * \return pointer to the job, or NULL on failure + */ +threadqueue_job_t * kvz_threadqueue_job_create(void (*fptr)(void *arg), void *arg) +{ + threadqueue_job_t *job = MALLOC(threadqueue_job_t, 1); + if (!job) { + fprintf(stderr, "Could not alloc job!\n"); + return NULL; } - threadqueue->queue = NULL; - threadqueue->queue_size = 0; - threadqueue->queue_count = 0; - threadqueue->queue_start = 0; - threadqueue->queue_waiting_execution = 0; - threadqueue->queue_waiting_dependency = 0; - threadqueue->queue_running = 0; - - //Lock the queue before creating threads, to ensure they all have correct information - PTHREAD_LOCK(&threadqueue->lock); - - for(i = 0; i < thread_count; i++) { - threadqueue_worker_spec *tqws = MALLOC(threadqueue_worker_spec, 1); - if (tqws) { - tqws->threadqueue = threadqueue; - tqws->worker_id = i; - if(pthread_create(&(threadqueue->threads[i]), NULL, threadqueue_worker, (void*)tqws) != 0) { - fprintf(stderr, "pthread_create failed!\n"); - assert(0); - return 0; - } - threadqueue->threads_running++; - } else { - fprintf(stderr, "Could not allocate threadqueue_worker_spec structure!\n"); - PTHREAD_UNLOCK(&threadqueue->lock); - return 0; - } + if (pthread_mutex_init(&job->lock, NULL) != 0) { + fprintf(stderr, "pthread_mutex_init(job) failed!\n"); + return NULL; } - + + job->state = THREADQUEUE_JOB_STATE_PAUSED; + job->ndepends = 0; + job->rdepends = NULL; + job->rdepends_count = 0; + job->rdepends_size = 0; + job->refcount = 1; + job->fptr = fptr; + job->arg = arg; + + return job; +} + + +int kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job) +{ + PTHREAD_LOCK(&threadqueue->lock); + PTHREAD_LOCK(&job->lock); + assert(job->state == THREADQUEUE_JOB_STATE_PAUSED); + + if (threadqueue->thread_count == 0) { + // When not using threads, run the job immediately. + job->fptr(job->arg); + job->state = THREADQUEUE_JOB_STATE_DONE; + } else if (job->ndepends == 0) { + threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(job)); + pthread_cond_signal(&threadqueue->job_available); + } else { + job->state = THREADQUEUE_JOB_STATE_WAITING; + } + PTHREAD_UNLOCK(&job->lock); PTHREAD_UNLOCK(&threadqueue->lock); return 1; } + +/** + * \brief Add a dependency between two jobs. + * + * \param job job that should be executed after dependency + * \param dependency job that should be executed before job + * + * \return 1 on success, 0 on failure + * + */ +int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency) +{ + // Lock the dependency first and then the job depending on it. + // This must be the same order as in threadqueue_worker. + PTHREAD_LOCK(&dependency->lock); + + if (dependency->state == THREADQUEUE_JOB_STATE_DONE) { + // The dependency has been completed already so there is nothing to do. + PTHREAD_UNLOCK(&dependency->lock); + return 1; + } + + PTHREAD_LOCK(&job->lock); + job->ndepends++; + PTHREAD_UNLOCK(&job->lock); + + // Add the reverse dependency + if (dependency->rdepends_count >= dependency->rdepends_size) { + dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; + size_t bytes = dependency->rdepends_size * sizeof(threadqueue_job_t*); + dependency->rdepends = realloc(dependency->rdepends, bytes); + } + dependency->rdepends[dependency->rdepends_count++] = kvz_threadqueue_copy_ref(job); + + PTHREAD_UNLOCK(&dependency->lock); + + return 1; +} + + /** * \brief Get a new pointer to a job. * @@ -329,31 +527,61 @@ void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr) return; } - assert(new_refcount >= 0); + assert(new_refcount == 0); + + for (int i = 0; i < job->rdepends_count; i++) { + kvz_threadqueue_free_job(&job->rdepends[i]); + } + job->rdepends_count = 0; FREE_POINTER(job->rdepends); pthread_mutex_destroy(&job->lock); FREE_POINTER(job); } + +/** + * \brief Wait for a job to be completed. + * + * \return 1 on success, 0 on failure + */ +int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job) +{ + PTHREAD_LOCK(&job->lock); + while (job->state != THREADQUEUE_JOB_STATE_DONE) { + PTHREAD_COND_WAIT(&threadqueue->job_done, &job->lock); + } + PTHREAD_UNLOCK(&job->lock); + + return 1; +} + + +/** + * \brief Stop all threads after they finish the current jobs. + * + * Block until all threads have stopped. + * + * \return 1 on success, 0 on failure + */ int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue) { PTHREAD_LOCK(&threadqueue->lock); if (threadqueue->stop) { // The threadqueue should have stopped already. - assert(threadqueue->threads_running == 0); + assert(threadqueue->thread_running_count == 0); PTHREAD_UNLOCK(&threadqueue->lock); return 1; } // Tell all threads to stop. threadqueue->stop = true; - PTHREAD_COND_BROADCAST(&threadqueue->cond); + PTHREAD_COND_BROADCAST(&threadqueue->job_available); PTHREAD_UNLOCK(&threadqueue->lock); // Wait for them to stop. - for (int i = 0; i < threadqueue->threads_count; i++) { + for (int i = 0; i < threadqueue->thread_count; i++) { if (pthread_join(threadqueue->threads[i], NULL) != 0) { fprintf(stderr, "pthread_join failed!\n"); return 0; @@ -364,198 +592,39 @@ int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue) } -int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) +/** + * \brief Stop all threads and free allocated resources. + * + * \return 1 on success, 0 on failure + */ +void kvz_threadqueue_free(threadqueue_queue_t *threadqueue) { - if (!kvz_threadqueue_stop(threadqueue)) return 0; + if (threadqueue == NULL) return; - // Free all jobs - PTHREAD_LOCK(&threadqueue->lock); - for (int i = 0; i < threadqueue->queue_count; ++i) { - kvz_threadqueue_free_job(&threadqueue->queue[i]); + kvz_threadqueue_stop(threadqueue); + + // Free all jobs. + while (threadqueue->first) { + threadqueue_job_t *next = threadqueue->first->next; + kvz_threadqueue_free_job(&threadqueue->first); + threadqueue->first = next; } - threadqueue->queue_count = 0; - threadqueue->queue_start = 0; - PTHREAD_UNLOCK(&threadqueue->lock); + threadqueue->last = NULL; - //Free allocated stuff - FREE_POINTER(threadqueue->queue); - threadqueue->queue_count = 0; - threadqueue->queue_size = 0; - threadqueue->queue_start = 0; - FREE_POINTER(threadqueue->threads); - threadqueue->threads_count = 0; - + threadqueue->thread_count = 0; + if (pthread_mutex_destroy(&threadqueue->lock) != 0) { fprintf(stderr, "pthread_mutex_destroy failed!\n"); - assert(0); - return 0; } - if (pthread_cond_destroy(&threadqueue->cond) != 0) { + + if (pthread_cond_destroy(&threadqueue->job_available) != 0) { fprintf(stderr, "pthread_cond_destroy failed!\n"); - assert(0); - return 0; } - - if (pthread_cond_destroy(&threadqueue->cb_cond) != 0) { + + if (pthread_cond_destroy(&threadqueue->job_done) != 0) { fprintf(stderr, "pthread_cond_destroy failed!\n"); - assert(0); - return 0; } - - return 1; -} - -int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue_job_t * const job) -{ - // NULL job is clearly OK. - if (!job) return 1; - - PTHREAD_LOCK(&job->lock); - while (job->state != THREADQUEUE_JOB_STATE_DONE) { - PTHREAD_COND_WAIT(&threadqueue->cb_cond, &job->lock); - } - PTHREAD_UNLOCK(&job->lock); - - PTHREAD_LOCK(&threadqueue->lock); - - // Free jobs submitted before this job. - int i = 0; - for (; i < threadqueue->queue_count; ++i) { - if (threadqueue->queue[i] == job) break; - kvz_threadqueue_free_job(&threadqueue->queue[i]); - } - // Move remaining jobs to the beginning of the array. - if (i > 0) { - threadqueue->queue_count -= i; - threadqueue->queue_start = 0; - memmove(threadqueue->queue, &threadqueue->queue[i], threadqueue->queue_count * sizeof(*threadqueue->queue)); - FILL_ARRAY(&threadqueue->queue[threadqueue->queue_count], 0, i); - } - - PTHREAD_UNLOCK(&threadqueue->lock); - - return 1; -} - -threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, void (*fptr)(void *arg), void *arg, int wait) { - threadqueue_job_t *job; - //No lock here... this should be constant - if (threadqueue->threads_count == 0) { - //FIXME: This should be improved in order to handle dependencies - fptr(arg); - return NULL; - } - - assert(wait == 0 || wait == 1); - - job = MALLOC(threadqueue_job_t, 1); - job->refcount = 1; - - if (!job) { - fprintf(stderr, "Could not alloc job!\n"); - assert(0); - return NULL; - } - - job->fptr = fptr; - job->arg = arg; - if (pthread_mutex_init(&job->lock, NULL) != 0) { - fprintf(stderr, "pthread_mutex_init(job) failed!\n"); - assert(0); - return NULL; - } - job->ndepends = wait; - job->rdepends = NULL; - job->rdepends_count = 0; - job->rdepends_size = 0; - job->state = THREADQUEUE_JOB_STATE_QUEUED; - - PTHREAD_LOCK(&threadqueue->lock); - - //Add the reverse dependency - if (threadqueue->queue_count >= threadqueue->queue_size) { - threadqueue->queue = realloc(threadqueue->queue, sizeof(threadqueue_job_t *) * (threadqueue->queue_size + THREADQUEUE_LIST_REALLOC_SIZE)); - if (!threadqueue->queue) { - fprintf(stderr, "Could not realloc queue!\n"); - assert(0); - return NULL; - } - threadqueue->queue_size += THREADQUEUE_LIST_REALLOC_SIZE; - } - threadqueue->queue[threadqueue->queue_count++] = job; - job->refcount++; - - if (job->ndepends == 0) { - ++threadqueue->queue_waiting_execution; - //Hope a thread can do it... - PTHREAD_COND_SIGNAL(&(threadqueue->cond)); - } else { - ++threadqueue->queue_waiting_dependency; - } - - PTHREAD_UNLOCK(&threadqueue->lock); - - return job; -} - -int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency) { - //If we are not using threads, job are NULL pointers, so we can skip that - if (!job && !dependency) return 1; - - assert(job && dependency); - - // Lock first the dependency and then the job depending on it. - // This must be the same order as in threadqueue_worker. - PTHREAD_LOCK(&dependency->lock); - PTHREAD_LOCK(&job->lock); - - if (dependency->state != THREADQUEUE_JOB_STATE_DONE) { - job->ndepends++; - } - - PTHREAD_UNLOCK(&job->lock); - - //Add the reverse dependency (FIXME: this may be moved in the if above... but we would lose ability to track) - if (dependency->rdepends_count >= dependency->rdepends_size) { - size_t new_size = - sizeof(threadqueue_job_t*) * - (dependency->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE); - dependency->rdepends = realloc(dependency->rdepends, new_size); - if (!dependency->rdepends) { - fprintf(stderr, "Could not realloc rdepends!\n"); - assert(0); - return 0; - } - dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; - } - dependency->rdepends[dependency->rdepends_count++] = job; - - PTHREAD_UNLOCK(&dependency->lock); - - return 1; -} - -int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job) { - int ndepends = 0; - - //NULL job => no threads, nothing to do - if (!job) return 1; - PTHREAD_LOCK(&job->lock); - job->ndepends--; - ndepends = job->ndepends; - PTHREAD_UNLOCK(&job->lock); - - if (ndepends == 0) { - PTHREAD_LOCK(&threadqueue->lock); - assert(threadqueue->queue_waiting_dependency > 0); - --threadqueue->queue_waiting_dependency; - ++threadqueue->queue_waiting_execution; - //Hope a thread can do it... - PTHREAD_COND_SIGNAL(&(threadqueue->cond)); - - PTHREAD_UNLOCK(&threadqueue->lock); - } - - return 1; + + FREE_POINTER(threadqueue); } diff --git a/src/threadqueue.h b/src/threadqueue.h index 090bc602..1b178548 100644 --- a/src/threadqueue.h +++ b/src/threadqueue.h @@ -30,91 +30,22 @@ #include "global.h" // IWYU pragma: keep -typedef enum { - THREADQUEUE_JOB_STATE_QUEUED = 0, - THREADQUEUE_JOB_STATE_RUNNING = 1, - THREADQUEUE_JOB_STATE_DONE = 2 -} threadqueue_job_state; +typedef struct threadqueue_job_t threadqueue_job_t; +typedef struct threadqueue_queue_t threadqueue_queue_t; -typedef struct threadqueue_job_t { - pthread_mutex_t lock; - - threadqueue_job_state state; - - unsigned int ndepends; //Number of active dependencies that this job wait for - - struct threadqueue_job_t **rdepends; //array of pointer to jobs that depend on this one. They have to exist when the thread finishes, because they cannot be run before. - unsigned int rdepends_count; //number of rdepends - unsigned int rdepends_size; //allocated size of rdepends +threadqueue_queue_t * kvz_threadqueue_init(int thread_count); - // Reference count - int refcount; +threadqueue_job_t * kvz_threadqueue_job_create(void (*fptr)(void *arg), void *arg); +int kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, threadqueue_job_t *job); - //Job function and state to use - void (*fptr)(void *arg); - void *arg; -} threadqueue_job_t; - - - - -typedef struct { - pthread_mutex_t lock; - pthread_cond_t cond; - pthread_cond_t cb_cond; - - pthread_t *threads; - int threads_count; - int threads_running; - - bool stop; // if true, threads should stop asap - - int fifo; - - threadqueue_job_t **queue; - unsigned int queue_start; - unsigned int queue_count; - unsigned int queue_size; - unsigned int queue_waiting_execution; //Number of jobs without any dependency which could be run - unsigned int queue_waiting_dependency; //Number of jobs waiting for a dependency to complete - unsigned int queue_running; //Number of jobs running - -} threadqueue_queue_t; - -//Init a threadqueue (if fifo, then behave as a FIFO with dependencies, otherwise as a LIFO with dependencies) -int kvz_threadqueue_init(threadqueue_queue_t * threadqueue, int thread_count, int fifo); - -//Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run kvz_threadqueue_job_unwait_job in order to have it run -threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, void (*fptr)(void *arg), void *arg, int wait); - -void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr); +int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency); threadqueue_job_t *kvz_threadqueue_copy_ref(threadqueue_job_t *job); -int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * threadqueue, threadqueue_job_t *job); +void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr); -//Add a dependency between two jobs. -int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on); - -/** - * \brief Stop all threads after they finish the current jobs. - * - * Blocks until all threads have stopped. - */ -int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue); - -//Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue. int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job); +int kvz_threadqueue_stop(threadqueue_queue_t * threadqueue); +void kvz_threadqueue_free(threadqueue_queue_t * threadqueue); -//Free ressources in a threadqueue -int kvz_threadqueue_finalize(threadqueue_queue_t * threadqueue); - -/* Constraints: - * - * - Always first lock threadqueue, than a job inside it - * - When job A depends on job B, always lock first job B and then job A - * - Jobs should be submitted in an order which is compatible with serial execution. - * - * */ - -#endif //THREADQUEUE_H_ +#endif // THREADQUEUE_H_