uvg266/src/threadqueue.c
2022-04-28 14:18:09 +03:00

644 lines
16 KiB
C

/*****************************************************************************
* This file is part of uvg266 VVC encoder.
*
* Copyright (c) 2021, Tampere University, ITU/ISO/IEC, project contributors
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this
* list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* * Neither the name of the Tampere University or ITU/ISO/IEC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
****************************************************************************/
#include "global.h"
#include "threadqueue.h"
#include <errno.h> // ETIMEDOUT
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "threads.h"
/**
* \file
*
* Lock acquisition order:
*
* 1. When locking a job and its dependency, the dependecy must be locked
* first and then the job depending on it.
*
* 2. When locking a job and the thread queue, the thread queue must be
* locked first and then the job.
*
* 3. When accessing threadqueue_job_t.next, the thread queue must be
* locked.
*/
#define THREADQUEUE_LIST_REALLOC_SIZE 32
#define PTHREAD_COND_SIGNAL(c) \
if (pthread_cond_signal((c)) != 0) { \
fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); \
assert(0); \
return 0; \
}
#define PTHREAD_COND_BROADCAST(c) \
if (pthread_cond_broadcast((c)) != 0) { \
fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); \
assert(0); \
return 0; \
}
#define PTHREAD_COND_WAIT(c,l) \
if (pthread_cond_wait((c),(l)) != 0) { \
fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); \
assert(0); \
return 0; \
}
#define PTHREAD_LOCK(l) \
if (pthread_mutex_lock((l)) != 0) { \
fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); \
assert(0); \
return 0; \
}
#define PTHREAD_UNLOCK(l) \
if (pthread_mutex_unlock((l)) != 0) { \
fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); \
assert(0); \
return 0; \
}
typedef enum {
/**
* \brief Job has been submitted, but is not allowed to run yet.
*/
THREADQUEUE_JOB_STATE_PAUSED,
/**
* \brief Job is waiting for dependencies.
*/
THREADQUEUE_JOB_STATE_WAITING,
/**
* \brief Job is ready to run.
*/
THREADQUEUE_JOB_STATE_READY,
/**
* \brief Job is running.
*/
THREADQUEUE_JOB_STATE_RUNNING,
/**
* \brief Job is completed.
*/
THREADQUEUE_JOB_STATE_DONE,
} threadqueue_job_state;
struct threadqueue_job_t {
pthread_mutex_t lock;
threadqueue_job_state state;
/**
* \brief Number of dependencies that have not been completed yet.
*/
int ndepends;
/**
* \brief Reverse dependencies.
*
* Array of pointers to jobs that depend on this one. They have to exist
* when the thread finishes, because they cannot be run before.
*/
struct threadqueue_job_t **rdepends;
/**
* \brief Number of elements in rdepends.
*/
int rdepends_count;
/**
* \brief Allocated size of rdepends.
*/
int rdepends_size;
/**
* \brief Reference count
*/
int refcount;
/**
* \brief Pointer to the function to execute.
*/
void (*fptr)(void *arg);
/**
* \brief Argument for fptr.
*/
void *arg;
/**
* \brief Pointer to the next job in the queue.
*/
struct threadqueue_job_t *next;
};
struct threadqueue_queue_t {
pthread_mutex_t lock;
/**
* \brief Job available condition variable
*
* Signalled when there is a new job to do.
*/
pthread_cond_t job_available;
/**
* \brief Job done condition variable
*
* Signalled when a job has been completed.
*/
pthread_cond_t job_done;
/**
* Array containing spawned threads
*/
pthread_t *threads;
/**
* \brief Number of threads spawned
*/
int thread_count;
/**
* \brief Number of threads running
*/
int thread_running_count;
/**
* \brief If true, threads should stop ASAP.
*/
bool stop;
/**
* \brief Pointer to the first ready job
*/
threadqueue_job_t *first;
/**
* \brief Pointer to the last ready job
*/
threadqueue_job_t *last;
};
/**
* \brief Add a job to the queue of jobs ready to run.
*
* The caller must have locked the thread queue and the job. This function
* takes the ownership of the job.
*/
static void threadqueue_push_job(threadqueue_queue_t * threadqueue,
threadqueue_job_t *job)
{
assert(job->ndepends == 0);
job->state = THREADQUEUE_JOB_STATE_READY;
if (threadqueue->first == NULL) {
threadqueue->first = job;
} else {
threadqueue->last->next = job;
}
threadqueue->last = job;
job->next = NULL;
}
/**
* \brief Retrieve a job from the queue of jobs ready to run.
*
* The caller must have locked the thread queue. The calling function
* receives the ownership of the job.
*/
static threadqueue_job_t * threadqueue_pop_job(threadqueue_queue_t * threadqueue)
{
assert(threadqueue->first != NULL);
threadqueue_job_t *job = threadqueue->first;
threadqueue->first = job->next;
job->next = NULL;
if (threadqueue->first == NULL) {
threadqueue->last = NULL;
}
return job;
}
/**
* \brief Function executed by worker threads.
*/
static void* threadqueue_worker(void* threadqueue_opaque)
{
threadqueue_queue_t * const threadqueue = (threadqueue_queue_t *) threadqueue_opaque;
PTHREAD_LOCK(&threadqueue->lock);
for (;;) {
while (!threadqueue->stop && threadqueue->first == NULL) {
// Wait until there is something to do in the queue.
PTHREAD_COND_WAIT(&threadqueue->job_available, &threadqueue->lock);
}
if (threadqueue->stop) {
break;
}
// Get a job and remove it from the queue.
threadqueue_job_t *job = threadqueue_pop_job(threadqueue);
PTHREAD_LOCK(&job->lock);
assert(job->state == THREADQUEUE_JOB_STATE_READY);
job->state = THREADQUEUE_JOB_STATE_RUNNING;
PTHREAD_UNLOCK(&job->lock);
PTHREAD_UNLOCK(&threadqueue->lock);
job->fptr(job->arg);
PTHREAD_LOCK(&threadqueue->lock);
PTHREAD_LOCK(&job->lock);
assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
job->state = THREADQUEUE_JOB_STATE_DONE;
PTHREAD_COND_SIGNAL(&threadqueue->job_done);
// Go through all the jobs that depend on this one, decreasing their
// ndepends. Count how many jobs can now start executing so we know how
// many threads to wake up.
int num_new_jobs = 0;
for (int i = 0; i < job->rdepends_count; ++i) {
threadqueue_job_t * const depjob = job->rdepends[i];
// The dependency (job) is locked before the job depending on it.
// This must be the same order as in uvg_threadqueue_job_dep_add.
PTHREAD_LOCK(&depjob->lock);
assert(depjob->state == THREADQUEUE_JOB_STATE_WAITING ||
depjob->state == THREADQUEUE_JOB_STATE_PAUSED);
assert(depjob->ndepends > 0);
depjob->ndepends--;
if (depjob->ndepends == 0 && depjob->state == THREADQUEUE_JOB_STATE_WAITING) {
// Move the job to ready jobs.
threadqueue_push_job(threadqueue, uvg_threadqueue_copy_ref(depjob));
num_new_jobs++;
}
// Clear this reference to the job.
PTHREAD_UNLOCK(&depjob->lock);
uvg_threadqueue_free_job(&job->rdepends[i]);
}
job->rdepends_count = 0;
PTHREAD_UNLOCK(&job->lock);
uvg_threadqueue_free_job(&job);
// The current thread will process one of the new jobs so we wake up
// one threads less than the the number of new jobs.
for (int i = 0; i < num_new_jobs - 1; i++) {
pthread_cond_signal(&threadqueue->job_available);
}
}
threadqueue->thread_running_count--;
PTHREAD_UNLOCK(&threadqueue->lock);
return NULL;
}
/**
* \brief Initialize the queue.
*
* \return 1 on success, 0 on failure
*/
threadqueue_queue_t * uvg_threadqueue_init(int thread_count)
{
threadqueue_queue_t *threadqueue = MALLOC(threadqueue_queue_t, 1);
if (!threadqueue) {
goto failed;
}
if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) {
fprintf(stderr, "pthread_mutex_init failed!\n");
goto failed;
}
if (pthread_cond_init(&threadqueue->job_available, NULL) != 0) {
fprintf(stderr, "pthread_cond_init failed!\n");
goto failed;
}
if (pthread_cond_init(&threadqueue->job_done, NULL) != 0) {
fprintf(stderr, "pthread_cond_init failed!\n");
goto failed;
}
threadqueue->threads = MALLOC(pthread_t, thread_count);
if (!threadqueue->threads) {
fprintf(stderr, "Could not malloc threadqueue->threads!\n");
goto failed;
}
threadqueue->thread_count = 0;
threadqueue->thread_running_count = 0;
threadqueue->stop = false;
threadqueue->first = NULL;
threadqueue->last = NULL;
// Lock the queue before creating threads, to ensure they all have correct information.
PTHREAD_LOCK(&threadqueue->lock);
for (int i = 0; i < thread_count; i++) {
if (pthread_create(&threadqueue->threads[i], NULL, threadqueue_worker, threadqueue) != 0) {
fprintf(stderr, "pthread_create failed!\n");
goto failed;
}
threadqueue->thread_count++;
threadqueue->thread_running_count++;
}
PTHREAD_UNLOCK(&threadqueue->lock);
return threadqueue;
failed:
uvg_threadqueue_free(threadqueue);
return NULL;
}
/**
* \brief Create a job and return a pointer to it.
*
* The job is g_created in a paused state. Function uvg_threadqueue_submit
* must be called on the job in order to have it run.
*
* \return pointer to the job, or NULL on failure
*/
threadqueue_job_t * uvg_threadqueue_job_create(void (*fptr)(void *arg), void *arg)
{
threadqueue_job_t *job = MALLOC(threadqueue_job_t, 1);
if (!job) {
fprintf(stderr, "Could not alloc job!\n");
return NULL;
}
if (pthread_mutex_init(&job->lock, NULL) != 0) {
fprintf(stderr, "pthread_mutex_init(job) failed!\n");
return NULL;
}
job->state = THREADQUEUE_JOB_STATE_PAUSED;
job->ndepends = 0;
job->rdepends = NULL;
job->rdepends_count = 0;
job->rdepends_size = 0;
job->refcount = 1;
job->fptr = fptr;
job->arg = arg;
return job;
}
int uvg_threadqueue_submit(threadqueue_queue_t * const threadqueue, threadqueue_job_t *job)
{
PTHREAD_LOCK(&threadqueue->lock);
PTHREAD_LOCK(&job->lock);
assert(job->state == THREADQUEUE_JOB_STATE_PAUSED);
if (threadqueue->thread_count == 0) {
// When not using threads, run the job immediately.
job->fptr(job->arg);
job->state = THREADQUEUE_JOB_STATE_DONE;
} else if (job->ndepends == 0) {
threadqueue_push_job(threadqueue, uvg_threadqueue_copy_ref(job));
pthread_cond_signal(&threadqueue->job_available);
} else {
job->state = THREADQUEUE_JOB_STATE_WAITING;
}
PTHREAD_UNLOCK(&job->lock);
PTHREAD_UNLOCK(&threadqueue->lock);
return 1;
}
/**
* \brief Add a dependency between two jobs.
*
* \param job job that should be executed after dependency
* \param dependency job that should be executed before job
*
* \return 1 on success, 0 on failure
*
*/
int uvg_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency)
{
// Lock the dependency first and then the job depending on it.
// This must be the same order as in threadqueue_worker.
PTHREAD_LOCK(&dependency->lock);
if (dependency->state == THREADQUEUE_JOB_STATE_DONE) {
// The dependency has been completed already so there is nothing to do.
PTHREAD_UNLOCK(&dependency->lock);
return 1;
}
PTHREAD_LOCK(&job->lock);
job->ndepends++;
PTHREAD_UNLOCK(&job->lock);
// Add the reverse dependency
if (dependency->rdepends_count >= dependency->rdepends_size) {
dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE;
size_t bytes = dependency->rdepends_size * sizeof(threadqueue_job_t*);
dependency->rdepends = realloc(dependency->rdepends, bytes);
}
dependency->rdepends[dependency->rdepends_count++] = uvg_threadqueue_copy_ref(job);
PTHREAD_UNLOCK(&dependency->lock);
return 1;
}
/**
* \brief Get a new pointer to a job.
*
* Increment reference count and return the job.
*/
threadqueue_job_t *uvg_threadqueue_copy_ref(threadqueue_job_t *job)
{
int32_t new_refcount = UVG_ATOMIC_INC(&job->refcount);
// The caller should have had another reference and we added one
// reference so refcount should be at least 2.
assert(new_refcount >= 2);
return job;
}
/**
* \brief Free a job.
*
* Decrement reference count of the job. If no references exist any more,
* deallocate associated memory and destroy mutexes.
*
* Sets the job pointer to NULL.
*/
void uvg_threadqueue_free_job(threadqueue_job_t **job_ptr)
{
threadqueue_job_t *job = *job_ptr;
if (job == NULL) return;
*job_ptr = NULL;
int new_refcount = UVG_ATOMIC_DEC(&job->refcount);
if (new_refcount > 0) {
// There are still references so we don't free the data yet.
return;
}
assert(new_refcount == 0);
for (int i = 0; i < job->rdepends_count; i++) {
uvg_threadqueue_free_job(&job->rdepends[i]);
}
job->rdepends_count = 0;
FREE_POINTER(job->rdepends);
pthread_mutex_destroy(&job->lock);
FREE_POINTER(job);
}
/**
* \brief Wait for a job to be completed.
*
* \return 1 on success, 0 on failure
*/
int uvg_threadqueue_waitfor(threadqueue_queue_t * threadqueue, threadqueue_job_t * job)
{
PTHREAD_LOCK(&job->lock);
while (job->state != THREADQUEUE_JOB_STATE_DONE) {
PTHREAD_COND_WAIT(&threadqueue->job_done, &job->lock);
}
PTHREAD_UNLOCK(&job->lock);
return 1;
}
/**
* \brief Stop all threads after they finish the current jobs.
*
* Block until all threads have stopped.
*
* \return 1 on success, 0 on failure
*/
int uvg_threadqueue_stop(threadqueue_queue_t * const threadqueue)
{
PTHREAD_LOCK(&threadqueue->lock);
if (threadqueue->stop) {
// The threadqueue should have stopped already.
assert(threadqueue->thread_running_count == 0);
PTHREAD_UNLOCK(&threadqueue->lock);
return 1;
}
// Tell all threads to stop.
threadqueue->stop = true;
PTHREAD_COND_BROADCAST(&threadqueue->job_available);
PTHREAD_UNLOCK(&threadqueue->lock);
// Wait for them to stop.
for (int i = 0; i < threadqueue->thread_count; i++) {
if (pthread_join(threadqueue->threads[i], NULL) != 0) {
fprintf(stderr, "pthread_join failed!\n");
return 0;
}
}
return 1;
}
/**
* \brief Stop all threads and free allocated resources.
*
* \return 1 on success, 0 on failure
*/
void uvg_threadqueue_free(threadqueue_queue_t *threadqueue)
{
if (threadqueue == NULL) return;
uvg_threadqueue_stop(threadqueue);
// Free all jobs.
while (threadqueue->first) {
threadqueue_job_t *next = threadqueue->first->next;
uvg_threadqueue_free_job(&threadqueue->first);
threadqueue->first = next;
}
threadqueue->last = NULL;
FREE_POINTER(threadqueue->threads);
threadqueue->thread_count = 0;
if (pthread_mutex_destroy(&threadqueue->lock) != 0) {
fprintf(stderr, "pthread_mutex_destroy failed!\n");
}
if (pthread_cond_destroy(&threadqueue->job_available) != 0) {
fprintf(stderr, "pthread_cond_destroy failed!\n");
}
if (pthread_cond_destroy(&threadqueue->job_done) != 0) {
fprintf(stderr, "pthread_cond_destroy failed!\n");
}
FREE_POINTER(threadqueue);
}