diff --git a/src/kvazaar.c b/src/kvazaar.c index a512da93..0ba2c5e0 100644 --- a/src/kvazaar.c +++ b/src/kvazaar.c @@ -43,7 +43,21 @@ static void kvazaar_close(kvz_encoder *encoder) { if (encoder) { + // The threadqueue must be stopped before freeing states. + if (encoder->control) { + kvz_threadqueue_stop(encoder->control->threadqueue); + } + if (encoder->states) { + // Flush input frame buffer. + kvz_picture *pic = NULL; + while ((pic = kvz_encoder_feed_frame(&encoder->input_buffer, + &encoder->states[0], + NULL)) != NULL) { + kvz_image_free(pic); + pic = NULL; + } + for (unsigned i = 0; i < encoder->num_encoder_states; ++i) { kvz_encoder_state_finalize(&encoder->states[i]); } diff --git a/src/threadqueue.c b/src/threadqueue.c index 7949b89a..63406651 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -231,7 +231,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) } } - // We got out of the loop because threadqueue->stop == 1. The queue is locked. + // We got out of the loop because threadqueue->stop is true. The queue is locked. assert(threadqueue->stop); --threadqueue->threads_running; @@ -270,7 +270,7 @@ int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_cou return 0; } - threadqueue->stop = 0; + threadqueue->stop = false; threadqueue->fifo = !!fifo; threadqueue->threads_running = 0; threadqueue->threads_count = thread_count; @@ -367,58 +367,47 @@ void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr) FREE_POINTER(job); } -static void threadqueue_free_jobs(threadqueue_queue_t * const threadqueue) { +int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue) +{ + PTHREAD_LOCK(&threadqueue->lock); + + if (threadqueue->stop) { + // The threadqueue should have stopped already. + assert(threadqueue->threads_running == 0); + PTHREAD_UNLOCK(&threadqueue->lock); + return 1; + } + + // Tell all threads to stop. + threadqueue->stop = true; + PTHREAD_COND_BROADCAST(&threadqueue->cond); + PTHREAD_UNLOCK(&threadqueue->lock); + + // Wait for them to stop. + for (int i = 0; i < threadqueue->threads_count; i++) { + if (pthread_join(threadqueue->threads[i], NULL) != 0) { + fprintf(stderr, "pthread_join failed!\n"); + return 0; + } + } + + return 1; +} + + +int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) +{ + if (!kvz_threadqueue_stop(threadqueue)) return 0; + + // Free all jobs + PTHREAD_LOCK(&threadqueue->lock); for (int i = 0; i < threadqueue->queue_count; ++i) { kvz_threadqueue_free_job(&threadqueue->queue[i]); } threadqueue->queue_count = 0; threadqueue->queue_start = 0; -} - -int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) { - //Flush the queue - if (!kvz_threadqueue_flush(threadqueue)) { - fprintf(stderr, "Unable to flush threadqueue!\n"); - return 0; - } - - //Lock threadqueue - PTHREAD_LOCK(&threadqueue->lock); - - //Free job memory - threadqueue_free_jobs(threadqueue); - - if (threadqueue->stop) { - fprintf(stderr, "threadqueue already stopping\n"); - - if (pthread_mutex_unlock(&threadqueue->lock) != 0) { - fprintf(stderr, "pthread_mutex_unlock failed!\n"); - assert(0); - return 0; - } - assert(0); //We should get here... - return 0; - } - - threadqueue->stop = 1; - - if (pthread_cond_broadcast(&(threadqueue->cond)) != 0) { - fprintf(stderr, "pthread_cond_broadcast failed!\n"); - PTHREAD_UNLOCK(&threadqueue->lock); - assert(0); - return 0; - } - //Unlock it now, since all jobs have to stpo PTHREAD_UNLOCK(&threadqueue->lock); - - //Join threads - for(int i = 0; i < threadqueue->threads_count; i++) { - if(pthread_join(threadqueue->threads[i], NULL) != 0) { - fprintf(stderr, "pthread_join failed!\n"); - return 0; - } - } - + #ifdef KVZ_DEBUG FREE_POINTER(threadqueue->debug_clock_thread_start); FREE_POINTER(threadqueue->debug_clock_thread_end); @@ -454,30 +443,6 @@ int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) { return 1; } -int kvz_threadqueue_flush(threadqueue_queue_t * const threadqueue) { - int notdone = 1; - - //Lock the queue - PTHREAD_LOCK(&threadqueue->lock); - - do { - notdone = threadqueue->queue_waiting_execution + threadqueue->queue_waiting_dependency + threadqueue->queue_running; - - if (notdone > 0) { - PTHREAD_COND_BROADCAST(&(threadqueue->cond)); - PTHREAD_COND_WAIT(&threadqueue->cb_cond, &threadqueue->lock); - } - } while (notdone > 0); - - threadqueue_free_jobs(threadqueue); - - assert(threadqueue->queue_waiting_dependency == 0 && threadqueue->queue_waiting_execution == 0 && threadqueue->queue_running == 0); - - PTHREAD_UNLOCK(&threadqueue->lock); - - return 1; -} - int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue_job_t * const job) { int job_done = 0; diff --git a/src/threadqueue.h b/src/threadqueue.h index 37b07701..b7527075 100644 --- a/src/threadqueue.h +++ b/src/threadqueue.h @@ -78,8 +78,8 @@ typedef struct { int threads_count; int threads_running; - int stop; //=>1: threads should stop asap - + bool stop; // if true, threads should stop asap + int fifo; threadqueue_job_t **queue; @@ -119,8 +119,12 @@ int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * threadqueue, threadqueu //Add a dependency between two jobs. int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on); -//Blocking call until the queue is empty. Previously set threadqueue_job handles should not be used anymore -int kvz_threadqueue_flush(threadqueue_queue_t * threadqueue); +/** + * \brief Stop all threads after they finish the current jobs. + * + * Blocks until all threads have stopped. + */ +int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue); //Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue. int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job);