diff --git a/src/threadqueue.c b/src/threadqueue.c index b336868b..7949b89a 100644 --- a/src/threadqueue.c +++ b/src/threadqueue.c @@ -95,8 +95,6 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) if(threadqueue->stop) { if (next_job) { // Put a job we had already reserved back into the queue. - // FIXME: This lock should be unnecessary, as nobody else is allowed - // to touch this job when it's running. PTHREAD_LOCK(&next_job->lock); next_job->state = THREADQUEUE_JOB_STATE_QUEUED; PTHREAD_UNLOCK(&next_job->lock); @@ -167,8 +165,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) KVZ_GET_TIME(&job->debug_clock_stop); #endif //KVZ_DEBUG - // FIXME: This lock should be unnecessary, as nobody else is allowed - // to touch this job when it's running. + // This lock is necessary because the main thread may try to add + // reverse dependencies to the job while running. PTHREAD_LOCK(&job->lock); assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); @@ -182,7 +180,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) // Go throught all the jobs that depend on this one, decresing their ndepends. for (int i = 0; i < job->rdepends_count; ++i) { threadqueue_job_t * const depjob = job->rdepends[i]; - // Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add. + // The dependency (job) is locked before the job depending on it. + // This must be the same order as in kvz_threadqueue_job_dep_add. PTHREAD_LOCK(&depjob->lock); assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED); @@ -603,35 +602,40 @@ threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadque return job; } -int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on) { +int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency) { //If we are not using threads, job are NULL pointers, so we can skip that - if (!job && !depends_on) return 1; - - assert(job && depends_on); - - //Lock first the job, and then the dependency + if (!job && !dependency) return 1; + + assert(job && dependency); + + // Lock first the dependency and then the job depending on it. + // This must be the same order as in threadqueue_worker. + PTHREAD_LOCK(&dependency->lock); PTHREAD_LOCK(&job->lock); - PTHREAD_LOCK(&depends_on->lock); - - if (depends_on->state != THREADQUEUE_JOB_STATE_DONE) { + + if (dependency->state != THREADQUEUE_JOB_STATE_DONE) { job->ndepends++; } - + + PTHREAD_UNLOCK(&job->lock); + //Add the reverse dependency (FIXME: this may be moved in the if above... but we would lose ability to track) - if (depends_on->rdepends_count >= depends_on->rdepends_size) { - depends_on->rdepends = realloc(depends_on->rdepends, sizeof(threadqueue_job_t *) * (depends_on->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE)); - if (!depends_on->rdepends) { + if (dependency->rdepends_count >= dependency->rdepends_size) { + size_t new_size = + sizeof(threadqueue_job_t*) * + (dependency->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE); + dependency->rdepends = realloc(dependency->rdepends, new_size); + if (!dependency->rdepends) { fprintf(stderr, "Could not realloc rdepends!\n"); assert(0); return 0; } - depends_on->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; + dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; } - depends_on->rdepends[depends_on->rdepends_count++] = job; - - PTHREAD_UNLOCK(&depends_on->lock); - PTHREAD_UNLOCK(&job->lock); - + dependency->rdepends[dependency->rdepends_count++] = job; + + PTHREAD_UNLOCK(&dependency->lock); + return 1; }