diff --git a/src/README-rdcost-thingy.txt b/src/README-rdcost-thingy.txt new file mode 100644 index 00000000..55e9392c --- /dev/null +++ b/src/README-rdcost-thingy.txt @@ -0,0 +1,33 @@ +Build Kvazaar as usual with make, then edit extract_rdcosts.py so that the +parameters suit your usage (the directories, num of threads and Kvazaar +params) and then run extract_rdcosts.py. It will run a lot of Kvazaar +instances in parallel to encode a lot of videos and sift off all the coeff +groups they measure RD cost for. The coeff groups will be written into the +relevant data file in the following format (although through GZIP): + +Size (B) | Description +----------+------------ +4 | size: Coeff group size, in int16's +4 | ccc: Coeff group's coding cost +size * 2 | coeffs: Coeff group data + +You can roll your own filter_rdcosts.c program to analyze the data the way +you want, and run it like: + +$ gzip -d < /path/to/compressed_datafile.gz | ./filter_rdcosts | less + +Maybe one day, there'll be a multithreaded script like extract_rdcosts.py to +automate and parallelize processing of a massive heap of data files. + +EDIT: +It's now possible to do OLS regression by streaming the source data twice +from source and using Octave to invert the temporary result matrix, and +that's what run_filter.py does in parallel. To do this on data you've +gathered by extract_rdcosts.py: + +$ gcc filter_rdcosts.c -o frcosts_matrix +$ gcc ols_2ndpart.c -o ols_2ndpart +$ ./run_filter.py + +Although you should probably adjust the run_filter.py params before actually +running it diff --git a/src/estimate.m b/src/estimate.m new file mode 100644 index 00000000..3dbd62ee --- /dev/null +++ b/src/estimate.m @@ -0,0 +1,5 @@ +data = dlmread("/dev/stdin", " "); +coeffs = data(1:end, 1:5); +costs = data(1:end, 6); +[beta, sigma, r] = ols(costs, coeffs); +disp(beta) diff --git a/src/extract_rdcosts.py b/src/extract_rdcosts.py new file mode 100755 index 00000000..02145ede --- /dev/null +++ b/src/extract_rdcosts.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 + +import glob +import os +import subprocess +import threading +import time + +logdir = os.path.join("/tmp", "rdcost", "logs") +ofdir = os.path.join("/tmp", "rdcost", "data") + +n_threads = 8 +home_rel = lambda path: os.path.join(os.environ["HOME"], path) + +qps = range(12, 14) +sequences = ("/opt/test_seqs/hevc-B/*.yuv",) + +kvzargs = [home_rel("kvazaar/src/kvazaar"), "--threads", "1", "--preset=ultrafast"] +kvzenv = {"LD_LIBRARY_PATH": home_rel("kvazaar/src/.libs/")} + +gzargs = ["gzip"] + +class MTSafeIterable: + def __init__(self, iterable): + self.lock = threading.Lock() + self.iterable = iterable + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + return next(self.iterable) + +def combinations(xi, yi): + for x in xi: + for y in yi: + yield (x, y) + +def chain(lol): + for l in lol: + for i in l: + yield i + +def run_job(job): + ifpath, qp = job + ifname = os.path.basename(ifpath) + + jobname = "%s-qp%i" % (ifname, qp) + hevcname = "%s.hevc" % jobname + logname = "%s.log" % jobname + ofname = "%s.gz" % jobname + + hevcpath = os.path.join("/tmp", hevcname) + logpath = os.path.join(logdir, logname) + ofpath = os.path.join(ofdir, ofname) + + my_kvzargs = kvzargs + ["-i", ifpath, + "--qp", str(qp), + "-o", hevcpath] + + with open(logpath, "w") as lf: + with open(ofpath, "wb") as of: + kvz = subprocess.Popen(my_kvzargs, env=kvzenv, stdout=subprocess.PIPE, stderr=lf) + gzip = subprocess.Popen(gzargs, stdin=kvz.stdout, stdout=of) + + gzip.communicate() + kvz.communicate() + +def threadfunc(joblist): + for job in joblist: + run_job(job) + +def main(): + jobs = combinations(chain(map(glob.glob, sequences)), qps) + joblist = MTSafeIterable(jobs) + + threads = [threading.Thread(target=threadfunc, args=(joblist,)) for _ in range(n_threads)] + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + +if (__name__ == "__main__"): + main() diff --git a/src/filter_rdcosts.c b/src/filter_rdcosts.c new file mode 100644 index 00000000..100682e5 --- /dev/null +++ b/src/filter_rdcosts.c @@ -0,0 +1,104 @@ +#include +#include +#include +#include + +#define BUFSZ (64 * 64 * sizeof(uint16_t)) +#define NUM_COEFF_BUCKETS (4) +#define NUM_OTHER_BUCKETS (0) +#define NUM_TOTAL_BUCKETS ((NUM_COEFF_BUCKETS) + (NUM_OTHER_BUCKETS)) +#define MAX_COEFF_BUCKET ((NUM_COEFF_BUCKETS) - 1) + +#define clz(x) __builtin_clz(x) +#define ilog2(x) (sizeof(x) * 8 - clz(x) - 1) + +void print_coeffs(const int16_t *buf, uint32_t size, uint32_t ccc) +{ + uint32_t i; + printf("Buf size %u, ccc %u\n", size, ccc); + for (i = 0; i < size; i++) + printf("%i ", buf[i]); + printf("\n"); +} + +void count_coeffs(const int16_t *buf, uint32_t size, uint64_t *buckets, uint64_t *num_signs, uint16_t *excess) +{ + *excess = 0; + uint32_t i; + + for (i = 0; i < size; i++) { + int16_t curr = buf[i]; + int16_t is_signed = curr >> 15; + *num_signs += (is_signed & 1); + + uint16_t abs = (curr ^ is_signed) - is_signed; + if (abs > MAX_COEFF_BUCKET) { + *excess += abs - MAX_COEFF_BUCKET; + abs = MAX_COEFF_BUCKET; + } + + buckets[abs]++; + } +} + +void print_buckets(const uint64_t *buckets, uint64_t num_signs) +{ + uint32_t i; + for (i = 0; i < NUM_COEFF_BUCKETS; i++) + printf("%3u: %lu\n", i, buckets[i]); + printf("Signs: %lu\n", num_signs); +} + +void update_matrix(const uint64_t *buckets, uint64_t *mat) +{ + for (int y = 0; y < NUM_TOTAL_BUCKETS; y++) { + for (int x = 0; x < NUM_TOTAL_BUCKETS; x++) { + int curr_pos = y * NUM_TOTAL_BUCKETS + x; + mat[curr_pos] += buckets[x] * buckets[y]; + } + } +} + +void process_rdcosts(FILE *in, FILE *out) +{ + void *buf = malloc(BUFSZ); + uint32_t *u32buf = (uint32_t *)buf; + int16_t *i16buf = (int16_t *)buf; + + float weights[NUM_TOTAL_BUCKETS] = {0.0f}; + + uint64_t mat[NUM_TOTAL_BUCKETS * NUM_TOTAL_BUCKETS] = {0}; + + while (!feof(in)) { + uint32_t size, ccc, size_sqrt; + uint64_t cg_buckets[NUM_TOTAL_BUCKETS] = {0}; + uint64_t cg_num_signs = 0; + uint16_t excess = 0; + + fread(buf, sizeof(uint32_t), 2, in); + size = u32buf[0]; + ccc = u32buf[1]; + + size_sqrt = 1 << (ilog2(size) >> 1); + + fread(buf, sizeof(int16_t), size, in); + + count_coeffs(i16buf, size, cg_buckets, &cg_num_signs, &excess); + update_matrix(cg_buckets, mat); + } + for (int y = 0; y < NUM_TOTAL_BUCKETS; y++) { + for (int x = 0; x < NUM_TOTAL_BUCKETS; x++) { + int curr_pos = y * NUM_TOTAL_BUCKETS + x; + printf("%lu ", mat[curr_pos]); + } + printf("\n"); + } + + + free(buf); +} + +int main(int ar, char **av) +{ + process_rdcosts(stdin, stdout); +} diff --git a/src/invert_matrix.m b/src/invert_matrix.m new file mode 100644 index 00000000..ce127d31 --- /dev/null +++ b/src/invert_matrix.m @@ -0,0 +1,3 @@ +A = dlmread("/dev/stdin"); +B = inv(A); +dlmwrite("/dev/stdout", B, " "); diff --git a/src/ols_2ndpart.c b/src/ols_2ndpart.c new file mode 100644 index 00000000..3aa09ce5 --- /dev/null +++ b/src/ols_2ndpart.c @@ -0,0 +1,102 @@ +#include +#include +#include +#include + +#define BUFSZ (64 * 64 * sizeof(uint16_t)) +#define NUM_COEFF_BUCKETS (4) +#define NUM_OTHER_BUCKETS (0) +#define NUM_TOTAL_BUCKETS ((NUM_COEFF_BUCKETS) + (NUM_OTHER_BUCKETS)) +#ifdef ERR_SQUARED +#define STEPSIZE (0.00000001f * 0.000001f) +#else +#define STEPSIZE (0.00000001f) +#endif + +#define clz(x) __builtin_clz(x) +#define ilog2(x) (sizeof(x) * 8 - clz(x) - 1) +#define coord(x,y,w) ((x)+((y)*(w))) + +void update_result(const uint64_t *buckets, uint64_t ccc, const double *mat, double *res) +{ + for (int y = 0; y < NUM_TOTAL_BUCKETS; y++) { + double addend = 0.0; + for (int x = 0; x < NUM_TOTAL_BUCKETS; x++) { + addend += mat[coord(x, y, NUM_TOTAL_BUCKETS)] * (double)buckets[x]; + } + addend *= (double)ccc; + res[y] += addend; + } +} + +void read_matrix(const char *fn, double *mat) +{ + FILE *f = fopen(fn, "r"); + for (int y = 0; y < NUM_TOTAL_BUCKETS; y++) { + for (int x = 0; x < NUM_TOTAL_BUCKETS; x++) { + float curr; + fscanf(f, "%f", &curr); + mat[x + y * NUM_TOTAL_BUCKETS] = curr; + } + } + fclose(f); +} + +void count_coeffs(const int16_t *buf, uint32_t size, uint64_t *buckets, uint64_t *num_signs) +{ + uint32_t i; + for (i = 0; i < size; i++) { + int16_t curr = buf[i]; + int16_t is_signed = curr >> 15; + *num_signs += (is_signed & 1); + + uint16_t abs = (curr ^ is_signed) - is_signed; + if (abs >= NUM_COEFF_BUCKETS) + abs = NUM_COEFF_BUCKETS - 1; + + buckets[abs]++; + } +} + +void process_rdcosts(FILE *in, FILE *out, const double *mat) +{ + void *buf = malloc(BUFSZ); + uint32_t *u32buf = (uint32_t *)buf; + int16_t *i16buf = (int16_t *)buf; + + double res[NUM_TOTAL_BUCKETS] = {0.0}; + + while (!feof(in)) { + uint32_t size, ccc, size_sqrt; + uint64_t cg_buckets[NUM_TOTAL_BUCKETS] = {0}; + uint64_t cg_num_signs = 0; + + fread(buf, sizeof(uint32_t), 2, in); + size = u32buf[0]; + ccc = u32buf[1]; + + size_sqrt = 1 << (ilog2(size) >> 1); + + fread(buf, sizeof(int16_t), size, in); + + count_coeffs(i16buf, size, cg_buckets, &cg_num_signs); + update_result(cg_buckets, ccc, mat, res); + } + + for (int y = 0; y < NUM_TOTAL_BUCKETS; y++) + fprintf(out, "%g\n", (float)(res[y])); + + free(buf); +} + +int main(int ar, char **av) +{ + double mat[NUM_TOTAL_BUCKETS * NUM_TOTAL_BUCKETS] = {0.0}; + if (ar != 2) { + fprintf(stderr, "gib matrix plz\n"); + return 1; + } + read_matrix(av[1], mat); + process_rdcosts(stdin, stdout, mat); +} + diff --git a/src/rdo.c b/src/rdo.c index c7dcce76..242f6868 100644 --- a/src/rdo.c +++ b/src/rdo.c @@ -22,6 +22,7 @@ #include #include +#include #include "cabac.h" #include "context.h" @@ -194,6 +195,26 @@ static INLINE uint32_t get_coeff_cabac_cost( return (23 - cabac_copy.bits_left) + (cabac_copy.num_buffered_bytes << 3); } +static INLINE void save_ccc(const coeff_t *coeff, int32_t size, uint32_t ccc) +{ + const uint64_t flush_count = 4096; + + static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; + static uint64_t count = 0; + pthread_mutex_lock(&mtx); + + assert(sizeof(coeff_t) == sizeof(int16_t)); + + fwrite(&size, sizeof(size), 1, stdout); + fwrite(&ccc, sizeof(ccc), 1, stdout); + fwrite( coeff, sizeof(coeff_t), size, stdout); + + if (((++count) % flush_count) == 0) + fflush(stdout); + + pthread_mutex_unlock(&mtx); +} + /** * \brief Estimate bitcost for coding coefficients. * @@ -209,13 +230,21 @@ uint32_t kvz_get_coeff_cost(const encoder_state_t * const state, int32_t type, int8_t scan_mode) { + int save_cccs = 1; // TODO! if (state->qp < state->encoder_control->cfg.fast_residual_cost_limit && state->qp < MAX_FAST_COEFF_COST_QP) { - - uint64_t weights = kvz_fast_coeff_get_weights(state); - return kvz_fast_coeff_cost(coeff, width, weights); + if (save_cccs) { + assert(0 && "Plz no fast-residual-cost"); + } else { + uint64_t weights = kvz_fast_coeff_get_weights(state); + return kvz_fast_coeff_cost(coeff, width, weights); + } } else { - return get_coeff_cabac_cost(state, coeff, width, type, scan_mode); + uint32_t ccc = get_coeff_cabac_cost(state, coeff, width, type, scan_mode); + if (save_cccs) { + save_ccc(coeff, width * width, ccc); + } + return ccc; } } diff --git a/src/run_filter.py b/src/run_filter.py new file mode 100755 index 00000000..784533ee --- /dev/null +++ b/src/run_filter.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +import glob +import os +import subprocess +import tempfile +import threading +import time + +n_threads = 8 +data = "/home/moptim/rdcost/data/*.gz" +gzargs = ["gzip", "-d"] +filtargs = ["./frcosts_matrix"] +octargs = ["octave-cli", "invert_matrix.m"] +filt2args = ["./ols_2ndpart"] +resultdir = os.path.join("/tmp", "rdcost", "coeff_buckets") + +class MTSafeIterable: + def __init__(self, iterable): + self.lock = threading.Lock() + self.iterable = iterable + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + return next(self.iterable) + +def run_job(job): + datapath = job + resultpath = os.path.join(resultdir, os.path.basename(job) + ".result") + + print("Running job %s" % datapath) + + with open(resultpath, "w") as rf: + with tempfile.NamedTemporaryFile() as tf: + with open(datapath, "rb") as df: + f2a = list(filt2args) + f2a.append(tf.name) + gzip = subprocess.Popen(gzargs, stdin=df, stdout=subprocess.PIPE) + filt = subprocess.Popen(filtargs, stdin=gzip.stdout, stdout=subprocess.PIPE) + octa = subprocess.Popen(octargs, stdin=filt.stdout, stdout=tf) + + octa.communicate() + filt.communicate() + gzip.communicate() + + with open(datapath, "rb") as df: + gz2 = subprocess.Popen(gzargs, stdin=df, stdout=subprocess.PIPE) + f2 = subprocess.Popen(f2a, stdin=gz2.stdout, stdout=rf) + + f2.communicate() + gz2.communicate() + + print("Job %s done" % datapath) + +def threadfunc(joblist): + for job in joblist: + run_job(job) + +def main(): + jobs = glob.glob(data) + joblist = MTSafeIterable(iter(jobs)) + + threads = [threading.Thread(target=threadfunc, args=(joblist,)) for _ in range(n_threads)] + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + +if (__name__ == "__main__"): + main()