diff --git a/src/encoder.c b/src/encoder.c index 3fdd3c47..3db5a55f 100644 --- a/src/encoder.c +++ b/src/encoder.c @@ -1324,7 +1324,7 @@ static void encoder_state_encode(encoder_state * const main_state) { for (i=0; main_state->children[i].encoder_control; ++i) { //If we don't have wavefronts, parallelize encoding of children. if (main_state->children[i].type != ENCODER_STATE_TYPE_WAVEFRONT_ROW) { - threadqueue_submit(main_state->encoder_control->threadqueue, worker_encoder_state_encode_children, &(main_state->children[i])); + threadqueue_submit(main_state->encoder_control->threadqueue, worker_encoder_state_encode_children, &(main_state->children[i]), 0); } else { //Wavefront rows have parallelism at LCU level, so we should not launch multiple threads here! //FIXME: add an assert: we can only have wavefront children diff --git a/src/threadqueue.c b/src/threadqueue.c index 5c2f9108..9bb9f755 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -285,7 +285,7 @@ int threadqueue_flush(threadqueue_queue * const threadqueue) { return 1; } -threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void (*fptr)(void *arg), void *arg) { +threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void (*fptr)(void *arg), void *arg, int wait) { threadqueue_job *job; //No lock here... this should be constant if (threadqueue->threads_count == 0) { @@ -293,6 +293,8 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void return NULL; } + assert(wait == 0 || wait == 1); + job = MALLOC(threadqueue_job, 1); if (!job) { @@ -308,7 +310,7 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void assert(0); return NULL; } - job->ndepends = 0; + job->ndepends = wait; job->rdepends = NULL; job->rdepends_count = 0; job->rdepends_size = 0; @@ -367,4 +369,21 @@ int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on) { PTHREAD_UNLOCK(&job->lock); return 1; -} \ No newline at end of file +} + +int threadqueue_job_unwait_job(threadqueue_queue * const threadqueue, threadqueue_job *job) { + PTHREAD_LOCK(&job->lock); + job->ndepends--; + PTHREAD_UNLOCK(&job->lock); + + PTHREAD_LOCK(&threadqueue->lock); + //Hope a thread can do it... + if(pthread_cond_signal(&(threadqueue->cond)) != 0) { + fprintf(stderr, "pthread_cond_signal failed!\n"); + assert(0); + return 0; + } + PTHREAD_UNLOCK(&threadqueue->lock); + + return 1; +} diff --git a/src/threadqueue.h b/src/threadqueue.h index b0becc52..dde8bbd2 100644 --- a/src/threadqueue.h +++ b/src/threadqueue.h @@ -66,8 +66,10 @@ typedef struct { //Init a threadqueue int threadqueue_init(threadqueue_queue * threadqueue, int thread_count); -//Add a job to the queue, and returs a threadqueue_job handle -threadqueue_job * threadqueue_submit(threadqueue_queue * threadqueue, void (*fptr)(void *arg), void *arg); +//Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run threadqueue_job_unwait_job in order to have it run +threadqueue_job * threadqueue_submit(threadqueue_queue * threadqueue, void (*fptr)(void *arg), void *arg, int wait); + +int threadqueue_job_unwait_job(threadqueue_queue * threadqueue, threadqueue_job *job); //Add a dependency between two jobs. int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on);