diff --git a/src/threadqueue.c b/src/threadqueue.c index 00d50c4e..8c5086ba 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -3,6 +3,7 @@ #include #include //ETIMEDOUT #include +#include #ifdef _DEBUG #include @@ -283,22 +284,33 @@ int threadqueue_init(threadqueue_queue * const threadqueue, int thread_count, in return 1; } +/** + * \brief Free a single job from the threadqueue index i, destroying it. + */ +static void threadqueue_free_job(threadqueue_queue * const threadqueue, int i) +{ +#ifdef _DEBUG + int j; + GET_TIME(&threadqueue->queue[i]->debug_clock_dequeue); + fprintf(threadqueue->debug_log, "%p\t%d\t%lf\t+%lf\t+%lf\t+%lf\t%s\n", threadqueue->queue[i], threadqueue->queue[i]->debug_worker_id, CLOCK_T_AS_DOUBLE(threadqueue->queue[i]->debug_clock_enqueue), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_enqueue, threadqueue->queue[i]->debug_clock_start), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_start, threadqueue->queue[i]->debug_clock_stop), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_stop, threadqueue->queue[i]->debug_clock_dequeue), threadqueue->queue[i]->debug_description); + + for (j = 0; j < threadqueue->queue[i]->rdepends_count; ++j) { + fprintf(threadqueue->debug_log, "%p->%p\n", threadqueue->queue[i], threadqueue->queue[i]->rdepends[j]); + } + + FREE_POINTER(threadqueue->queue[i]->debug_description); +#endif + FREE_POINTER(threadqueue->queue[i]->rdepends); + + pthread_mutex_destroy(&threadqueue->queue[i]->lock); + + FREE_POINTER(threadqueue->queue[i]); +} + static void threadqueue_free_jobs(threadqueue_queue * const threadqueue) { int i; for (i=0; i < threadqueue->queue_count; ++i) { -#ifdef _DEBUG - int j; - GET_TIME(&threadqueue->queue[i]->debug_clock_dequeue); - fprintf(threadqueue->debug_log, "%p\t%d\t%lf\t+%lf\t+%lf\t+%lf\t%s\n", threadqueue->queue[i], threadqueue->queue[i]->debug_worker_id, CLOCK_T_AS_DOUBLE(threadqueue->queue[i]->debug_clock_enqueue), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_enqueue, threadqueue->queue[i]->debug_clock_start), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_start, threadqueue->queue[i]->debug_clock_stop), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_stop, threadqueue->queue[i]->debug_clock_dequeue), threadqueue->queue[i]->debug_description); - - for (j = 0; j < threadqueue->queue[i]->rdepends_count; ++j) { - fprintf(threadqueue->debug_log, "%p->%p\n", threadqueue->queue[i], threadqueue->queue[i]->rdepends[j]); - } - - FREE_POINTER(threadqueue->queue[i]->debug_description); -#endif - FREE_POINTER(threadqueue->queue[i]->rdepends); - FREE_POINTER(threadqueue->queue[i]); + threadqueue_free_job(threadqueue, i); } threadqueue->queue_count = 0; threadqueue->queue_start = 0; @@ -455,6 +467,20 @@ int threadqueue_waitfor(threadqueue_queue * const threadqueue, threadqueue_job * } } while (!job_done); + // Free jobs submitted before this job. + int i; + for (i = 0; i < threadqueue->queue_count; ++i) { + if (threadqueue->queue[i] == job) break; + threadqueue_free_job(threadqueue, i); + } + // Move remaining jobs to the beginning of the array. + if (i > 0) { + threadqueue->queue_count -= i; + threadqueue->queue_start = 0; + memmove(threadqueue->queue, &threadqueue->queue[i], threadqueue->queue_count * sizeof(*threadqueue->queue)); + memset(&threadqueue->queue[threadqueue->queue_count], 0, i * sizeof(*threadqueue->queue)); + } + PTHREAD_UNLOCK(&threadqueue->lock); return 1; diff --git a/src/threadqueue.h b/src/threadqueue.h index 172bc89a..36e9917c 100644 --- a/src/threadqueue.h +++ b/src/threadqueue.h @@ -109,7 +109,7 @@ int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on); //Blocking call until the queue is empty. Previously set threadqueue_job handles should not be used anymore int threadqueue_flush(threadqueue_queue * threadqueue); -//Blocking call until job is executed. Job handles submitted before job should not be used any more. +//Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue. int threadqueue_waitfor(threadqueue_queue * threadqueue, threadqueue_job * job); //Free ressources in a threadqueue