mirror of
https://github.com/ultravideo/uvg266.git
synced 2024-11-30 12:44:07 +00:00
Instrument threads in order to be able to do some visualization
This commit is contained in:
parent
a7cd31d87b
commit
4f73a7fc91
|
@ -1287,7 +1287,13 @@ static void encoder_state_encode_leaf(encoder_state * const encoder_state) {
|
|||
} else {
|
||||
for (i = 0; i < encoder_state->lcu_order_count; ++i) {
|
||||
const lcu_order_element * const lcu = &encoder_state->lcu_order[i];
|
||||
encoder_state->tile->wf_jobs[lcu->id] = threadqueue_submit(encoder_state->encoder_control->threadqueue, worker_encoder_state_search_lcu, (void*)lcu, 1);
|
||||
#ifdef _DEBUG
|
||||
char job_description[256];
|
||||
sprintf(job_description, "frame=%d,tile=%d,slice=%d,row=%d,position_x=%d,position_y=%d", encoder_state->global->frame, encoder_state->tile->id, encoder_state->slice->id, encoder_state->wfrow->lcu_offset_y, lcu->position.x + encoder_state->tile->lcu_offset_x, lcu->position.y + encoder_state->tile->lcu_offset_y);
|
||||
#else
|
||||
char* job_description = NULL;
|
||||
#endif
|
||||
encoder_state->tile->wf_jobs[lcu->id] = threadqueue_submit(encoder_state->encoder_control->threadqueue, worker_encoder_state_search_lcu, (void*)lcu, 1, job_description);
|
||||
if (encoder_state->tile->wf_jobs[lcu->id]) {
|
||||
if (lcu->position.x > 0) {
|
||||
threadqueue_job_dep_add(encoder_state->tile->wf_jobs[lcu->id], encoder_state->tile->wf_jobs[lcu->id - 1]);
|
||||
|
@ -1349,7 +1355,23 @@ static void encoder_state_encode(encoder_state * const main_state) {
|
|||
for (i=0; main_state->children[i].encoder_control; ++i) {
|
||||
//If we don't have wavefronts, parallelize encoding of children.
|
||||
if (main_state->children[i].type != ENCODER_STATE_TYPE_WAVEFRONT_ROW) {
|
||||
threadqueue_submit(main_state->encoder_control->threadqueue, worker_encoder_state_encode_children, &(main_state->children[i]), 0);
|
||||
#ifdef _DEBUG
|
||||
char job_description[256];
|
||||
switch (main_state->children[i].type) {
|
||||
case ENCODER_STATE_TYPE_TILE:
|
||||
sprintf(job_description, "frame=%d,tile=%d,position_x=%d,position_y=%d", main_state->children[i].global->frame, main_state->children[i].tile->id, main_state->children[i].tile->lcu_offset_x, main_state->children[i].tile->lcu_offset_y);
|
||||
break;
|
||||
case ENCODER_STATE_TYPE_SLICE:
|
||||
sprintf(job_description, "frame=%d,slice=%d,start_in_ts=%d", main_state->children[i].global->frame, main_state->children[i].slice->id, main_state->children[i].slice->start_in_ts);
|
||||
break;
|
||||
default:
|
||||
sprintf(job_description, "frame=%d,invalid", main_state->children[i].global->frame);
|
||||
break;
|
||||
}
|
||||
#else
|
||||
char* job_description = NULL;
|
||||
#endif
|
||||
threadqueue_submit(main_state->encoder_control->threadqueue, worker_encoder_state_encode_children, &(main_state->children[i]), 0, job_description);
|
||||
} else {
|
||||
//Wavefront rows have parallelism at LCU level, so we should not launch multiple threads here!
|
||||
//FIXME: add an assert: we can only have wavefront children
|
||||
|
|
|
@ -2,11 +2,23 @@
|
|||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <stdlib.h>
|
||||
#ifdef _DEBUG
|
||||
#include <string.h>
|
||||
|
||||
#define TIMESPEC_AS_DOUBLE(ts) ((double)((ts).tv_sec) + (double)((ts).tv_nsec) / (double)1000000000L)
|
||||
#define TIMESPEC_DIFF(start, stop) ((double)((stop).tv_sec - (start).tv_sec) + (double)((stop).tv_nsec - (start).tv_nsec) / (double)1000000000L)
|
||||
|
||||
#endif //_DEBUG
|
||||
|
||||
#include "global.h"
|
||||
#include "threadqueue.h"
|
||||
#include "threads.h"
|
||||
|
||||
typedef struct {
|
||||
threadqueue_queue * threadqueue;
|
||||
int worker_id;
|
||||
} threadqueue_worker_spec;
|
||||
|
||||
#define THREADQUEUE_LIST_REALLOC_SIZE 32
|
||||
|
||||
//#define PTHREAD_LOCK(l) fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p) (try)\n", __FUNCTION__, __LINE__, #l, l); if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s=%p) failed!\n", #l, l); assert(0); return 0; } else {fprintf(stderr, "%s:%d pthread_mutex_lock(%s=%p)\n", __FUNCTION__, __LINE__, #l, l);}
|
||||
|
@ -15,8 +27,13 @@
|
|||
#define PTHREAD_LOCK(l) if (pthread_mutex_lock((l)) != 0) { fprintf(stderr, "pthread_mutex_lock(%s) failed!\n", #l); assert(0); return 0; }
|
||||
#define PTHREAD_UNLOCK(l) if (pthread_mutex_unlock((l)) != 0) { fprintf(stderr, "pthread_mutex_unlock(%s) failed!\n", #l); assert(0); return 0; }
|
||||
|
||||
static void* threadqueue_worker(void* threadqueue_opaque) {
|
||||
threadqueue_queue * const threadqueue = threadqueue_opaque;
|
||||
static void* threadqueue_worker(void* threadqueue_worker_spec_opaque) {
|
||||
threadqueue_worker_spec * const threadqueue_worker_spec = threadqueue_worker_spec_opaque;
|
||||
threadqueue_queue * const threadqueue = threadqueue_worker_spec->threadqueue;
|
||||
|
||||
#ifdef _DEBUG
|
||||
clock_gettime(CLOCK_MONOTONIC, &threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]);
|
||||
#endif //_DEBUG
|
||||
|
||||
for(;;) {
|
||||
int task_id = -1, i = 0;
|
||||
|
@ -64,8 +81,18 @@ static void* threadqueue_worker(void* threadqueue_opaque) {
|
|||
//Unlock the queue
|
||||
PTHREAD_UNLOCK(&threadqueue->lock);
|
||||
|
||||
#ifdef _DEBUG
|
||||
job->debug_worker_id = threadqueue_worker_spec->worker_id;
|
||||
clock_gettime(CLOCK_MONOTONIC, &job->debug_clock_start);
|
||||
#endif //_DEBUG
|
||||
|
||||
job->fptr(job->arg);
|
||||
|
||||
#ifdef _DEBUG
|
||||
job->debug_worker_id = threadqueue_worker_spec->worker_id;
|
||||
clock_gettime(CLOCK_MONOTONIC, &job->debug_clock_stop);
|
||||
#endif //_DEBUG
|
||||
|
||||
//Re-lock the job to update its status and treat its dependencies
|
||||
PTHREAD_LOCK(&job->lock);
|
||||
assert(job->state == THREADQUEUE_JOB_STATE_RUNNING);
|
||||
|
@ -100,6 +127,12 @@ static void* threadqueue_worker(void* threadqueue_opaque) {
|
|||
assert(threadqueue->stop);
|
||||
--threadqueue->threads_running;
|
||||
|
||||
#ifdef _DEBUG
|
||||
clock_gettime(CLOCK_MONOTONIC, &threadqueue->debug_clock_thread_end[threadqueue_worker_spec->worker_id]);
|
||||
|
||||
fprintf(threadqueue->debug_log, "\t%d\t-\t%lf\t+%lf\t-\tthread\n", threadqueue_worker_spec->worker_id, TIMESPEC_AS_DOUBLE(threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id]), TIMESPEC_DIFF(threadqueue->debug_clock_thread_start[threadqueue_worker_spec->worker_id], threadqueue->debug_clock_thread_end[threadqueue_worker_spec->worker_id]));
|
||||
#endif //_DEBUG
|
||||
|
||||
PTHREAD_UNLOCK(&threadqueue->lock);
|
||||
|
||||
pthread_exit(NULL);
|
||||
|
@ -134,6 +167,13 @@ int threadqueue_init(threadqueue_queue * const threadqueue, int thread_count) {
|
|||
fprintf(stderr, "Could not malloc threadqueue->threads!\n");
|
||||
return 0;
|
||||
}
|
||||
#ifdef _DEBUG
|
||||
threadqueue->debug_clock_thread_start = MALLOC(struct timespec, thread_count);
|
||||
assert(threadqueue->debug_clock_thread_start);
|
||||
threadqueue->debug_clock_thread_end = MALLOC(struct timespec, thread_count);
|
||||
assert(threadqueue->debug_clock_thread_end);
|
||||
threadqueue->debug_log = fopen("threadqueue.log", "w");
|
||||
#endif //_DEBUG
|
||||
|
||||
threadqueue->queue = NULL;
|
||||
threadqueue->queue_size = 0;
|
||||
|
@ -144,12 +184,20 @@ int threadqueue_init(threadqueue_queue * const threadqueue, int thread_count) {
|
|||
PTHREAD_LOCK(&threadqueue->lock);
|
||||
|
||||
for(i = 0; i < thread_count; i++) {
|
||||
if(pthread_create(&(threadqueue->threads[i]), NULL, threadqueue_worker, (void*)threadqueue) != 0) {
|
||||
threadqueue_worker_spec *tqws = MALLOC(threadqueue_worker_spec, 1);
|
||||
if (tqws) {
|
||||
tqws->threadqueue = threadqueue;
|
||||
tqws->worker_id = i;
|
||||
if(pthread_create(&(threadqueue->threads[i]), NULL, threadqueue_worker, (void*)tqws) != 0) {
|
||||
fprintf(stderr, "pthread_create failed!\n");
|
||||
assert(0);
|
||||
return 0;
|
||||
}
|
||||
threadqueue->threads_running++;
|
||||
} else {
|
||||
fprintf(stderr, "Could not allocate threadqueue_worker_spec structure!\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
PTHREAD_UNLOCK(&threadqueue->lock);
|
||||
|
@ -157,6 +205,33 @@ int threadqueue_init(threadqueue_queue * const threadqueue, int thread_count) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
static void threadqueue_free_jobs(threadqueue_queue * const threadqueue) {
|
||||
int i;
|
||||
for (i=0; i < threadqueue->queue_count; ++i) {
|
||||
#ifdef _DEBUG
|
||||
int j;
|
||||
clock_gettime(CLOCK_MONOTONIC, &threadqueue->queue[i]->debug_clock_dequeue);
|
||||
fprintf(threadqueue->debug_log, "%p\t%d\t%lf\t+%lf\t+%lf\t+%lf\t%s\n", threadqueue->queue[i], threadqueue->queue[i]->debug_worker_id, TIMESPEC_AS_DOUBLE(threadqueue->queue[i]->debug_clock_enqueue), TIMESPEC_DIFF(threadqueue->queue[i]->debug_clock_enqueue, threadqueue->queue[i]->debug_clock_start), TIMESPEC_DIFF(threadqueue->queue[i]->debug_clock_start, threadqueue->queue[i]->debug_clock_stop), TIMESPEC_DIFF(threadqueue->queue[i]->debug_clock_stop, threadqueue->queue[i]->debug_clock_dequeue), threadqueue->queue[i]->debug_description);
|
||||
|
||||
for (j = 0; j < threadqueue->queue[i]->rdepends_count; ++j) {
|
||||
fprintf(threadqueue->debug_log, "%p->%p\n", threadqueue->queue[i], threadqueue->queue[i]->rdepends[j]);
|
||||
}
|
||||
|
||||
FREE_POINTER(threadqueue->queue[i]->debug_description);
|
||||
#endif
|
||||
FREE_POINTER(threadqueue->queue[i]);
|
||||
}
|
||||
threadqueue->queue_count = 0;
|
||||
#ifdef _DEBUG
|
||||
{
|
||||
struct timespec time;
|
||||
clock_gettime(CLOCK_MONOTONIC, &time);
|
||||
|
||||
fprintf(threadqueue->debug_log, "\t\t-\t-\t%lf\t-\tFLUSH\n", TIMESPEC_AS_DOUBLE(time));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int threadqueue_finalize(threadqueue_queue * const threadqueue) {
|
||||
int i;
|
||||
|
||||
|
@ -170,10 +245,7 @@ int threadqueue_finalize(threadqueue_queue * const threadqueue) {
|
|||
PTHREAD_LOCK(&threadqueue->lock);
|
||||
|
||||
//Free job memory
|
||||
for (i=0; i < threadqueue->queue_count; ++i) {
|
||||
FREE_POINTER(threadqueue->queue[i]);
|
||||
}
|
||||
threadqueue->queue_count = 0;
|
||||
threadqueue_free_jobs(threadqueue);
|
||||
|
||||
if (threadqueue->stop) {
|
||||
fprintf(stderr, "threadqueue already stopping\n");
|
||||
|
@ -206,6 +278,10 @@ int threadqueue_finalize(threadqueue_queue * const threadqueue) {
|
|||
}
|
||||
}
|
||||
|
||||
#ifdef _DEBUG
|
||||
fclose(threadqueue->debug_log);
|
||||
#endif
|
||||
|
||||
//Free allocated stuff
|
||||
FREE_POINTER(threadqueue->queue);
|
||||
threadqueue->queue_count = 0;
|
||||
|
@ -271,13 +347,8 @@ int threadqueue_flush(threadqueue_queue * const threadqueue) {
|
|||
}
|
||||
} while (notdone > 0);
|
||||
|
||||
#if 1
|
||||
//technically not needed, but spares memory. On the other hand, it makes debugging harder.
|
||||
for (i=0; i < threadqueue->queue_count; ++i) {
|
||||
FREE_POINTER(threadqueue->queue[i]);
|
||||
}
|
||||
threadqueue->queue_count = 0;
|
||||
#endif
|
||||
threadqueue_free_jobs(threadqueue);
|
||||
|
||||
assert(threadqueue->queue_waiting == 0);
|
||||
|
||||
PTHREAD_UNLOCK(&threadqueue->lock);
|
||||
|
@ -285,7 +356,7 @@ int threadqueue_flush(threadqueue_queue * const threadqueue) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void (*fptr)(void *arg), void *arg, int wait) {
|
||||
threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* const debug_description) {
|
||||
threadqueue_job *job;
|
||||
//No lock here... this should be constant
|
||||
if (threadqueue->threads_count == 0) {
|
||||
|
@ -297,6 +368,28 @@ threadqueue_job * threadqueue_submit(threadqueue_queue * const threadqueue, void
|
|||
|
||||
job = MALLOC(threadqueue_job, 1);
|
||||
|
||||
#ifdef _DEBUG
|
||||
if (debug_description) {
|
||||
int desc_len = MIN(255, strlen(debug_description));
|
||||
char* desc;
|
||||
|
||||
//Copy description
|
||||
desc = MALLOC(char, desc_len + 1);
|
||||
assert(desc);
|
||||
memcpy(desc, debug_description, desc_len);
|
||||
desc[desc_len] = 0;
|
||||
|
||||
job->debug_description = desc;
|
||||
} else {
|
||||
char* desc;
|
||||
desc = MALLOC(char, 255);
|
||||
sprintf(desc, "(*%p)(%p)", fptr, arg);
|
||||
|
||||
job->debug_description = desc;
|
||||
}
|
||||
clock_gettime(CLOCK_MONOTONIC, &job->debug_clock_enqueue);
|
||||
#endif //_DEBUG
|
||||
|
||||
if (!job) {
|
||||
fprintf(stderr, "Could not alloc job!\n");
|
||||
assert(0);
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
****************************************************************************/
|
||||
|
||||
#include <pthread.h>
|
||||
#ifdef _DEBUG
|
||||
#include <time.h>
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
THREADQUEUE_JOB_STATE_QUEUED = 0,
|
||||
|
@ -41,6 +44,17 @@ typedef struct threadqueue_job {
|
|||
//Job function and state to use
|
||||
void (*fptr)(void *arg);
|
||||
void *arg;
|
||||
|
||||
#ifdef _DEBUG
|
||||
const char* debug_description;
|
||||
|
||||
int debug_worker_id;
|
||||
|
||||
struct timespec debug_clock_enqueue;
|
||||
struct timespec debug_clock_start;
|
||||
struct timespec debug_clock_stop;
|
||||
struct timespec debug_clock_dequeue;
|
||||
#endif
|
||||
} threadqueue_job;
|
||||
|
||||
|
||||
|
@ -61,13 +75,26 @@ typedef struct {
|
|||
unsigned int queue_count;
|
||||
unsigned int queue_size;
|
||||
unsigned int queue_waiting;
|
||||
|
||||
#ifdef _DEBUG
|
||||
//Format: pointer <tab> worker id <tab> time enqueued <tab> time started <tab> time stopped <tab> time dequeued <tab> job description
|
||||
//For threads, pointer = "" and job description == "thread", time enqueued and time dequeued are equal to "-"
|
||||
//For flush, pointer = "" and job description == "FLUSH", time enqueued, time dequeued and time started are equal to "-"
|
||||
//Each time field, except the first one in the line be expressed in a relative way, by prepending the number of seconds by +.
|
||||
//Dependencies: pointer -> pointer
|
||||
|
||||
FILE *debug_log;
|
||||
|
||||
struct timespec *debug_clock_thread_start;
|
||||
struct timespec *debug_clock_thread_end;
|
||||
#endif
|
||||
} threadqueue_queue;
|
||||
|
||||
//Init a threadqueue
|
||||
int threadqueue_init(threadqueue_queue * threadqueue, int thread_count);
|
||||
|
||||
//Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run threadqueue_job_unwait_job in order to have it run
|
||||
threadqueue_job * threadqueue_submit(threadqueue_queue * threadqueue, void (*fptr)(void *arg), void *arg, int wait);
|
||||
threadqueue_job * threadqueue_submit(threadqueue_queue * threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* debug_description);
|
||||
|
||||
int threadqueue_job_unwait_job(threadqueue_queue * threadqueue, threadqueue_job *job);
|
||||
|
||||
|
|
Loading…
Reference in a new issue