diff --git a/src/encoder.c b/src/encoder.c index cad51b98..a1f87126 100644 --- a/src/encoder.c +++ b/src/encoder.c @@ -286,11 +286,8 @@ encoder_control_t* kvz_encoder_control_init(const kvz_config *const cfg) } } - encoder->threadqueue = MALLOC(threadqueue_queue_t, 1); - if (!encoder->threadqueue || - !kvz_threadqueue_init(encoder->threadqueue, - encoder->cfg.threads, - encoder->cfg.owf > 0)) { + encoder->threadqueue = kvz_threadqueue_init(encoder->cfg.threads); + if (!encoder->threadqueue) { fprintf(stderr, "Could not initialize threadqueue.\n"); goto init_failed; } @@ -653,10 +650,8 @@ void kvz_encoder_control_free(encoder_control_t *const encoder) kvz_scalinglist_destroy(&encoder->scaling_list); - if (encoder->threadqueue) { - kvz_threadqueue_finalize(encoder->threadqueue); - } - FREE_POINTER(encoder->threadqueue); + kvz_threadqueue_free(encoder->threadqueue); + encoder->threadqueue = NULL; free(encoder); } diff --git a/src/encoder_state-bitstream.c b/src/encoder_state-bitstream.c index bbb9407f..67bdbcc8 100644 --- a/src/encoder_state-bitstream.c +++ b/src/encoder_state-bitstream.c @@ -1018,19 +1018,13 @@ static void encoder_state_write_bitstream_main(encoder_state_t * const state) kvz_bitstream_add_rbsp_trailing_bits(stream); } - { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); - encoder_state_write_bitstream_children(state); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, encoder->threadqueue, "type=write_bitstream_append,frame=%d,encoder_type=%c", state->frame->num, state->type); - } - + encoder_state_write_bitstream_children(state); + if (state->encoder_control->cfg.hash != KVZ_HASH_NONE) { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); // Calculate checksum add_checksum(state); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, encoder->threadqueue, "type=write_bitstream_checksum,frame=%d,encoder_type=%c", state->frame->num, state->type); } - + //Get bitstream length for stats uint64_t newpos = kvz_bitstream_tell(stream); state->stats_bitstream_length = (newpos >> 3) - (curpos >> 3); diff --git a/src/encoderstate.c b/src/encoderstate.c index d9c42e94..7fcee3ee 100644 --- a/src/encoderstate.c +++ b/src/encoderstate.c @@ -725,16 +725,7 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) // frame is encoded. Deblocking and SAO search is done during LCU encoding. for (int i = 0; i < state->lcu_order_count; ++i) { - PERFORMANCE_MEASURE_START(KVZ_PERF_LCU); - encoder_state_worker_encode_lcu(&state->lcu_order[i]); - -#ifdef KVZ_DEBUG - { - const lcu_order_element_t * const lcu = &state->lcu_order[i]; - PERFORMANCE_MEASURE_END(KVZ_PERF_LCU, ctrl->threadqueue, "type=encode_lcu,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d", state->frame->num, state->tile->id, state->slice->id, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH + lcu->size.x - 1, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH + lcu->size.y - 1); - } -#endif //KVZ_DEBUG } } else { // Add each LCU in the wavefront row as it's own job to the queue. @@ -769,14 +760,8 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) for (int i = 0; i < state->lcu_order_count; ++i) { const lcu_order_element_t * const lcu = &state->lcu_order[i]; -#ifdef KVZ_DEBUG - char job_description[256]; - sprintf(job_description, "type=encode_lcu,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d", state->frame->num, state->tile->id, state->slice->id, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH, lcu->position_px.x + state->tile->lcu_offset_x * LCU_WIDTH + lcu->size.x - 1, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH, lcu->position_px.y + state->tile->lcu_offset_y * LCU_WIDTH + lcu->size.y - 1); -#else - char* job_description = NULL; -#endif kvz_threadqueue_free_job(&state->tile->wf_jobs[lcu->id]); - state->tile->wf_jobs[lcu->id] = kvz_threadqueue_submit(ctrl->threadqueue, encoder_state_worker_encode_lcu, (void*)lcu, 1, job_description); + state->tile->wf_jobs[lcu->id] = kvz_threadqueue_job_create(encoder_state_worker_encode_lcu, (void*)lcu); threadqueue_job_t **job = &state->tile->wf_jobs[lcu->id]; // If job object was returned, add dependancies and allow it to run. @@ -814,7 +799,7 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) } } - kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, state->tile->wf_jobs[lcu->id]); + kvz_threadqueue_submit(state->encoder_control->threadqueue, state->tile->wf_jobs[lcu->id]); // The wavefront row is done when the last LCU in the row is done. if (i + 1 == state->lcu_order_count) { @@ -911,27 +896,13 @@ static void encoder_state_encode(encoder_state_t * const main_state) { for (int i = 0; main_state->children[i].encoder_control; ++i) { //If we don't have wavefronts, parallelize encoding of children. if (main_state->children[i].type != ENCODER_STATE_TYPE_WAVEFRONT_ROW) { -#ifdef KVZ_DEBUG - char job_description[256]; - switch (main_state->children[i].type) { - case ENCODER_STATE_TYPE_TILE: - sprintf(job_description, "type=encode_child,frame=%d,tile=%d,row=%d-%d,px_x=%d-%d,px_y=%d-%d", main_state->children[i].frame->num, main_state->children[i].tile->id, main_state->children[i].lcu_order[0].position.y + main_state->children[i].tile->lcu_offset_y, main_state->children[i].lcu_order[0].position.y + main_state->children[i].tile->lcu_offset_y, - main_state->children[i].lcu_order[0].position_px.x + main_state->children[i].tile->lcu_offset_x * LCU_WIDTH, main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].position_px.x + main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].size.x + main_state->children[i].tile->lcu_offset_x * LCU_WIDTH - 1, - main_state->children[i].lcu_order[0].position_px.y + main_state->children[i].tile->lcu_offset_y * LCU_WIDTH, main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].position_px.y + main_state->children[i].lcu_order[main_state->children[i].lcu_order_count-1].size.y + main_state->children[i].tile->lcu_offset_y * LCU_WIDTH - 1); - break; - case ENCODER_STATE_TYPE_SLICE: - sprintf(job_description, "type=encode_child,frame=%d,slice=%d,start_in_ts=%d", main_state->children[i].frame->num, main_state->children[i].slice->id, main_state->children[i].slice->start_in_ts); - break; - default: - sprintf(job_description, "type=encode_child,frame=%d,invalid", main_state->children[i].frame->num); - break; - } -#else - char* job_description = NULL; -#endif kvz_threadqueue_free_job(&main_state->children[i].tqj_recon_done); - main_state->children[i].tqj_recon_done = kvz_threadqueue_submit(main_state->encoder_control->threadqueue, encoder_state_worker_encode_children, &(main_state->children[i]), 1, job_description); - if (main_state->children[i].previous_encoder_state != &main_state->children[i] && main_state->children[i].previous_encoder_state->tqj_recon_done && !main_state->children[i].frame->is_idr_frame) { + main_state->children[i].tqj_recon_done = + kvz_threadqueue_job_create(encoder_state_worker_encode_children, &main_state->children[i]); + if (main_state->children[i].previous_encoder_state != &main_state->children[i] && + main_state->children[i].previous_encoder_state->tqj_recon_done && + !main_state->children[i].frame->is_idr_frame) + { #if 0 // Disabled due to non-determinism. if (main_state->encoder_control->cfg->mv_constraint == KVZ_MV_CONSTRAIN_FRAME_AND_TILE_MARGIN) @@ -947,7 +918,7 @@ static void encoder_state_encode(encoder_state_t * const main_state) { } } } - kvz_threadqueue_job_unwait_job(main_state->encoder_control->threadqueue, main_state->children[i].tqj_recon_done); + kvz_threadqueue_submit(main_state->encoder_control->threadqueue, main_state->children[i].tqj_recon_done); } else { //Wavefront rows have parallelism at LCU level, so we should not launch multiple threads here! //FIXME: add an assert: we can only have wavefront children @@ -1256,39 +1227,22 @@ static void _encode_one_frame_add_bitstream_deps(const encoder_state_t * const s void kvz_encode_one_frame(encoder_state_t * const state, kvz_picture* frame) { - { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); - encoder_state_init_new_frame(state, frame); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, state->encoder_control->threadqueue, "type=init_new_frame,frame=%d,poc=%d", state->frame->num, state->frame->poc); - } - { - PERFORMANCE_MEASURE_START(KVZ_PERF_FRAME); - encoder_state_encode(state); - PERFORMANCE_MEASURE_END(KVZ_PERF_FRAME, state->encoder_control->threadqueue, "type=encode,frame=%d", state->frame->num); - } - //kvz_threadqueue_flush(main_state->encoder_control->threadqueue); - { - threadqueue_job_t *job; -#ifdef KVZ_DEBUG - char job_description[256]; - sprintf(job_description, "type=write_bitstream,frame=%d", state->frame->num); -#else - char* job_description = NULL; -#endif + encoder_state_init_new_frame(state, frame); + encoder_state_encode(state); - job = kvz_threadqueue_submit(state->encoder_control->threadqueue, kvz_encoder_state_worker_write_bitstream, (void*) state, 1, job_description); - - _encode_one_frame_add_bitstream_deps(state, job); - if (state->previous_encoder_state != state && state->previous_encoder_state->tqj_bitstream_written) { - //We need to depend on previous bitstream generation - kvz_threadqueue_job_dep_add(job, state->previous_encoder_state->tqj_bitstream_written); - } - kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, job); - assert(!state->tqj_bitstream_written); - state->tqj_bitstream_written = job; + threadqueue_job_t *job = + kvz_threadqueue_job_create(kvz_encoder_state_worker_write_bitstream, state); + + _encode_one_frame_add_bitstream_deps(state, job); + if (state->previous_encoder_state != state && state->previous_encoder_state->tqj_bitstream_written) { + //We need to depend on previous bitstream generation + kvz_threadqueue_job_dep_add(job, state->previous_encoder_state->tqj_bitstream_written); } + kvz_threadqueue_submit(state->encoder_control->threadqueue, job); + assert(!state->tqj_bitstream_written); + state->tqj_bitstream_written = job; + state->frame->done = 0; - //kvz_threadqueue_flush(main_state->encoder_control->threadqueue); } diff --git a/src/search.c b/src/search.c index 0f2ae83c..d081015e 100644 --- a/src/search.c +++ b/src/search.c @@ -399,10 +399,6 @@ static double search_cu(encoder_state_t * const state, int x, int y, int depth, int x_local = SUB_SCU(x); int y_local = SUB_SCU(y); -#ifdef KVZ_DEBUG - int debug_split = 0; -#endif - PERFORMANCE_MEASURE_START(KVZ_PERF_SEARCHCU); // Stop recursion if the CU is completely outside the frame. if (x >= frame->width || y >= frame->height) { @@ -713,13 +709,6 @@ static double search_cu(encoder_state_t * const state, int x, int y, int depth, work_tree_copy_down(x_local, y_local, depth, work_tree); } - PERFORMANCE_MEASURE_END(KVZ_PERF_SEARCHCU, state->encoder_control->threadqueue, "type=search_cu,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d,depth=%d,split=%d,cur_cu_is_intra=%d", state->frame->num, state->tile->id, state->slice->id, - state->tile->offset_x + x, - state->tile->offset_x + x + cu_width, - state->tile->offset_y + y, - state->tile->offset_y + y + cu_width, - depth, debug_split, (cur_cu->type == CU_INTRA) ? 1 : 0); - assert(cur_cu->type != CU_NOTSET); return cost; diff --git a/src/threadqueue.c b/src/threadqueue.c index 881c4a5a..3b0a69cd 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -30,298 +30,469 @@ #include "threads.h" -typedef struct { - threadqueue_queue_t * threadqueue; - int worker_id; -} threadqueue_worker_spec; +/** + * \file + * + * Lock acquisition order: + * + * 1. When locking a job and its dependency, the dependecy must be locked + * first and then the job depending on it. + * + * 2. When locking a job and the thread queue, the thread queue must be + * locked first and then the job. + * + * 3. When accessing threadqueue_job_t.next, the thread queue must be + * locked. + */ #define THREADQUEUE_LIST_REALLOC_SIZE 32 -//#define PTHREAD_COND_SIGNAL(c) fprintf(stderr, "%s:%d pthread_cond_signal(%s=%p)\n", __FUNCTION__, __LINE__, #c, c); if (pthread_cond_signal((c)) != 0) { fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); assert(0); return 0; } -//#define PTHREAD_COND_BROADCAST(c) fprintf(stderr, "%s:%d pthread_cond_broadcast(%s=%p)\n", __FUNCTION__, __LINE__, #c, c); if (pthread_cond_broadcast((c)) != 0) { fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); assert(0); return 0; } -//#define PTHREAD_COND_WAIT(c,l) fprintf(stderr, "%s:%d pthread_cond_wait(%s=%p, %s=%p)\n", __FUNCTION__, __LINE__, #c, c, #l, l); if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_cond_wait(%s=%p, %s=%p) (done)\n", __FUNCTION__, __LINE__, #c, c, #l, l);} -//#define PTHREAD_LOCK(l) fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p) (try)\n", __FUNCTION__, __LINE__, #l, l); if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);} -//#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_unlock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);} +#define PTHREAD_COND_SIGNAL(c) \ + if (pthread_cond_signal((c)) != 0) { \ + fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); \ + assert(0); \ + return 0; \ + } + +#define PTHREAD_COND_BROADCAST(c) \ + if (pthread_cond_broadcast((c)) != 0) { \ + fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); \ + assert(0); \ + return 0; \ + } + +#define PTHREAD_COND_WAIT(c,l) \ + if (pthread_cond_wait((c),(l)) != 0) { \ + fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); \ + assert(0); \ + return 0; \ + } + +#define PTHREAD_LOCK(l) \ + if (pthread_mutex_lock((l)) != 0) { \ + fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); \ + assert(0); \ + return 0; \ + } + +#define PTHREAD_UNLOCK(l) \ + if (pthread_mutex_unlock((l)) != 0) { \ + fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); \ + assert(0); \ + return 0; \ + } -#define PTHREAD_COND_SIGNAL(c) if (pthread_cond_signal((c)) != 0) { fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); assert(0); return 0; } -#define PTHREAD_COND_BROADCAST(c) if (pthread_cond_broadcast((c)) != 0) { fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); assert(0); return 0; } +typedef enum { + /** + * \brief Job has been submitted, but is not allowed to run yet. + */ + THREADQUEUE_JOB_STATE_PAUSED, -#ifndef _PTHREAD_DUMP -#define PTHREAD_COND_WAIT(c,l) if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0; } -#define PTHREAD_LOCK(l) if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); assert(0); return 0; } -#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); assert(0); return 0; } + /** + * \brief Job is waiting for dependencies. + */ + THREADQUEUE_JOB_STATE_WAITING, -#else //PTHREAD_DUMP -#define PTHREAD_LOCK(l) do { \ - PERFORMANCE_MEASURE_START(); \ - if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); assert(0); return 0; } \ - PERFORMANCE_MEASURE_END(NULL, "pthread_mutex_lock(%s=%p)@%s:%d",#l,l,__FUNCTION__, __LINE__); \ -} while (0); + /** + * \brief Job is ready to run. + */ + THREADQUEUE_JOB_STATE_READY, -#define PTHREAD_UNLOCK(l) do { \ - PERFORMANCE_MEASURE_START(); \ - if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); assert(0); return 0; } \ - PERFORMANCE_MEASURE_END(NULL, "pthread_mutex_unlock(%s=%p)@%s:%d",#l,l,__FUNCTION__, __LINE__); \ -} while (0); + /** + * \brief Job is running. + */ + THREADQUEUE_JOB_STATE_RUNNING, -#define PTHREAD_COND_WAIT(c,l) do { \ - PERFORMANCE_MEASURE_START(); \ - if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0;} \ - PERFORMANCE_MEASURE_END(NULL, "pthread_cond_wait(%s=%p, %s=%p)@%s:%d",#c, c, #l, l,__FUNCTION__, __LINE__); \ -} while (0); -#endif //PTHREAD_DUMP + /** + * \brief Job is completed. + */ + THREADQUEUE_JOB_STATE_DONE, -static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) +} threadqueue_job_state; + + +struct threadqueue_job_t { + pthread_mutex_t lock; + + threadqueue_job_state state; + + /** + * \brief Number of dependencies that have not been completed yet. + */ + int ndepends; + + /** + * \brief Reverse dependencies. + * + * Array of pointers to jobs that depend on this one. They have to exist + * when the thread finishes, because they cannot be run before. + */ + struct threadqueue_job_t **rdepends; + + /** + * \brief Number of elements in rdepends. + */ + int rdepends_count; + + /** + * \brief Allocated size of rdepends. + */ + int rdepends_size; + + /** + * \brief Reference count + */ + int refcount; + + /** + * \brief Pointer to the function to execute. + */ + void (*fptr)(void *arg); + + /** + * \brief Argument for fptr. + */ + void *arg; + + /** + * \brief Pointer to the next job in the queue. + */ + struct threadqueue_job_t *next; + +}; + + +struct threadqueue_queue_t { + pthread_mutex_t lock; + + /** + * \brief Job available condition variable + * + * Signalled when there is a new job to do. + */ + pthread_cond_t job_available; + + /** + * \brief Job done condition variable + * + * Signalled when a job has been completed. + */ + pthread_cond_t job_done; + + /** + * Array containing spawned threads + */ + pthread_t *threads; + + /** + * \brief Number of threads spawned + */ + int thread_count; + + /** + * \brief Number of threads running + */ + int thread_running_count; + + /** + * \brief If true, threads should stop ASAP. + */ + bool stop; + + /** + * \brief Pointer to the first ready job + */ + threadqueue_job_t *first; + + /** + * \brief Pointer to the last ready job + */ + threadqueue_job_t *last; +}; + + +/** + * \brief Add a job to the queue of jobs ready to run. + * + * The caller must have locked the thread queue and the job. This function + * takes the ownership of the job. + */ +static void threadqueue_push_job(threadqueue_queue_t * threadqueue, + threadqueue_job_t *job) { - threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque; - threadqueue_queue_t * const threadqueue = threadqueue_worker_spec->threadqueue; - threadqueue_job_t * next_job = NULL; + assert(job->ndepends == 0); + job->state = THREADQUEUE_JOB_STATE_READY; -#ifdef KVZ_DEBUG - KVZ_GET_TIME(&threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]); -#endif //KVZ_DEBUG + if (threadqueue->first == NULL) { + threadqueue->first = job; + } else { + threadqueue->last->next = job; + } - for(;;) { - threadqueue_job_t * job = NULL; + threadqueue->last = job; + job->next = NULL; +} - PTHREAD_LOCK(&threadqueue->lock); - while(!threadqueue->stop && threadqueue->queue_waiting_execution == 0 && !next_job) { +/** + * \brief Retrieve a job from the queue of jobs ready to run. + * + * The caller must have locked the thread queue. The calling function + * receives the ownership of the job. + */ +static threadqueue_job_t * threadqueue_pop_job(threadqueue_queue_t * threadqueue) +{ + assert(threadqueue->first != NULL); + + threadqueue_job_t *job = threadqueue->first; + threadqueue->first = job->next; + job->next = NULL; + + if (threadqueue->first == NULL) { + threadqueue->last = NULL; + } + + return job; +} + + +/** + * \brief Function executed by worker threads. + */ +static void* threadqueue_worker(void* threadqueue_opaque) +{ + threadqueue_queue_t * const threadqueue = (threadqueue_queue_t *) threadqueue_opaque; + + PTHREAD_LOCK(&threadqueue->lock); + + for (;;) { + while (!threadqueue->stop && threadqueue->first == NULL) { // Wait until there is something to do in the queue. - PTHREAD_COND_WAIT(&threadqueue->cond, &threadqueue->lock); + PTHREAD_COND_WAIT(&threadqueue->job_available, &threadqueue->lock); } - if(threadqueue->stop) { - if (next_job) { - // Put a job we had already reserved back into the queue. - PTHREAD_LOCK(&next_job->lock); - next_job->state = THREADQUEUE_JOB_STATE_QUEUED; - PTHREAD_UNLOCK(&next_job->lock); - } + if (threadqueue->stop) { break; } - //Find a task (should be fast enough) - job = NULL; - if (next_job) { - assert(next_job->ndepends == 0); - job = next_job; - } else { - //FIXME: if not using OWF, the first is better than the second, otherwise we should use the second order - //for (i = threadqueue->queue_count - 1; i >= threadqueue->queue_start; --i) { - //for (i = threadqueue->queue_start; i < threadqueue->queue_count; ++i) { + // Get a job and remove it from the queue. + threadqueue_job_t *job = threadqueue_pop_job(threadqueue); - for (int i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1); - (threadqueue->fifo ? i < threadqueue->queue_count : i >= threadqueue->queue_start); - (threadqueue->fifo ? ++i : --i)) { - threadqueue_job_t * const i_job = threadqueue->queue[i]; + PTHREAD_LOCK(&job->lock); + assert(job->state == THREADQUEUE_JOB_STATE_READY); + job->state = THREADQUEUE_JOB_STATE_RUNNING; + PTHREAD_UNLOCK(&job->lock); + PTHREAD_UNLOCK(&threadqueue->lock); - if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { - // Once we found the job with no dependancies, lock it and change - // its state to running, so nobody else can claim it. - PTHREAD_LOCK(&i_job->lock); - if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { - job = i_job; - job->state = THREADQUEUE_JOB_STATE_RUNNING; - } - PTHREAD_UNLOCK(&i_job->lock); - if (job) break; - } + job->fptr(job->arg); + + PTHREAD_LOCK(&threadqueue->lock); + PTHREAD_LOCK(&job->lock); + assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); + job->state = THREADQUEUE_JOB_STATE_DONE; + + PTHREAD_COND_SIGNAL(&threadqueue->job_done); + + // Go through all the jobs that depend on this one, decreasing their + // ndepends. Count how many jobs can now start executing so we know how + // many threads to wake up. + int num_new_jobs = 0; + for (int i = 0; i < job->rdepends_count; ++i) { + threadqueue_job_t * const depjob = job->rdepends[i]; + // The dependency (job) is locked before the job depending on it. + // This must be the same order as in kvz_threadqueue_job_dep_add. + PTHREAD_LOCK(&depjob->lock); + + assert(depjob->state == THREADQUEUE_JOB_STATE_WAITING || + depjob->state == THREADQUEUE_JOB_STATE_PAUSED); + assert(depjob->ndepends > 0); + depjob->ndepends--; + + if (depjob->ndepends == 0 && depjob->state == THREADQUEUE_JOB_STATE_WAITING) { + // Move the job to ready jobs. + threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(depjob)); + num_new_jobs++; } + + // Clear this reference to the job. + PTHREAD_UNLOCK(&depjob->lock); + kvz_threadqueue_free_job(&job->rdepends[i]); } + job->rdepends_count = 0; - if (!job) { - // We have no job. Probably because more threads were woken up than - // there were jobs to do. - PTHREAD_UNLOCK(&threadqueue->lock); - } else { - // We have a job with ndepends==0 and its state is running. - assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); + PTHREAD_UNLOCK(&job->lock); + kvz_threadqueue_free_job(&job); - // Advance queue_start to skip all the running jobs. - while (threadqueue->queue_start < threadqueue->queue_count && - threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED) - { - threadqueue->queue_start++; - } - - if (!next_job) { - --threadqueue->queue_waiting_execution; - ++threadqueue->queue_running; - } - - PTHREAD_UNLOCK(&threadqueue->lock); - -#ifdef KVZ_DEBUG - job->debug_worker_id = threadqueue_worker_spec->worker_id; - KVZ_GET_TIME(&job->debug_clock_start); -#endif //KVZ_DEBUG - - job->fptr(job->arg); - -#ifdef KVZ_DEBUG - job->debug_worker_id = threadqueue_worker_spec->worker_id; - KVZ_GET_TIME(&job->debug_clock_stop); -#endif //KVZ_DEBUG - - // This lock is necessary because the main thread may try to add - // reverse dependencies to the job while running. - PTHREAD_LOCK(&job->lock); - assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); - - job->state = THREADQUEUE_JOB_STATE_DONE; - - next_job = NULL; - - int queue_waiting_dependency_decr = 0; - int queue_waiting_execution_incr = 0; - - // Go throught all the jobs that depend on this one, decresing their ndepends. - for (int i = 0; i < job->rdepends_count; ++i) { - threadqueue_job_t * const depjob = job->rdepends[i]; - // The dependency (job) is locked before the job depending on it. - // This must be the same order as in kvz_threadqueue_job_dep_add. - PTHREAD_LOCK(&depjob->lock); - - assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED); - assert(depjob->ndepends > 0); - --depjob->ndepends; - - // Count how many jobs can now start executing so we know how many - // threads to wake up. - if (depjob->ndepends == 0) { - if (!next_job) { - // Avoid having to find a new job for this worker through the - // queue by taking one of the jobs that depended on current job. - next_job = depjob; - depjob->state = THREADQUEUE_JOB_STATE_RUNNING; - } else { - ++queue_waiting_execution_incr; - } - ++queue_waiting_dependency_decr; - } - - PTHREAD_UNLOCK(&depjob->lock); - } - - PTHREAD_UNLOCK(&job->lock); - - PTHREAD_LOCK(&threadqueue->lock); - - assert(threadqueue->queue_waiting_dependency >= queue_waiting_dependency_decr); - - // This thread will - if (!next_job) { - // We didn't find a new job, so this thread will have to go wait. - threadqueue->queue_running--; - } - threadqueue->queue_waiting_dependency -= queue_waiting_dependency_decr; - threadqueue->queue_waiting_execution += queue_waiting_execution_incr; - - // Wake up enough threads to take care of the tasks now lacking dependancies. - for (int i = 0; i < queue_waiting_execution_incr; ++i) { - PTHREAD_COND_SIGNAL(&threadqueue->cond); - } - - // Signal main thread that a job has been completed. - pthread_cond_signal(&threadqueue->cb_cond); - - PTHREAD_UNLOCK(&threadqueue->lock); + // The current thread will process one of the new jobs so we wake up + // one threads less than the the number of new jobs. + for (int i = 0; i < num_new_jobs - 1; i++) { + pthread_cond_signal(&threadqueue->job_available); } } - // We got out of the loop because threadqueue->stop is true. The queue is locked. - assert(threadqueue->stop); - --threadqueue->threads_running; - -#ifdef KVZ_DEBUG - KVZ_GET_TIME(&threadqueue->debug_clock_thread_end[threadqueue_worker_spec->worker_id]); - - fprintf(threadqueue->debug_log, "\t%d\t-\t%lf\t+%lf\t-\tthread\n", threadqueue_worker_spec->worker_id, KVZ_CLOCK_T_AS_DOUBLE(threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]), KVZ_CLOCK_T_DIFF(threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id], threadqueue->debug_clock_thread_end[threadqueue_worker_spec->worker_id])); -#endif //KVZ_DEBUG - + threadqueue->thread_running_count--; PTHREAD_UNLOCK(&threadqueue->lock); - - free(threadqueue_worker_spec_opaque); - - pthread_exit(NULL); - return NULL; } -int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_count, int fifo) { - int i; +/** + * \brief Initialize the queue. + * + * \return 1 on success, 0 on failure + */ +threadqueue_queue_t * kvz_threadqueue_init(int thread_count) +{ + threadqueue_queue_t *threadqueue = MALLOC(threadqueue_queue_t, 1); + if (!threadqueue) { + goto failed; + } + if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) { fprintf(stderr, "pthread_mutex_init failed!\n"); - assert(0); - return 0; + goto failed; } - if (pthread_cond_init(&threadqueue->cond, NULL) != 0) { + + if (pthread_cond_init(&threadqueue->job_available, NULL) != 0) { fprintf(stderr, "pthread_cond_init failed!\n"); - assert(0); - return 0; + goto failed; } - - if (pthread_cond_init(&threadqueue->cb_cond, NULL) != 0) { + + if (pthread_cond_init(&threadqueue->job_done, NULL) != 0) { fprintf(stderr, "pthread_cond_init failed!\n"); - assert(0); - return 0; + goto failed; } - - threadqueue->stop = false; - threadqueue->fifo = !!fifo; - threadqueue->threads_running = 0; - threadqueue->threads_count = thread_count; - + threadqueue->threads = MALLOC(pthread_t, thread_count); if (!threadqueue->threads) { fprintf(stderr, "Could not malloc threadqueue->threads!\n"); - return 0; + goto failed; } -#ifdef KVZ_DEBUG - threadqueue->debug_clock_thread_start = MALLOC(KVZ_CLOCK_T, thread_count); - assert(threadqueue->debug_clock_thread_start); - threadqueue->debug_clock_thread_end = MALLOC(KVZ_CLOCK_T, thread_count); - assert(threadqueue->debug_clock_thread_end); - threadqueue->debug_log = fopen("threadqueue.log", "w"); -#endif //KVZ_DEBUG - - threadqueue->queue = NULL; - threadqueue->queue_size = 0; - threadqueue->queue_count = 0; - threadqueue->queue_start = 0; - threadqueue->queue_waiting_execution = 0; - threadqueue->queue_waiting_dependency = 0; - threadqueue->queue_running = 0; - - //Lock the queue before creating threads, to ensure they all have correct information + threadqueue->thread_count = 0; + threadqueue->thread_running_count = 0; + + threadqueue->stop = false; + + threadqueue->first = NULL; + threadqueue->last = NULL; + + // Lock the queue before creating threads, to ensure they all have correct information. PTHREAD_LOCK(&threadqueue->lock); - - for(i = 0; i < thread_count; i++) { - threadqueue_worker_spec *tqws = MALLOC(threadqueue_worker_spec, 1); - if (tqws) { - tqws->threadqueue = threadqueue; - tqws->worker_id = i; - if(pthread_create(&(threadqueue->threads[i]), NULL, threadqueue_worker, (void*)tqws) != 0) { - fprintf(stderr, "pthread_create failed!\n"); - assert(0); - return 0; - } - threadqueue->threads_running++; - } else { - fprintf(stderr, "Could not allocate threadqueue_worker_spec structure!\n"); - PTHREAD_UNLOCK(&threadqueue->lock); - return 0; + for (int i = 0; i < thread_count; i++) { + if (pthread_create(&threadqueue->threads[i], NULL, threadqueue_worker, threadqueue) != 0) { + fprintf(stderr, "pthread_create failed!\n"); + goto failed; } + threadqueue->thread_count++; + threadqueue->thread_running_count++; } - + PTHREAD_UNLOCK(&threadqueue->lock); + + return threadqueue; + +failed: + kvz_threadqueue_free(threadqueue); + return NULL; +} + + +/** + * \brief Create a job and return a pointer to it. + * + * The job is created in a paused state. Function kvz_threadqueue_submit + * must be called on the job in order to have it run. + * + * \return pointer to the job, or NULL on failure + */ +threadqueue_job_t * kvz_threadqueue_job_create(void (*fptr)(void *arg), void *arg) +{ + threadqueue_job_t *job = MALLOC(threadqueue_job_t, 1); + if (!job) { + fprintf(stderr, "Could not alloc job!\n"); + return NULL; + } + + if (pthread_mutex_init(&job->lock, NULL) != 0) { + fprintf(stderr, "pthread_mutex_init(job) failed!\n"); + return NULL; + } + + job->state = THREADQUEUE_JOB_STATE_PAUSED; + job->ndepends = 0; + job->rdepends = NULL; + job->rdepends_count = 0; + job->rdepends_size = 0; + job->refcount = 1; + job->fptr = fptr; + job->arg = arg; + + return job; +} + + +int kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job) +{ + PTHREAD_LOCK(&threadqueue->lock); + PTHREAD_LOCK(&job->lock); + assert(job->state == THREADQUEUE_JOB_STATE_PAUSED); + + if (threadqueue->thread_count == 0) { + // When not using threads, run the job immediately. + job->fptr(job->arg); + job->state = THREADQUEUE_JOB_STATE_DONE; + } else if (job->ndepends == 0) { + threadqueue_push_job(threadqueue, kvz_threadqueue_copy_ref(job)); + pthread_cond_signal(&threadqueue->job_available); + } else { + job->state = THREADQUEUE_JOB_STATE_WAITING; + } + PTHREAD_UNLOCK(&job->lock); PTHREAD_UNLOCK(&threadqueue->lock); return 1; } + +/** + * \brief Add a dependency between two jobs. + * + * \param job job that should be executed after dependency + * \param dependency job that should be executed before job + * + * \return 1 on success, 0 on failure + * + */ +int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency) +{ + // Lock the dependency first and then the job depending on it. + // This must be the same order as in threadqueue_worker. + PTHREAD_LOCK(&dependency->lock); + + if (dependency->state == THREADQUEUE_JOB_STATE_DONE) { + // The dependency has been completed already so there is nothing to do. + PTHREAD_UNLOCK(&dependency->lock); + return 1; + } + + PTHREAD_LOCK(&job->lock); + job->ndepends++; + PTHREAD_UNLOCK(&job->lock); + + // Add the reverse dependency + if (dependency->rdepends_count >= dependency->rdepends_size) { + dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; + size_t bytes = dependency->rdepends_size * sizeof(threadqueue_job_t*); + dependency->rdepends = realloc(dependency->rdepends, bytes); + } + dependency->rdepends[dependency->rdepends_count++] = kvz_threadqueue_copy_ref(job); + + PTHREAD_UNLOCK(&dependency->lock); + + return 1; +} + + /** * \brief Get a new pointer to a job. * @@ -356,35 +527,61 @@ void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr) return; } - assert(new_refcount >= 0); + assert(new_refcount == 0); -#ifdef KVZ_DEBUG - FREE_POINTER(job->debug_description); -#endif + for (int i = 0; i < job->rdepends_count; i++) { + kvz_threadqueue_free_job(&job->rdepends[i]); + } + job->rdepends_count = 0; FREE_POINTER(job->rdepends); pthread_mutex_destroy(&job->lock); FREE_POINTER(job); } + +/** + * \brief Wait for a job to be completed. + * + * \return 1 on success, 0 on failure + */ +int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job) +{ + PTHREAD_LOCK(&job->lock); + while (job->state != THREADQUEUE_JOB_STATE_DONE) { + PTHREAD_COND_WAIT(&threadqueue->job_done, &job->lock); + } + PTHREAD_UNLOCK(&job->lock); + + return 1; +} + + +/** + * \brief Stop all threads after they finish the current jobs. + * + * Block until all threads have stopped. + * + * \return 1 on success, 0 on failure + */ int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue) { PTHREAD_LOCK(&threadqueue->lock); if (threadqueue->stop) { // The threadqueue should have stopped already. - assert(threadqueue->threads_running == 0); + assert(threadqueue->thread_running_count == 0); PTHREAD_UNLOCK(&threadqueue->lock); return 1; } // Tell all threads to stop. threadqueue->stop = true; - PTHREAD_COND_BROADCAST(&threadqueue->cond); + PTHREAD_COND_BROADCAST(&threadqueue->job_available); PTHREAD_UNLOCK(&threadqueue->lock); // Wait for them to stop. - for (int i = 0; i < threadqueue->threads_count; i++) { + for (int i = 0; i < threadqueue->thread_count; i++) { if (pthread_join(threadqueue->threads[i], NULL) != 0) { fprintf(stderr, "pthread_join failed!\n"); return 0; @@ -395,274 +592,39 @@ int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue) } -int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) +/** + * \brief Stop all threads and free allocated resources. + * + * \return 1 on success, 0 on failure + */ +void kvz_threadqueue_free(threadqueue_queue_t *threadqueue) { - if (!kvz_threadqueue_stop(threadqueue)) return 0; + if (threadqueue == NULL) return; - // Free all jobs - PTHREAD_LOCK(&threadqueue->lock); - for (int i = 0; i < threadqueue->queue_count; ++i) { - kvz_threadqueue_free_job(&threadqueue->queue[i]); + kvz_threadqueue_stop(threadqueue); + + // Free all jobs. + while (threadqueue->first) { + threadqueue_job_t *next = threadqueue->first->next; + kvz_threadqueue_free_job(&threadqueue->first); + threadqueue->first = next; } - threadqueue->queue_count = 0; - threadqueue->queue_start = 0; - PTHREAD_UNLOCK(&threadqueue->lock); + threadqueue->last = NULL; -#ifdef KVZ_DEBUG - FREE_POINTER(threadqueue->debug_clock_thread_start); - FREE_POINTER(threadqueue->debug_clock_thread_end); - fclose(threadqueue->debug_log); -#endif - - //Free allocated stuff - FREE_POINTER(threadqueue->queue); - threadqueue->queue_count = 0; - threadqueue->queue_size = 0; - threadqueue->queue_start = 0; - FREE_POINTER(threadqueue->threads); - threadqueue->threads_count = 0; - + threadqueue->thread_count = 0; + if (pthread_mutex_destroy(&threadqueue->lock) != 0) { fprintf(stderr, "pthread_mutex_destroy failed!\n"); - assert(0); - return 0; } - if (pthread_cond_destroy(&threadqueue->cond) != 0) { + + if (pthread_cond_destroy(&threadqueue->job_available) != 0) { fprintf(stderr, "pthread_cond_destroy failed!\n"); - assert(0); - return 0; } - - if (pthread_cond_destroy(&threadqueue->cb_cond) != 0) { + + if (pthread_cond_destroy(&threadqueue->job_done) != 0) { fprintf(stderr, "pthread_cond_destroy failed!\n"); - assert(0); - return 0; } - - return 1; + + FREE_POINTER(threadqueue); } - -int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue_job_t * const job) -{ - // NULL job is clearly OK. - if (!job) return 1; - - PTHREAD_LOCK(&job->lock); - while (job->state != THREADQUEUE_JOB_STATE_DONE) { - PTHREAD_COND_WAIT(&threadqueue->cb_cond, &job->lock); - } - PTHREAD_UNLOCK(&job->lock); - - PTHREAD_LOCK(&threadqueue->lock); - - // Free jobs submitted before this job. - int i = 0; - for (; i < threadqueue->queue_count; ++i) { - if (threadqueue->queue[i] == job) break; - kvz_threadqueue_free_job(&threadqueue->queue[i]); - } - // Move remaining jobs to the beginning of the array. - if (i > 0) { - threadqueue->queue_count -= i; - threadqueue->queue_start = 0; - memmove(threadqueue->queue, &threadqueue->queue[i], threadqueue->queue_count * sizeof(*threadqueue->queue)); - FILL_ARRAY(&threadqueue->queue[threadqueue->queue_count], 0, i); - } - - PTHREAD_UNLOCK(&threadqueue->lock); - - return 1; -} - -threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* const debug_description) { - threadqueue_job_t *job; - //No lock here... this should be constant - if (threadqueue->threads_count == 0) { - //FIXME: This should be improved in order to handle dependencies - PERFORMANCE_MEASURE_START(KVZ_PERF_JOB); - fptr(arg); - PERFORMANCE_MEASURE_END(KVZ_PERF_JOB, threadqueue, "%s", debug_description); - return NULL; - } - - assert(wait == 0 || wait == 1); - - job = MALLOC(threadqueue_job_t, 1); - job->refcount = 1; - -#ifdef KVZ_DEBUG - if (debug_description) { - size_t desc_len = MIN(255, strlen(debug_description)); - char* desc; - - //Copy description - desc = MALLOC(char, desc_len + 1); - assert(desc); - memcpy(desc, debug_description, desc_len); - desc[desc_len] = 0; - - job->debug_description = desc; - } else { - char* desc; - desc = MALLOC(char, 255); - sprintf(desc, "(*%p)(%p)", fptr, arg); - - job->debug_description = desc; - } - KVZ_GET_TIME(&job->debug_clock_enqueue); -#endif //KVZ_DEBUG - - if (!job) { - fprintf(stderr, "Could not alloc job!\n"); - assert(0); - return NULL; - } - - job->fptr = fptr; - job->arg = arg; - if (pthread_mutex_init(&job->lock, NULL) != 0) { - fprintf(stderr, "pthread_mutex_init(job) failed!\n"); - assert(0); - return NULL; - } - job->ndepends = wait; - job->rdepends = NULL; - job->rdepends_count = 0; - job->rdepends_size = 0; - job->state = THREADQUEUE_JOB_STATE_QUEUED; - - PTHREAD_LOCK(&threadqueue->lock); - - //Add the reverse dependency - if (threadqueue->queue_count >= threadqueue->queue_size) { - threadqueue->queue = realloc(threadqueue->queue, sizeof(threadqueue_job_t *) * (threadqueue->queue_size + THREADQUEUE_LIST_REALLOC_SIZE)); - if (!threadqueue->queue) { - fprintf(stderr, "Could not realloc queue!\n"); - assert(0); - return NULL; - } - threadqueue->queue_size += THREADQUEUE_LIST_REALLOC_SIZE; - } - threadqueue->queue[threadqueue->queue_count++] = job; - job->refcount++; - - if (job->ndepends == 0) { - ++threadqueue->queue_waiting_execution; - //Hope a thread can do it... - PTHREAD_COND_SIGNAL(&(threadqueue->cond)); - } else { - ++threadqueue->queue_waiting_dependency; - } - - PTHREAD_UNLOCK(&threadqueue->lock); - - return job; -} - -int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency) { - //If we are not using threads, job are NULL pointers, so we can skip that - if (!job && !dependency) return 1; - - assert(job && dependency); - - // Lock first the dependency and then the job depending on it. - // This must be the same order as in threadqueue_worker. - PTHREAD_LOCK(&dependency->lock); - PTHREAD_LOCK(&job->lock); - - if (dependency->state != THREADQUEUE_JOB_STATE_DONE) { - job->ndepends++; - } - - PTHREAD_UNLOCK(&job->lock); - - //Add the reverse dependency (FIXME: this may be moved in the if above... but we would lose ability to track) - if (dependency->rdepends_count >= dependency->rdepends_size) { - size_t new_size = - sizeof(threadqueue_job_t*) * - (dependency->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE); - dependency->rdepends = realloc(dependency->rdepends, new_size); - if (!dependency->rdepends) { - fprintf(stderr, "Could not realloc rdepends!\n"); - assert(0); - return 0; - } - dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; - } - dependency->rdepends[dependency->rdepends_count++] = job; - - PTHREAD_UNLOCK(&dependency->lock); - - return 1; -} - -int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job) { - int ndepends = 0; - - //NULL job => no threads, nothing to do - if (!job) return 1; - PTHREAD_LOCK(&job->lock); - job->ndepends--; - ndepends = job->ndepends; - PTHREAD_UNLOCK(&job->lock); - - if (ndepends == 0) { - PTHREAD_LOCK(&threadqueue->lock); - assert(threadqueue->queue_waiting_dependency > 0); - --threadqueue->queue_waiting_dependency; - ++threadqueue->queue_waiting_execution; - //Hope a thread can do it... - PTHREAD_COND_SIGNAL(&(threadqueue->cond)); - - PTHREAD_UNLOCK(&threadqueue->lock); - } - - return 1; -} - -#ifdef KVZ_DEBUG -int threadqueue_log(threadqueue_queue_t * threadqueue, const KVZ_CLOCK_T *start, const KVZ_CLOCK_T *stop, const char* debug_description) { - int i, thread_id = -1; - FILE* output; - - assert(start); - - if (threadqueue) { - //We need to lock to output safely - PTHREAD_LOCK(&threadqueue->lock); - - output = threadqueue->debug_log; - - //Find the thread - for(i = 0; i < threadqueue->threads_count; i++) { - if(pthread_equal(threadqueue->threads[i], pthread_self()) != 0) { - thread_id = i; - break; - } - } - } else { - thread_id = -1; - output = stderr; - } - - if (thread_id >= 0) { - if (stop) { - fprintf(output, "\t%d\t-\t%lf\t+%lf\t-\t%s\n", thread_id, KVZ_CLOCK_T_AS_DOUBLE(*start), KVZ_CLOCK_T_DIFF(*start, *stop), debug_description); - } else { - fprintf(output, "\t%d\t-\t%lf\t-\t-\t%s\n", thread_id, KVZ_CLOCK_T_AS_DOUBLE(*start), debug_description); - } - } else { - if (stop) { - fprintf(output, "\t\t-\t%lf\t+%lf\t-\t%s\n", KVZ_CLOCK_T_AS_DOUBLE(*start), KVZ_CLOCK_T_DIFF(*start, *stop), debug_description); - } else { - fprintf(output, "\t\t-\t%lf\t-\t-\t%s\n", KVZ_CLOCK_T_AS_DOUBLE(*start), debug_description); - } - } - - if (threadqueue) { - PTHREAD_UNLOCK(&threadqueue->lock); - } - return 1; -} -#endif //KVZ_DEBUG diff --git a/src/threadqueue.h b/src/threadqueue.h index b7527075..1b178548 100644 --- a/src/threadqueue.h +++ b/src/threadqueue.h @@ -30,151 +30,22 @@ #include "global.h" // IWYU pragma: keep -typedef enum { - THREADQUEUE_JOB_STATE_QUEUED = 0, - THREADQUEUE_JOB_STATE_RUNNING = 1, - THREADQUEUE_JOB_STATE_DONE = 2 -} threadqueue_job_state; +typedef struct threadqueue_job_t threadqueue_job_t; +typedef struct threadqueue_queue_t threadqueue_queue_t; -typedef struct threadqueue_job_t { - pthread_mutex_t lock; - - threadqueue_job_state state; - - unsigned int ndepends; //Number of active dependencies that this job wait for - - struct threadqueue_job_t **rdepends; //array of pointer to jobs that depend on this one. They have to exist when the thread finishes, because they cannot be run before. - unsigned int rdepends_count; //number of rdepends - unsigned int rdepends_size; //allocated size of rdepends +threadqueue_queue_t * kvz_threadqueue_init(int thread_count); - // Reference count - int refcount; +threadqueue_job_t * kvz_threadqueue_job_create(void (*fptr)(void *arg), void *arg); +int kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, threadqueue_job_t *job); - //Job function and state to use - void (*fptr)(void *arg); - void *arg; - -#ifdef KVZ_DEBUG - const char* debug_description; - - int debug_worker_id; - - KVZ_CLOCK_T debug_clock_enqueue; - KVZ_CLOCK_T debug_clock_start; - KVZ_CLOCK_T debug_clock_stop; - KVZ_CLOCK_T debug_clock_dequeue; -#endif -} threadqueue_job_t; - - - - -typedef struct { - pthread_mutex_t lock; - pthread_cond_t cond; - pthread_cond_t cb_cond; - - pthread_t *threads; - int threads_count; - int threads_running; - - bool stop; // if true, threads should stop asap - - int fifo; - - threadqueue_job_t **queue; - unsigned int queue_start; - unsigned int queue_count; - unsigned int queue_size; - unsigned int queue_waiting_execution; //Number of jobs without any dependency which could be run - unsigned int queue_waiting_dependency; //Number of jobs waiting for a dependency to complete - unsigned int queue_running; //Number of jobs running - -#ifdef KVZ_DEBUG - //Format: pointer worker id time enqueued time started time stopped time dequeued job description - //For threads, pointer = "" and job description == "thread", time enqueued and time dequeued are equal to "-" - //For flush, pointer = "" and job description == "FLUSH", time enqueued, time dequeued and time started are equal to "-" - //Each time field, except the first one in the line be expressed in a relative way, by prepending the number of seconds by +. - //Dependencies: pointer -> pointer - - FILE *debug_log; - - KVZ_CLOCK_T *debug_clock_thread_start; - KVZ_CLOCK_T *debug_clock_thread_end; -#endif -} threadqueue_queue_t; - -//Init a threadqueue (if fifo, then behave as a FIFO with dependencies, otherwise as a LIFO with dependencies) -int kvz_threadqueue_init(threadqueue_queue_t * threadqueue, int thread_count, int fifo); - -//Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run kvz_threadqueue_job_unwait_job in order to have it run -threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* debug_description); - -void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr); +int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency); threadqueue_job_t *kvz_threadqueue_copy_ref(threadqueue_job_t *job); -int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * threadqueue, threadqueue_job_t *job); +void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr); -//Add a dependency between two jobs. -int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on); - -/** - * \brief Stop all threads after they finish the current jobs. - * - * Blocks until all threads have stopped. - */ -int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue); - -//Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue. int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job); +int kvz_threadqueue_stop(threadqueue_queue_t * threadqueue); +void kvz_threadqueue_free(threadqueue_queue_t * threadqueue); -//Free ressources in a threadqueue -int kvz_threadqueue_finalize(threadqueue_queue_t * threadqueue); - -#ifdef KVZ_DEBUG -int threadqueue_log(threadqueue_queue_t * threadqueue, const KVZ_CLOCK_T *start, const KVZ_CLOCK_T *stop, const char* debug_description); - -// Bitmasks for PERFORMANCE_MEASURE_START and PERFORMANCE_MEASURE_END. -#define KVZ_PERF_FRAME (1 << 0) -#define KVZ_PERF_JOB (1 << 1) -#define KVZ_PERF_LCU (1 << 2) -#define KVZ_PERF_SAOREC (1 << 3) -#define KVZ_PERF_BSLEAF (1 << 4) -#define KVZ_PERF_SEARCHCU (1 << 5) - -#define IMPL_PERFORMANCE_MEASURE_START(mask) KVZ_CLOCK_T start, stop; if ((KVZ_DEBUG) & mask) { KVZ_GET_TIME(&start); } -#define IMPL_PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) { if ((KVZ_DEBUG) & mask) { KVZ_GET_TIME(&stop); {char job_description[256]; sprintf(job_description, (str), __VA_ARGS__); threadqueue_log((threadqueue), &start, &stop, job_description);}} } \ - -#ifdef _MSC_VER -// Disable VS conditional expression warning from debug code. -# define WITHOUT_CONSTANT_EXP_WARNING(macro) \ - __pragma(warning(push)) \ - __pragma(warning(disable:4127)) \ - macro \ - __pragma(warning(pop)) -# define PERFORMANCE_MEASURE_START(mask) \ - WITHOUT_CONSTANT_EXP_WARNING(IMPL_PERFORMANCE_MEASURE_START(mask)) -# define PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) \ - WITHOUT_CONSTANT_EXP_WARNING(IMPL_PERFORMANCE_MEASURE_END(mask, threadqueue, str, ##__VA_ARGS__)) -#else -# define PERFORMANCE_MEASURE_START(mask) \ - IMPL_PERFORMANCE_MEASURE_START(mask) -# define PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) \ - IMPL_PERFORMANCE_MEASURE_END(mask, threadqueue, str, ##__VA_ARGS__) -#endif - -#else -#define PERFORMANCE_MEASURE_START(mask) -#define PERFORMANCE_MEASURE_END(mask, threadqueue, str, ...) -#endif - -/* Constraints: - * - * - Always first lock threadqueue, than a job inside it - * - When job A depends on job B, always lock first job B and then job A - * - Jobs should be submitted in an order which is compatible with serial execution. - * - * */ - -#endif //THREADQUEUE_H_ +#endif // THREADQUEUE_H_ diff --git a/tests/coeff_sum_tests.c b/tests/coeff_sum_tests.c index e1530057..7b33e1cb 100644 --- a/tests/coeff_sum_tests.c +++ b/tests/coeff_sum_tests.c @@ -52,7 +52,7 @@ SUITE(coeff_sum_tests) { setup(); - for (int i = 0; i < strategies.count; ++i) { + for (volatile int i = 0; i < strategies.count; ++i) { if (strcmp(strategies.strategies[i].type, "coeff_abs_sum") != 0) { continue; } diff --git a/tests/intra_sad_tests.c b/tests/intra_sad_tests.c index 1497beb4..d0416cd7 100644 --- a/tests/intra_sad_tests.c +++ b/tests/intra_sad_tests.c @@ -177,7 +177,7 @@ SUITE(intra_sad_tests) // Loop through all strategies picking out the intra sad ones and run // selectec strategies though all tests. - for (unsigned i = 0; i < strategies.count; ++i) { + for (volatile unsigned i = 0; i < strategies.count; ++i) { const char * type = strategies.strategies[i].type; if (strcmp(type, "sad_4x4") == 0) { diff --git a/tests/sad_tests.c b/tests/sad_tests.c index 2053ecfe..031ece47 100644 --- a/tests/sad_tests.c +++ b/tests/sad_tests.c @@ -378,7 +378,7 @@ SUITE(sad_tests) sad_test_env.tested_func = strategies.strategies[i].fptr; sad_test_env.strategy = &strategies.strategies[i]; int num_dim_tests = sizeof(tested_dims) / sizeof(tested_dims[0]); - for (int dim_test = 0; dim_test < num_dim_tests; ++dim_test) { + for (volatile int dim_test = 0; dim_test < num_dim_tests; ++dim_test) { sad_test_env.width = tested_dims[dim_test].width; sad_test_env.height = tested_dims[dim_test].height; RUN_TEST(test_reg_sad); diff --git a/tests/satd_tests.c b/tests/satd_tests.c index efdcda08..5f0cf93b 100644 --- a/tests/satd_tests.c +++ b/tests/satd_tests.c @@ -167,7 +167,7 @@ SUITE(satd_tests) // Loop through all strategies picking out the intra sad ones and run // selectec strategies though all tests. - for (unsigned i = 0; i < strategies.count; ++i) { + for (volatile unsigned i = 0; i < strategies.count; ++i) { const char * type = strategies.strategies[i].type; if (strcmp(type, "satd_4x4") == 0) { diff --git a/tests/speed_tests.c b/tests/speed_tests.c index f2cd0742..2a0d245c 100644 --- a/tests/speed_tests.c +++ b/tests/speed_tests.c @@ -405,7 +405,7 @@ SUITE(speed_tests) int num_tested_dims = sizeof(tested_dims) / sizeof(*tested_dims); // Call reg_sad with all the sizes it is actually called with. - for (int dim_i = 0; dim_i < num_tested_dims; ++dim_i) { + for (volatile int dim_i = 0; dim_i < num_tested_dims; ++dim_i) { test_env.width = tested_dims[dim_i].x; test_env.height = tested_dims[dim_i].y; RUN_TEST(inter_sad);