diff --git a/src/threadqueue.c b/src/threadqueue.c index e4b5deb5..9038c64e 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -74,36 +74,42 @@ typedef struct { } while (0); #endif //PTHREAD_DUMP + const struct timespec kvz_time_to_wait = {1, 0}; -static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) { + +static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) +{ threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque; threadqueue_queue_t * const threadqueue = threadqueue_worker_spec->threadqueue; threadqueue_job_t * next_job = NULL; - + #ifdef KVZ_DEBUG KVZ_GET_TIME(&threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]); #endif //KVZ_DEBUG for(;;) { - int i = 0; threadqueue_job_t * job = NULL; - + PTHREAD_LOCK(&threadqueue->lock); while(!threadqueue->stop && threadqueue->queue_waiting_execution == 0 && !next_job) { + // Wait until there is something to do in the queue. PTHREAD_COND_WAIT(&threadqueue->cond, &threadqueue->lock); } - + if(threadqueue->stop) { if (next_job) { + // Put a job we had already reserved back into the queue. + // FIXME: This lock should be unnecessary, as nobody else is allowed + // to touch this job when it's running. PTHREAD_LOCK(&next_job->lock); next_job->state = THREADQUEUE_JOB_STATE_QUEUED; PTHREAD_UNLOCK(&next_job->lock); } break; } - + //Find a task (should be fast enough) job = NULL; if (next_job) { @@ -113,13 +119,15 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) { //FIXME: if not using OWF, the first is better than the second, otherwise we should use the second order //for (i = threadqueue->queue_count - 1; i >= threadqueue->queue_start; --i) { //for (i = threadqueue->queue_start; i < threadqueue->queue_count; ++i) { - - for (i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1); + + for (int i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1); (threadqueue->fifo ? i < threadqueue->queue_count : i >= threadqueue->queue_start); (threadqueue->fifo ? ++i : --i)) { threadqueue_job_t * const i_job = threadqueue->queue[i]; - + if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { + // Once we found the job with no dependancies, lock it and change + // its state to running, so nobody else can claim it. PTHREAD_LOCK(&i_job->lock); if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) { job = i_job; @@ -130,58 +138,69 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) { } } } - - //Ok we got a job (and we have a lock on it) - if (job) { - int queue_waiting_dependency_decr, queue_waiting_execution_incr; + if (!job) { + // We have no job. Probably because more threads were woken up than + // there were jobs to do. + PTHREAD_UNLOCK(&threadqueue->lock); + } else { + // We have a job with ndepends==0 and its state is running. assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); - - //Move the queue_start "pointer" if needed - while (threadqueue->queue_start < threadqueue->queue_count && threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED) threadqueue->queue_start++; + + // Advance queue_start to skip all the running jobs. + while (threadqueue->queue_start < threadqueue->queue_count && + threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED) + { + threadqueue->queue_start++; + } if (!next_job) { --threadqueue->queue_waiting_execution; ++threadqueue->queue_running; } - - //Unlock the queue + PTHREAD_UNLOCK(&threadqueue->lock); - + #ifdef KVZ_DEBUG job->debug_worker_id = threadqueue_worker_spec->worker_id; KVZ_GET_TIME(&job->debug_clock_start); #endif //KVZ_DEBUG - + job->fptr(job->arg); - + #ifdef KVZ_DEBUG job->debug_worker_id = threadqueue_worker_spec->worker_id; KVZ_GET_TIME(&job->debug_clock_stop); #endif //KVZ_DEBUG - - //Re-lock the job to update its status and treat its dependencies + + // FIXME: This lock should be unnecessary, as nobody else is allowed + // to touch this job when it's running. PTHREAD_LOCK(&job->lock); assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); - + job->state = THREADQUEUE_JOB_STATE_DONE; - + next_job = NULL; - - queue_waiting_dependency_decr = 0; - queue_waiting_execution_incr = 0; - //Decrease counter of dependencies - for (i = 0; i < job->rdepends_count; ++i) { + + int queue_waiting_dependency_decr = 0; + int queue_waiting_execution_incr = 0; + + // Go throught all the jobs that depend on this one, decresing their ndepends. + for (int i = 0; i < job->rdepends_count; ++i) { threadqueue_job_t * const depjob = job->rdepends[i]; - //Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add + // Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add. PTHREAD_LOCK(&depjob->lock); - + assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED); assert(depjob->ndepends > 0); --depjob->ndepends; - + + // Count how many jobs can now start executing so we know how many + // threads to wake up. if (depjob->ndepends == 0) { if (!next_job) { + // Avoid having to find a new job for this worker through the + // queue by taking one of the jobs that depended on current job. next_job = depjob; depjob->state = THREADQUEUE_JOB_STATE_RUNNING; } else { @@ -189,30 +208,37 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) { } ++queue_waiting_dependency_decr; } - + PTHREAD_UNLOCK(&depjob->lock); } - //Unlock the job - PTHREAD_UNLOCK(&job->lock); - //Signal the queue that we've done a job + PTHREAD_UNLOCK(&job->lock); + PTHREAD_LOCK(&threadqueue->lock); - if (!next_job) threadqueue->queue_running--; + assert(threadqueue->queue_waiting_dependency >= queue_waiting_dependency_decr); + + // This thread will + if (!next_job) { + // We didn't find a new job, so this thread will have to go wait. + threadqueue->queue_running--; + } threadqueue->queue_waiting_dependency -= queue_waiting_dependency_decr; threadqueue->queue_waiting_execution += queue_waiting_execution_incr; - for (i = 0; i < queue_waiting_execution_incr; ++i) { + + // Wake up enough threads to take care of the tasks now lacking dependancies. + for (int i = 0; i < queue_waiting_execution_incr; ++i) { PTHREAD_COND_SIGNAL(&threadqueue->cond); } - //We only signal cb_cond since we finished a job + + // Signal main thread that a job has been completed. pthread_cond_signal(&threadqueue->cb_cond); - PTHREAD_UNLOCK(&threadqueue->lock); - } else { + PTHREAD_UNLOCK(&threadqueue->lock); } } - //We got out of the loop because threadqueue->stop == 1. The queue is locked. + // We got out of the loop because threadqueue->stop == 1. The queue is locked. assert(threadqueue->stop); --threadqueue->threads_running; @@ -230,6 +256,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) { return NULL; } + + int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_count, int fifo) { int i; if (pthread_mutex_init(&threadqueue->lock, NULL) != 0) {