Eliminate a race condition in threadqueue

Fixes the order of acquiring locks for the job and its dependency in
kvz_threadqueue_job_dep_add. The dependency is locked before the job
that depends on it. This is the same order as in threadqueue_worker.

Acquiring the locks in different order in kvz_threadqueue_job_dep_add
and threadqueue_worker would sometimes result in a deadlock.
This commit is contained in:
Arttu Ylä-Outinen 2017-05-18 12:25:53 +03:00
parent 4b213477f0
commit 5f8e17d4ba

View file

@ -95,8 +95,6 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque)
if(threadqueue->stop) { if(threadqueue->stop) {
if (next_job) { if (next_job) {
// Put a job we had already reserved back into the queue. // Put a job we had already reserved back into the queue.
// FIXME: This lock should be unnecessary, as nobody else is allowed
// to touch this job when it's running.
PTHREAD_LOCK(&next_job->lock); PTHREAD_LOCK(&next_job->lock);
next_job->state = THREADQUEUE_JOB_STATE_QUEUED; next_job->state = THREADQUEUE_JOB_STATE_QUEUED;
PTHREAD_UNLOCK(&next_job->lock); PTHREAD_UNLOCK(&next_job->lock);
@ -167,8 +165,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque)
KVZ_GET_TIME(&job->debug_clock_stop); KVZ_GET_TIME(&job->debug_clock_stop);
#endif //KVZ_DEBUG #endif //KVZ_DEBUG
// FIXME: This lock should be unnecessary, as nobody else is allowed // This lock is necessary because the main thread may try to add
// to touch this job when it's running. // reverse dependencies to the job while running.
PTHREAD_LOCK(&job->lock); PTHREAD_LOCK(&job->lock);
assert(job->state == THREADQUEUE_JOB_STATE_RUNNING); assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
@ -182,7 +180,8 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque)
// Go throught all the jobs that depend on this one, decresing their ndepends. // Go throught all the jobs that depend on this one, decresing their ndepends.
for (int i = 0; i < job->rdepends_count; ++i) { for (int i = 0; i < job->rdepends_count; ++i) {
threadqueue_job_t * const depjob = job->rdepends[i]; threadqueue_job_t * const depjob = job->rdepends[i];
// Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add. // The dependency (job) is locked before the job depending on it.
// This must be the same order as in kvz_threadqueue_job_dep_add.
PTHREAD_LOCK(&depjob->lock); PTHREAD_LOCK(&depjob->lock);
assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED); assert(depjob->state == THREADQUEUE_JOB_STATE_QUEUED);
@ -603,34 +602,39 @@ threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadque
return job; return job;
} }
int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on) { int kvz_threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *dependency) {
//If we are not using threads, job are NULL pointers, so we can skip that //If we are not using threads, job are NULL pointers, so we can skip that
if (!job && !depends_on) return 1; if (!job && !dependency) return 1;
assert(job && depends_on); assert(job && dependency);
//Lock first the job, and then the dependency // Lock first the dependency and then the job depending on it.
// This must be the same order as in threadqueue_worker.
PTHREAD_LOCK(&dependency->lock);
PTHREAD_LOCK(&job->lock); PTHREAD_LOCK(&job->lock);
PTHREAD_LOCK(&depends_on->lock);
if (depends_on->state != THREADQUEUE_JOB_STATE_DONE) { if (dependency->state != THREADQUEUE_JOB_STATE_DONE) {
job->ndepends++; job->ndepends++;
} }
PTHREAD_UNLOCK(&job->lock);
//Add the reverse dependency (FIXME: this may be moved in the if above... but we would lose ability to track) //Add the reverse dependency (FIXME: this may be moved in the if above... but we would lose ability to track)
if (depends_on->rdepends_count >= depends_on->rdepends_size) { if (dependency->rdepends_count >= dependency->rdepends_size) {
depends_on->rdepends = realloc(depends_on->rdepends, sizeof(threadqueue_job_t *) * (depends_on->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE)); size_t new_size =
if (!depends_on->rdepends) { sizeof(threadqueue_job_t*) *
(dependency->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE);
dependency->rdepends = realloc(dependency->rdepends, new_size);
if (!dependency->rdepends) {
fprintf(stderr, "Could not realloc rdepends!\n"); fprintf(stderr, "Could not realloc rdepends!\n");
assert(0); assert(0);
return 0; return 0;
} }
depends_on->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE; dependency->rdepends_size += THREADQUEUE_LIST_REALLOC_SIZE;
} }
depends_on->rdepends[depends_on->rdepends_count++] = job; dependency->rdepends[dependency->rdepends_count++] = job;
PTHREAD_UNLOCK(&depends_on->lock); PTHREAD_UNLOCK(&dependency->lock);
PTHREAD_UNLOCK(&job->lock);
return 1; return 1;
} }