Better thread scheduling

This commit is contained in:
Laurent Fasnacht 2014-06-03 10:26:15 +02:00
parent 0811dbcfbe
commit 9bdecbe071
2 changed files with 73 additions and 40 deletions

View file

@ -18,64 +18,84 @@ typedef struct {
#define THREADQUEUE_LIST_REALLOC_SIZE 32
//#define PTHREAD_COND_SIGNAL(c) fprintf(stderr, "%s:%d pthread_cond_signal(%s=%p)\n", __FUNCTION__, __LINE__, #c, c); if (pthread_cond_signal((c)) != 0) { fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); assert(0); return 0; }
//#define PTHREAD_COND_BROADCAST(c) fprintf(stderr, "%s:%d pthread_cond_broadcast(%s=%p)\n", __FUNCTION__, __LINE__, #c, c); if (pthread_cond_broadcast((c)) != 0) { fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); assert(0); return 0; }
//#define PTHREAD_COND_WAIT(c,l) fprintf(stderr, "%s:%d pthread_cond_wait(%s=%p, %s=%p)\n", __FUNCTION__, __LINE__, #c, c, #l, l); if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_cond_wait(%s=%p, %s=%p) (done)\n", __FUNCTION__, __LINE__, #c, c, #l, l);}
//#define PTHREAD_LOCK(l) fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p) (try)\n", __FUNCTION__, __LINE__, #l, l); if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);}
//#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_unlock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);}
#define PTHREAD_COND_SIGNAL(c) if (pthread_cond_signal((c)) != 0) { fprintf(stderr, "pthread_cond_signal(%s=%p) failed!\n", #c, c); assert(0); return 0; }
#define PTHREAD_COND_BROADCAST(c) if (pthread_cond_broadcast((c)) != 0) { fprintf(stderr, "pthread_cond_broadcast(%s=%p) failed!\n", #c, c); assert(0); return 0; }
#define PTHREAD_COND_WAIT(c,l) if (pthread_cond_wait((c),(l)) != 0) { fprintf(stderr, "pthread_cond_wait(%s=%p, %s=%p) failed!\n", #c, c, #l, l); assert(0); return 0; }
#define PTHREAD_LOCK(l) if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); assert(0); return 0; }
#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); assert(0); return 0; }
static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque;
threadqueue_queue * const threadqueue = threadqueue_worker_spec->threadqueue;
threadqueue_job * next_job = NULL;
#ifdef _DEBUG
GET_TIME(&threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]);
#endif //_DEBUG
for(;;) {
int task_id = -1, i = 0;
int i = 0;
int signal_count = 0;
threadqueue_job * job_to_run = NULL;
PTHREAD_LOCK(&threadqueue->lock);
while(!threadqueue->stop && threadqueue->queue_waiting == 0) {
if (pthread_cond_wait(&threadqueue->cond, &threadqueue->lock) != 0) {
while(!threadqueue->stop && threadqueue->queue_waiting_execution == 0 && !next_job) {
PTHREAD_COND_WAIT(&threadqueue->cond, &threadqueue->lock);
/*if (pthread_cond_wait(&threadqueue->cond, &threadqueue->lock) != 0) {
fprintf(stderr, "pthread_cond_wait failed!\n");
assert(0);
return 0;
}
}*/
}
if(threadqueue->stop) {
if (next_job) {
PTHREAD_LOCK(&next_job->lock);
next_job->state = THREADQUEUE_JOB_STATE_QUEUED;
PTHREAD_UNLOCK(&next_job->lock);
}
break;
}
//Find a task (should be fast enough)
task_id = -1;
job_to_run = NULL;
if (next_job) {
PTHREAD_LOCK(&next_job->lock);
assert(next_job->ndepends == 0);
job_to_run = next_job;
} else {
for (i = threadqueue->queue_start; i < threadqueue->queue_count; ++i) {
threadqueue_job * const job = threadqueue->queue[i];
PTHREAD_LOCK(&job->lock);
threadqueue_job * const i_job = threadqueue->queue[i];
PTHREAD_LOCK(&i_job->lock);
if (job->state == THREADQUEUE_JOB_STATE_QUEUED && job->ndepends == 0) {
task_id = i;
if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) {
job_to_run = i_job;
break; //Task remains locked
}
//Not this task, so unlock it
PTHREAD_UNLOCK(&job->lock);
PTHREAD_UNLOCK(&i_job->lock);
}
}
//Ok we got a job (and we have a lock on it)
if (task_id != -1) {
threadqueue_job * const job = threadqueue->queue[i];
if (job_to_run) {
threadqueue_job * const job = job_to_run;
assert(job->state == THREADQUEUE_JOB_STATE_QUEUED);
assert(job->state == THREADQUEUE_JOB_STATE_QUEUED || (job == next_job && job->state == THREADQUEUE_JOB_STATE_RUNNING));
job->state = THREADQUEUE_JOB_STATE_RUNNING;
//Move the queue_start "pointer" if needed
while (threadqueue->queue_start < threadqueue->queue_count && threadqueue->queue[threadqueue->queue_start]->state != THREADQUEUE_JOB_STATE_QUEUED) threadqueue->queue_start++;
--threadqueue->queue_waiting;
if (!next_job) --threadqueue->queue_waiting_execution;
//We can unlock the job here, since fptr and arg are constant
PTHREAD_UNLOCK(&job->lock);
@ -101,6 +121,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
job->state = THREADQUEUE_JOB_STATE_DONE;
signal_count = 0;
next_job = NULL;
//Decrease counter of dependencies
for (i = 0; i < job->rdepends_count; ++i) {
@ -113,7 +134,15 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
--depjob->ndepends;
if (depjob->ndepends == 0) {
if (!next_job) {
next_job = depjob;
depjob->state = THREADQUEUE_JOB_STATE_RUNNING;
} else {
++signal_count;
++threadqueue->queue_waiting_execution;
}
assert(threadqueue->queue_waiting_dependency > 0);
--threadqueue->queue_waiting_dependency;
}
PTHREAD_UNLOCK(&depjob->lock);
@ -123,6 +152,12 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
//Signal the queue that we've done a job
PTHREAD_LOCK(&threadqueue->lock);
for (i = 0; i < signal_count; ++i) {
PTHREAD_COND_SIGNAL(&threadqueue->cond);
}
//PTHREAD_COND_BROADCAST(&threadqueue->cond);
//Don't log this one
//PTHREAD_COND_SIGNAL(&threadqueue->cb_cond);
pthread_cond_signal(&threadqueue->cb_cond);
PTHREAD_UNLOCK(&threadqueue->lock);
} else {
@ -188,7 +223,8 @@ int threadqueue_init(threadqueue_queue * const threadqueue, int thread_count) {
threadqueue->queue_size = 0;
threadqueue->queue_count = 0;
threadqueue->queue_start = 0;
threadqueue->queue_waiting = 0;
threadqueue->queue_waiting_execution = 0;
threadqueue->queue_waiting_dependency = 0;
//Lock the queue before creating threads, to ensure they all have correct information
PTHREAD_LOCK(&threadqueue->lock);
@ -333,8 +369,8 @@ int threadqueue_flush(threadqueue_queue * const threadqueue) {
PTHREAD_LOCK(&threadqueue->lock);
do {
if (threadqueue->queue_waiting > 0) {
notdone = threadqueue->queue_waiting;
if (threadqueue->queue_waiting_execution + threadqueue->queue_waiting_dependency > 0) {
notdone = threadqueue->queue_waiting_execution + threadqueue->queue_waiting_dependency;
} else {
notdone = 0;
for (i = 0; i < threadqueue->queue_count; ++i) {
@ -364,7 +400,7 @@ int threadqueue_flush(threadqueue_queue * const threadqueue) {
threadqueue_free_jobs(threadqueue);
assert(threadqueue->queue_waiting == 0);
assert(threadqueue->queue_waiting_dependency == 0 && threadqueue->queue_waiting_execution == 0);
PTHREAD_UNLOCK(&threadqueue->lock);
@ -441,16 +477,12 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void
}
threadqueue->queue[threadqueue->queue_count++] = job;
++threadqueue->queue_waiting;
if (job->ndepends == 0) {
++threadqueue->queue_waiting_execution;
//Hope a thread can do it...
fprintf(stderr, "Sent signal from submit\n");
if(pthread_cond_signal(&(threadqueue->cond)) != 0) {
fprintf(stderr, "pthread_cond_signal failed!\n");
assert(0);
return NULL;
}
PTHREAD_COND_SIGNAL(&(threadqueue->cond));
} else {
++threadqueue->queue_waiting_dependency;
}
PTHREAD_UNLOCK(&threadqueue->lock);
@ -499,12 +531,12 @@ int threadqueue_job_unwait_job(threadqueue_queue * const threadqueue, threadqueu
if (ndepends == 0) {
PTHREAD_LOCK(&threadqueue->lock);
assert(threadqueue->queue_waiting_dependency > 0);
--threadqueue->queue_waiting_dependency;
++threadqueue->queue_waiting_execution;
//Hope a thread can do it...
if(pthread_cond_signal(&(threadqueue->cond)) != 0) {
fprintf(stderr, "pthread_cond_signal failed!\n");
assert(0);
return 0;
}
PTHREAD_COND_SIGNAL(&(threadqueue->cond));
PTHREAD_UNLOCK(&threadqueue->lock);
}

View file

@ -75,7 +75,8 @@ typedef struct {
unsigned int queue_start;
unsigned int queue_count;
unsigned int queue_size;
unsigned int queue_waiting;
unsigned int queue_waiting_execution; //Number of jobs without any dependency which could be run
unsigned int queue_waiting_dependency; //Number of jobs waiting for a dependency to complete
#ifdef _DEBUG
//Format: pointer <tab> worker id <tab> time enqueued <tab> time started <tab> time stopped <tab> time dequeued <tab> job description