Rename struct threadqueue_job to threadqueue_job_t.

This commit is contained in:
Ari Koivula 2015-03-04 16:28:56 +02:00
parent cf5f240604
commit b7fcb800b2
5 changed files with 30 additions and 29 deletions

View file

@ -86,7 +86,8 @@ static int encoder_state_config_tile_init(encoder_state_t * const encoder_state,
}
if (encoder->wpp) {
encoder_state->tile->wf_jobs = MALLOC(threadqueue_job*, encoder_state->tile->frame->width_in_lcu * encoder_state->tile->frame->height_in_lcu);
int num_jobs = encoder_state->tile->frame->width_in_lcu * encoder_state->tile->frame->height_in_lcu;
encoder_state->tile->wf_jobs = MALLOC(threadqueue_job_t*, num_jobs);
if (!encoder_state->tile->wf_jobs) {
printf("Error allocating wf_jobs array!\n");
return 0;

View file

@ -426,7 +426,7 @@ static void encoder_state_worker_encode_children(void * opaque) {
encoder_state_write_bitstream_leaf(sub_state);
PERFORMANCE_MEASURE_END(_DEBUG_PERF_WRITE_BITSTREAM_LEAF, sub_state->encoder_control->threadqueue, "type=encoder_state_write_bitstream_leaf,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d", sub_state->global->frame, sub_state->tile->id, sub_state->slice->id, sub_state->lcu_order[0].position_px.x + sub_state->tile->lcu_offset_x * LCU_WIDTH, sub_state->lcu_order[sub_state->lcu_order_count-1].position_px.x + sub_state->lcu_order[sub_state->lcu_order_count-1].size.x + sub_state->tile->lcu_offset_x * LCU_WIDTH - 1, sub_state->lcu_order[0].position_px.y + sub_state->tile->lcu_offset_y * LCU_WIDTH, sub_state->lcu_order[sub_state->lcu_order_count-1].position_px.y + sub_state->lcu_order[sub_state->lcu_order_count-1].size.y + sub_state->tile->lcu_offset_y * LCU_WIDTH - 1);
} else {
threadqueue_job *job;
threadqueue_job_t *job;
#ifdef _DEBUG
char job_description[256];
sprintf(job_description, "type=encoder_state_write_bitstream_leaf,frame=%d,tile=%d,slice=%d,px_x=%d-%d,px_y=%d-%d", sub_state->global->frame, sub_state->tile->id, sub_state->slice->id, sub_state->lcu_order[0].position_px.x + sub_state->tile->lcu_offset_x * LCU_WIDTH, sub_state->lcu_order[sub_state->lcu_order_count-1].position_px.x + sub_state->lcu_order[sub_state->lcu_order_count-1].size.x + sub_state->tile->lcu_offset_x * LCU_WIDTH - 1, sub_state->lcu_order[0].position_px.y + sub_state->tile->lcu_offset_y * LCU_WIDTH, sub_state->lcu_order[sub_state->lcu_order_count-1].position_px.y + sub_state->lcu_order[sub_state->lcu_order_count-1].size.y + sub_state->tile->lcu_offset_y * LCU_WIDTH - 1);
@ -579,11 +579,11 @@ static void encoder_state_encode(encoder_state_t * const main_state) {
if (main_state->encoder_control->sao_enable && main_state->children[0].type == ENCODER_STATE_TYPE_WAVEFRONT_ROW) {
int y;
videoframe_t * const frame = main_state->tile->frame;
threadqueue_job *previous_job = NULL;
threadqueue_job_t *previous_job = NULL;
for (y = 0; y < frame->height_in_lcu; ++y) {
worker_sao_reconstruct_lcu_data *data = MALLOC(worker_sao_reconstruct_lcu_data, 1);
threadqueue_job *job;
threadqueue_job_t *job;
#ifdef _DEBUG
char job_description[256];
sprintf(job_description, "type=sao,frame=%d,tile=%d,px_x=%d-%d,px_y=%d-%d", main_state->global->frame, main_state->tile->id, main_state->tile->lcu_offset_x * LCU_WIDTH, main_state->tile->lcu_offset_x * LCU_WIDTH + main_state->tile->frame->width - 1, (main_state->tile->lcu_offset_y + y) * LCU_WIDTH, MIN(main_state->tile->lcu_offset_y * LCU_WIDTH + main_state->tile->frame->height, (main_state->tile->lcu_offset_y + y + 1) * LCU_WIDTH)-1);
@ -698,7 +698,7 @@ static void encoder_state_new_frame(encoder_state_t * const main_state) {
}
static void _encode_one_frame_add_bitstream_deps(const encoder_state_t * const encoder_state, threadqueue_job * const job) {
static void _encode_one_frame_add_bitstream_deps(const encoder_state_t * const encoder_state, threadqueue_job_t * const job) {
int i;
for (i = 0; encoder_state->children[i].encoder_control; ++i) {
_encode_one_frame_add_bitstream_deps(&encoder_state->children[i], job);
@ -726,7 +726,7 @@ void encode_one_frame(encoder_state_t * const main_state)
}
//threadqueue_flush(main_state->encoder_control->threadqueue);
{
threadqueue_job *job;
threadqueue_job_t *job;
#ifdef _DEBUG
char job_description[256];
sprintf(job_description, "type=write_bitstream,frame=%d", main_state->global->frame);

View file

@ -101,7 +101,7 @@ typedef struct {
yuv_t *ver_buf_before_sao;
//Job pointers for wavefronts
threadqueue_job **wf_jobs;
threadqueue_job_t **wf_jobs;
} encoder_state_config_tile_t;
typedef struct {
@ -168,8 +168,8 @@ typedef struct encoder_state_t {
uint32_t stats_bitstream_length; //Bitstream length written in bytes
//Jobs to wait for
threadqueue_job * tqj_recon_done; //Reconstruction is done
threadqueue_job * tqj_bitstream_written; //Bitstream is written
threadqueue_job_t * tqj_recon_done; //Reconstruction is done
threadqueue_job_t * tqj_bitstream_written; //Bitstream is written
} encoder_state_t;

View file

@ -79,7 +79,7 @@ const struct timespec time_to_wait = {1, 0};
static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque;
threadqueue_queue * const threadqueue = threadqueue_worker_spec->threadqueue;
threadqueue_job * next_job = NULL;
threadqueue_job_t * next_job = NULL;
#ifdef _DEBUG
GET_TIME(&threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]);
@ -87,7 +87,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
for(;;) {
int i = 0;
threadqueue_job * job = NULL;
threadqueue_job_t * job = NULL;
PTHREAD_LOCK(&threadqueue->lock);
@ -118,7 +118,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
for (i = (threadqueue->fifo ? threadqueue->queue_start : threadqueue->queue_count - 1);
(threadqueue->fifo ? i < threadqueue->queue_count : i >= threadqueue->queue_start);
(threadqueue->fifo ? ++i : --i)) {
threadqueue_job * const i_job = threadqueue->queue[i];
threadqueue_job_t * const i_job = threadqueue->queue[i];
if (i_job->state == THREADQUEUE_JOB_STATE_QUEUED && i_job->ndepends == 0) {
PTHREAD_LOCK(&i_job->lock);
@ -175,7 +175,7 @@ static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
queue_waiting_execution_incr = 0;
//Decrease counter of dependencies
for (i = 0; i < job->rdepends_count; ++i) {
threadqueue_job * const depjob = job->rdepends[i];
threadqueue_job_t * const depjob = job->rdepends[i];
//Note that we lock the dependency AFTER locking the source. This avoids a deadlock in dep_add
PTHREAD_LOCK(&depjob->lock);
@ -461,7 +461,7 @@ int threadqueue_flush(threadqueue_queue * const threadqueue) {
return 1;
}
int threadqueue_waitfor(threadqueue_queue * const threadqueue, threadqueue_job * const job) {
int threadqueue_waitfor(threadqueue_queue * const threadqueue, threadqueue_job_t * const job) {
int job_done = 0;
//NULL job is clearly OK :-)
@ -509,8 +509,8 @@ int threadqueue_waitfor(threadqueue_queue * const threadqueue, threadqueue_job *
return 1;
}
threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* const debug_description) {
threadqueue_job *job;
threadqueue_job_t * threadqueue_submit(threadqueue_queue * const threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* const debug_description) {
threadqueue_job_t *job;
//No lock here... this should be constant
if (threadqueue->threads_count == 0) {
//FIXME: This should be improved in order to handle dependencies
@ -522,7 +522,7 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void
assert(wait == 0 || wait == 1);
job = MALLOC(threadqueue_job, 1);
job = MALLOC(threadqueue_job_t, 1);
#ifdef _DEBUG
if (debug_description) {
@ -569,7 +569,7 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void
//Add the reverse dependency
if (threadqueue->queue_count >= threadqueue->queue_size) {
threadqueue->queue = realloc(threadqueue->queue, sizeof(threadqueue_job *) * (threadqueue->queue_size + THREADQUEUE_LIST_REALLOC_SIZE));
threadqueue->queue = realloc(threadqueue->queue, sizeof(threadqueue_job_t *) * (threadqueue->queue_size + THREADQUEUE_LIST_REALLOC_SIZE));
if (!threadqueue->queue) {
fprintf(stderr, "Could not realloc queue!\n");
assert(0);
@ -592,7 +592,7 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void
return job;
}
int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on) {
int threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on) {
//If we are not using threads, job are NULL pointers, so we can skip that
if (!job && !depends_on) return 1;
@ -608,7 +608,7 @@ int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on) {
//Add the reverse dependency (FIXME: this may be moved in the if above... but we would lose ability to track)
if (depends_on->rdepends_count >= depends_on->rdepends_size) {
depends_on->rdepends = realloc(depends_on->rdepends, sizeof(threadqueue_job *) * (depends_on->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE));
depends_on->rdepends = realloc(depends_on->rdepends, sizeof(threadqueue_job_t *) * (depends_on->rdepends_size + THREADQUEUE_LIST_REALLOC_SIZE));
if (!depends_on->rdepends) {
fprintf(stderr, "Could not realloc rdepends!\n");
assert(0);
@ -624,7 +624,7 @@ int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on) {
return 1;
}
int threadqueue_job_unwait_job(threadqueue_queue * const threadqueue, threadqueue_job *job) {
int threadqueue_job_unwait_job(threadqueue_queue * const threadqueue, threadqueue_job_t *job) {
int ndepends = 0;
//NULL job => no threads, nothing to do

View file

@ -31,14 +31,14 @@ typedef enum {
THREADQUEUE_JOB_STATE_DONE = 2
} threadqueue_job_state;
typedef struct threadqueue_job {
typedef struct threadqueue_job_t {
pthread_mutex_t lock;
threadqueue_job_state state;
unsigned int ndepends; //Number of active dependencies that this job wait for
struct threadqueue_job **rdepends; //array of pointer to jobs that depend on this one. They have to exist when the thread finishes, because they cannot be run before.
struct threadqueue_job_t **rdepends; //array of pointer to jobs that depend on this one. They have to exist when the thread finishes, because they cannot be run before.
unsigned int rdepends_count; //number of rdepends
unsigned int rdepends_size; //allocated size of rdepends
@ -56,7 +56,7 @@ typedef struct threadqueue_job {
CLOCK_T debug_clock_stop;
CLOCK_T debug_clock_dequeue;
#endif
} threadqueue_job;
} threadqueue_job_t;
@ -74,7 +74,7 @@ typedef struct {
int fifo;
threadqueue_job **queue;
threadqueue_job_t **queue;
unsigned int queue_start;
unsigned int queue_count;
unsigned int queue_size;
@ -100,18 +100,18 @@ typedef struct {
int threadqueue_init(threadqueue_queue * threadqueue, int thread_count, int fifo);
//Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run threadqueue_job_unwait_job in order to have it run
threadqueue_job * threadqueue_submit(threadqueue_queue * threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* debug_description);
threadqueue_job_t * threadqueue_submit(threadqueue_queue * threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* debug_description);
int threadqueue_job_unwait_job(threadqueue_queue * threadqueue, threadqueue_job *job);
int threadqueue_job_unwait_job(threadqueue_queue * threadqueue, threadqueue_job_t *job);
//Add a dependency between two jobs.
int threadqueue_job_dep_add(threadqueue_job *job, threadqueue_job *depends_on);
int threadqueue_job_dep_add(threadqueue_job_t *job, threadqueue_job_t *depends_on);
//Blocking call until the queue is empty. Previously set threadqueue_job handles should not be used anymore
int threadqueue_flush(threadqueue_queue * threadqueue);
//Blocking call until job is executed. Job handles submitted before job should not be used any more as they are removed from the queue.
int threadqueue_waitfor(threadqueue_queue * threadqueue, threadqueue_job * job);
int threadqueue_waitfor(threadqueue_queue * threadqueue, threadqueue_job_t * job);
//Free ressources in a threadqueue
int threadqueue_finalize(threadqueue_queue * threadqueue);