Comment and tidy threadqueue_worker

Carefully avoided making any changes to the logic.
This commit is contained in:
Ari Koivula 2016-03-14 19:59:21 +02:00
parent 1165ae2e1f
commit 6f431e510c

View file

@ -74,9 +74,12 @@ typedef struct {
} while (0); } while (0);
#endif //PTHREAD_DUMP #endif //PTHREAD_DUMP
const struct timespec kvz_time_to_wait = {1, 0}; const struct timespec kvz_time_to_wait = {1, 0};
static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
static void* threadqueue_worker(void* threadqueue_worker_spec_opaque)
{
threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque; threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque;
threadqueue_queue_t * const threadqueue = threadqueue_worker_spec->threadqueue; threadqueue_queue_t * const threadqueue = threadqueue_worker_spec->threadqueue;
threadqueue_job_t * next_job = NULL; threadqueue_job_t * next_job = NULL;
@ -86,17 +89,20 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
#endif //KVZ_DEBUG #endif //KVZ_DEBUG
for(;;) { for(;;) {
int i = 0;
threadqueue_job_t * job = NULL; threadqueue_job_t * job = NULL;
PTHREAD_LOCK(&threadqueue->lock); PTHREAD_LOCK(&threadqueue->lock);
while(!threadqueue->stop && threadqueue->queue_waiting_execution == 0 && !next_job) { while(!threadqueue->stop && threadqueue->queue_waiting_execution == 0 && !next_job) {
// Wait until there is something to do in the queue.
PTHREAD_COND_WAIT(&threadqueue->cond, &threadqueue->lock); PTHREAD_COND_WAIT(&threadqueue->cond, &threadqueue->lock);
} }
if(threadqueue->stop) { if(threadqueue->stop) {
if (next_job) { if (next_job) {
// Put a job we had already reserved back into the queue.
// FIXME: This lock should be unnecessary, as nobody else is allowed
// to touch this job when it's running.
PTHREAD_LOCK(&next_job->lock); PTHREAD_LOCK(&next_job->lock);
next_job->state = THREADQUEUE_JOB_STATE_QUEUED; next_job->state = THREADQUEUE_JOB_STATE_QUEUED;
PTHREAD_UNLOCK(&next_job->lock); PTHREAD_UNLOCK(&next_job->lock);
@ -114,12 +120,14 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
//for (i = threadqueue->queue_count - 1; i >= threadqueue->queue_start; --i) { //for (i = threadqueue->queue_count - 1; i >= threadqueue->queue_start; --i) {
//for (i = threadqueue->queue_start; i < threadqueue->queue_count; ++i) { //for (i = threadqueue->queue_start; i < threadqueue->queue_count; ++i) {
for (i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1); for (int i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1);
(threadqueue->fifo ? i < threadqueue->queue_count : i >= threadqueue->queue_start); (threadqueue->fifo ? i < threadqueue->queue_count : i >= threadqueue->queue_start);
(threadqueue->fifo ? ++i : --i)) { (threadqueue->fifo ? ++i : --i)) {
threadqueue_job_t * const i_job = threadqueue->queue[i]; threadqueue_job_t * const i_job = threadqueue->queue[i];
if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) {
// Once we found the job with no dependancies, lock it and change
// its state to running, so nobody else can claim it.
PTHREAD_LOCK(&i_job->lock); PTHREAD_LOCK(&i_job->lock);
if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) {
job = i_job; job = i_job;
@ -131,21 +139,26 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
} }
} }
//Ok we got a job (and we have a lock on it) if (!job) {
if (job) { // We have no job. Probably because more threads were woken up than
int queue_waiting_dependency_decr, queue_waiting_execution_incr; // there were jobs to do.
PTHREAD_UNLOCK(&threadqueue->lock);
} else {
// We have a job with ndepends==0 and its state is running.
assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
//Move the queue_start "pointer" if needed // Advance queue_start to skip all the running jobs.
while (threadqueue->queue_start < threadqueue->queue_count && threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED) threadqueue->queue_start++; while (threadqueue->queue_start < threadqueue->queue_count &&
threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED)
{
threadqueue->queue_start++;
}
if (!next_job) { if (!next_job) {
--threadqueue->queue_waiting_execution; --threadqueue->queue_waiting_execution;
++threadqueue->queue_running; ++threadqueue->queue_running;
} }
//Unlock the queue
PTHREAD_UNLOCK(&threadqueue->lock); PTHREAD_UNLOCK(&threadqueue->lock);
#ifdef KVZ_DEBUG #ifdef KVZ_DEBUG
@ -160,7 +173,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
KVZ_GET_TIME(&job->debug_clock_stop); KVZ_GET_TIME(&job->debug_clock_stop);
#endif //KVZ_DEBUG #endif //KVZ_DEBUG
//Re-lock the job to update its status and treat its dependencies // FIXME: This lock should be unnecessary, as nobody else is allowed
// to touch this job when it's running.
PTHREAD_LOCK(&job->lock); PTHREAD_LOCK(&job->lock);
assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
@ -168,20 +182,25 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
next_job = NULL; next_job = NULL;
queue_waiting_dependency_decr = 0; int queue_waiting_dependency_decr = 0;
queue_waiting_execution_incr = 0; int queue_waiting_execution_incr = 0;
//Decrease counter of dependencies
for (i = 0; i < job->rdepends_count; ++i) { // Go throught all the jobs that depend on this one, decresing their ndepends.
for (int i = 0; i < job->rdepends_count; ++i) {
threadqueue_job_t * const depjob = job->rdepends[i]; threadqueue_job_t * const depjob = job->rdepends[i];
//Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add // Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add.
PTHREAD_LOCK(&depjob->lock); PTHREAD_LOCK(&depjob->lock);
assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED); assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED);
assert(depjob->ndepends > 0); assert(depjob->ndepends > 0);
--depjob->ndepends; --depjob->ndepends;
// Count how many jobs can now start executing so we know how many
// threads to wake up.
if (depjob->ndepends == 0) { if (depjob->ndepends == 0) {
if (!next_job) { if (!next_job) {
// Avoid having to find a new job for this worker through the
// queue by taking one of the jobs that depended on current job.
next_job = depjob; next_job = depjob;
depjob->state = THREADQUEUE_JOB_STATE_RUNNING; depjob->state = THREADQUEUE_JOB_STATE_RUNNING;
} else { } else {
@ -192,22 +211,29 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
PTHREAD_UNLOCK(&depjob->lock); PTHREAD_UNLOCK(&depjob->lock);
} }
//Unlock the job
PTHREAD_UNLOCK(&job->lock); PTHREAD_UNLOCK(&job->lock);
//Signal the queue that we've done a job
PTHREAD_LOCK(&threadqueue->lock); PTHREAD_LOCK(&threadqueue->lock);
if (!next_job) threadqueue->queue_running--;
assert(threadqueue->queue_waiting_dependency >= queue_waiting_dependency_decr); assert(threadqueue->queue_waiting_dependency >= queue_waiting_dependency_decr);
// This thread will
if (!next_job) {
// We didn't find a new job, so this thread will have to go wait.
threadqueue->queue_running--;
}
threadqueue->queue_waiting_dependency -= queue_waiting_dependency_decr; threadqueue->queue_waiting_dependency -= queue_waiting_dependency_decr;
threadqueue->queue_waiting_execution += queue_waiting_execution_incr; threadqueue->queue_waiting_execution += queue_waiting_execution_incr;
for (i = 0; i < queue_waiting_execution_incr; ++i) {
// Wake up enough threads to take care of the tasks now lacking dependancies.
for (int i = 0; i < queue_waiting_execution_incr; ++i) {
PTHREAD_COND_SIGNAL(&threadqueue->cond); PTHREAD_COND_SIGNAL(&threadqueue->cond);
} }
//We only signal cb_cond since we finished a job
// Signal main thread that a job has been completed.
pthread_cond_signal(&threadqueue->cb_cond); pthread_cond_signal(&threadqueue->cb_cond);
PTHREAD_UNLOCK(&threadqueue->lock);
} else {
PTHREAD_UNLOCK(&threadqueue->lock); PTHREAD_UNLOCK(&threadqueue->lock);
} }
} }
@ -230,6 +256,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
return NULL; return NULL;
} }
int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_count, int fifo) { int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_count, int fifo) {
int i; int i;
if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) { if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) {