mirror of
https://github.com/ultravideo/uvg266.git
synced 2024-11-24 10:34:05 +00:00
Free threadqueue jobs when they are not needed.
- Also add destroying the mutex when the job is freed. - This makes Kvazaar no longer acquire thousands of OS handles on Windows.
This commit is contained in:
parent
5d0df56c94
commit
5662621b3c
|
@ -3,6 +3,7 @@
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <errno.h> //ETIMEDOUT
|
#include <errno.h> //ETIMEDOUT
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
#ifdef _DEBUG
|
#ifdef _DEBUG
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
@ -283,22 +284,33 @@ int threadqueue_init(threadqueue_queue * const threadqueue, int thread_count, in
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Free a single job from the threadqueue index i, destroying it.
|
||||||
|
*/
|
||||||
|
static void threadqueue_free_job(threadqueue_queue * const threadqueue, int i)
|
||||||
|
{
|
||||||
|
#ifdef _DEBUG
|
||||||
|
int j;
|
||||||
|
GET_TIME(&threadqueue->queue[i]->debug_clock_dequeue);
|
||||||
|
fprintf(threadqueue->debug_log, "%p\t%d\t%lf\t+%lf\t+%lf\t+%lf\t%s\n", threadqueue->queue[i], threadqueue->queue[i]->debug_worker_id, CLOCK_T_AS_DOUBLE(threadqueue->queue[i]->debug_clock_enqueue), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_enqueue, threadqueue->queue[i]->debug_clock_start), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_start, threadqueue->queue[i]->debug_clock_stop), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_stop, threadqueue->queue[i]->debug_clock_dequeue), threadqueue->queue[i]->debug_description);
|
||||||
|
|
||||||
|
for (j = 0; j < threadqueue->queue[i]->rdepends_count; ++j) {
|
||||||
|
fprintf(threadqueue->debug_log, "%p->%p\n", threadqueue->queue[i], threadqueue->queue[i]->rdepends[j]);
|
||||||
|
}
|
||||||
|
|
||||||
|
FREE_POINTER(threadqueue->queue[i]->debug_description);
|
||||||
|
#endif
|
||||||
|
FREE_POINTER(threadqueue->queue[i]->rdepends);
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&threadqueue->queue[i]->lock);
|
||||||
|
|
||||||
|
FREE_POINTER(threadqueue->queue[i]);
|
||||||
|
}
|
||||||
|
|
||||||
static void threadqueue_free_jobs(threadqueue_queue * const threadqueue) {
|
static void threadqueue_free_jobs(threadqueue_queue * const threadqueue) {
|
||||||
int i;
|
int i;
|
||||||
for (i=0; i < threadqueue->queue_count; ++i) {
|
for (i=0; i < threadqueue->queue_count; ++i) {
|
||||||
#ifdef _DEBUG
|
threadqueue_free_job(threadqueue, i);
|
||||||
int j;
|
|
||||||
GET_TIME(&threadqueue->queue[i]->debug_clock_dequeue);
|
|
||||||
fprintf(threadqueue->debug_log, "%p\t%d\t%lf\t+%lf\t+%lf\t+%lf\t%s\n", threadqueue->queue[i], threadqueue->queue[i]->debug_worker_id, CLOCK_T_AS_DOUBLE(threadqueue->queue[i]->debug_clock_enqueue), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_enqueue, threadqueue->queue[i]->debug_clock_start), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_start, threadqueue->queue[i]->debug_clock_stop), CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_stop, threadqueue->queue[i]->debug_clock_dequeue), threadqueue->queue[i]->debug_description);
|
|
||||||
|
|
||||||
for (j = 0; j < threadqueue->queue[i]->rdepends_count; ++j) {
|
|
||||||
fprintf(threadqueue->debug_log, "%p->%p\n", threadqueue->queue[i], threadqueue->queue[i]->rdepends[j]);
|
|
||||||
}
|
|
||||||
|
|
||||||
FREE_POINTER(threadqueue->queue[i]->debug_description);
|
|
||||||
#endif
|
|
||||||
FREE_POINTER(threadqueue->queue[i]->rdepends);
|
|
||||||
FREE_POINTER(threadqueue->queue[i]);
|
|
||||||
}
|
}
|
||||||
threadqueue->queue_count = 0;
|
threadqueue->queue_count = 0;
|
||||||
threadqueue->queue_start = 0;
|
threadqueue->queue_start = 0;
|
||||||
|
@ -455,6 +467,20 @@ int threadqueue_waitfor(threadqueue_queue * const threadqueue, threadqueue_job *
|
||||||
}
|
}
|
||||||
} while (!job_done);
|
} while (!job_done);
|
||||||
|
|
||||||
|
// Free jobs submitted before this job.
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < threadqueue->queue_count; ++i) {
|
||||||
|
if (threadqueue->queue[i] == job) break;
|
||||||
|
threadqueue_free_job(threadqueue, i);
|
||||||
|
}
|
||||||
|
// Move remaining jobs to the beginning of the array.
|
||||||
|
if (i > 0) {
|
||||||
|
threadqueue->queue_count -= i;
|
||||||
|
threadqueue->queue_start = 0;
|
||||||
|
memmove(threadqueue->queue, &threadqueue->queue[i], threadqueue->queue_count * sizeof(*threadqueue->queue));
|
||||||
|
memset(&threadqueue->queue[threadqueue->queue_count], 0, i * sizeof(*threadqueue->queue));
|
||||||
|
}
|
||||||
|
|
||||||
PTHREAD_UNLOCK(&threadqueue->lock);
|
PTHREAD_UNLOCK(&threadqueue->lock);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -109,7 +109,7 @@ int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on);
|
||||||
//Blocking call until the queue is empty. Previously set threadqueue_job handles should not be used anymore
|
//Blocking call until the queue is empty. Previously set threadqueue_job handles should not be used anymore
|
||||||
int threadqueue_flush(threadqueue_queue * threadqueue);
|
int threadqueue_flush(threadqueue_queue * threadqueue);
|
||||||
|
|
||||||
//Blocking call until job is executed. Job handles submitted before job should not be used any more.
|
//Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue.
|
||||||
int threadqueue_waitfor(threadqueue_queue * threadqueue, threadqueue_job * job);
|
int threadqueue_waitfor(threadqueue_queue * threadqueue, threadqueue_job * job);
|
||||||
|
|
||||||
//Free ressources in a threadqueue
|
//Free ressources in a threadqueue
|
||||||
|
|
Loading…
Reference in a new issue