Merge branch 'fix-encoder-close'

This commit is contained in:
Arttu Ylä-Outinen 2017-06-05 11:34:03 +03:00
commit c1c7b8d8cc
3 changed files with 59 additions and 76 deletions

View file

@ -43,7 +43,21 @@
static void kvazaar_close(kvz_encoder *encoder) static void kvazaar_close(kvz_encoder *encoder)
{ {
if (encoder) { if (encoder) {
// The threadqueue must be stopped before freeing states.
if (encoder->control) {
kvz_threadqueue_stop(encoder->control->threadqueue);
}
if (encoder->states) { if (encoder->states) {
// Flush input frame buffer.
kvz_picture *pic = NULL;
while ((pic = kvz_encoder_feed_frame(&encoder->input_buffer,
&encoder->states[0],
NULL)) != NULL) {
kvz_image_free(pic);
pic = NULL;
}
for (unsigned i = 0; i < encoder->num_encoder_states; ++i) { for (unsigned i = 0; i < encoder->num_encoder_states; ++i) {
kvz_encoder_state_finalize(&encoder->states[i]); kvz_encoder_state_finalize(&encoder->states[i]);
} }

View file

@ -231,7 +231,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque)
} }
} }
// We got out of the loop because threadqueue->stop == 1. The queue is locked. // We got out of the loop because threadqueue->stop is true. The queue is locked.
assert(threadqueue->stop); assert(threadqueue->stop);
--threadqueue->threads_running; --threadqueue->threads_running;
@ -270,7 +270,7 @@ int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_cou
return 0; return 0;
} }
threadqueue->stop = 0; threadqueue->stop = false;
threadqueue->fifo = !!fifo; threadqueue->fifo = !!fifo;
threadqueue->threads_running = 0; threadqueue->threads_running = 0;
threadqueue->threads_count = thread_count; threadqueue->threads_count = thread_count;
@ -367,51 +367,23 @@ void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr)
FREE_POINTER(job); FREE_POINTER(job);
} }
static void threadqueue_free_jobs(threadqueue_queue_t * const threadqueue) { int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue)
for (int i = 0; i < threadqueue->queue_count; ++i) { {
kvz_threadqueue_free_job(&threadqueue->queue[i]);
}
threadqueue->queue_count = 0;
threadqueue->queue_start = 0;
}
int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) {
//Flush the queue
if (!kvz_threadqueue_flush(threadqueue)) {
fprintf(stderr, "Unable to flush threadqueue!\n");
return 0;
}
//Lock threadqueue
PTHREAD_LOCK(&threadqueue->lock); PTHREAD_LOCK(&threadqueue->lock);
//Free job memory
threadqueue_free_jobs(threadqueue);
if (threadqueue->stop) { if (threadqueue->stop) {
fprintf(stderr, "threadqueue already stopping\n"); // The threadqueue should have stopped already.
assert(threadqueue->threads_running == 0);
if (pthread_mutex_unlock(&threadqueue->lock) != 0) {
fprintf(stderr, "pthread_mutex_unlock failed!\n");
assert(0);
return 0;
}
assert(0); //We should get here...
return 0;
}
threadqueue->stop = 1;
if (pthread_cond_broadcast(&(threadqueue->cond)) != 0) {
fprintf(stderr, "pthread_cond_broadcast failed!\n");
PTHREAD_UNLOCK(&threadqueue->lock); PTHREAD_UNLOCK(&threadqueue->lock);
assert(0); return 1;
return 0;
} }
//Unlock it now, since all jobs have to stpo
// Tell all threads to stop.
threadqueue->stop = true;
PTHREAD_COND_BROADCAST(&threadqueue->cond);
PTHREAD_UNLOCK(&threadqueue->lock); PTHREAD_UNLOCK(&threadqueue->lock);
//Join threads // Wait for them to stop.
for (int i = 0; i < threadqueue->threads_count; i++) { for (int i = 0; i < threadqueue->threads_count; i++) {
if (pthread_join(threadqueue->threads[i], NULL) != 0) { if (pthread_join(threadqueue->threads[i], NULL) != 0) {
fprintf(stderr, "pthread_join failed!\n"); fprintf(stderr, "pthread_join failed!\n");
@ -419,6 +391,23 @@ int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) {
} }
} }
return 1;
}
int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue)
{
if (!kvz_threadqueue_stop(threadqueue)) return 0;
// Free all jobs
PTHREAD_LOCK(&threadqueue->lock);
for (int i = 0; i < threadqueue->queue_count; ++i) {
kvz_threadqueue_free_job(&threadqueue->queue[i]);
}
threadqueue->queue_count = 0;
threadqueue->queue_start = 0;
PTHREAD_UNLOCK(&threadqueue->lock);
#ifdef KVZ_DEBUG #ifdef KVZ_DEBUG
FREE_POINTER(threadqueue->debug_clock_thread_start); FREE_POINTER(threadqueue->debug_clock_thread_start);
FREE_POINTER(threadqueue->debug_clock_thread_end); FREE_POINTER(threadqueue->debug_clock_thread_end);
@ -454,30 +443,6 @@ int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) {
return 1; return 1;
} }
int kvz_threadqueue_flush(threadqueue_queue_t * const threadqueue) {
int notdone = 1;
//Lock the queue
PTHREAD_LOCK(&threadqueue->lock);
do {
notdone = threadqueue->queue_waiting_execution + threadqueue->queue_waiting_dependency + threadqueue->queue_running;
if (notdone > 0) {
PTHREAD_COND_BROADCAST(&(threadqueue->cond));
PTHREAD_COND_WAIT(&threadqueue->cb_cond, &threadqueue->lock);
}
} while (notdone > 0);
threadqueue_free_jobs(threadqueue);
assert(threadqueue->queue_waiting_dependency == 0 && threadqueue->queue_waiting_execution == 0 && threadqueue->queue_running == 0);
PTHREAD_UNLOCK(&threadqueue->lock);
return 1;
}
int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue_job_t * const job) { int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue_job_t * const job) {
int job_done = 0; int job_done = 0;

View file

@ -78,7 +78,7 @@ typedef struct {
int threads_count; int threads_count;
int threads_running; int threads_running;
int stop; //=>1: threads should stop asap bool stop; // if true, threads should stop asap
int fifo; int fifo;
@ -119,8 +119,12 @@ int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * threadqueue, threadqueu
//Add a dependency between two jobs. //Add a dependency between two jobs.
int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on); int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on);
//Blocking call until the queue is empty. Previously set threadqueue_job handles should not be used anymore /**
int kvz_threadqueue_flush(threadqueue_queue_t * threadqueue); * \brief Stop all threads after they finish the current jobs.
*
* Blocks until all threads have stopped.
*/
int kvz_threadqueue_stop(threadqueue_queue_t * const threadqueue);
//Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue. //Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue.
int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job); int kvz_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job);